This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 68417902d3012aa40b73dfa333e5282b37e9406d Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> AuthorDate: Wed Feb 27 11:55:30 2019 +0800 [FLINK-11741] [cep] Migrate legacy NFA serializers to use new serialization compatibility abstractions Although these serializers were used for state that are no longer accessed now, they will still be snapshotted as part of the CEP state's meta info, since they still are registered. This commit updates them to use the new serialization compatibility abstractions, so that the serializers are no longer Java-serialized. --- .../main/java/org/apache/flink/cep/nfa/NFA.java | 97 ++++++++++--------- .../org/apache/flink/cep/nfa/SharedBuffer.java | 107 ++++++++++----------- ...va => NFASerializerSnapshotsMigrationTest.java} | 0 3 files changed, 106 insertions(+), 98 deletions(-) diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index 3ddec5c..30b7b8a 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -21,14 +21,12 @@ package org.apache.flink.cep.nfa; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.util.FunctionUtils; -import org.apache.flink.api.common.typeutils.CompatibilityResult; -import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; -import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy; @@ -878,7 +876,8 @@ public class NFA<T> { } /** - * The {@link TypeSerializerConfigSnapshot} serializer configuration to be stored with the managed state. + * @deprecated This snapshot class is no longer in use, and only maintained for backwards compatibility + * purposes. It is fully replaced by {@link MigratedNFASerializerSnapshot}. */ @Deprecated public static final class NFASerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot<MigratedNFA<T>> { @@ -899,6 +898,53 @@ public class NFA<T> { public int getVersion() { return VERSION; } + + @Override + public TypeSerializerSchemaCompatibility<MigratedNFA<T>> resolveSchemaCompatibility(TypeSerializer<MigratedNFA<T>> newSerializer) { + return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot( + newSerializer, + new MigratedNFASerializerSnapshot<>(), + getNestedSerializerSnapshots()); + } + } + + /** + * A {@link TypeSerializerSnapshot} for the legacy {@link NFASerializer}. + */ + @SuppressWarnings("deprecation") + public static final class MigratedNFASerializerSnapshot<T> extends CompositeTypeSerializerSnapshot<MigratedNFA<T>, NFASerializer<T>> { + + private static final int VERSION = 2; + + public MigratedNFASerializerSnapshot() { + super(NFASerializer.class); + } + + MigratedNFASerializerSnapshot(NFASerializer<T> legacyNfaSerializer) { + super(legacyNfaSerializer); + } + + @Override + protected int getCurrentOuterSnapshotVersion() { + return VERSION; + } + + @Override + protected TypeSerializer<?>[] getNestedSerializers(NFASerializer<T> outerSerializer) { + return new TypeSerializer<?>[]{ outerSerializer.eventSerializer, outerSerializer.sharedBufferSerializer }; + } + + @Override + protected NFASerializer<T> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) { + @SuppressWarnings("unchecked") + TypeSerializer<T> eventSerializer = (TypeSerializer<T>) nestedSerializers[0]; + + @SuppressWarnings("unchecked") + TypeSerializer<org.apache.flink.cep.nfa.SharedBuffer<T>> sharedBufferSerializer = + (TypeSerializer<org.apache.flink.cep.nfa.SharedBuffer<T>>) nestedSerializers[1]; + + return new NFASerializer<>(eventSerializer, sharedBufferSerializer); + } } /** @@ -1000,43 +1046,8 @@ public class NFA<T> { } @Override - public TypeSerializerConfigSnapshot<MigratedNFA<T>> snapshotConfiguration() { - return new NFASerializerConfigSnapshot<>(eventSerializer, sharedBufferSerializer); - } - - @Override - public CompatibilityResult<MigratedNFA<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - if (configSnapshot instanceof NFASerializerConfigSnapshot) { - List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> serializersAndConfigs = - ((NFASerializerConfigSnapshot<?>) configSnapshot).getNestedSerializersAndConfigs(); - - CompatibilityResult<T> eventCompatResult = CompatibilityUtil.resolveCompatibilityResult( - serializersAndConfigs.get(0).f0, - UnloadableDummyTypeSerializer.class, - serializersAndConfigs.get(0).f1, - eventSerializer); - - CompatibilityResult<org.apache.flink.cep.nfa.SharedBuffer<T>> sharedBufCompatResult = - CompatibilityUtil.resolveCompatibilityResult( - serializersAndConfigs.get(1).f0, - UnloadableDummyTypeSerializer.class, - serializersAndConfigs.get(1).f1, - sharedBufferSerializer); - - if (!sharedBufCompatResult.isRequiresMigration() && !eventCompatResult.isRequiresMigration()) { - return CompatibilityResult.compatible(); - } else { - if (eventCompatResult.getConvertDeserializer() != null && - sharedBufCompatResult.getConvertDeserializer() != null) { - return CompatibilityResult.requiresMigration( - new NFASerializer<>( - new TypeDeserializerAdapter<>(eventCompatResult.getConvertDeserializer()), - new TypeDeserializerAdapter<>(sharedBufCompatResult.getConvertDeserializer()))); - } - } - } - - return CompatibilityResult.requiresMigration(); + public MigratedNFASerializerSnapshot<T> snapshotConfiguration() { + return new MigratedNFASerializerSnapshot<>(this); } } } diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java index 3ebfac7..2246b27 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java @@ -18,14 +18,12 @@ package org.apache.flink.cep.nfa; -import org.apache.flink.api.common.typeutils.CompatibilityResult; -import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; -import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cep.nfa.compiler.NFAStateNameHandler; import org.apache.flink.cep.nfa.sharedbuffer.EventId; @@ -156,8 +154,10 @@ public class SharedBuffer<V> { } /** - * The {@link TypeSerializerConfigSnapshot} serializer configuration to be stored with the managed state. + * @deprecated This snapshot class is no longer in use, and only maintained for backwards compatibility + * purposes. It is fully replaced by {@link SharedBufferSerializerSnapshot}. */ + @Deprecated public static final class SharedBufferSerializerConfigSnapshot<K, V> extends CompositeTypeSerializerConfigSnapshot<SharedBuffer<V>> { @@ -179,6 +179,50 @@ public class SharedBuffer<V> { public int getVersion() { return VERSION; } + + @Override + public TypeSerializerSchemaCompatibility<SharedBuffer<V>> resolveSchemaCompatibility(TypeSerializer<SharedBuffer<V>> newSerializer) { + return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot( + newSerializer, + new SharedBufferSerializerSnapshot<>(), + getNestedSerializerSnapshots()); + } + } + + /** + * A {@link TypeSerializerSnapshot} for the {@link SharedBufferSerializerSnapshot}. + */ + public static final class SharedBufferSerializerSnapshot<K, V> + extends CompositeTypeSerializerSnapshot<SharedBuffer<V>, SharedBufferSerializer<K, V>> { + + private static final int VERSION = 2; + + public SharedBufferSerializerSnapshot() { + super(SharedBufferSerializer.class); + } + + public SharedBufferSerializerSnapshot(SharedBufferSerializer<K, V> sharedBufferSerializer) { + super(sharedBufferSerializer); + } + + @Override + protected int getCurrentOuterSnapshotVersion() { + return VERSION; + } + + @Override + protected TypeSerializer<?>[] getNestedSerializers(SharedBufferSerializer<K, V> outerSerializer) { + return new TypeSerializer<?>[]{ outerSerializer.keySerializer, outerSerializer.valueSerializer, outerSerializer.versionSerializer }; + } + + @Override + @SuppressWarnings("unchecked") + protected SharedBufferSerializer<K, V> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) { + TypeSerializer<K> keySerializer = (TypeSerializer<K>) nestedSerializers[0]; + TypeSerializer<V> valueSerializer = (TypeSerializer<V>) nestedSerializers[1]; + TypeSerializer<DeweyNumber> versionSerializer = (TypeSerializer<DeweyNumber>) nestedSerializers[2]; + return new SharedBufferSerializer<>(keySerializer, valueSerializer, versionSerializer); + } } /** @@ -351,55 +395,8 @@ public class SharedBuffer<V> { } @Override - public TypeSerializerConfigSnapshot<SharedBuffer<V>> snapshotConfiguration() { - return new SharedBufferSerializerConfigSnapshot<>( - keySerializer, - valueSerializer, - versionSerializer); - } - - @Override - public CompatibilityResult<SharedBuffer<V>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - if (configSnapshot instanceof SharedBufferSerializerConfigSnapshot) { - List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> serializerConfigSnapshots = - ((SharedBufferSerializerConfigSnapshot<?, ?>) configSnapshot).getNestedSerializersAndConfigs(); - - CompatibilityResult<K> keyCompatResult = CompatibilityUtil.resolveCompatibilityResult( - serializerConfigSnapshots.get(0).f0, - UnloadableDummyTypeSerializer.class, - serializerConfigSnapshots.get(0).f1, - keySerializer); - - CompatibilityResult<V> valueCompatResult = CompatibilityUtil.resolveCompatibilityResult( - serializerConfigSnapshots.get(1).f0, - UnloadableDummyTypeSerializer.class, - serializerConfigSnapshots.get(1).f1, - valueSerializer); - - CompatibilityResult<DeweyNumber> versionCompatResult = CompatibilityUtil.resolveCompatibilityResult( - serializerConfigSnapshots.get(2).f0, - UnloadableDummyTypeSerializer.class, - serializerConfigSnapshots.get(2).f1, - versionSerializer); - - if (!keyCompatResult.isRequiresMigration() && !valueCompatResult.isRequiresMigration() && - !versionCompatResult.isRequiresMigration()) { - return CompatibilityResult.compatible(); - } else { - if (keyCompatResult.getConvertDeserializer() != null - && valueCompatResult.getConvertDeserializer() != null - && versionCompatResult.getConvertDeserializer() != null) { - return CompatibilityResult.requiresMigration( - new SharedBufferSerializer<>( - new TypeDeserializerAdapter<>(keyCompatResult.getConvertDeserializer()), - new TypeDeserializerAdapter<>(valueCompatResult.getConvertDeserializer()), - new TypeDeserializerAdapter<>(versionCompatResult.getConvertDeserializer()) - )); - } - } - } - - return CompatibilityResult.requiresMigration(); + public SharedBufferSerializerSnapshot<K, V> snapshotConfiguration() { + return new SharedBufferSerializerSnapshot<>(this); } } } diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/MigratedNFASerializerSnapshotsMigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/NFASerializerSnapshotsMigrationTest.java similarity index 100% rename from flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/MigratedNFASerializerSnapshotsMigrationTest.java rename to flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/NFASerializerSnapshotsMigrationTest.java