[FLINK-6178] [core] Introduce TypeDeserializer interface for CompatibilityResult
Previously, the CompatibilityResult class accepts a full-blown TypeSerializer for its convert deserializer, which will actually only ever be used for deserialization. This commit narrows down the interface by introducing a new TypeDeserializer interface that contains only the read methods. This closes #3834. This closes #3804. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/63c04a51 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/63c04a51 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/63c04a51 Branch: refs/heads/master Commit: 63c04a516f40ec2dca4d8edef58e7c2ef563ce67 Parents: 8aa5e05 Author: Tzu-Li (Gordon) Tai <[email protected]> Authored: Mon May 8 02:42:02 2017 +0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Mon May 8 03:44:32 2017 +0800 ---------------------------------------------------------------------- .../common/typeutils/CompatibilityResult.java | 8 +- .../api/common/typeutils/TypeDeserializer.java | 88 +++++++++++++ .../typeutils/TypeDeserializerAdapter.java | 127 +++++++++++++++++++ .../api/common/typeutils/TypeSerializer.java | 4 +- .../typeutils/base/GenericArraySerializer.java | 3 +- .../common/typeutils/base/ListSerializer.java | 3 +- .../common/typeutils/base/MapSerializer.java | 5 +- .../typeutils/runtime/EitherSerializer.java | 5 +- .../java/typeutils/runtime/RowSerializer.java | 4 +- .../AbstractKeyedCEPPatternOperator.java | 4 +- .../table/runtime/types/CRowSerializer.scala | 3 +- .../runtime/state/ArrayListSerializer.java | 3 +- .../flink/runtime/state/HashMapSerializer.java | 5 +- .../api/scala/typeutils/EitherSerializer.scala | 6 +- .../api/scala/typeutils/OptionSerializer.scala | 3 +- .../MultiplexingStreamRecordSerializer.java | 4 +- .../streamrecord/StreamRecordSerializer.java | 4 +- .../streamrecord/StreamElementSerializer.java | 4 +- 18 files changed, 258 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java index cfbb516..891cfe0 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompatibilityResult.java @@ -38,7 +38,7 @@ public final class CompatibilityResult<T> { * * <p>This is only relevant if migration is required. */ - private final TypeSerializer<T> convertDeserializer; + private final TypeDeserializer<T> convertDeserializer; /** * Returns a strategy that signals that the new serializer is compatible and no migration is required. @@ -61,16 +61,16 @@ public final class CompatibilityResult<T> { * * @return a result that signals migration is necessary, possibly providing a convert deserializer. */ - public static <T> CompatibilityResult<T> requiresMigration(TypeSerializer<T> convertDeserializer) { + public static <T> CompatibilityResult<T> requiresMigration(TypeDeserializer<T> convertDeserializer) { return new CompatibilityResult<>(true, convertDeserializer); } - private CompatibilityResult(boolean requiresMigration, TypeSerializer<T> convertDeserializer) { + private CompatibilityResult(boolean requiresMigration, TypeDeserializer<T> convertDeserializer) { this.requiresMigration = requiresMigration; this.convertDeserializer = convertDeserializer; } - public TypeSerializer<T> getConvertDeserializer() { + public TypeDeserializer<T> getConvertDeserializer() { return convertDeserializer; } http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializer.java new file mode 100644 index 0000000..2ec064a --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializer.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.core.memory.DataInputView; + +import java.io.IOException; + +/** + * This interface describes the methods that are required for a data type to be read by the Flink runtime. + * Specifically, this interface contains the deserialization methods. In contrast, the {@link TypeSerializer} + * interface contains the complete set of methods for both serialization and deserialization. + * + * <p>The methods in this class are assumed to be stateless, such that it is effectively thread safe. Stateful + * implementations of the methods may lead to unpredictable side effects and will compromise both stability and + * correctness of the program. + * + * @param <T> The data type that the deserializer deserializes. + */ +public interface TypeDeserializer<T> { + + /** + * Creates a deep copy of this deserializer if it is necessary, i.e. if it is stateful. This + * can return itself if the serializer is not stateful. + * + * We need this because deserializers might be used in several threads. Stateless deserializers + * are inherently thread-safe while stateful deserializers might not be thread-safe. + */ + TypeSerializer<T> duplicate(); + + /** + * De-serializes a record from the given source input view. + * + * @param source The input view from which to read the data. + * @return The deserialized element. + * + * @throws IOException Thrown, if the de-serialization encountered an I/O related error. Typically raised by the + * input view, which may have an underlying I/O channel from which it reads. + */ + T deserialize(DataInputView source) throws IOException; + + /** + * De-serializes a record from the given source input view into the given reuse record instance if mutable. + * + * @param reuse The record instance into which to de-serialize the data. + * @param source The input view from which to read the data. + * @return The deserialized element. + * + * @throws IOException Thrown, if the de-serialization encountered an I/O related error. Typically raised by the + * input view, which may have an underlying I/O channel from which it reads. + */ + T deserialize(T reuse, DataInputView source) throws IOException; + + /** + * Gets the length of the data type, if it is a fix length data type. + * + * @return The length of the data type, or <code>-1</code> for variable length data types. + */ + int getLength(); + + /** + * Returns true if the given object can be equaled with this object. If not, it returns false. + * + * @param obj Object which wants to take part in the equality relation + * @return true if obj can be equaled with this, otherwise false + */ + boolean canEqual(Object obj); + + boolean equals(Object obj); + + int hashCode(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java new file mode 100644 index 0000000..e02bed4 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeDeserializerAdapter.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * A utility class that wraps a {@link TypeDeserializer} as a {@link TypeSerializer}. + * + * <p>Methods related to deserialization are directly forwarded to the wrapped deserializer, + * while serialization methods are masked and not intended for use. + * + * @param <T> The data type that the deserializer deserializes. + */ +public final class TypeDeserializerAdapter<T> extends TypeSerializer<T> { + + private static final long serialVersionUID = 1L; + + /** The actual wrapped deserializer instance */ + private final TypeDeserializer<T> deserializer; + + /** + * Creates a {@link TypeSerializer} that wraps a {@link TypeDeserializer}. + * + * @param deserializer the actual deserializer to wrap. + */ + public TypeDeserializerAdapter(TypeDeserializer<T> deserializer) { + this.deserializer = Preconditions.checkNotNull(deserializer); + } + + // -------------------------------------------------------------------------------------------- + // Forwarded deserialization related methods + // -------------------------------------------------------------------------------------------- + + public T deserialize(DataInputView source) throws IOException { + return deserializer.deserialize(source); + } + + public T deserialize(T reuse, DataInputView source) throws IOException { + return deserializer.deserialize(reuse, source); + } + + public TypeSerializer<T> duplicate() { + return deserializer.duplicate(); + } + + public int getLength() { + return deserializer.getLength(); + } + + public boolean equals(Object obj) { + return deserializer.equals(obj); + } + + public boolean canEqual(Object obj) { + return deserializer.canEqual(obj); + } + + public int hashCode() { + return deserializer.hashCode(); + } + + // -------------------------------------------------------------------------------------------- + // Irrelevant methods not intended for use + // -------------------------------------------------------------------------------------------- + + public boolean isImmutableType() { + throw new UnsupportedOperationException( + "This is a TypeDeserializerAdapter used only for deserialization; this method should not be used."); + } + + public T createInstance() { + throw new UnsupportedOperationException( + "This is a TypeDeserializerAdapter used only for deserialization; this method should not be used."); + } + + public T copy(T from) { + throw new UnsupportedOperationException( + "This is a TypeDeserializerAdapter used only for deserialization; this method should not be used."); + } + + public T copy(T from, T reuse) { + throw new UnsupportedOperationException( + "This is a TypeDeserializerAdapter used only for deserialization; this method should not be used."); + } + + public void serialize(T record, DataOutputView target) throws IOException { + throw new UnsupportedOperationException( + "This is a TypeDeserializerAdapter used only for deserialization; this method should not be used."); + } + + public void copy(DataInputView source, DataOutputView target) throws IOException { + throw new UnsupportedOperationException( + "This is a TypeDeserializerAdapter used only for deserialization; this method should not be used."); + } + + public TypeSerializerConfigSnapshot snapshotConfiguration() { + throw new UnsupportedOperationException( + "This is a TypeDeserializerAdapter used only for deserialization; this method should not be used."); + } + + public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + throw new UnsupportedOperationException( + "This is a TypeDeserializerAdapter used only for deserialization; this method should not be used."); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java index f0562d4..0b5a08a 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java @@ -36,7 +36,7 @@ import java.io.Serializable; * @param <T> The data type that the serializer serializes. */ @PublicEvolving -public abstract class TypeSerializer<T> implements Serializable { +public abstract class TypeSerializer<T> implements TypeDeserializer<T>, Serializable { private static final long serialVersionUID = 1L; @@ -197,7 +197,7 @@ public abstract class TypeSerializer<T> implements Serializable { * has been reconfigured to be compatible, to continue reading previous data, and that the * serialization schema remains the same. No migration needs to be performed.</li> * - * <li>{@link CompatibilityResult#requiresMigration(TypeSerializer)}: this signals Flink that + * <li>{@link CompatibilityResult#requiresMigration(TypeDeserializer)}: this signals Flink that * migration needs to be performed, because this serializer is not compatible, or cannot be reconfigured to be * compatible, for previous data. Furthermore, in the case that the preceding serializer cannot be found or * restored to read the previous data to perform the migration, the provided convert deserializer can be http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java index fe61ab3..3e592b4 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java @@ -23,6 +23,7 @@ import java.lang.reflect.Array; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.core.memory.DataInputView; @@ -213,7 +214,7 @@ public final class GenericArraySerializer<C> extends TypeSerializer<C[]> { return CompatibilityResult.requiresMigration( new GenericArraySerializer<>( componentClass, - compatResult.getConvertDeserializer())); + new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()))); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java index 02d22de..1b6540c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java @@ -20,6 +20,7 @@ package org.apache.flink.api.common.typeutils.base; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.core.memory.DataInputView; @@ -188,7 +189,7 @@ public final class ListSerializer<T> extends TypeSerializer<List<T>> { return CompatibilityResult.compatible(); } else if (compatResult.getConvertDeserializer() != null) { return CompatibilityResult.requiresMigration( - new ListSerializer<>(compatResult.getConvertDeserializer())); + new ListSerializer<>(new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()))); } } http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java index 50900e4..182fff6 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java @@ -20,6 +20,7 @@ package org.apache.flink.api.common.typeutils.base; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.core.memory.DataInputView; @@ -220,8 +221,8 @@ public final class MapSerializer<K, V> extends TypeSerializer<Map<K, V>> { } else if (keyCompatResult.getConvertDeserializer() != null && valueCompatResult.getConvertDeserializer() != null) { return CompatibilityResult.requiresMigration( new MapSerializer<>( - keyCompatResult.getConvertDeserializer(), - valueCompatResult.getConvertDeserializer())); + new TypeDeserializerAdapter<>(keyCompatResult.getConvertDeserializer()), + new TypeDeserializerAdapter<>(valueCompatResult.getConvertDeserializer()))); } } http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java index c025d61..461dd87 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java @@ -20,6 +20,7 @@ package org.apache.flink.api.java.typeutils.runtime; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.core.memory.DataInputView; @@ -212,8 +213,8 @@ public class EitherSerializer<L, R> extends TypeSerializer<Either<L, R>> { if (leftCompatResult.getConvertDeserializer() != null && rightCompatResult.getConvertDeserializer() != null) { return CompatibilityResult.requiresMigration( new EitherSerializer<>( - leftCompatResult.getConvertDeserializer(), - rightCompatResult.getConvertDeserializer())); + new TypeDeserializerAdapter<>(leftCompatResult.getConvertDeserializer()), + new TypeDeserializerAdapter<>(rightCompatResult.getConvertDeserializer()))); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java index 5770dac..075c9d3 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java @@ -20,6 +20,7 @@ package org.apache.flink.api.java.typeutils.runtime; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializerUtil; @@ -276,7 +277,8 @@ public final class RowSerializer extends TypeSerializer<Row> { // one of the field serializers cannot provide a fallback deserializer return CompatibilityResult.requiresMigration(null); } else { - convertDeserializers[i] = compatResult.getConvertDeserializer(); + convertDeserializers[i] = + new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java index 14235dc..140e091 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot; @@ -516,7 +517,8 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> return CompatibilityResult.compatible(); } else if (compatResult.getConvertDeserializer() != null) { return CompatibilityResult.requiresMigration( - new PriorityQueueSerializer<>(compatResult.getConvertDeserializer(), factory)); + new PriorityQueueSerializer<>( + new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()), factory)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala index 7ffa57c..122f4fb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala @@ -96,7 +96,8 @@ class CRowSerializer(val rowSerializer: TypeSerializer[Row]) extends TypeSeriali if (compatResult.requiresMigration()) { if (compatResult.getConvertDeserializer != null) { CompatibilityResult.requiresMigration( - new CRowSerializer(compatResult.getConvertDeserializer) + new CRowSerializer( + new TypeDeserializerAdapter(compatResult.getConvertDeserializer)) ) } else { CompatibilityResult.requiresMigration(null) http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java index 8fbc227..c39cb9b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot; @@ -155,7 +156,7 @@ final public class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> { return CompatibilityResult.compatible(); } else if (compatResult.getConvertDeserializer() != null) { return CompatibilityResult.requiresMigration( - new ArrayListSerializer<>(compatResult.getConvertDeserializer())); + new ArrayListSerializer<>(new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()))); } } http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java index d52c207..925fe78 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.state; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.base.MapSerializerConfigSnapshot; @@ -221,8 +222,8 @@ public final class HashMapSerializer<K, V> extends TypeSerializer<HashMap<K, V>> } else if (keyCompatResult.getConvertDeserializer() != null && valueCompatResult.getConvertDeserializer() != null) { return CompatibilityResult.requiresMigration( new HashMapSerializer<>( - keyCompatResult.getConvertDeserializer(), - valueCompatResult.getConvertDeserializer())); + new TypeDeserializerAdapter<>(keyCompatResult.getConvertDeserializer()), + new TypeDeserializerAdapter<>(valueCompatResult.getConvertDeserializer()))); } } http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala index 468fddc..88b2041 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala @@ -18,7 +18,7 @@ package org.apache.flink.api.scala.typeutils import org.apache.flink.annotation.Internal -import org.apache.flink.api.common.typeutils.{CompatibilityResult, TypeSerializer, TypeSerializerConfigSnapshot} +import org.apache.flink.api.common.typeutils.{CompatibilityResult, TypeDeserializerAdapter, TypeSerializer, TypeSerializerConfigSnapshot} import org.apache.flink.api.java.typeutils.runtime.EitherSerializerConfigSnapshot import org.apache.flink.core.memory.{DataInputView, DataOutputView} @@ -133,8 +133,8 @@ class EitherSerializer[A, B, T <: Either[A, B]]( CompatibilityResult.requiresMigration( new EitherSerializer[A, B, T]( - leftCompatResult.getConvertDeserializer, - rightCompatResult.getConvertDeserializer + new TypeDeserializerAdapter(leftCompatResult.getConvertDeserializer), + new TypeDeserializerAdapter(rightCompatResult.getConvertDeserializer) ) ) http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala index d2bb098..81b3bcc 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala @@ -114,7 +114,8 @@ class OptionSerializer[A](val elemSerializer: TypeSerializer[A]) if (compatResult.requiresMigration()) { if (compatResult.getConvertDeserializer != null) { CompatibilityResult.requiresMigration( - new OptionSerializer[A](compatResult.getConvertDeserializer)) + new OptionSerializer[A]( + new TypeDeserializerAdapter(compatResult.getConvertDeserializer))) } else { CompatibilityResult.requiresMigration(null) } http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java index 53fea46..552ffd0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java @@ -24,6 +24,7 @@ import java.io.IOException; import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.core.memory.DataInputView; @@ -228,7 +229,8 @@ public class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<Stream return CompatibilityResult.compatible(); } else if (compatResult.getConvertDeserializer() != null) { return CompatibilityResult.requiresMigration( - new MultiplexingStreamRecordSerializer<>(compatResult.getConvertDeserializer())); + new MultiplexingStreamRecordSerializer<>( + new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()))); } } http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java index 2a87f4e..f7a661e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.core.memory.DataInputView; @@ -167,7 +168,8 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord return CompatibilityResult.requiresMigration(null); } else if (compatResult.getConvertDeserializer() != null) { return CompatibilityResult.requiresMigration( - new StreamRecordSerializer<>(compatResult.getConvertDeserializer())); + new StreamRecordSerializer<>( + new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()))); } } http://git-wip-us.apache.org/repos/asf/flink/blob/63c04a51/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java index 5c52fa6..e444ced 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java @@ -24,6 +24,7 @@ import java.io.IOException; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.core.memory.DataInputView; @@ -289,7 +290,8 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme return CompatibilityResult.compatible(); } else if (compatResult.getConvertDeserializer() != null) { return CompatibilityResult.requiresMigration( - new StreamElementSerializer<>(compatResult.getConvertDeserializer())); + new StreamElementSerializer<>( + new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()))); } }
