This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 943c934bd20d9d53bc85572b6e65bd10b98a2c81 Author: Tzu-Li (Gordon) Tai <[email protected]> AuthorDate: Wed Feb 27 11:59:47 2019 +0800 [FLINK-11772] [DataStream] Remove "config" from all serializer snapshot field / method names in InternalTimersSnapshot This renaming corresponds to the fact that TypeSerializerConfigSnapshot is now deprecated, and is fully replaced by TypeSerializerSnapshot. --- .../api/operators/InternalTimerServiceImpl.java | 4 ++-- .../api/operators/InternalTimersSnapshot.java | 28 +++++++++++----------- .../InternalTimersSnapshotReaderWriters.java | 8 +++---- 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java index 3bded503..c2088d0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java @@ -145,13 +145,13 @@ public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N>, CompatibilityResult<K> keySerializerCompatibility = CompatibilityUtil.resolveCompatibilityResult( this.keyDeserializer, null, - restoredTimersSnapshot.getKeySerializerConfigSnapshot(), + restoredTimersSnapshot.getKeySerializerSnapshot(), keySerializer); CompatibilityResult<N> namespaceSerializerCompatibility = CompatibilityUtil.resolveCompatibilityResult( this.namespaceDeserializer, null, - restoredTimersSnapshot.getNamespaceSerializerConfigSnapshot(), + restoredTimersSnapshot.getNamespaceSerializerSnapshot(), namespaceSerializer); if (keySerializerCompatibility.isRequiresMigration() || namespaceSerializerCompatibility.isRequiresMigration()) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshot.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshot.java index 3dde695..9c78aa5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshot.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshot.java @@ -33,9 +33,9 @@ import java.util.Set; public class InternalTimersSnapshot<K, N> { private TypeSerializer<K> keySerializer; - private TypeSerializerSnapshot<K> keySerializerConfigSnapshot; + private TypeSerializerSnapshot<K> keySerializerSnapshot; private TypeSerializer<N> namespaceSerializer; - private TypeSerializerSnapshot<N> namespaceSerializerConfigSnapshot; + private TypeSerializerSnapshot<N> namespaceSerializerSnapshot; private Set<TimerHeapInternalTimer<K, N>> eventTimeTimers; private Set<TimerHeapInternalTimer<K, N>> processingTimeTimers; @@ -46,16 +46,16 @@ public class InternalTimersSnapshot<K, N> { /** Constructor to use when snapshotting the timers. */ public InternalTimersSnapshot( TypeSerializer<K> keySerializer, - TypeSerializerSnapshot<K> keySerializerConfigSnapshot, + TypeSerializerSnapshot<K> keySerializerSnapshot, TypeSerializer<N> namespaceSerializer, - TypeSerializerSnapshot<N> namespaceSerializerConfigSnapshot, + TypeSerializerSnapshot<N> namespaceSerializerSnapshot, @Nullable Set<TimerHeapInternalTimer<K, N>> eventTimeTimers, @Nullable Set<TimerHeapInternalTimer<K, N>> processingTimeTimers) { this.keySerializer = Preconditions.checkNotNull(keySerializer); - this.keySerializerConfigSnapshot = Preconditions.checkNotNull(keySerializerConfigSnapshot); + this.keySerializerSnapshot = Preconditions.checkNotNull(keySerializerSnapshot); this.namespaceSerializer = Preconditions.checkNotNull(namespaceSerializer); - this.namespaceSerializerConfigSnapshot = Preconditions.checkNotNull(namespaceSerializerConfigSnapshot); + this.namespaceSerializerSnapshot = Preconditions.checkNotNull(namespaceSerializerSnapshot); this.eventTimeTimers = eventTimeTimers; this.processingTimeTimers = processingTimeTimers; } @@ -68,12 +68,12 @@ public class InternalTimersSnapshot<K, N> { this.keySerializer = keySerializer; } - public TypeSerializerSnapshot<K> getKeySerializerConfigSnapshot() { - return keySerializerConfigSnapshot; + public TypeSerializerSnapshot<K> getKeySerializerSnapshot() { + return keySerializerSnapshot; } - public void setKeySerializerConfigSnapshot(TypeSerializerSnapshot<K> keySerializerConfigSnapshot) { - this.keySerializerConfigSnapshot = keySerializerConfigSnapshot; + public void setKeySerializerSnapshot(TypeSerializerSnapshot<K> keySerializerConfigSnapshot) { + this.keySerializerSnapshot = keySerializerConfigSnapshot; } public TypeSerializer<N> getNamespaceSerializer() { @@ -84,12 +84,12 @@ public class InternalTimersSnapshot<K, N> { this.namespaceSerializer = namespaceSerializer; } - public TypeSerializerSnapshot<N> getNamespaceSerializerConfigSnapshot() { - return namespaceSerializerConfigSnapshot; + public TypeSerializerSnapshot<N> getNamespaceSerializerSnapshot() { + return namespaceSerializerSnapshot; } - public void setNamespaceSerializerConfigSnapshot(TypeSerializerSnapshot<N> namespaceSerializerConfigSnapshot) { - this.namespaceSerializerConfigSnapshot = namespaceSerializerConfigSnapshot; + public void setNamespaceSerializerSnapshot(TypeSerializerSnapshot<N> namespaceSerializerConfigSnapshot) { + this.namespaceSerializerSnapshot = namespaceSerializerConfigSnapshot; } public Set<TimerHeapInternalTimer<K, N>> getEventTimeTimers() { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java index 180e254..a937a5c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java @@ -159,8 +159,8 @@ public class InternalTimersSnapshotReaderWriters { TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience( out, Arrays.asList( - Tuple2.of(timersSnapshot.getKeySerializer(), timersSnapshot.getKeySerializerConfigSnapshot()), - Tuple2.of(timersSnapshot.getNamespaceSerializer(), timersSnapshot.getNamespaceSerializerConfigSnapshot()))); + Tuple2.of(timersSnapshot.getKeySerializer(), timersSnapshot.getKeySerializerSnapshot()), + Tuple2.of(timersSnapshot.getNamespaceSerializer(), timersSnapshot.getNamespaceSerializerSnapshot()))); } } @@ -289,9 +289,9 @@ public class InternalTimersSnapshotReaderWriters { TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, userCodeClassLoader); restoredTimersSnapshot.setKeySerializer((TypeSerializer<K>) serializersAndConfigs.get(0).f0); - restoredTimersSnapshot.setKeySerializerConfigSnapshot((TypeSerializerSnapshot<K>) serializersAndConfigs.get(0).f1); + restoredTimersSnapshot.setKeySerializerSnapshot((TypeSerializerSnapshot<K>) serializersAndConfigs.get(0).f1); restoredTimersSnapshot.setNamespaceSerializer((TypeSerializer<N>) serializersAndConfigs.get(1).f0); - restoredTimersSnapshot.setNamespaceSerializerConfigSnapshot((TypeSerializerSnapshot<N>) serializersAndConfigs.get(1).f1); + restoredTimersSnapshot.setNamespaceSerializerSnapshot((TypeSerializerSnapshot<N>) serializersAndConfigs.get(1).f1); } }
