This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7b3b7cb1a81263d40b3e2f644543a92bbca0e8f4
Author: Tzu-Li (Gordon) Tai <[email protected]>
AuthorDate: Mon Feb 25 12:32:25 2019 +0800

    [FLINK-11741] [core] Remove CompositeSerializer's ensureCompatibility 
method using SelfResolvingTypeSerializer interface
    
    Only the TtlSerializer needs to implement the
    SelfResolvingTypeSerializer interface, because all other subclasses of
    CompositeSerializer are test serializers.
---
 .../api/common/typeutils/CompositeSerializer.java  | 49 ----------------------
 .../flink/runtime/state/ttl/TtlStateFactory.java   | 23 +++++++++-
 2 files changed, 22 insertions(+), 50 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
index 9e337c3..f58f7d6 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.api.common.typeutils;
 
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Preconditions;
@@ -28,7 +27,6 @@ import javax.annotation.Nonnull;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
-import java.util.List;
 
 /**
  * Base class for composite serializers.
@@ -195,53 +193,6 @@ public abstract class CompositeSerializer<T> extends 
TypeSerializer<T> {
                        && Arrays.equals(fieldSerializers, 
other.fieldSerializers);
        }
 
-       @Override
-       public CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-               // We can not remove this method, as long as we support 
restoring into CompositeTypeSerializerConfigSnapshot.
-               // Previously (pre 1.8), multiple composite serializers were 
using this class directly as their snapshot class.
-               if (configSnapshot instanceof ConfigSnapshot) {
-                       List<Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> previousSerializersAndConfigs =
-                               ((CompositeTypeSerializerConfigSnapshot) 
configSnapshot).getNestedSerializersAndConfigs();
-                       if (previousSerializersAndConfigs.size() == 
fieldSerializers.length) {
-                               return 
ensureFieldCompatibility(previousSerializersAndConfigs);
-                       }
-               }
-               return CompatibilityResult.requiresMigration();
-       }
-
-       @SuppressWarnings("unchecked")
-       private CompatibilityResult<T> ensureFieldCompatibility(
-               List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> 
previousSerializersAndConfigs) {
-               TypeSerializer<Object>[] convertSerializers = new 
TypeSerializer[fieldSerializers.length];
-               boolean requiresMigration = false;
-               for (int index = 0; index < 
previousSerializersAndConfigs.size(); index++) {
-                       CompatibilityResult<Object> compatResult =
-                               
resolveFieldCompatibility(previousSerializersAndConfigs, index);
-                       if (compatResult.isRequiresMigration()) {
-                               requiresMigration = true;
-                               if (compatResult.getConvertDeserializer() != 
null) {
-                                       convertSerializers[index] = new 
TypeDeserializerAdapter<>(compatResult.getConvertDeserializer());
-                               } else {
-                                       return 
CompatibilityResult.requiresMigration();
-                               }
-                       }
-               }
-               return requiresMigration ? 
createMigrationCompatResult(convertSerializers) : 
CompatibilityResult.compatible();
-       }
-
-       private CompatibilityResult<Object> resolveFieldCompatibility(
-               List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> 
previousSerializersAndConfigs, int index) {
-               return CompatibilityUtil.resolveCompatibilityResult(
-                       previousSerializersAndConfigs.get(index).f0, 
UnloadableDummyTypeSerializer.class,
-                       previousSerializersAndConfigs.get(index).f1, 
fieldSerializers[index]);
-       }
-
-       private CompatibilityResult<T> 
createMigrationCompatResult(TypeSerializer<Object>[] convertSerializers) {
-               PrecomputedParameters precomputed =
-                       
PrecomputedParameters.precompute(this.precomputed.immutableTargetType, 
convertSerializers);
-               return 
CompatibilityResult.requiresMigration(createSerializerInstance(precomputed, 
convertSerializers));
-       }
-
        /** This class holds composite serializer parameters which can be 
precomputed in advanced for better performance. */
        protected static class PrecomputedParameters implements Serializable {
                private static final long serialVersionUID = 1L;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
index 37f7d79..02c0273 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
@@ -29,7 +29,10 @@ import org.apache.flink.api.common.state.StateTtlConfig;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.CompositeSerializer;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
@@ -238,7 +241,8 @@ public class TtlStateFactory<K, N, SV, TTLSV, S extends 
State, IS extends S> {
        /**
         * Serializer for user state value with TTL. Visibility is public for 
usage with external tools.
         */
-       public static class TtlSerializer<T> extends 
CompositeSerializer<TtlValue<T>> {
+       public static class TtlSerializer<T> extends 
CompositeSerializer<TtlValue<T>>
+                       implements 
TypeSerializerConfigSnapshot.SelfResolvingTypeSerializer<TtlValue<T>> {
                private static final long serialVersionUID = 
131020282727167064L;
 
                @SuppressWarnings("WeakerAccess")
@@ -293,6 +297,23 @@ public class TtlStateFactory<K, N, SV, TTLSV, S extends 
State, IS extends S> {
                        return new TtlSerializerSnapshot<>(this);
                }
 
+               @Override
+               public TypeSerializerSchemaCompatibility<TtlValue<T>> 
resolveSchemaCompatibilityViaRedirectingToNewSnapshotClass(
+                               TypeSerializerConfigSnapshot<TtlValue<T>> 
deprecatedConfigSnapshot) {
+
+                       if (deprecatedConfigSnapshot instanceof ConfigSnapshot) 
{
+                               ConfigSnapshot castedLegacyConfigSnapshot = 
(ConfigSnapshot) deprecatedConfigSnapshot;
+                               TtlSerializerSnapshot<T> newSnapshot = new 
TtlSerializerSnapshot<>();
+
+                               return 
CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+                                       this,
+                                       newSnapshot,
+                                       
castedLegacyConfigSnapshot.getNestedSerializerSnapshots());
+                       }
+
+                       return TypeSerializerSchemaCompatibility.incompatible();
+               }
+
                public static boolean isTtlStateSerializer(TypeSerializer<?> 
typeSerializer) {
                        boolean ttlSerializer = typeSerializer instanceof 
TtlStateFactory.TtlSerializer;
                        boolean ttlListSerializer = typeSerializer instanceof 
ListSerializer &&

Reply via email to