This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d451b7399ff8c1d9fa8497f366047dcfcae5391a
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
AuthorDate: Wed Feb 27 12:10:08 2019 +0800

    [FLINK-11772] [DataStream] Let InternalTimerServiceImpl use new 
serialization compatibility APIs for key / namespace serializer checks
    
    This commit lets the InternalTimerServiceImpl properly use
    TypeSerializerSchemaCompatibility /
    TypeSerializerSnapshot#resolveSchemaCompatibility when attempting to
    check the compatibility of new key and namespace serializers.
    
    This also fixes the fact that this check was previously broken, in that
    the key / namespace serializer was not reassigned to be reconfigured
    ones.
---
 .../api/operators/InternalTimerServiceImpl.java    | 42 ++++++++++++----------
 .../InternalTimersSnapshotReaderWriters.java       | 10 ++++--
 2 files changed, 31 insertions(+), 21 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
index c2088d0..a7a3490 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
@@ -19,9 +19,8 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.runtime.state.InternalPriorityQueue;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
@@ -142,26 +141,31 @@ public class InternalTimerServiceImpl<K, N> implements 
InternalTimerService<N>,
 
                        // the following is the case where we restore
                        if (restoredTimersSnapshot != null) {
-                               CompatibilityResult<K> 
keySerializerCompatibility = CompatibilityUtil.resolveCompatibilityResult(
-                                       this.keyDeserializer,
-                                       null,
-                                       
restoredTimersSnapshot.getKeySerializerSnapshot(),
-                                       keySerializer);
-
-                               CompatibilityResult<N> 
namespaceSerializerCompatibility = CompatibilityUtil.resolveCompatibilityResult(
-                                       this.namespaceDeserializer,
-                                       null,
-                                       
restoredTimersSnapshot.getNamespaceSerializerSnapshot(),
-                                       namespaceSerializer);
-
-                               if 
(keySerializerCompatibility.isRequiresMigration() || 
namespaceSerializerCompatibility.isRequiresMigration()) {
-                                       throw new IllegalStateException("Tried 
to initialize restored TimerService " +
-                                               "with incompatible serializers 
than those used to snapshot its state.");
+                               TypeSerializerSchemaCompatibility<K> 
keySerializerCompatibility =
+                                       
restoredTimersSnapshot.getKeySerializerSnapshot().resolveSchemaCompatibility(keySerializer);
+
+                               if (keySerializerCompatibility.isIncompatible() 
|| keySerializerCompatibility.isCompatibleAfterMigration()) {
+                                       throw new IllegalStateException(
+                                               "Tried to initialize restored 
TimerService with new key serializer that requires migration or is 
incompatible.");
                                }
+
+                               TypeSerializerSchemaCompatibility<N> 
namespaceSerializerCompatibility =
+                                       
restoredTimersSnapshot.getNamespaceSerializerSnapshot().resolveSchemaCompatibility(namespaceSerializer);
+
+                               if 
(namespaceSerializerCompatibility.isIncompatible() || 
namespaceSerializerCompatibility.isCompatibleAfterMigration()) {
+                                       throw new IllegalStateException(
+                                               "Tried to initialize restored 
TimerService with new namespace serializer that requires migration or is 
incompatible.");
+                               }
+
+                               this.keySerializer = 
keySerializerCompatibility.isCompatibleAsIs()
+                                       ? keySerializer : 
keySerializerCompatibility.getReconfiguredSerializer();
+                               this.namespaceSerializer = 
namespaceSerializerCompatibility.isCompatibleAsIs()
+                                       ? namespaceSerializer : 
namespaceSerializerCompatibility.getReconfiguredSerializer();
+                       } else {
+                               this.keySerializer = keySerializer;
+                               this.namespaceSerializer = namespaceSerializer;
                        }
 
-                       this.keySerializer = keySerializer;
-                       this.namespaceSerializer = namespaceSerializer;
                        this.keyDeserializer = null;
                        this.namespaceDeserializer = null;
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java
index a937a5c..d61e542 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.api.common.typeutils.BackwardsCompatibleSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
@@ -265,8 +266,13 @@ public class InternalTimersSnapshotReaderWriters {
 
                        DataInputViewStream dis = new DataInputViewStream(in);
                        try {
-                               
restoredTimersSnapshot.setKeySerializer(InstantiationUtil.deserializeObject(dis,
 userCodeClassLoader, true));
-                               
restoredTimersSnapshot.setNamespaceSerializer(InstantiationUtil.deserializeObject(dis,
 userCodeClassLoader, true));
+                               final TypeSerializer<K> keySerializer = 
InstantiationUtil.deserializeObject(dis, userCodeClassLoader, true);
+                               final TypeSerializer<N> namespaceSerializer = 
InstantiationUtil.deserializeObject(dis, userCodeClassLoader, true);
+
+                               
restoredTimersSnapshot.setKeySerializer(keySerializer);
+                               
restoredTimersSnapshot.setKeySerializerSnapshot(new 
BackwardsCompatibleSerializerSnapshot<>(keySerializer));
+                               
restoredTimersSnapshot.setNamespaceSerializer(namespaceSerializer);
+                               
restoredTimersSnapshot.setNamespaceSerializerSnapshot(new 
BackwardsCompatibleSerializerSnapshot<>(namespaceSerializer));
                        } catch (ClassNotFoundException exception) {
                                throw new IOException(exception);
                        }

Reply via email to