This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 336908360fa1e28f7c8ff6cb40890283ea88fc0b
Author: Tzu-Li (Gordon) Tai <[email protected]>
AuthorDate: Wed Feb 27 11:51:41 2019 +0800

    [FLINK-11741] [core] Remove TypeSerializerSingleton's ensureCompatibility 
implementation
---
 .../ParameterlessTypeSerializerConfig.java         | 23 +++++++++++++++
 .../typeutils/base/TypeSerializerSingleton.java    | 34 +---------------------
 2 files changed, 24 insertions(+), 33 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/ParameterlessTypeSerializerConfig.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/ParameterlessTypeSerializerConfig.java
index 29da90a..28e6485 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/ParameterlessTypeSerializerConfig.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/ParameterlessTypeSerializerConfig.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.common.typeutils;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Preconditions;
@@ -64,6 +65,18 @@ public final class ParameterlessTypeSerializerConfig<T> 
extends TypeSerializerCo
        }
 
        @Override
+       public TypeSerializerSchemaCompatibility<T> 
resolveSchemaCompatibility(TypeSerializer<T> newSerializer) {
+               if (newSerializer instanceof TypeSerializerSingleton) {
+                       TypeSerializerSingleton<T> singletonSerializer = 
(TypeSerializerSingleton<T>) newSerializer;
+                       return 
isCompatibleSerializationFormatIdentifier(serializationFormatIdentifier, 
singletonSerializer)
+                               ? 
TypeSerializerSchemaCompatibility.compatibleAsIs()
+                               : 
TypeSerializerSchemaCompatibility.incompatible();
+               }
+
+               return super.resolveSchemaCompatibility(newSerializer);
+       }
+
+       @Override
        public int getVersion() {
                return VERSION;
        }
@@ -90,4 +103,14 @@ public final class ParameterlessTypeSerializerConfig<T> 
extends TypeSerializerCo
        public int hashCode() {
                return serializationFormatIdentifier.hashCode();
        }
+
+       private static boolean isCompatibleSerializationFormatIdentifier(
+                       String identifier, TypeSerializerSingleton<?> 
newSingletonSerializer) {
+
+               String name = newSingletonSerializer.getClass().getName();
+               // we also need to check canonical name because some singleton 
serializers were using that as the identifier
+               String canonicalName = 
newSingletonSerializer.getClass().getCanonicalName();
+
+               return identifier.equals(name) || 
identifier.equals(canonicalName);
+       }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
index 0594ef7..0c9362a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
@@ -19,10 +19,7 @@
 package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.ParameterlessTypeSerializerConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 
 @Internal
 public abstract class TypeSerializerSingleton<T> extends TypeSerializer<T> {
@@ -40,38 +37,9 @@ public abstract class TypeSerializerSingleton<T> extends 
TypeSerializer<T> {
        public int hashCode() {
                return this.getClass().hashCode();
        }
-       
+
        @Override
        public boolean equals(Object obj) {
                return obj.getClass().equals(this.getClass());
        }
-
-       /**
-        * @deprecated this is kept around for backwards compatibility.
-        *             Can only be removed when {@link 
ParameterlessTypeSerializerConfig} is removed.
-        */
-       @Override
-       @Deprecated
-       public CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
-               if (configSnapshot instanceof ParameterlessTypeSerializerConfig
-                               && isCompatibleSerializationFormatIdentifier(
-                                               
((ParameterlessTypeSerializerConfig<?>) 
configSnapshot).getSerializationFormatIdentifier())) {
-
-                       return CompatibilityResult.compatible();
-               } else {
-                       return CompatibilityResult.requiresMigration();
-               }
-       }
-
-       /**
-        * Subclasses can override this if they know that they are also 
compatible with identifiers of other formats.
-        */
-       protected boolean isCompatibleSerializationFormatIdentifier(String 
identifier) {
-               return identifier.equals(getClass().getName()) ||
-                               
identifier.equals(getClass().getCanonicalName());
-       }
-
-       private String getSerializationFormatIdentifier() {
-               return getClass().getName();
-       }
 }

Reply via email to