This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 336908360fa1e28f7c8ff6cb40890283ea88fc0b Author: Tzu-Li (Gordon) Tai <[email protected]> AuthorDate: Wed Feb 27 11:51:41 2019 +0800 [FLINK-11741] [core] Remove TypeSerializerSingleton's ensureCompatibility implementation --- .../ParameterlessTypeSerializerConfig.java | 23 +++++++++++++++ .../typeutils/base/TypeSerializerSingleton.java | 34 +--------------------- 2 files changed, 24 insertions(+), 33 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/ParameterlessTypeSerializerConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/ParameterlessTypeSerializerConfig.java index 29da90a..28e6485 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/ParameterlessTypeSerializerConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/ParameterlessTypeSerializerConfig.java @@ -19,6 +19,7 @@ package org.apache.flink.api.common.typeutils; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.util.Preconditions; @@ -64,6 +65,18 @@ public final class ParameterlessTypeSerializerConfig<T> extends TypeSerializerCo } @Override + public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer) { + if (newSerializer instanceof TypeSerializerSingleton) { + TypeSerializerSingleton<T> singletonSerializer = (TypeSerializerSingleton<T>) newSerializer; + return isCompatibleSerializationFormatIdentifier(serializationFormatIdentifier, singletonSerializer) + ? TypeSerializerSchemaCompatibility.compatibleAsIs() + : TypeSerializerSchemaCompatibility.incompatible(); + } + + return super.resolveSchemaCompatibility(newSerializer); + } + + @Override public int getVersion() { return VERSION; } @@ -90,4 +103,14 @@ public final class ParameterlessTypeSerializerConfig<T> extends TypeSerializerCo public int hashCode() { return serializationFormatIdentifier.hashCode(); } + + private static boolean isCompatibleSerializationFormatIdentifier( + String identifier, TypeSerializerSingleton<?> newSingletonSerializer) { + + String name = newSingletonSerializer.getClass().getName(); + // we also need to check canonical name because some singleton serializers were using that as the identifier + String canonicalName = newSingletonSerializer.getClass().getCanonicalName(); + + return identifier.equals(name) || identifier.equals(canonicalName); + } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java index 0594ef7..0c9362a 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java @@ -19,10 +19,7 @@ package org.apache.flink.api.common.typeutils.base; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeutils.CompatibilityResult; -import org.apache.flink.api.common.typeutils.ParameterlessTypeSerializerConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; @Internal public abstract class TypeSerializerSingleton<T> extends TypeSerializer<T> { @@ -40,38 +37,9 @@ public abstract class TypeSerializerSingleton<T> extends TypeSerializer<T> { public int hashCode() { return this.getClass().hashCode(); } - + @Override public boolean equals(Object obj) { return obj.getClass().equals(this.getClass()); } - - /** - * @deprecated this is kept around for backwards compatibility. - * Can only be removed when {@link ParameterlessTypeSerializerConfig} is removed. - */ - @Override - @Deprecated - public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) { - if (configSnapshot instanceof ParameterlessTypeSerializerConfig - && isCompatibleSerializationFormatIdentifier( - ((ParameterlessTypeSerializerConfig<?>) configSnapshot).getSerializationFormatIdentifier())) { - - return CompatibilityResult.compatible(); - } else { - return CompatibilityResult.requiresMigration(); - } - } - - /** - * Subclasses can override this if they know that they are also compatible with identifiers of other formats. - */ - protected boolean isCompatibleSerializationFormatIdentifier(String identifier) { - return identifier.equals(getClass().getName()) || - identifier.equals(getClass().getCanonicalName()); - } - - private String getSerializationFormatIdentifier() { - return getClass().getName(); - } }
