This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git
commit a22069ff8e80678c4d7a33c3ad349f0a4499567c Author: Tzu-Li (Gordon) Tai <[email protected]> AuthorDate: Wed Feb 27 19:56:19 2019 +0800 [FLINK-11772] [DataStream] InternalTimerServiceSerializationProxy should not be serializing timers' key / namespace serializers anymore All of the changes done to managed state surrounding how we no longer Java-serialize serializers anymore, and only write the serializer snapshot, was not reflected to how we snapshot timers. This was mainly due to the fact that timers were not handled by state backends (and were therefore not managed state) in the past, and were handled in an isolated manner by the InternalTimerServiceSerializationProxy. This closes #7849. --- .../api/operators/InternalTimerServiceImpl.java | 29 ++++--- .../InternalTimerServiceSerializationProxy.java | 18 +++- .../api/operators/InternalTimersSnapshot.java | 30 ++----- .../InternalTimersSnapshotReaderWriters.java | 96 +++++++++++++++++----- .../operators/InternalTimerServiceImplTest.java | 16 +++- 5 files changed, 124 insertions(+), 65 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java index a7a3490..dd88bc6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java @@ -265,13 +265,19 @@ public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N>, public InternalTimersSnapshot<K, N> snapshotTimersForKeyGroup(int keyGroupIdx) { return new InternalTimersSnapshot<>( keySerializer, - keySerializer.snapshotConfiguration(), namespaceSerializer, - namespaceSerializer.snapshotConfiguration(), eventTimeTimersQueue.getSubsetForKeyGroup(keyGroupIdx), processingTimeTimersQueue.getSubsetForKeyGroup(keyGroupIdx)); } + public TypeSerializer<K> getKeySerializer() { + return keySerializer; + } + + public TypeSerializer<N> getNamespaceSerializer() { + return namespaceSerializer; + } + /** * Restore the timers (both processing and event time ones) for a given {@code keyGroupIdx}. * @@ -283,13 +289,17 @@ public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N>, public void restoreTimersForKeyGroup(InternalTimersSnapshot<?, ?> restoredSnapshot, int keyGroupIdx) { this.restoredTimersSnapshot = (InternalTimersSnapshot<K, N>) restoredSnapshot; - if (areSnapshotSerializersIncompatible(restoredSnapshot)) { - throw new IllegalArgumentException("Tried to restore timers " + - "for the same service with different serializers."); + TypeSerializer<K> restoredKeySerializer = restoredTimersSnapshot.getKeySerializerSnapshot().restoreSerializer(); + if (this.keyDeserializer != null && !this.keyDeserializer.equals(restoredKeySerializer)) { + throw new IllegalArgumentException("Tried to restore timers for the same service with different key serializers."); } + this.keyDeserializer = restoredKeySerializer; - this.keyDeserializer = restoredTimersSnapshot.getKeySerializer(); - this.namespaceDeserializer = restoredTimersSnapshot.getNamespaceSerializer(); + TypeSerializer<N> restoredNamespaceSerializer = restoredTimersSnapshot.getNamespaceSerializerSnapshot().restoreSerializer(); + if (this.namespaceDeserializer != null && !this.namespaceDeserializer.equals(restoredNamespaceSerializer)) { + throw new IllegalArgumentException("Tried to restore timers for the same service with different namespace serializers."); + } + this.namespaceDeserializer = restoredNamespaceSerializer; checkArgument(localKeyGroupRange.contains(keyGroupIdx), "Key Group " + keyGroupIdx + " does not belong to the local range."); @@ -358,9 +368,4 @@ public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N>, } return result; } - - private boolean areSnapshotSerializersIncompatible(InternalTimersSnapshot<?, ?> restoredSnapshot) { - return (this.keyDeserializer != null && !this.keyDeserializer.equals(restoredSnapshot.getKeySerializer())) || - (this.namespaceDeserializer != null && !this.namespaceDeserializer.equals(restoredSnapshot.getNamespaceSerializer())); - } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceSerializationProxy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceSerializationProxy.java index dea17f9..ca591bcd 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceSerializationProxy.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceSerializationProxy.java @@ -35,7 +35,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; @Internal public class InternalTimerServiceSerializationProxy<K> extends PostVersionedIOReadableWritable { - public static final int VERSION = 1; + public static final int VERSION = 2; /** The key-group timer services to write / read. */ private final InternalTimeServiceManager<K> timerServicesManager; @@ -75,6 +75,12 @@ public class InternalTimerServiceSerializationProxy<K> extends PostVersionedIORe } @Override + public int[] getCompatibleVersions() { + return new int[] { VERSION, 1 }; + } + + @Override + @SuppressWarnings("unchecked") public void write(DataOutputView out) throws IOException { super.write(out); final Map<String, InternalTimerServiceImpl<K, ?>> registeredTimerServices = @@ -87,7 +93,11 @@ public class InternalTimerServiceSerializationProxy<K> extends PostVersionedIORe out.writeUTF(serviceName); InternalTimersSnapshotReaderWriters - .getWriterForVersion(VERSION, timerService.snapshotTimersForKeyGroup(keyGroupIdx)) + .getWriterForVersion( + VERSION, + timerService.snapshotTimersForKeyGroup(keyGroupIdx), + timerService.getKeySerializer(), + (TypeSerializer) timerService.getNamespaceSerializer()) .writeTimersSnapshot(out); } } @@ -115,8 +125,8 @@ public class InternalTimerServiceSerializationProxy<K> extends PostVersionedIORe @SuppressWarnings("unchecked") private <N> InternalTimerServiceImpl<K, N> registerOrGetTimerService( String serviceName, InternalTimersSnapshot<?, ?> restoredTimersSnapshot) { - final TypeSerializer<K> keySerializer = (TypeSerializer<K>) restoredTimersSnapshot.getKeySerializer(); - final TypeSerializer<N> namespaceSerializer = (TypeSerializer<N>) restoredTimersSnapshot.getNamespaceSerializer(); + final TypeSerializer<K> keySerializer = (TypeSerializer<K>) restoredTimersSnapshot.getKeySerializerSnapshot().restoreSerializer(); + final TypeSerializer<N> namespaceSerializer = (TypeSerializer<N>) restoredTimersSnapshot.getNamespaceSerializerSnapshot().restoreSerializer(); TimerSerializer<K, N> timerSerializer = new TimerSerializer<>(keySerializer, namespaceSerializer); return timerServicesManager.registerOrGetTimerService(serviceName, timerSerializer); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshot.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshot.java index 9c78aa5..d5e9483 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshot.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshot.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerUtils; import org.apache.flink.util.Preconditions; import javax.annotation.Nullable; @@ -32,9 +33,7 @@ import java.util.Set; */ public class InternalTimersSnapshot<K, N> { - private TypeSerializer<K> keySerializer; private TypeSerializerSnapshot<K> keySerializerSnapshot; - private TypeSerializer<N> namespaceSerializer; private TypeSerializerSnapshot<N> namespaceSerializerSnapshot; private Set<TimerHeapInternalTimer<K, N>> eventTimeTimers; @@ -46,28 +45,19 @@ public class InternalTimersSnapshot<K, N> { /** Constructor to use when snapshotting the timers. */ public InternalTimersSnapshot( TypeSerializer<K> keySerializer, - TypeSerializerSnapshot<K> keySerializerSnapshot, TypeSerializer<N> namespaceSerializer, - TypeSerializerSnapshot<N> namespaceSerializerSnapshot, @Nullable Set<TimerHeapInternalTimer<K, N>> eventTimeTimers, @Nullable Set<TimerHeapInternalTimer<K, N>> processingTimeTimers) { - this.keySerializer = Preconditions.checkNotNull(keySerializer); - this.keySerializerSnapshot = Preconditions.checkNotNull(keySerializerSnapshot); - this.namespaceSerializer = Preconditions.checkNotNull(namespaceSerializer); - this.namespaceSerializerSnapshot = Preconditions.checkNotNull(namespaceSerializerSnapshot); + Preconditions.checkNotNull(keySerializer); + this.keySerializerSnapshot = TypeSerializerUtils.snapshotBackwardsCompatible(keySerializer); + Preconditions.checkNotNull(namespaceSerializer); + this.namespaceSerializerSnapshot = TypeSerializerUtils.snapshotBackwardsCompatible(namespaceSerializer); + this.eventTimeTimers = eventTimeTimers; this.processingTimeTimers = processingTimeTimers; } - public TypeSerializer<K> getKeySerializer() { - return keySerializer; - } - - public void setKeySerializer(TypeSerializer<K> keySerializer) { - this.keySerializer = keySerializer; - } - public TypeSerializerSnapshot<K> getKeySerializerSnapshot() { return keySerializerSnapshot; } @@ -76,14 +66,6 @@ public class InternalTimersSnapshot<K, N> { this.keySerializerSnapshot = keySerializerConfigSnapshot; } - public TypeSerializer<N> getNamespaceSerializer() { - return namespaceSerializer; - } - - public void setNamespaceSerializer(TypeSerializer<N> namespaceSerializer) { - this.namespaceSerializer = namespaceSerializer; - } - public TypeSerializerSnapshot<N> getNamespaceSerializerSnapshot() { return namespaceSerializerSnapshot; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java index d61e542..9dcd959 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java @@ -56,16 +56,24 @@ public class InternalTimersSnapshotReaderWriters { // Writers // - pre-versioned: Flink 1.4.0 // - v1: Flink 1.4.1 + // - v2: Flink 1.8.0 // ------------------------------------------------------------------------------- - public static <K, N> InternalTimersSnapshotWriter getWriterForVersion(int version, InternalTimersSnapshot<K, N> timersSnapshot) { + public static <K, N> InternalTimersSnapshotWriter getWriterForVersion( + int version, + InternalTimersSnapshot<K, N> timersSnapshot, + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer) { switch (version) { case NO_VERSION: - return new InternalTimersSnapshotWriterPreVersioned<>(timersSnapshot); + return new InternalTimersSnapshotWriterPreVersioned<>(timersSnapshot, keySerializer, namespaceSerializer); + + case 1: + return new InternalTimersSnapshotWriterV1<>(timersSnapshot, keySerializer, namespaceSerializer); case InternalTimerServiceSerializationProxy.VERSION: - return new InternalTimersSnapshotWriterV1<>(timersSnapshot); + return new InternalTimersSnapshotWriterV2<>(timersSnapshot, keySerializer, namespaceSerializer); default: // guard for future @@ -92,8 +100,16 @@ public class InternalTimersSnapshotReaderWriters { protected final InternalTimersSnapshot<K, N> timersSnapshot; - public AbstractInternalTimersSnapshotWriter(InternalTimersSnapshot<K, N> timersSnapshot) { + protected final TypeSerializer<K> keySerializer; + protected final TypeSerializer<N> namespaceSerializer; + + public AbstractInternalTimersSnapshotWriter( + InternalTimersSnapshot<K, N> timersSnapshot, + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer) { this.timersSnapshot = checkNotNull(timersSnapshot); + this.keySerializer = checkNotNull(keySerializer); + this.namespaceSerializer = checkNotNull(namespaceSerializer); } protected abstract void writeKeyAndNamespaceSerializers(DataOutputView out) throws IOException; @@ -103,8 +119,8 @@ public class InternalTimersSnapshotReaderWriters { writeKeyAndNamespaceSerializers(out); LegacyTimerSerializer<K, N> timerSerializer = new LegacyTimerSerializer<>( - timersSnapshot.getKeySerializer(), - timersSnapshot.getNamespaceSerializer()); + keySerializer, + namespaceSerializer); // write the event time timers Set<TimerHeapInternalTimer<K, N>> eventTimers = timersSnapshot.getEventTimeTimers(); @@ -132,16 +148,19 @@ public class InternalTimersSnapshotReaderWriters { private static class InternalTimersSnapshotWriterPreVersioned<K, N> extends AbstractInternalTimersSnapshotWriter<K, N> { - public InternalTimersSnapshotWriterPreVersioned(InternalTimersSnapshot<K, N> timersSnapshot) { - super(timersSnapshot); + public InternalTimersSnapshotWriterPreVersioned( + InternalTimersSnapshot<K, N> timersSnapshot, + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer) { + super(timersSnapshot, keySerializer, namespaceSerializer); } @Override protected void writeKeyAndNamespaceSerializers(DataOutputView out) throws IOException { // the pre-versioned format only serializes the serializers, without their configuration snapshots try (ByteArrayOutputStreamWithPos stream = new ByteArrayOutputStreamWithPos()) { - InstantiationUtil.serializeObject(stream, timersSnapshot.getKeySerializer()); - InstantiationUtil.serializeObject(stream, timersSnapshot.getNamespaceSerializer()); + InstantiationUtil.serializeObject(stream, keySerializer); + InstantiationUtil.serializeObject(stream, namespaceSerializer); out.write(stream.getBuf(), 0, stream.getPosition()); } @@ -150,8 +169,11 @@ public class InternalTimersSnapshotReaderWriters { private static class InternalTimersSnapshotWriterV1<K, N> extends AbstractInternalTimersSnapshotWriter<K, N> { - public InternalTimersSnapshotWriterV1(InternalTimersSnapshot<K, N> timersSnapshot) { - super(timersSnapshot); + public InternalTimersSnapshotWriterV1( + InternalTimersSnapshot<K, N> timersSnapshot, + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer) { + super(timersSnapshot, keySerializer, namespaceSerializer); } @Override @@ -160,8 +182,24 @@ public class InternalTimersSnapshotReaderWriters { TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience( out, Arrays.asList( - Tuple2.of(timersSnapshot.getKeySerializer(), timersSnapshot.getKeySerializerSnapshot()), - Tuple2.of(timersSnapshot.getNamespaceSerializer(), timersSnapshot.getNamespaceSerializerSnapshot()))); + Tuple2.of(keySerializer, timersSnapshot.getKeySerializerSnapshot()), + Tuple2.of(namespaceSerializer, timersSnapshot.getNamespaceSerializerSnapshot()))); + } + } + + private static class InternalTimersSnapshotWriterV2<K, N> extends AbstractInternalTimersSnapshotWriter<K, N> { + + public InternalTimersSnapshotWriterV2( + InternalTimersSnapshot<K, N> timersSnapshot, + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer) { + super(timersSnapshot, keySerializer, namespaceSerializer); + } + + @Override + protected void writeKeyAndNamespaceSerializers(DataOutputView out) throws IOException { + TypeSerializerSnapshot.writeVersionedSnapshot(out, timersSnapshot.getKeySerializerSnapshot()); + TypeSerializerSnapshot.writeVersionedSnapshot(out, timersSnapshot.getNamespaceSerializerSnapshot()); } } @@ -178,9 +216,12 @@ public class InternalTimersSnapshotReaderWriters { case NO_VERSION: return new InternalTimersSnapshotReaderPreVersioned<>(userCodeClassLoader); - case InternalTimerServiceSerializationProxy.VERSION: + case 1: return new InternalTimersSnapshotReaderV1<>(userCodeClassLoader); + case InternalTimerServiceSerializationProxy.VERSION: + return new InternalTimersSnapshotReaderV2<>(userCodeClassLoader); + default: // guard for future throw new IllegalStateException( @@ -223,8 +264,8 @@ public class InternalTimersSnapshotReaderWriters { LegacyTimerSerializer<K, N> timerSerializer = new LegacyTimerSerializer<>( - restoredTimersSnapshot.getKeySerializer(), - restoredTimersSnapshot.getNamespaceSerializer()); + restoredTimersSnapshot.getKeySerializerSnapshot().restoreSerializer(), + restoredTimersSnapshot.getNamespaceSerializerSnapshot().restoreSerializer()); // read the event time timers int sizeOfEventTimeTimers = in.readInt(); @@ -269,9 +310,7 @@ public class InternalTimersSnapshotReaderWriters { final TypeSerializer<K> keySerializer = InstantiationUtil.deserializeObject(dis, userCodeClassLoader, true); final TypeSerializer<N> namespaceSerializer = InstantiationUtil.deserializeObject(dis, userCodeClassLoader, true); - restoredTimersSnapshot.setKeySerializer(keySerializer); restoredTimersSnapshot.setKeySerializerSnapshot(new BackwardsCompatibleSerializerSnapshot<>(keySerializer)); - restoredTimersSnapshot.setNamespaceSerializer(namespaceSerializer); restoredTimersSnapshot.setNamespaceSerializerSnapshot(new BackwardsCompatibleSerializerSnapshot<>(namespaceSerializer)); } catch (ClassNotFoundException exception) { throw new IOException(exception); @@ -294,13 +333,28 @@ public class InternalTimersSnapshotReaderWriters { List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> serializersAndConfigs = TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, userCodeClassLoader); - restoredTimersSnapshot.setKeySerializer((TypeSerializer<K>) serializersAndConfigs.get(0).f0); restoredTimersSnapshot.setKeySerializerSnapshot((TypeSerializerSnapshot<K>) serializersAndConfigs.get(0).f1); - restoredTimersSnapshot.setNamespaceSerializer((TypeSerializer<N>) serializersAndConfigs.get(1).f0); restoredTimersSnapshot.setNamespaceSerializerSnapshot((TypeSerializerSnapshot<N>) serializersAndConfigs.get(1).f1); } } + private static class InternalTimersSnapshotReaderV2<K, N> extends AbstractInternalTimersSnapshotReader<K, N> { + + public InternalTimersSnapshotReaderV2(ClassLoader userCodeClassLoader) { + super(userCodeClassLoader); + } + + @SuppressWarnings("unchecked") + @Override + protected void restoreKeyAndNamespaceSerializers( + InternalTimersSnapshot<K, N> restoredTimersSnapshot, + DataInputView in) throws IOException { + + restoredTimersSnapshot.setKeySerializerSnapshot(TypeSerializerSnapshot.readVersionedSnapshot(in, userCodeClassLoader)); + restoredTimersSnapshot.setNamespaceSerializerSnapshot(TypeSerializerSnapshot.readVersionedSnapshot(in, userCodeClassLoader)); + } + } + /** * A {@link TypeSerializer} used to serialize/deserialize a {@link TimerHeapInternalTimer}. */ diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImplTest.java index f2da6da..99d2700 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImplTest.java @@ -620,10 +620,14 @@ public class InternalTimerServiceImplTest { Map<Integer, byte[]> snapshot = new HashMap<>(); for (Integer keyGroupIndex : testKeyGroupRange) { try (ByteArrayOutputStream outStream = new ByteArrayOutputStream()) { - InternalTimersSnapshot<?, ?> timersSnapshot = timerService.snapshotTimersForKeyGroup(keyGroupIndex); + InternalTimersSnapshot<Integer, String> timersSnapshot = timerService.snapshotTimersForKeyGroup(keyGroupIndex); InternalTimersSnapshotReaderWriters - .getWriterForVersion(snapshotVersion, timersSnapshot) + .getWriterForVersion( + snapshotVersion, + timersSnapshot, + timerService.getKeySerializer(), + timerService.getNamespaceSerializer()) .writeTimersSnapshot(new DataOutputViewStreamWrapper(outStream)); snapshot.put(keyGroupIndex, outStream.toByteArray()); @@ -701,10 +705,14 @@ public class InternalTimerServiceImplTest { Map<Integer, byte[]> snapshot2 = new HashMap<>(); for (Integer keyGroupIndex : testKeyGroupRange) { try (ByteArrayOutputStream outStream = new ByteArrayOutputStream()) { - InternalTimersSnapshot<?, ?> timersSnapshot = timerService.snapshotTimersForKeyGroup(keyGroupIndex); + InternalTimersSnapshot<Integer, String> timersSnapshot = timerService.snapshotTimersForKeyGroup(keyGroupIndex); InternalTimersSnapshotReaderWriters - .getWriterForVersion(snapshotVersion, timersSnapshot) + .getWriterForVersion( + snapshotVersion, + timersSnapshot, + timerService.getKeySerializer(), + timerService.getNamespaceSerializer()) .writeTimersSnapshot(new DataOutputViewStreamWrapper(outStream)); if (subKeyGroupRange1.contains(keyGroupIndex)) {
