This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7b3b7cb1a81263d40b3e2f644543a92bbca0e8f4 Author: Tzu-Li (Gordon) Tai <[email protected]> AuthorDate: Mon Feb 25 12:32:25 2019 +0800 [FLINK-11741] [core] Remove CompositeSerializer's ensureCompatibility method using SelfResolvingTypeSerializer interface Only the TtlSerializer needs to implement the SelfResolvingTypeSerializer interface, because all other subclasses of CompositeSerializer are test serializers. --- .../api/common/typeutils/CompositeSerializer.java | 49 ---------------------- .../flink/runtime/state/ttl/TtlStateFactory.java | 23 +++++++++- 2 files changed, 22 insertions(+), 50 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java index 9e337c3..f58f7d6 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java @@ -18,7 +18,6 @@ package org.apache.flink.api.common.typeutils; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.util.Preconditions; @@ -28,7 +27,6 @@ import javax.annotation.Nonnull; import java.io.IOException; import java.io.Serializable; import java.util.Arrays; -import java.util.List; /** * Base class for composite serializers. @@ -195,53 +193,6 @@ public abstract class CompositeSerializer<T> extends TypeSerializer<T> { && Arrays.equals(fieldSerializers, other.fieldSerializers); } - @Override - public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - // We can not remove this method, as long as we support restoring into CompositeTypeSerializerConfigSnapshot. - // Previously (pre 1.8), multiple composite serializers were using this class directly as their snapshot class. - if (configSnapshot instanceof ConfigSnapshot) { - List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousSerializersAndConfigs = - ((CompositeTypeSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs(); - if (previousSerializersAndConfigs.size() == fieldSerializers.length) { - return ensureFieldCompatibility(previousSerializersAndConfigs); - } - } - return CompatibilityResult.requiresMigration(); - } - - @SuppressWarnings("unchecked") - private CompatibilityResult<T> ensureFieldCompatibility( - List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousSerializersAndConfigs) { - TypeSerializer<Object>[] convertSerializers = new TypeSerializer[fieldSerializers.length]; - boolean requiresMigration = false; - for (int index = 0; index < previousSerializersAndConfigs.size(); index++) { - CompatibilityResult<Object> compatResult = - resolveFieldCompatibility(previousSerializersAndConfigs, index); - if (compatResult.isRequiresMigration()) { - requiresMigration = true; - if (compatResult.getConvertDeserializer() != null) { - convertSerializers[index] = new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()); - } else { - return CompatibilityResult.requiresMigration(); - } - } - } - return requiresMigration ? createMigrationCompatResult(convertSerializers) : CompatibilityResult.compatible(); - } - - private CompatibilityResult<Object> resolveFieldCompatibility( - List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousSerializersAndConfigs, int index) { - return CompatibilityUtil.resolveCompatibilityResult( - previousSerializersAndConfigs.get(index).f0, UnloadableDummyTypeSerializer.class, - previousSerializersAndConfigs.get(index).f1, fieldSerializers[index]); - } - - private CompatibilityResult<T> createMigrationCompatResult(TypeSerializer<Object>[] convertSerializers) { - PrecomputedParameters precomputed = - PrecomputedParameters.precompute(this.precomputed.immutableTargetType, convertSerializers); - return CompatibilityResult.requiresMigration(createSerializerInstance(precomputed, convertSerializers)); - } - /** This class holds composite serializer parameters which can be precomputed in advanced for better performance. */ protected static class PrecomputedParameters implements Serializable { private static final long serialVersionUID = 1L; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java index 37f7d79..02c0273 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java @@ -29,7 +29,10 @@ import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.CompositeSerializer; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.base.ListSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; @@ -238,7 +241,8 @@ public class TtlStateFactory<K, N, SV, TTLSV, S extends State, IS extends S> { /** * Serializer for user state value with TTL. Visibility is public for usage with external tools. */ - public static class TtlSerializer<T> extends CompositeSerializer<TtlValue<T>> { + public static class TtlSerializer<T> extends CompositeSerializer<TtlValue<T>> + implements TypeSerializerConfigSnapshot.SelfResolvingTypeSerializer<TtlValue<T>> { private static final long serialVersionUID = 131020282727167064L; @SuppressWarnings("WeakerAccess") @@ -293,6 +297,23 @@ public class TtlStateFactory<K, N, SV, TTLSV, S extends State, IS extends S> { return new TtlSerializerSnapshot<>(this); } + @Override + public TypeSerializerSchemaCompatibility<TtlValue<T>> resolveSchemaCompatibilityViaRedirectingToNewSnapshotClass( + TypeSerializerConfigSnapshot<TtlValue<T>> deprecatedConfigSnapshot) { + + if (deprecatedConfigSnapshot instanceof ConfigSnapshot) { + ConfigSnapshot castedLegacyConfigSnapshot = (ConfigSnapshot) deprecatedConfigSnapshot; + TtlSerializerSnapshot<T> newSnapshot = new TtlSerializerSnapshot<>(); + + return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot( + this, + newSnapshot, + castedLegacyConfigSnapshot.getNestedSerializerSnapshots()); + } + + return TypeSerializerSchemaCompatibility.incompatible(); + } + public static boolean isTtlStateSerializer(TypeSerializer<?> typeSerializer) { boolean ttlSerializer = typeSerializer instanceof TtlStateFactory.TtlSerializer; boolean ttlListSerializer = typeSerializer instanceof ListSerializer &&
