This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 68417902d3012aa40b73dfa333e5282b37e9406d
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
AuthorDate: Wed Feb 27 11:55:30 2019 +0800

    [FLINK-11741] [cep] Migrate legacy NFA serializers to use new serialization 
compatibility abstractions
    
    Although these serializers were used for state that are no longer accessed
    now, they will still be snapshotted as part of the CEP state's meta
    info, since they still are registered. This commit updates them to use
    the new serialization compatibility abstractions, so that the
    serializers are no longer Java-serialized.
---
 .../main/java/org/apache/flink/cep/nfa/NFA.java    |  97 ++++++++++---------
 .../org/apache/flink/cep/nfa/SharedBuffer.java     | 107 ++++++++++-----------
 ...va => NFASerializerSnapshotsMigrationTest.java} |   0
 3 files changed, 106 insertions(+), 98 deletions(-)

diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 3ddec5c..30b7b8a 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -21,14 +21,12 @@ package org.apache.flink.cep.nfa;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
@@ -878,7 +876,8 @@ public class NFA<T> {
        }
 
        /**
-        * The {@link TypeSerializerConfigSnapshot} serializer configuration to 
be stored with the managed state.
+        * @deprecated This snapshot class is no longer in use, and only 
maintained for backwards compatibility
+        *             purposes. It is fully replaced by {@link 
MigratedNFASerializerSnapshot}.
         */
        @Deprecated
        public static final class NFASerializerConfigSnapshot<T> extends 
CompositeTypeSerializerConfigSnapshot<MigratedNFA<T>> {
@@ -899,6 +898,53 @@ public class NFA<T> {
                public int getVersion() {
                        return VERSION;
                }
+
+               @Override
+               public TypeSerializerSchemaCompatibility<MigratedNFA<T>> 
resolveSchemaCompatibility(TypeSerializer<MigratedNFA<T>> newSerializer) {
+                       return 
CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+                               newSerializer,
+                               new MigratedNFASerializerSnapshot<>(),
+                               getNestedSerializerSnapshots());
+               }
+       }
+
+       /**
+        * A {@link TypeSerializerSnapshot} for the legacy {@link 
NFASerializer}.
+        */
+       @SuppressWarnings("deprecation")
+       public static final class MigratedNFASerializerSnapshot<T> extends 
CompositeTypeSerializerSnapshot<MigratedNFA<T>, NFASerializer<T>> {
+
+               private static final int VERSION = 2;
+
+               public MigratedNFASerializerSnapshot() {
+                       super(NFASerializer.class);
+               }
+
+               MigratedNFASerializerSnapshot(NFASerializer<T> 
legacyNfaSerializer) {
+                       super(legacyNfaSerializer);
+               }
+
+               @Override
+               protected int getCurrentOuterSnapshotVersion() {
+                       return VERSION;
+               }
+
+               @Override
+               protected TypeSerializer<?>[] 
getNestedSerializers(NFASerializer<T> outerSerializer) {
+                       return new TypeSerializer<?>[]{ 
outerSerializer.eventSerializer, outerSerializer.sharedBufferSerializer };
+               }
+
+               @Override
+               protected NFASerializer<T> 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers) {
+                       @SuppressWarnings("unchecked")
+                       TypeSerializer<T> eventSerializer = (TypeSerializer<T>) 
nestedSerializers[0];
+
+                       @SuppressWarnings("unchecked")
+                       
TypeSerializer<org.apache.flink.cep.nfa.SharedBuffer<T>> sharedBufferSerializer 
=
+                               
(TypeSerializer<org.apache.flink.cep.nfa.SharedBuffer<T>>) nestedSerializers[1];
+
+                       return new NFASerializer<>(eventSerializer, 
sharedBufferSerializer);
+               }
        }
 
        /**
@@ -1000,43 +1046,8 @@ public class NFA<T> {
                }
 
                @Override
-               public TypeSerializerConfigSnapshot<MigratedNFA<T>> 
snapshotConfiguration() {
-                       return new 
NFASerializerConfigSnapshot<>(eventSerializer, sharedBufferSerializer);
-               }
-
-               @Override
-               public CompatibilityResult<MigratedNFA<T>> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-                       if (configSnapshot instanceof 
NFASerializerConfigSnapshot) {
-                               List<Tuple2<TypeSerializer<?>, 
TypeSerializerSnapshot<?>>> serializersAndConfigs =
-                                       ((NFASerializerConfigSnapshot<?>) 
configSnapshot).getNestedSerializersAndConfigs();
-
-                               CompatibilityResult<T> eventCompatResult = 
CompatibilityUtil.resolveCompatibilityResult(
-                                       serializersAndConfigs.get(0).f0,
-                                       UnloadableDummyTypeSerializer.class,
-                                       serializersAndConfigs.get(0).f1,
-                                       eventSerializer);
-
-                               
CompatibilityResult<org.apache.flink.cep.nfa.SharedBuffer<T>> 
sharedBufCompatResult =
-                                       
CompatibilityUtil.resolveCompatibilityResult(
-                                               serializersAndConfigs.get(1).f0,
-                                               
UnloadableDummyTypeSerializer.class,
-                                               serializersAndConfigs.get(1).f1,
-                                               sharedBufferSerializer);
-
-                               if 
(!sharedBufCompatResult.isRequiresMigration() && 
!eventCompatResult.isRequiresMigration()) {
-                                       return CompatibilityResult.compatible();
-                               } else {
-                                       if 
(eventCompatResult.getConvertDeserializer() != null &&
-                                               
sharedBufCompatResult.getConvertDeserializer() != null) {
-                                               return 
CompatibilityResult.requiresMigration(
-                                                       new NFASerializer<>(
-                                                               new 
TypeDeserializerAdapter<>(eventCompatResult.getConvertDeserializer()),
-                                                               new 
TypeDeserializerAdapter<>(sharedBufCompatResult.getConvertDeserializer())));
-                                       }
-                               }
-                       }
-
-                       return CompatibilityResult.requiresMigration();
+               public MigratedNFASerializerSnapshot<T> snapshotConfiguration() 
{
+                       return new MigratedNFASerializerSnapshot<>(this);
                }
        }
 }
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index 3ebfac7..2246b27 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -18,14 +18,12 @@
 
 package org.apache.flink.cep.nfa;
 
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.nfa.compiler.NFAStateNameHandler;
 import org.apache.flink.cep.nfa.sharedbuffer.EventId;
@@ -156,8 +154,10 @@ public class SharedBuffer<V> {
        }
 
        /**
-        * The {@link TypeSerializerConfigSnapshot} serializer configuration to 
be stored with the managed state.
+        * @deprecated This snapshot class is no longer in use, and only 
maintained for backwards compatibility
+        *             purposes. It is fully replaced by {@link 
SharedBufferSerializerSnapshot}.
         */
+       @Deprecated
        public static final class SharedBufferSerializerConfigSnapshot<K, V>
                        extends 
CompositeTypeSerializerConfigSnapshot<SharedBuffer<V>> {
 
@@ -179,6 +179,50 @@ public class SharedBuffer<V> {
                public int getVersion() {
                        return VERSION;
                }
+
+               @Override
+               public TypeSerializerSchemaCompatibility<SharedBuffer<V>> 
resolveSchemaCompatibility(TypeSerializer<SharedBuffer<V>> newSerializer) {
+                       return 
CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+                               newSerializer,
+                               new SharedBufferSerializerSnapshot<>(),
+                               getNestedSerializerSnapshots());
+               }
+       }
+
+       /**
+        * A {@link TypeSerializerSnapshot} for the {@link 
SharedBufferSerializerSnapshot}.
+        */
+       public static final class SharedBufferSerializerSnapshot<K, V>
+                       extends 
CompositeTypeSerializerSnapshot<SharedBuffer<V>, SharedBufferSerializer<K, V>> {
+
+               private static final int VERSION = 2;
+
+               public SharedBufferSerializerSnapshot() {
+                       super(SharedBufferSerializer.class);
+               }
+
+               public SharedBufferSerializerSnapshot(SharedBufferSerializer<K, 
V> sharedBufferSerializer) {
+                       super(sharedBufferSerializer);
+               }
+
+               @Override
+               protected int getCurrentOuterSnapshotVersion() {
+                       return VERSION;
+               }
+
+               @Override
+               protected TypeSerializer<?>[] 
getNestedSerializers(SharedBufferSerializer<K, V> outerSerializer) {
+                       return new TypeSerializer<?>[]{ 
outerSerializer.keySerializer, outerSerializer.valueSerializer, 
outerSerializer.versionSerializer };
+               }
+
+               @Override
+               @SuppressWarnings("unchecked")
+               protected SharedBufferSerializer<K, V> 
createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] 
nestedSerializers) {
+                       TypeSerializer<K> keySerializer = (TypeSerializer<K>) 
nestedSerializers[0];
+                       TypeSerializer<V> valueSerializer = (TypeSerializer<V>) 
nestedSerializers[1];
+                       TypeSerializer<DeweyNumber> versionSerializer = 
(TypeSerializer<DeweyNumber>) nestedSerializers[2];
+                       return new SharedBufferSerializer<>(keySerializer, 
valueSerializer, versionSerializer);
+               }
        }
 
        /**
@@ -351,55 +395,8 @@ public class SharedBuffer<V> {
                }
 
                @Override
-               public TypeSerializerConfigSnapshot<SharedBuffer<V>> 
snapshotConfiguration() {
-                       return new SharedBufferSerializerConfigSnapshot<>(
-                               keySerializer,
-                               valueSerializer,
-                               versionSerializer);
-               }
-
-               @Override
-               public CompatibilityResult<SharedBuffer<V>> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-                       if (configSnapshot instanceof 
SharedBufferSerializerConfigSnapshot) {
-                               List<Tuple2<TypeSerializer<?>, 
TypeSerializerSnapshot<?>>> serializerConfigSnapshots =
-                                       
((SharedBufferSerializerConfigSnapshot<?, ?>) 
configSnapshot).getNestedSerializersAndConfigs();
-
-                               CompatibilityResult<K> keyCompatResult = 
CompatibilityUtil.resolveCompatibilityResult(
-                                       serializerConfigSnapshots.get(0).f0,
-                                       UnloadableDummyTypeSerializer.class,
-                                       serializerConfigSnapshots.get(0).f1,
-                                       keySerializer);
-
-                               CompatibilityResult<V> valueCompatResult = 
CompatibilityUtil.resolveCompatibilityResult(
-                                       serializerConfigSnapshots.get(1).f0,
-                                       UnloadableDummyTypeSerializer.class,
-                                       serializerConfigSnapshots.get(1).f1,
-                                       valueSerializer);
-
-                               CompatibilityResult<DeweyNumber> 
versionCompatResult = CompatibilityUtil.resolveCompatibilityResult(
-                                       serializerConfigSnapshots.get(2).f0,
-                                       UnloadableDummyTypeSerializer.class,
-                                       serializerConfigSnapshots.get(2).f1,
-                                       versionSerializer);
-
-                               if (!keyCompatResult.isRequiresMigration() && 
!valueCompatResult.isRequiresMigration() &&
-                                       
!versionCompatResult.isRequiresMigration()) {
-                                       return CompatibilityResult.compatible();
-                               } else {
-                                       if 
(keyCompatResult.getConvertDeserializer() != null
-                                               && 
valueCompatResult.getConvertDeserializer() != null
-                                               && 
versionCompatResult.getConvertDeserializer() != null) {
-                                               return 
CompatibilityResult.requiresMigration(
-                                                       new 
SharedBufferSerializer<>(
-                                                               new 
TypeDeserializerAdapter<>(keyCompatResult.getConvertDeserializer()),
-                                                               new 
TypeDeserializerAdapter<>(valueCompatResult.getConvertDeserializer()),
-                                                               new 
TypeDeserializerAdapter<>(versionCompatResult.getConvertDeserializer())
-                                                       ));
-                                       }
-                               }
-                       }
-
-                       return CompatibilityResult.requiresMigration();
+               public SharedBufferSerializerSnapshot<K, V> 
snapshotConfiguration() {
+                       return new SharedBufferSerializerSnapshot<>(this);
                }
        }
 }
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/MigratedNFASerializerSnapshotsMigrationTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/NFASerializerSnapshotsMigrationTest.java
similarity index 100%
rename from 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/MigratedNFASerializerSnapshotsMigrationTest.java
rename to 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/NFASerializerSnapshotsMigrationTest.java

Reply via email to