[FLINK-6178] [core] Introduce TypeDeserializer interface for CompatibilityResult

Previously, the CompatibilityResult class accepts a full-blown
TypeSerializer for its convert deserializer, which will actually only
ever be used for deserialization.

This commit narrows down the interface by introducing a new
TypeDeserializer interface that contains only the read methods.

This closes #3834.
This closes #3804.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/63c04a51
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/63c04a51
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/63c04a51

Branch: refs/heads/master
Commit: 63c04a516f40ec2dca4d8edef58e7c2ef563ce67
Parents: 8aa5e05
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Mon May 8 02:42:02 2017 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Mon May 8 03:44:32 2017 +0800

----------------------------------------------------------------------
 .../common/typeutils/CompatibilityResult.java   |   8 +-
 .../api/common/typeutils/TypeDeserializer.java  |  88 +++++++++++++
 .../typeutils/TypeDeserializerAdapter.java      | 127 +++++++++++++++++++
 .../api/common/typeutils/TypeSerializer.java    |   4 +-
 .../typeutils/base/GenericArraySerializer.java  |   3 +-
 .../common/typeutils/base/ListSerializer.java   |   3 +-
 .../common/typeutils/base/MapSerializer.java    |   5 +-
 .../typeutils/runtime/EitherSerializer.java     |   5 +-
 .../java/typeutils/runtime/RowSerializer.java   |   4 +-
 .../AbstractKeyedCEPPatternOperator.java        |   4 +-
 .../table/runtime/types/CRowSerializer.scala    |   3 +-
 .../runtime/state/ArrayListSerializer.java      |   3 +-
 .../flink/runtime/state/HashMapSerializer.java  |   5 +-
 .../api/scala/typeutils/EitherSerializer.scala  |   6 +-
 .../api/scala/typeutils/OptionSerializer.scala  |   3 +-
 .../MultiplexingStreamRecordSerializer.java     |   4 +-
 .../streamrecord/StreamRecordSerializer.java    |   4 +-
 .../streamrecord/StreamElementSerializer.java   |   4 +-
 18 files changed, 258 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
index cfbb516..891cfe0 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java
@@ -38,7 +38,7 @@ public final class CompatibilityResult<T> {
         *
         * <p>This is only relevant if migration is required.
         */
-       private final TypeSerializer<T> convertDeserializer;
+       private final TypeDeserializer<T> convertDeserializer;
 
        /**
         * Returns a strategy that signals that the new serializer is 
compatible and no migration is required.
@@ -61,16 +61,16 @@ public final class CompatibilityResult<T> {
         *
         * @return a result that signals migration is necessary, possibly 
providing a convert deserializer.
         */
-       public static <T> CompatibilityResult<T> 
requiresMigration(TypeSerializer<T> convertDeserializer) {
+       public static <T> CompatibilityResult<T> 
requiresMigration(TypeDeserializer<T> convertDeserializer) {
                return new CompatibilityResult<>(true, convertDeserializer);
        }
 
-       private CompatibilityResult(boolean requiresMigration, 
TypeSerializer<T> convertDeserializer) {
+       private CompatibilityResult(boolean requiresMigration, 
TypeDeserializer<T> convertDeserializer) {
                this.requiresMigration = requiresMigration;
                this.convertDeserializer = convertDeserializer;
        }
 
-       public TypeSerializer<T> getConvertDeserializer() {
+       public TypeDeserializer<T> getConvertDeserializer() {
                return convertDeserializer;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializer.java
new file mode 100644
index 0000000..2ec064a
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializer.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.core.memory.DataInputView;
+
+import java.io.IOException;
+
+/**
+ * This interface describes the methods that are required for a data type to 
be read by the Flink runtime.
+ * Specifically, this interface contains the deserialization methods. In 
contrast, the {@link TypeSerializer}
+ * interface contains the complete set of methods for both serialization and 
deserialization.
+ *
+ * <p>The methods in this class are assumed to be stateless, such that it is 
effectively thread safe. Stateful
+ * implementations of the methods may lead to unpredictable side effects and 
will compromise both stability and
+ * correctness of the program.
+ *
+ * @param <T> The data type that the deserializer deserializes.
+ */
+public interface TypeDeserializer<T> {
+
+       /**
+        * Creates a deep copy of this deserializer if it is necessary, i.e. if 
it is stateful. This
+        * can return itself if the serializer is not stateful.
+        *
+        * We need this because deserializers might be used in several threads. 
Stateless deserializers
+        * are inherently thread-safe while stateful deserializers might not be 
thread-safe.
+        */
+       TypeSerializer<T> duplicate();
+
+       /**
+        * De-serializes a record from the given source input view.
+        *
+        * @param source The input view from which to read the data.
+        * @return The deserialized element.
+        *
+        * @throws IOException Thrown, if the de-serialization encountered an 
I/O related error. Typically raised by the
+        *                     input view, which may have an underlying I/O 
channel from which it reads.
+        */
+       T deserialize(DataInputView source) throws IOException;
+
+       /**
+        * De-serializes a record from the given source input view into the 
given reuse record instance if mutable.
+        *
+        * @param reuse The record instance into which to de-serialize the data.
+        * @param source The input view from which to read the data.
+        * @return The deserialized element.
+        *
+        * @throws IOException Thrown, if the de-serialization encountered an 
I/O related error. Typically raised by the
+        *                     input view, which may have an underlying I/O 
channel from which it reads.
+        */
+       T deserialize(T reuse, DataInputView source) throws IOException;
+
+       /**
+        * Gets the length of the data type, if it is a fix length data type.
+        *
+        * @return The length of the data type, or <code>-1</code> for variable 
length data types.
+        */
+       int getLength();
+
+       /**
+        * Returns true if the given object can be equaled with this object. If 
not, it returns false.
+        *
+        * @param obj Object which wants to take part in the equality relation
+        * @return true if obj can be equaled with this, otherwise false
+        */
+       boolean canEqual(Object obj);
+
+       boolean equals(Object obj);
+
+       int hashCode();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java
new file mode 100644
index 0000000..e02bed4
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A utility class that wraps a {@link TypeDeserializer} as a {@link 
TypeSerializer}.
+ *
+ * <p>Methods related to deserialization are directly forwarded to the wrapped 
deserializer,
+ * while serialization methods are masked and not intended for use.
+ *
+ * @param <T> The data type that the deserializer deserializes.
+ */
+public final class TypeDeserializerAdapter<T> extends TypeSerializer<T> {
+
+       private static final long serialVersionUID = 1L;
+
+       /** The actual wrapped deserializer instance */
+       private final TypeDeserializer<T> deserializer;
+
+       /**
+        * Creates a {@link TypeSerializer} that wraps a {@link 
TypeDeserializer}.
+        *
+        * @param deserializer the actual deserializer to wrap.
+        */
+       public TypeDeserializerAdapter(TypeDeserializer<T> deserializer) {
+               this.deserializer = Preconditions.checkNotNull(deserializer);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Forwarded deserialization related methods
+       // 
--------------------------------------------------------------------------------------------
+
+       public T deserialize(DataInputView source) throws IOException {
+               return deserializer.deserialize(source);
+       }
+
+       public T deserialize(T reuse, DataInputView source) throws IOException {
+               return deserializer.deserialize(reuse, source);
+       }
+
+       public TypeSerializer<T> duplicate() {
+               return deserializer.duplicate();
+       }
+
+       public int getLength() {
+               return deserializer.getLength();
+       }
+
+       public boolean equals(Object obj) {
+               return deserializer.equals(obj);
+       }
+
+       public boolean canEqual(Object obj) {
+               return deserializer.canEqual(obj);
+       }
+
+       public int hashCode() {
+               return deserializer.hashCode();
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Irrelevant methods not intended for use
+       // 
--------------------------------------------------------------------------------------------
+
+       public boolean isImmutableType() {
+               throw new UnsupportedOperationException(
+                       "This is a TypeDeserializerAdapter used only for 
deserialization; this method should not be used.");
+       }
+
+       public T createInstance() {
+               throw new UnsupportedOperationException(
+                       "This is a TypeDeserializerAdapter used only for 
deserialization; this method should not be used.");
+       }
+
+       public T copy(T from) {
+               throw new UnsupportedOperationException(
+                       "This is a TypeDeserializerAdapter used only for 
deserialization; this method should not be used.");
+       }
+
+       public T copy(T from, T reuse) {
+               throw new UnsupportedOperationException(
+                       "This is a TypeDeserializerAdapter used only for 
deserialization; this method should not be used.");
+       }
+
+       public void serialize(T record, DataOutputView target) throws 
IOException {
+               throw new UnsupportedOperationException(
+                       "This is a TypeDeserializerAdapter used only for 
deserialization; this method should not be used.");
+       }
+
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               throw new UnsupportedOperationException(
+                       "This is a TypeDeserializerAdapter used only for 
deserialization; this method should not be used.");
+       }
+
+       public TypeSerializerConfigSnapshot snapshotConfiguration() {
+               throw new UnsupportedOperationException(
+                       "This is a TypeDeserializerAdapter used only for 
deserialization; this method should not be used.");
+       }
+
+       public CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+               throw new UnsupportedOperationException(
+                       "This is a TypeDeserializerAdapter used only for 
deserialization; this method should not be used.");
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index f0562d4..0b5a08a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -36,7 +36,7 @@ import java.io.Serializable;
  * @param <T> The data type that the serializer serializes.
  */
 @PublicEvolving
-public abstract class TypeSerializer<T> implements Serializable {
+public abstract class TypeSerializer<T> implements TypeDeserializer<T>, 
Serializable {
        
        private static final long serialVersionUID = 1L;
 
@@ -197,7 +197,7 @@ public abstract class TypeSerializer<T> implements 
Serializable {
         *     has been reconfigured to be compatible, to continue reading 
previous data, and that the
         *     serialization schema remains the same. No migration needs to be 
performed.</li>
         *
-        *     <li>{@link 
CompatibilityResult#requiresMigration(TypeSerializer)}: this signals Flink that
+        *     <li>{@link 
CompatibilityResult#requiresMigration(TypeDeserializer)}: this signals Flink 
that
         *     migration needs to be performed, because this serializer is not 
compatible, or cannot be reconfigured to be
         *     compatible, for previous data. Furthermore, in the case that the 
preceding serializer cannot be found or
         *     restored to read the previous data to perform the migration, the 
provided convert deserializer can be

http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
index fe61ab3..3e592b4 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
@@ -23,6 +23,7 @@ import java.lang.reflect.Array;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
@@ -213,7 +214,7 @@ public final class GenericArraySerializer<C> extends 
TypeSerializer<C[]> {
                                        return 
CompatibilityResult.requiresMigration(
                                                new GenericArraySerializer<>(
                                                        componentClass,
-                                                       
compatResult.getConvertDeserializer()));
+                                                       new 
TypeDeserializerAdapter<>(compatResult.getConvertDeserializer())));
                                }
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
index 02d22de..1b6540c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
@@ -188,7 +189,7 @@ public final class ListSerializer<T> extends 
TypeSerializer<List<T>> {
                                return CompatibilityResult.compatible();
                        } else if (compatResult.getConvertDeserializer() != 
null) {
                                return CompatibilityResult.requiresMigration(
-                                       new 
ListSerializer<>(compatResult.getConvertDeserializer()));
+                                       new ListSerializer<>(new 
TypeDeserializerAdapter<>(compatResult.getConvertDeserializer())));
                        }
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
index 50900e4..182fff6 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
@@ -220,8 +221,8 @@ public final class MapSerializer<K, V> extends 
TypeSerializer<Map<K, V>> {
                        } else if (keyCompatResult.getConvertDeserializer() != 
null && valueCompatResult.getConvertDeserializer() != null) {
                                return CompatibilityResult.requiresMigration(
                                        new MapSerializer<>(
-                                               
keyCompatResult.getConvertDeserializer(),
-                                               
valueCompatResult.getConvertDeserializer()));
+                                               new 
TypeDeserializerAdapter<>(keyCompatResult.getConvertDeserializer()),
+                                               new 
TypeDeserializerAdapter<>(valueCompatResult.getConvertDeserializer())));
                        }
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
index c025d61..461dd87 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
@@ -212,8 +213,8 @@ public class EitherSerializer<L, R> extends 
TypeSerializer<Either<L, R>> {
                                if (leftCompatResult.getConvertDeserializer() 
!= null && rightCompatResult.getConvertDeserializer() != null) {
                                        return 
CompatibilityResult.requiresMigration(
                                                new EitherSerializer<>(
-                                                       
leftCompatResult.getConvertDeserializer(),
-                                                       
rightCompatResult.getConvertDeserializer()));
+                                                       new 
TypeDeserializerAdapter<>(leftCompatResult.getConvertDeserializer()),
+                                                       new 
TypeDeserializerAdapter<>(rightCompatResult.getConvertDeserializer())));
                                }
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
index 5770dac..075c9d3 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java.typeutils.runtime;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
@@ -276,7 +277,8 @@ public final class RowSerializer extends 
TypeSerializer<Row> {
                                                        // one of the field 
serializers cannot provide a fallback deserializer
                                                        return 
CompatibilityResult.requiresMigration(null);
                                                } else {
-                                                       convertDeserializers[i] 
= compatResult.getConvertDeserializer();
+                                                       convertDeserializers[i] 
=
+                                                               new 
TypeDeserializerAdapter<>(compatResult.getConvertDeserializer());
                                                }
                                        }
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 14235dc..140e091 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import 
org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot;
@@ -516,7 +517,8 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
                                        return CompatibilityResult.compatible();
                                } else if 
(compatResult.getConvertDeserializer() != null) {
                                        return 
CompatibilityResult.requiresMigration(
-                                               new 
PriorityQueueSerializer<>(compatResult.getConvertDeserializer(), factory));
+                                               new PriorityQueueSerializer<>(
+                                                       new 
TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()), factory));
                                }
                        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
index 7ffa57c..122f4fb 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
@@ -96,7 +96,8 @@ class CRowSerializer(val rowSerializer: TypeSerializer[Row]) 
extends TypeSeriali
         if (compatResult.requiresMigration()) {
           if (compatResult.getConvertDeserializer != null) {
             CompatibilityResult.requiresMigration(
-              new CRowSerializer(compatResult.getConvertDeserializer)
+              new CRowSerializer(
+                new 
TypeDeserializerAdapter(compatResult.getConvertDeserializer))
             )
           } else {
             CompatibilityResult.requiresMigration(null)

http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
index 8fbc227..c39cb9b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java
@@ -18,6 +18,7 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import 
org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot;
@@ -155,7 +156,7 @@ final public class ArrayListSerializer<T> extends 
TypeSerializer<ArrayList<T>> {
                                return CompatibilityResult.compatible();
                        } else if (compatResult.getConvertDeserializer() != 
null) {
                                return CompatibilityResult.requiresMigration(
-                                       new 
ArrayListSerializer<>(compatResult.getConvertDeserializer()));
+                                       new ArrayListSerializer<>(new 
TypeDeserializerAdapter<>(compatResult.getConvertDeserializer())));
                        }
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
index d52c207..925fe78 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.base.MapSerializerConfigSnapshot;
@@ -221,8 +222,8 @@ public final class HashMapSerializer<K, V> extends 
TypeSerializer<HashMap<K, V>>
                        } else if (keyCompatResult.getConvertDeserializer() != 
null && valueCompatResult.getConvertDeserializer() != null) {
                                return CompatibilityResult.requiresMigration(
                                        new HashMapSerializer<>(
-                                               
keyCompatResult.getConvertDeserializer(),
-                                               
valueCompatResult.getConvertDeserializer()));
+                                               new 
TypeDeserializerAdapter<>(keyCompatResult.getConvertDeserializer()),
+                                               new 
TypeDeserializerAdapter<>(valueCompatResult.getConvertDeserializer())));
                        }
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
index 468fddc..88b2041 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
@@ -18,7 +18,7 @@
 package org.apache.flink.api.scala.typeutils
 
 import org.apache.flink.annotation.Internal
-import org.apache.flink.api.common.typeutils.{CompatibilityResult, 
TypeSerializer, TypeSerializerConfigSnapshot}
+import org.apache.flink.api.common.typeutils.{CompatibilityResult, 
TypeDeserializerAdapter, TypeSerializer, TypeSerializerConfigSnapshot}
 import 
org.apache.flink.api.java.typeutils.runtime.EitherSerializerConfigSnapshot
 import org.apache.flink.core.memory.{DataInputView, DataOutputView}
 
@@ -133,8 +133,8 @@ class EitherSerializer[A, B, T <: Either[A, B]](
 
             CompatibilityResult.requiresMigration(
               new EitherSerializer[A, B, T](
-                leftCompatResult.getConvertDeserializer,
-                rightCompatResult.getConvertDeserializer
+                new 
TypeDeserializerAdapter(leftCompatResult.getConvertDeserializer),
+                new 
TypeDeserializerAdapter(rightCompatResult.getConvertDeserializer)
               )
             )
 

http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
index d2bb098..81b3bcc 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
@@ -114,7 +114,8 @@ class OptionSerializer[A](val elemSerializer: 
TypeSerializer[A])
         if (compatResult.requiresMigration()) {
           if (compatResult.getConvertDeserializer != null) {
             CompatibilityResult.requiresMigration(
-              new OptionSerializer[A](compatResult.getConvertDeserializer))
+              new OptionSerializer[A](
+                new 
TypeDeserializerAdapter(compatResult.getConvertDeserializer)))
           } else {
             CompatibilityResult.requiresMigration(null)
           }

http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
index 53fea46..552ffd0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
@@ -228,7 +229,8 @@ public class MultiplexingStreamRecordSerializer<T> extends 
TypeSerializer<Stream
                                return CompatibilityResult.compatible();
                        } else if (compatResult.getConvertDeserializer() != 
null) {
                                return CompatibilityResult.requiresMigration(
-                                       new 
MultiplexingStreamRecordSerializer<>(compatResult.getConvertDeserializer()));
+                                       new 
MultiplexingStreamRecordSerializer<>(
+                                               new 
TypeDeserializerAdapter<>(compatResult.getConvertDeserializer())));
                        }
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
index 2a87f4e..f7a661e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
@@ -167,7 +168,8 @@ public final class StreamRecordSerializer<T> extends 
TypeSerializer<StreamRecord
                                return 
CompatibilityResult.requiresMigration(null);
                        } else if (compatResult.getConvertDeserializer() != 
null) {
                                return CompatibilityResult.requiresMigration(
-                                       new 
StreamRecordSerializer<>(compatResult.getConvertDeserializer()));
+                                       new StreamRecordSerializer<>(
+                                               new 
TypeDeserializerAdapter<>(compatResult.getConvertDeserializer())));
                        }
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
index 5c52fa6..e444ced 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputView;
@@ -289,7 +290,8 @@ public final class StreamElementSerializer<T> extends 
TypeSerializer<StreamEleme
                                return CompatibilityResult.compatible();
                        } else if (compatResult.getConvertDeserializer() != 
null) {
                                return CompatibilityResult.requiresMigration(
-                                       new 
StreamElementSerializer<>(compatResult.getConvertDeserializer()));
+                                       new StreamElementSerializer<>(
+                                               new 
TypeDeserializerAdapter<>(compatResult.getConvertDeserializer())));
                        }
                }
 

Reply via email to