Repository: flink Updated Branches: refs/heads/release-1.3 0d9087df4 -> 09cc3f7c5
http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index ff5a342..2be09ad 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -22,10 +22,12 @@ import com.google.common.base.Predicate; import com.google.common.collect.Iterators; import com.google.common.collect.ListMultimap; import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; import org.apache.flink.api.common.typeutils.base.EnumSerializer; import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; import org.apache.flink.api.common.typeutils.base.LongSerializer; @@ -811,7 +813,7 @@ public class NFA<T> implements Serializable { /** * The {@link TypeSerializerConfigSnapshot} serializer configuration to be stored with the managed state. */ - public static final class NFASerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { + public static final class NFASerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot { private static final int VERSION = 1; @@ -819,10 +821,10 @@ public class NFA<T> implements Serializable { public NFASerializerConfigSnapshot() {} public NFASerializerConfigSnapshot( - TypeSerializerConfigSnapshot sharedBufferSerializerConfigSnapshot, - TypeSerializerConfigSnapshot eventSerializerConfigSnapshot) { + TypeSerializer<T> eventSerializer, + TypeSerializer<SharedBuffer<String, T>> sharedBufferSerializer) { - super(sharedBufferSerializerConfigSnapshot, eventSerializerConfigSnapshot); + super(eventSerializer, sharedBufferSerializer); } @Override @@ -1062,29 +1064,36 @@ public class NFA<T> implements Serializable { @Override public TypeSerializerConfigSnapshot snapshotConfiguration() { - return new NFASerializerConfigSnapshot( - eventSerializer.snapshotConfiguration(), - sharedBufferSerializer.snapshotConfiguration() - ); + return new NFASerializerConfigSnapshot<>(eventSerializer, sharedBufferSerializer); } + + @Override public CompatibilityResult<NFA<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { if (configSnapshot instanceof NFASerializerConfigSnapshot) { - TypeSerializerConfigSnapshot[] serializerConfigSnapshots = - ((NFASerializerConfigSnapshot) configSnapshot).getNestedSerializerConfigSnapshots(); + List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> serializersAndConfigs = + ((NFASerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs(); + + CompatibilityResult<T> eventCompatResult = CompatibilityUtil.resolveCompatibilityResult( + serializersAndConfigs.get(0).f0, + UnloadableDummyTypeSerializer.class, + serializersAndConfigs.get(0).f1, + eventSerializer); - CompatibilityResult<T> elementCompatResult = - eventSerializer.ensureCompatibility(serializerConfigSnapshots[0]); CompatibilityResult<SharedBuffer<String, T>> sharedBufCompatResult = - sharedBufferSerializer.ensureCompatibility(serializerConfigSnapshots[1]); + CompatibilityUtil.resolveCompatibilityResult( + serializersAndConfigs.get(1).f0, + UnloadableDummyTypeSerializer.class, + serializersAndConfigs.get(1).f1, + sharedBufferSerializer); - if (!sharedBufCompatResult.isRequiresMigration() && !elementCompatResult.isRequiresMigration()) { + if (!sharedBufCompatResult.isRequiresMigration() && !eventCompatResult.isRequiresMigration()) { return CompatibilityResult.compatible(); } else { - if (elementCompatResult.getConvertDeserializer() != null && + if (eventCompatResult.getConvertDeserializer() != null && sharedBufCompatResult.getConvertDeserializer() != null) { return CompatibilityResult.requiresMigration( new NFASerializer<>( - new TypeDeserializerAdapter<>(elementCompatResult.getConvertDeserializer()), + new TypeDeserializerAdapter<>(eventCompatResult.getConvertDeserializer()), new TypeDeserializerAdapter<>(sharedBufCompatResult.getConvertDeserializer()))); } } http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java index d0f6bf4..91fce1f 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java @@ -23,10 +23,13 @@ import com.google.common.collect.ListMultimap; import org.apache.commons.lang3.StringUtils; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cep.NonDuplicatingTypeSerializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; @@ -809,7 +812,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable { /** * The {@link TypeSerializerConfigSnapshot} serializer configuration to be stored with the managed state. */ - public static final class SharedBufferSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { + public static final class SharedBufferSerializerConfigSnapshot<K, V> extends CompositeTypeSerializerConfigSnapshot { private static final int VERSION = 1; @@ -817,11 +820,11 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable { public SharedBufferSerializerConfigSnapshot() {} public SharedBufferSerializerConfigSnapshot( - TypeSerializerConfigSnapshot keySerializerConfigSnapshot, - TypeSerializerConfigSnapshot valueSerializerConfigSnapshot, - TypeSerializerConfigSnapshot versionSerializerConfigSnapshot) { + TypeSerializer<K> keySerializer, + TypeSerializer<V> valueSerializer, + TypeSerializer<DeweyNumber> versionSerializer) { - super(keySerializerConfigSnapshot, valueSerializerConfigSnapshot, versionSerializerConfigSnapshot); + super(keySerializer, valueSerializer, versionSerializer); } @Override @@ -1115,22 +1118,35 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable { @Override public TypeSerializerConfigSnapshot snapshotConfiguration() { - return new SharedBufferSerializerConfigSnapshot( - keySerializer.snapshotConfiguration(), - valueSerializer.snapshotConfiguration(), - versionSerializer.snapshotConfiguration() - ); + return new SharedBufferSerializerConfigSnapshot<>( + keySerializer, + valueSerializer, + versionSerializer); } @Override public CompatibilityResult<SharedBuffer<K, V>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { if (configSnapshot instanceof SharedBufferSerializerConfigSnapshot) { - TypeSerializerConfigSnapshot[] serializerConfigSnapshots = - ((SharedBufferSerializerConfigSnapshot) configSnapshot).getNestedSerializerConfigSnapshots(); - - CompatibilityResult<K> keyCompatResult = keySerializer.ensureCompatibility(serializerConfigSnapshots[0]); - CompatibilityResult<V> valueCompatResult = valueSerializer.ensureCompatibility(serializerConfigSnapshots[1]); - CompatibilityResult<DeweyNumber> versionCompatResult = versionSerializer.ensureCompatibility(serializerConfigSnapshots[2]); + List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> serializerConfigSnapshots = + ((SharedBufferSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs(); + + CompatibilityResult<K> keyCompatResult = CompatibilityUtil.resolveCompatibilityResult( + serializerConfigSnapshots.get(0).f0, + UnloadableDummyTypeSerializer.class, + serializerConfigSnapshots.get(0).f1, + keySerializer); + + CompatibilityResult<V> valueCompatResult = CompatibilityUtil.resolveCompatibilityResult( + serializerConfigSnapshots.get(1).f0, + UnloadableDummyTypeSerializer.class, + serializerConfigSnapshots.get(1).f1, + valueSerializer); + + CompatibilityResult<DeweyNumber> versionCompatResult = CompatibilityUtil.resolveCompatibilityResult( + serializerConfigSnapshots.get(2).f0, + UnloadableDummyTypeSerializer.class, + serializerConfigSnapshots.get(2).f1, + versionSerializer); if (!keyCompatResult.isRequiresMigration() && !valueCompatResult.isRequiresMigration() && !versionCompatResult.isRequiresMigration()) { return CompatibilityResult.compatible(); http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java index 2ed7245..7b6e5e3 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java @@ -22,10 +22,13 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; import org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.core.fs.FSDataInputStream; @@ -482,14 +485,20 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> @Override public TypeSerializerConfigSnapshot snapshotConfiguration() { - return new CollectionSerializerConfigSnapshot(elementSerializer.snapshotConfiguration()); + return new CollectionSerializerConfigSnapshot<>(elementSerializer); } @Override public CompatibilityResult<PriorityQueue<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { if (configSnapshot instanceof CollectionSerializerConfigSnapshot) { - CompatibilityResult<T> compatResult = elementSerializer.ensureCompatibility( - ((CollectionSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerConfigSnapshot()); + Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousElemSerializerAndConfig = + ((CollectionSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig(); + + CompatibilityResult<T> compatResult = CompatibilityUtil.resolveCompatibilityResult( + previousElemSerializerAndConfig.f0, + UnloadableDummyTypeSerializer.class, + previousElemSerializerAndConfig.f1, + elementSerializer); if (!compatResult.isRequiresMigration()) { return CompatibilityResult.compatible(); http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala index 0fd3680..caf346c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala @@ -81,8 +81,7 @@ class CRowSerializer(val rowSerializer: TypeSerializer[Row]) extends TypeSeriali // -------------------------------------------------------------------------------------------- override def snapshotConfiguration(): TypeSerializerConfigSnapshot = { - new CRowSerializer.CRowSerializerConfigSnapshot( - rowSerializer.snapshotConfiguration()) + new CRowSerializer.CRowSerializerConfigSnapshot(rowSerializer) } override def ensureCompatibility( @@ -90,8 +89,11 @@ class CRowSerializer(val rowSerializer: TypeSerializer[Row]) extends TypeSeriali configSnapshot match { case crowSerializerConfigSnapshot: CRowSerializer.CRowSerializerConfigSnapshot => - val compatResult = rowSerializer.ensureCompatibility( - crowSerializerConfigSnapshot.getSingleNestedSerializerConfigSnapshot) + val compatResult = CompatibilityUtil.resolveCompatibilityResult( + crowSerializerConfigSnapshot.getSingleNestedSerializerAndConfig.f0, + classOf[UnloadableDummyTypeSerializer[_]], + crowSerializerConfigSnapshot.getSingleNestedSerializerAndConfig.f1, + rowSerializer) if (compatResult.isRequiresMigration) { if (compatResult.getConvertDeserializer != null) { @@ -114,8 +116,8 @@ class CRowSerializer(val rowSerializer: TypeSerializer[Row]) extends TypeSeriali object CRowSerializer { class CRowSerializerConfigSnapshot( - private var rowSerializerConfigSnapshot: TypeSerializerConfigSnapshot) - extends CompositeTypeSerializerConfigSnapshot(rowSerializerConfigSnapshot) { + private val rowSerializer: TypeSerializer[Row]) + extends CompositeTypeSerializerConfigSnapshot(rowSerializer) { /** This empty nullary constructor is required for deserializing the configuration. */ def this() = this(null) http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java index 56eb7ea..3c4f4b0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ArrayListSerializer.java @@ -18,10 +18,13 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; import org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -143,14 +146,20 @@ final public class ArrayListSerializer<T> extends TypeSerializer<ArrayList<T>> { @Override public TypeSerializerConfigSnapshot snapshotConfiguration() { - return new CollectionSerializerConfigSnapshot(elementSerializer.snapshotConfiguration()); + return new CollectionSerializerConfigSnapshot<>(elementSerializer); } @Override public CompatibilityResult<ArrayList<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { if (configSnapshot instanceof CollectionSerializerConfigSnapshot) { - CompatibilityResult<T> compatResult = elementSerializer.ensureCompatibility( - ((CollectionSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerConfigSnapshot()); + Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousElemSerializerAndConfig = + ((CollectionSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig(); + + CompatibilityResult<T> compatResult = CompatibilityUtil.resolveCompatibilityResult( + previousElemSerializerAndConfig.f0, + UnloadableDummyTypeSerializer.class, + previousElemSerializerAndConfig.f1, + elementSerializer); if (!compatResult.isRequiresMigration()) { return CompatibilityResult.compatible(); http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java index 1d3af72..eec2e93 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java @@ -24,7 +24,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy; +import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FSDataOutputStream; @@ -296,8 +296,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { for (RegisteredOperatorBackendStateMetaInfo.Snapshot<?> restoredMetaInfo : restoredMetaInfoSnapshots) { if (restoredMetaInfo.getPartitionStateSerializer() == null || - restoredMetaInfo.getPartitionStateSerializer() - instanceof TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer) { + restoredMetaInfo.getPartitionStateSerializer() instanceof UnloadableDummyTypeSerializer) { // must fail now if the previous serializer cannot be restored because there is no serializer // capable of reading previous state http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java index b93c9e0..066684b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashMapSerializer.java @@ -20,16 +20,20 @@ package org.apache.flink.runtime.state; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; import org.apache.flink.api.common.typeutils.base.MapSerializerConfigSnapshot; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.util.Preconditions; import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; /** @@ -203,19 +207,26 @@ public final class HashMapSerializer<K, V> extends TypeSerializer<HashMap<K, V>> @Override public TypeSerializerConfigSnapshot snapshotConfiguration() { - return new MapSerializerConfigSnapshot( - keySerializer.snapshotConfiguration(), - valueSerializer.snapshotConfiguration()); + return new MapSerializerConfigSnapshot<>(keySerializer, valueSerializer); } @Override public CompatibilityResult<HashMap<K, V>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { if (configSnapshot instanceof MapSerializerConfigSnapshot) { - TypeSerializerConfigSnapshot[] keyValueSerializerConfigSnapshots = - ((MapSerializerConfigSnapshot) configSnapshot).getNestedSerializerConfigSnapshots(); - - CompatibilityResult<K> keyCompatResult = keySerializer.ensureCompatibility(keyValueSerializerConfigSnapshots[0]); - CompatibilityResult<V> valueCompatResult = valueSerializer.ensureCompatibility(keyValueSerializerConfigSnapshots[1]); + List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousKvSerializersAndConfigs = + ((MapSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs(); + + CompatibilityResult<K> keyCompatResult = CompatibilityUtil.resolveCompatibilityResult( + previousKvSerializersAndConfigs.get(0).f0, + UnloadableDummyTypeSerializer.class, + previousKvSerializersAndConfigs.get(0).f1, + keySerializer); + + CompatibilityResult<V> valueCompatResult = CompatibilityUtil.resolveCompatibilityResult( + previousKvSerializersAndConfigs.get(1).f0, + UnloadableDummyTypeSerializer.class, + previousKvSerializersAndConfigs.get(1).f1, + valueSerializer); if (!keyCompatResult.isRequiresMigration() && !valueCompatResult.isRequiresMigration()) { return CompatibilityResult.compatible(); http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java index f265f78..2ff8cb6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java @@ -20,19 +20,16 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy; -import org.apache.flink.api.common.typeutils.TypeSerializerUtil; +import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.io.VersionedIOReadableWritable; -import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; -import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.util.Preconditions; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; /** @@ -94,20 +91,10 @@ public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritab super.write(out); // write in a way to be fault tolerant of read failures when deserializing the key serializer - try ( - ByteArrayOutputStreamWithPos buffer = new ByteArrayOutputStreamWithPos(); - DataOutputViewStreamWrapper bufferWrapper = new DataOutputViewStreamWrapper(buffer)){ - - new TypeSerializerSerializationProxy<>(keySerializer).write(bufferWrapper); - - // write offset of key serializer's configuration snapshot - out.writeInt(buffer.getPosition()); - TypeSerializerUtil.writeSerializerConfigSnapshot(bufferWrapper, keySerializerConfigSnapshot); - - // flush buffer - out.writeInt(buffer.getPosition()); - out.write(buffer.getBuf(), 0, buffer.getPosition()); - } + TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience( + out, + Collections.singletonList( + new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(keySerializer, keySerializerConfigSnapshot))); // write individual registered keyed state metainfos out.writeShort(stateMetaInfoSnapshots.size()); @@ -118,38 +105,19 @@ public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritab } } + @SuppressWarnings("unchecked") @Override public void read(DataInputView in) throws IOException { super.read(in); - final TypeSerializerSerializationProxy<K> keySerializerProxy = - new TypeSerializerSerializationProxy<>(userCodeClassLoader); - // only starting from version 3, we have the key serializer and its config snapshot written if (getReadVersion() >= 3) { - int keySerializerConfigSnapshotOffset = in.readInt(); - int numBufferedBytes = in.readInt(); - byte[] keySerializerAndConfigBytes = new byte[numBufferedBytes]; - in.readFully(keySerializerAndConfigBytes); - - try ( - ByteArrayInputStreamWithPos buffer = new ByteArrayInputStreamWithPos(keySerializerAndConfigBytes); - DataInputViewStreamWrapper bufferWrapper = new DataInputViewStreamWrapper(buffer)) { - - try { - keySerializerProxy.read(bufferWrapper); - this.keySerializer = keySerializerProxy.getTypeSerializer(); - } catch (IOException e) { - this.keySerializer = null; - } - - buffer.setPosition(keySerializerConfigSnapshotOffset); - this.keySerializerConfigSnapshot = - TypeSerializerUtil.readSerializerConfigSnapshot(bufferWrapper, userCodeClassLoader); - } + Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> keySerializerAndConfig = + TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, userCodeClassLoader).get(0); + this.keySerializer = (TypeSerializer<K>) keySerializerAndConfig.f0; + this.keySerializerConfigSnapshot = keySerializerAndConfig.f1; } else { - keySerializerProxy.read(in); - this.keySerializer = keySerializerProxy.getTypeSerializer(); + this.keySerializer = TypeSerializerSerializationUtil.tryReadSerializer(in, userCodeClassLoader); this.keySerializerConfigSnapshot = null; } http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java index ac81e86..9108306 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java @@ -19,19 +19,17 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.state.StateDescriptor; -import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy; -import org.apache.flink.api.common.typeutils.TypeSerializerUtil; -import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; -import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.util.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Arrays; +import java.util.List; /** * Readers and writers for different versions of the {@link RegisteredKeyedBackendStateMetaInfo.Snapshot}. @@ -39,8 +37,6 @@ import java.io.IOException; */ public class KeyedBackendStateMetaInfoSnapshotReaderWriters { - private static final Logger LOG = LoggerFactory.getLogger(KeyedBackendStateMetaInfoSnapshotReaderWriters.class); - // ------------------------------------------------------------------------------- // Writers // - v1: Flink 1.2.x @@ -91,8 +87,8 @@ public class KeyedBackendStateMetaInfoSnapshotReaderWriters { out.writeInt(stateMetaInfo.getStateType().ordinal()); out.writeUTF(stateMetaInfo.getName()); - new TypeSerializerSerializationProxy<>(stateMetaInfo.getNamespaceSerializer()).write(out); - new TypeSerializerSerializationProxy<>(stateMetaInfo.getStateSerializer()).write(out); + TypeSerializerSerializationUtil.writeSerializer(out, stateMetaInfo.getNamespaceSerializer()); + TypeSerializerSerializationUtil.writeSerializer(out, stateMetaInfo.getStateSerializer()); } } @@ -108,25 +104,13 @@ public class KeyedBackendStateMetaInfoSnapshotReaderWriters { out.writeUTF(stateMetaInfo.getName()); // write in a way that allows us to be fault-tolerant and skip blocks in the case of java serialization failures - try ( - ByteArrayOutputStreamWithPos outWithPos = new ByteArrayOutputStreamWithPos(); - DataOutputViewStreamWrapper outViewWrapper = new DataOutputViewStreamWrapper(outWithPos)) { - - new TypeSerializerSerializationProxy<>(stateMetaInfo.getNamespaceSerializer()).write(outViewWrapper); - - // write current offset, which represents the start offset of the state serializer - out.writeInt(outWithPos.getPosition()); - new TypeSerializerSerializationProxy<>(stateMetaInfo.getStateSerializer()).write(outViewWrapper); - - // write current offset, which represents the start of the configuration snapshots - out.writeInt(outWithPos.getPosition()); - TypeSerializerUtil.writeSerializerConfigSnapshot(outViewWrapper, stateMetaInfo.getNamespaceSerializerConfigSnapshot()); - TypeSerializerUtil.writeSerializerConfigSnapshot(outViewWrapper, stateMetaInfo.getStateSerializerConfigSnapshot()); - - // write total number of bytes and then flush - out.writeInt(outWithPos.getPosition()); - out.write(outWithPos.getBuf(), 0, outWithPos.getPosition()); - } + TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience( + out, + Arrays.asList( + new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>( + stateMetaInfo.getNamespaceSerializer(), stateMetaInfo.getNamespaceSerializerConfigSnapshot()), + new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>( + stateMetaInfo.getStateSerializer(), stateMetaInfo.getStateSerializerConfigSnapshot()))); } } @@ -184,15 +168,8 @@ public class KeyedBackendStateMetaInfoSnapshotReaderWriters { metaInfo.setStateType(StateDescriptor.Type.values()[in.readInt()]); metaInfo.setName(in.readUTF()); - final TypeSerializerSerializationProxy<N> namespaceSerializerProxy = - new TypeSerializerSerializationProxy<>(userCodeClassLoader); - namespaceSerializerProxy.read(in); - metaInfo.setNamespaceSerializer(namespaceSerializerProxy.getTypeSerializer()); - - final TypeSerializerSerializationProxy<S> stateSerializerProxy = - new TypeSerializerSerializationProxy<>(userCodeClassLoader); - stateSerializerProxy.read(in); - metaInfo.setStateSerializer(stateSerializerProxy.getTypeSerializer()); + metaInfo.setNamespaceSerializer(TypeSerializerSerializationUtil.<N>tryReadSerializer(in, userCodeClassLoader)); + metaInfo.setStateSerializer(TypeSerializerSerializationUtil.<S>tryReadSerializer(in, userCodeClassLoader)); // older versions do not contain the configuration snapshot metaInfo.setNamespaceSerializerConfigSnapshot(null); @@ -202,6 +179,7 @@ public class KeyedBackendStateMetaInfoSnapshotReaderWriters { } } + @SuppressWarnings("unchecked") static class KeyedBackendStateMetaInfoReaderV3<N, S> extends AbstractKeyedBackendStateMetaInfoReader { public KeyedBackendStateMetaInfoReaderV3(ClassLoader userCodeClassLoader) { @@ -216,48 +194,14 @@ public class KeyedBackendStateMetaInfoSnapshotReaderWriters { metaInfo.setStateType(StateDescriptor.Type.values()[in.readInt()]); metaInfo.setName(in.readUTF()); - // read offsets - int stateSerializerStartOffset = in.readInt(); - int configSnapshotsStartOffset = in.readInt(); - - int totalBytes = in.readInt(); - - byte[] buffer = new byte[totalBytes]; - in.readFully(buffer); - - ByteArrayInputStreamWithPos inWithPos = new ByteArrayInputStreamWithPos(buffer); - DataInputViewStreamWrapper inViewWrapper = new DataInputViewStreamWrapper(inWithPos); - - try { - final TypeSerializerSerializationProxy<N> namespaceSerializerProxy = - new TypeSerializerSerializationProxy<>(userCodeClassLoader); - namespaceSerializerProxy.read(inViewWrapper); - metaInfo.setNamespaceSerializer(namespaceSerializerProxy.getTypeSerializer()); - } catch (IOException e) { - LOG.warn("Deserialization of previous namespace serializer errored; setting serializer to null. ", e); - - metaInfo.setNamespaceSerializer(null); - } - - // make sure we start from the state serializer bytes position - inWithPos.setPosition(stateSerializerStartOffset); - try { - final TypeSerializerSerializationProxy<S> stateSerializerProxy = - new TypeSerializerSerializationProxy<>(userCodeClassLoader); - stateSerializerProxy.read(inViewWrapper); - metaInfo.setStateSerializer(stateSerializerProxy.getTypeSerializer()); - } catch (IOException e) { - LOG.warn("Deserialization of previous state serializer errored; setting serializer to null. ", e); - - metaInfo.setStateSerializer(null); - } - - // make sure we start from the config snapshot bytes position - inWithPos.setPosition(configSnapshotsStartOffset); - metaInfo.setNamespaceSerializerConfigSnapshot( - TypeSerializerUtil.readSerializerConfigSnapshot(inViewWrapper, userCodeClassLoader)); - metaInfo.setStateSerializerConfigSnapshot( - TypeSerializerUtil.readSerializerConfigSnapshot(inViewWrapper, userCodeClassLoader)); + List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> serializersAndConfigs = + TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, userCodeClassLoader); + + metaInfo.setNamespaceSerializer((TypeSerializer<N>) serializersAndConfigs.get(0).f0); + metaInfo.setNamespaceSerializerConfigSnapshot(serializersAndConfigs.get(0).f1); + + metaInfo.setStateSerializer((TypeSerializer<S>) serializersAndConfigs.get(1).f0); + metaInfo.setStateSerializerConfigSnapshot(serializersAndConfigs.get(1).f1); return metaInfo; } http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java index 4f151c9..e52323f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java @@ -19,21 +19,19 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy; -import org.apache.flink.api.common.typeutils.TypeSerializerUtil; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; -import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; -import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Collections; /** * Readers and writers for different versions of the {@link RegisteredOperatorBackendStateMetaInfo.Snapshot}. @@ -91,7 +89,7 @@ public class OperatorBackendStateMetaInfoSnapshotReaderWriters { public void writeStateMetaInfo(DataOutputView out) throws IOException { out.writeUTF(stateMetaInfo.getName()); out.writeByte(stateMetaInfo.getAssignmentMode().ordinal()); - new TypeSerializerSerializationProxy<>(stateMetaInfo.getPartitionStateSerializer()).write(out); + TypeSerializerSerializationUtil.writeSerializer(out, stateMetaInfo.getPartitionStateSerializer()); } } @@ -107,22 +105,11 @@ public class OperatorBackendStateMetaInfoSnapshotReaderWriters { out.writeByte(stateMetaInfo.getAssignmentMode().ordinal()); // write in a way that allows us to be fault-tolerant and skip blocks in the case of java serialization failures - try ( - ByteArrayOutputStreamWithPos outWithPos = new ByteArrayOutputStreamWithPos(); - DataOutputViewStreamWrapper outViewWrapper = new DataOutputViewStreamWrapper(outWithPos)) { - - new TypeSerializerSerializationProxy<>(stateMetaInfo.getPartitionStateSerializer()).write(outViewWrapper); - - // write the start offset of the config snapshot - out.writeInt(outWithPos.getPosition()); - TypeSerializerUtil.writeSerializerConfigSnapshot( - outViewWrapper, - stateMetaInfo.getPartitionStateSerializerConfigSnapshot()); - - // write the total number of bytes and flush - out.writeInt(outWithPos.getPosition()); - out.write(outWithPos.getBuf(), 0, outWithPos.getPosition()); - } + TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience( + out, + Collections.singletonList(new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>( + stateMetaInfo.getPartitionStateSerializer(), + stateMetaInfo.getPartitionStateSerializerConfigSnapshot()))); } } @@ -192,6 +179,7 @@ public class OperatorBackendStateMetaInfoSnapshotReaderWriters { } } + @SuppressWarnings("unchecked") public static class OperatorBackendStateMetaInfoReaderV2<S> extends AbstractOperatorBackendStateMetaInfoReader<S> { public OperatorBackendStateMetaInfoReaderV2(ClassLoader userCodeClassLoader) { @@ -206,32 +194,11 @@ public class OperatorBackendStateMetaInfoSnapshotReaderWriters { stateMetaInfo.setName(in.readUTF()); stateMetaInfo.setAssignmentMode(OperatorStateHandle.Mode.values()[in.readByte()]); - // read start offset of configuration snapshot - int configSnapshotStartOffset = in.readInt(); - - int totalBytes = in.readInt(); - - byte[] buffer = new byte[totalBytes]; - in.readFully(buffer); - - ByteArrayInputStreamWithPos inWithPos = new ByteArrayInputStreamWithPos(buffer); - DataInputViewStreamWrapper inViewWrapper = new DataInputViewStreamWrapper(inWithPos); - - try { - final TypeSerializerSerializationProxy<S> partitionStateSerializerProxy = - new TypeSerializerSerializationProxy<>(userCodeClassLoader); - partitionStateSerializerProxy.read(inViewWrapper); - stateMetaInfo.setPartitionStateSerializer(partitionStateSerializerProxy.getTypeSerializer()); - } catch (IOException e) { - LOG.warn("Deserialization of previous serializer errored; setting serializer to null. ", e); - - stateMetaInfo.setPartitionStateSerializer(null); - } + Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> stateSerializerAndConfig = + TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, userCodeClassLoader).get(0); - // make sure we start from the partition state serializer bytes position - inWithPos.setPosition(configSnapshotStartOffset); - stateMetaInfo.setPartitionStateSerializerConfigSnapshot( - TypeSerializerUtil.readSerializerConfigSnapshot(inViewWrapper, userCodeClassLoader)); + stateMetaInfo.setPartitionStateSerializer((TypeSerializer<S>) stateSerializerAndConfig.f0); + stateMetaInfo.setPartitionStateSerializerConfigSnapshot(stateSerializerAndConfig.f1); return stateMetaInfo; } http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateMigrationUtil.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateMigrationUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateMigrationUtil.java deleted file mode 100644 index 39bb743..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateMigrationUtil.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state; - -import org.apache.flink.api.common.typeutils.CompatibilityResult; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; - -/** - * Utilities related to state migration, commonly used in the state backends. - */ -public class StateMigrationUtil { - - /** - * Resolves the final compatibility result of two serializers by taking into account compound information, - * including the preceding serializer, the preceding serializer's configuration snapshot, and the new serializer. - * - * The final result is determined as follows: - * 1. If there is no configuration snapshot of the preceding serializer, - * assumes the new serializer to be compatible. - * 2. Confront the configuration snapshot with the new serializer. - * 3. If the result is compatible, just return that as the result. - * 4. If not compatible and requires migration, check if the preceding serializer is valid. - * If yes, use that as the convert deserializer for state migration. - * 5. If the preceding serializer is not valid, check if the result came with a convert deserializer. - * If yes, use that for state migration and simply return the result. - * 6. If all of above fails, state migration is required but could not be performed; throw exception. - * - * @param precedingSerializer the preceding serializer used to write the data - * @param dummySerializerClassTag any class tags that identifies the preceding serializer as a dummy placeholder - * @param precedingSerializerConfigSnapshot configuration snapshot of the preceding serializer - * @param newSerializer the new serializer to ensure compatibility with - * - * @param <T> Type of the data handled by the serializers - * - * @return the final resolved compatibility result - */ - public static <T> CompatibilityResult<T> resolveCompatibilityResult( - TypeSerializer<T> precedingSerializer, - Class<?> dummySerializerClassTag, - TypeSerializerConfigSnapshot precedingSerializerConfigSnapshot, - TypeSerializer<T> newSerializer) { - - if (precedingSerializerConfigSnapshot != null) { - CompatibilityResult<T> initialResult = newSerializer.ensureCompatibility(precedingSerializerConfigSnapshot); - - if (!initialResult.isRequiresMigration()) { - return initialResult; - } else { - if (precedingSerializer != null && !(precedingSerializer.getClass().equals(dummySerializerClassTag))) { - // if the preceding serializer exists and is not a dummy, use - // that for converting instead of the provided convert deserializer - return CompatibilityResult.requiresMigration(precedingSerializer); - } else if (initialResult.getConvertDeserializer() != null) { - return initialResult; - } else { - throw new RuntimeException( - "State migration required, but there is no available serializer capable of reading previous data."); - } - } - } else { - // if the configuration snapshot of the preceding serializer cannot be provided, - // we can only simply assume that the new serializer is compatible - return CompatibilityResult.compatible(); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index 3e5645b..ada6377 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -29,8 +29,9 @@ import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy; +import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.memory.DataInputViewStreamWrapper; @@ -54,7 +55,6 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo; -import org.apache.flink.runtime.state.StateMigrationUtil; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.internal.InternalAggregatingState; import org.apache.flink.runtime.state.internal.InternalFoldingState; @@ -391,11 +391,11 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { if (!keySerializerRestored) { // check for key serializer compatibility; this also reconfigures the // key serializer to be compatible, if it is required and is possible - if (StateMigrationUtil.resolveCompatibilityResult( - serializationProxy.getKeySerializer(), - TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class, - serializationProxy.getKeySerializerConfigSnapshot(), - keySerializer) + if (CompatibilityUtil.resolveCompatibilityResult( + serializationProxy.getKeySerializer(), + UnloadableDummyTypeSerializer.class, + serializationProxy.getKeySerializerConfigSnapshot(), + keySerializer) .isRequiresMigration()) { // TODO replace with state migration; note that key hash codes need to remain the same after migration @@ -412,8 +412,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> restoredMetaInfo : restoredMetaInfos) { if (restoredMetaInfo.getStateSerializer() == null || - restoredMetaInfo.getStateSerializer() - instanceof TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer) { + restoredMetaInfo.getStateSerializer() instanceof UnloadableDummyTypeSerializer) { // must fail now if the previous serializer cannot be restored because there is no serializer // capable of reading previous state http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java index fee97f4..f1f0406 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java @@ -24,7 +24,7 @@ import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy; +import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.runtime.checkpoint.CheckpointOptions; @@ -61,7 +61,7 @@ import static org.mockito.Mockito.when; * Tests for the {@link org.apache.flink.runtime.state.memory.MemoryStateBackend}. */ @RunWith(PowerMockRunner.class) -@PrepareForTest({KeyedBackendStateMetaInfoSnapshotReaderWriters.class, OperatorBackendStateMetaInfoSnapshotReaderWriters.class}) +@PrepareForTest(TypeSerializerSerializationUtil.class) public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBackend> { @Override @@ -268,9 +268,10 @@ public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBack "testOperator"); // mock failure when deserializing serializer - TypeSerializerSerializationProxy<?> mockProxy = mock(TypeSerializerSerializationProxy.class); + TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy = + mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class); doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class)); - PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy); + PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy); operatorStateBackend.restore(Collections.singletonList(stateHandle)); @@ -320,9 +321,10 @@ public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBack when(env.getUserClassLoader()).thenReturn(OperatorStateBackendTest.class.getClassLoader()); // mock failure when deserializing serializer - TypeSerializerSerializationProxy<?> mockProxy = mock(TypeSerializerSerializationProxy.class); + TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy = + mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class); doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class)); - PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy); + PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy); try { restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env); http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java index af5f0b2..31b75c7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy; +import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.testutils.OneShotLatch; @@ -64,7 +64,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @RunWith(PowerMockRunner.class) -@PrepareForTest(OperatorBackendStateMetaInfoSnapshotReaderWriters.class) +@PrepareForTest(TypeSerializerSerializationUtil.class) public class OperatorStateBackendTest { private final ClassLoader classLoader = getClass().getClassLoader(); @@ -544,9 +544,10 @@ public class OperatorStateBackendTest { "testOperator"); // mock failure when deserializing serializer - TypeSerializerSerializationProxy<?> mockProxy = mock(TypeSerializerSerializationProxy.class); + TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy = + mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class); doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class)); - PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy); + PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy); operatorStateBackend.restore(Collections.singletonList(stateHandle)); http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java index 3d5b210..920aa69 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy; +import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil; import org.apache.flink.api.common.typeutils.base.DoubleSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; @@ -44,10 +44,7 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @RunWith(PowerMockRunner.class) -@PrepareForTest({ - KeyedBackendSerializationProxy.class, - KeyedBackendStateMetaInfoSnapshotReaderWriters.class, - OperatorBackendStateMetaInfoSnapshotReaderWriters.class}) +@PrepareForTest(TypeSerializerSerializationUtil.class) public class SerializationProxiesTest { @Test @@ -116,9 +113,10 @@ public class SerializationProxiesTest { new KeyedBackendSerializationProxy<>(Thread.currentThread().getContextClassLoader()); // mock failure when deserializing serializers - TypeSerializerSerializationProxy<?> mockProxy = mock(TypeSerializerSerializationProxy.class); + TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy = + mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class); doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class)); - PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy); + PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy); try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) { serializationProxy.read(new DataInputViewStreamWrapper(in)); @@ -182,9 +180,10 @@ public class SerializationProxiesTest { } // mock failure when deserializing serializer - TypeSerializerSerializationProxy<?> mockProxy = mock(TypeSerializerSerializationProxy.class); + TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy = + mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class); doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class)); - PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy); + PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy); try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) { metaInfo = KeyedBackendStateMetaInfoSnapshotReaderWriters @@ -279,9 +278,10 @@ public class SerializationProxiesTest { } // mock failure when deserializing serializer - TypeSerializerSerializationProxy<?> mockProxy = mock(TypeSerializerSerializationProxy.class); + TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy = + mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class); doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class)); - PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy); + PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy); try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) { metaInfo = OperatorBackendStateMetaInfoSnapshotReaderWriters http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala index 9736e81..9c45276 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala @@ -23,7 +23,9 @@ import org.apache.flink.annotation.Internal import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo._ import org.apache.flink.api.common.typeutils._ +import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.api.java.typeutils._ +import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase import org.apache.flink.api.scala.typeutils._ import org.apache.flink.types.Value import org.apache.hadoop.io.Writable @@ -152,6 +154,15 @@ private[flink] trait TypeInformationGen[C <: Context] { override def createInstance(fields: Array[AnyRef]): T = { instance.splice } + + override def createSerializerInstance( + tupleClass: Class[T], + fieldSerializers: Array[TypeSerializer[_]]) = { + this.getClass + .getConstructors()(0) + .newInstance(tupleClass, fieldSerializers) + .asInstanceOf[CaseClassSerializer[T]] + } } } } http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala index 6096388..1899b13 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala @@ -106,11 +106,23 @@ package object scala { fieldSerializers(i) = types(i).createSerializer(executionConfig) } - new CaseClassSerializer[(T1, T2)](classOf[(T1, T2)], fieldSerializers) { - override def createInstance(fields: Array[AnyRef]) = { - (fields(0).asInstanceOf[T1], fields(1).asInstanceOf[T2]) - } - } + new Tuple2CaseClassSerializer[T1, T2](classOf[(T1, T2)], fieldSerializers) } } + + class Tuple2CaseClassSerializer[T1, T2]( + val clazz: Class[(T1, T2)], + fieldSerializers: Array[TypeSerializer[_]]) + extends CaseClassSerializer[(T1, T2)](clazz, fieldSerializers) { + + override def createInstance(fields: Array[AnyRef]) = { + (fields(0).asInstanceOf[T1], fields(1).asInstanceOf[T2]) + } + + override def createSerializerInstance( + tupleClass: Class[(T1, T2)], + fieldSerializers: Array[TypeSerializer[_]]) = { + new Tuple2CaseClassSerializer[T1, T2](tupleClass, fieldSerializers) + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala index 6c4378a..1095aee 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala @@ -18,7 +18,7 @@ package org.apache.flink.api.scala.typeutils import org.apache.flink.annotation.Internal -import org.apache.flink.api.common.typeutils.{CompatibilityResult, TypeDeserializerAdapter, TypeSerializer, TypeSerializerConfigSnapshot} +import org.apache.flink.api.common.typeutils._ import org.apache.flink.api.java.typeutils.runtime.EitherSerializerConfigSnapshot import org.apache.flink.core.memory.{DataInputView, DataOutputView} @@ -110,22 +110,29 @@ class EitherSerializer[A, B, T <: Either[A, B]]( // Serializer configuration snapshotting & compatibility // -------------------------------------------------------------------------------------------- - override def snapshotConfiguration(): EitherSerializerConfigSnapshot = { - new EitherSerializerConfigSnapshot( - leftSerializer.snapshotConfiguration(), - rightSerializer.snapshotConfiguration()) + override def snapshotConfiguration(): EitherSerializerConfigSnapshot[A, B] = { + new EitherSerializerConfigSnapshot[A, B](leftSerializer, rightSerializer) } override def ensureCompatibility( configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[T] = { configSnapshot match { - case eitherSerializerConfig: EitherSerializerConfigSnapshot => - val leftRightConfigs = - eitherSerializerConfig.getNestedSerializerConfigSnapshots - - val leftCompatResult = leftSerializer.ensureCompatibility(leftRightConfigs(0)) - val rightCompatResult = rightSerializer.ensureCompatibility(leftRightConfigs(1)) + case eitherSerializerConfig: EitherSerializerConfigSnapshot[A, B] => + val previousLeftRightSerWithConfigs = + eitherSerializerConfig.getNestedSerializersAndConfigs + + val leftCompatResult = CompatibilityUtil.resolveCompatibilityResult( + previousLeftRightSerWithConfigs.get(0).f0, + classOf[UnloadableDummyTypeSerializer[_]], + previousLeftRightSerWithConfigs.get(0).f1, + leftSerializer) + + val rightCompatResult = CompatibilityUtil.resolveCompatibilityResult( + previousLeftRightSerWithConfigs.get(1).f0, + classOf[UnloadableDummyTypeSerializer[_]], + previousLeftRightSerWithConfigs.get(1).f1, + rightSerializer) if (leftCompatResult.isRequiresMigration || rightCompatResult.isRequiresMigration) { http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala index 4b56059..8adfb5c 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala @@ -100,16 +100,20 @@ class OptionSerializer[A](val elemSerializer: TypeSerializer[A]) // Serializer configuration snapshotting & compatibility // -------------------------------------------------------------------------------------------- - override def snapshotConfiguration(): OptionSerializer.OptionSerializerConfigSnapshot = { - new OptionSerializer.OptionSerializerConfigSnapshot(elemSerializer.snapshotConfiguration()) + override def snapshotConfiguration(): OptionSerializer.OptionSerializerConfigSnapshot[A] = { + new OptionSerializer.OptionSerializerConfigSnapshot(elemSerializer) } override def ensureCompatibility( configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[Option[A]] = { configSnapshot match { - case optionSerializerConfigSnapshot: OptionSerializer.OptionSerializerConfigSnapshot => - val compatResult = elemSerializer.ensureCompatibility( - optionSerializerConfigSnapshot.getSingleNestedSerializerConfigSnapshot) + case optionSerializerConfigSnapshot + : OptionSerializer.OptionSerializerConfigSnapshot[A] => + val compatResult = CompatibilityUtil.resolveCompatibilityResult( + optionSerializerConfigSnapshot.getSingleNestedSerializerAndConfig.f0, + classOf[UnloadableDummyTypeSerializer[_]], + optionSerializerConfigSnapshot.getSingleNestedSerializerAndConfig.f1, + elemSerializer) if (compatResult.isRequiresMigration) { if (compatResult.getConvertDeserializer != null) { @@ -130,9 +134,9 @@ class OptionSerializer[A](val elemSerializer: TypeSerializer[A]) object OptionSerializer { - class OptionSerializerConfigSnapshot( - private var elemSerializerConfigSnapshot: TypeSerializerConfigSnapshot) - extends CompositeTypeSerializerConfigSnapshot(elemSerializerConfigSnapshot) { + class OptionSerializerConfigSnapshot[A]( + private val elemSerializer: TypeSerializer[A]) + extends CompositeTypeSerializerConfigSnapshot(elemSerializer) { /** This empty nullary constructor is required for deserializing the configuration. */ def this() = this(null) http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala index 5de76ca..641caa1 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala @@ -21,7 +21,6 @@ import org.apache.flink.annotation.Internal import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeutils._ import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer -import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.KryoSerializerConfigSnapshot import org.apache.flink.core.memory.{DataInputView, DataOutputView} import scala.util.{Failure, Success, Try} @@ -105,23 +104,28 @@ class TrySerializer[A]( // -------------------------------------------------------------------------------------------- override def snapshotConfiguration(): TypeSerializerConfigSnapshot = { - new TrySerializer.TrySerializerConfigSnapshot( - elemSerializer.snapshotConfiguration(), - throwableSerializer.snapshotConfiguration()) + new TrySerializer.TrySerializerConfigSnapshot[A](elemSerializer, throwableSerializer) } override def ensureCompatibility( configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[Try[A]] = { configSnapshot match { - case trySerializerConfigSnapshot: TrySerializer.TrySerializerConfigSnapshot => - val serializerConfigSnapshots = - trySerializerConfigSnapshot.getNestedSerializerConfigSnapshots - - val elemCompatRes = - elemSerializer.ensureCompatibility(serializerConfigSnapshots(0)) - val throwableCompatRes = - throwableSerializer.ensureCompatibility(serializerConfigSnapshots(1)) + case trySerializerConfigSnapshot: TrySerializer.TrySerializerConfigSnapshot[A] => + val previousSerializersAndConfigs = + trySerializerConfigSnapshot.getNestedSerializersAndConfigs + + val elemCompatRes = CompatibilityUtil.resolveCompatibilityResult( + previousSerializersAndConfigs.get(0).f0, + classOf[UnloadableDummyTypeSerializer[_]], + previousSerializersAndConfigs.get(0).f1, + elemSerializer) + + val throwableCompatRes = CompatibilityUtil.resolveCompatibilityResult( + previousSerializersAndConfigs.get(1).f0, + classOf[UnloadableDummyTypeSerializer[_]], + previousSerializersAndConfigs.get(1).f1, + throwableSerializer) if (elemCompatRes.isRequiresMigration || throwableCompatRes.isRequiresMigration) { CompatibilityResult.requiresMigration() @@ -136,11 +140,11 @@ class TrySerializer[A]( object TrySerializer { - class TrySerializerConfigSnapshot( - private var elemSerializerConfigSnapshot: TypeSerializerConfigSnapshot, - private var throwableSerializerConfigSnapshot: KryoSerializerConfigSnapshot[Throwable]) + class TrySerializerConfigSnapshot[A]( + private var elemSerializer: TypeSerializer[A], + private var throwableSerializer: TypeSerializer[Throwable]) extends CompositeTypeSerializerConfigSnapshot( - elemSerializerConfigSnapshot, throwableSerializerConfigSnapshot) { + elemSerializer, throwableSerializer) { /** This empty nullary constructor is required for deserializing the configuration. */ def this() = this(null, null) http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java index dc23b8d..81ba33a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java @@ -23,10 +23,13 @@ import static java.util.Objects.requireNonNull; import java.io.IOException; import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.streaming.api.watermark.Watermark; @@ -216,14 +219,20 @@ public class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<Stream @Override public MultiplexingStreamRecordSerializerConfigSnapshot snapshotConfiguration() { - return new MultiplexingStreamRecordSerializerConfigSnapshot(typeSerializer.snapshotConfiguration()); + return new MultiplexingStreamRecordSerializerConfigSnapshot<>(typeSerializer); } @Override public CompatibilityResult<StreamElement> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { if (configSnapshot instanceof MultiplexingStreamRecordSerializerConfigSnapshot) { - CompatibilityResult<T> compatResult = typeSerializer.ensureCompatibility( - ((MultiplexingStreamRecordSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerConfigSnapshot()); + Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousTypeSerializerAndConfig = + ((MultiplexingStreamRecordSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig(); + + CompatibilityResult<T> compatResult = CompatibilityUtil.resolveCompatibilityResult( + previousTypeSerializerAndConfig.f0, + UnloadableDummyTypeSerializer.class, + previousTypeSerializerAndConfig.f1, + typeSerializer); if (!compatResult.isRequiresMigration()) { return CompatibilityResult.compatible(); @@ -240,7 +249,7 @@ public class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<Stream /** * Configuration snapshot specific to the {@link MultiplexingStreamRecordSerializer}. */ - public static final class MultiplexingStreamRecordSerializerConfigSnapshot + public static final class MultiplexingStreamRecordSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot { private static final int VERSION = 1; @@ -248,8 +257,8 @@ public class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<Stream /** This empty nullary constructor is required for deserializing the configuration. */ public MultiplexingStreamRecordSerializerConfigSnapshot() {} - public MultiplexingStreamRecordSerializerConfigSnapshot(TypeSerializerConfigSnapshot typeSerializerConfigSnapshot) { - super(typeSerializerConfigSnapshot); + public MultiplexingStreamRecordSerializerConfigSnapshot(TypeSerializer<T> typeSerializer) { + super(typeSerializer); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java index 7b0390d..5c32c19 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java @@ -21,10 +21,13 @@ package org.apache.flink.migration.streaming.runtime.streamrecord; import java.io.IOException; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -155,14 +158,20 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord @Override public StreamRecordSerializerConfigSnapshot snapshotConfiguration() { - return new StreamRecordSerializerConfigSnapshot(typeSerializer.snapshotConfiguration()); + return new StreamRecordSerializerConfigSnapshot<>(typeSerializer); } @Override public CompatibilityResult<StreamRecord<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { if (configSnapshot instanceof StreamRecordSerializerConfigSnapshot) { - CompatibilityResult<T> compatResult = typeSerializer.ensureCompatibility( - ((StreamRecordSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerConfigSnapshot()); + Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousTypeSerializerAndConfig = + ((StreamRecordSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig(); + + CompatibilityResult<T> compatResult = CompatibilityUtil.resolveCompatibilityResult( + previousTypeSerializerAndConfig.f0, + UnloadableDummyTypeSerializer.class, + previousTypeSerializerAndConfig.f1, + typeSerializer); if (!compatResult.isRequiresMigration()) { return CompatibilityResult.compatible(); @@ -179,15 +188,15 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord /** * Configuration snapshot specific to the {@link StreamRecordSerializer}. */ - public static final class StreamRecordSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { + public static final class StreamRecordSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot { private static final int VERSION = 1; /** This empty nullary constructor is required for deserializing the configuration. */ public StreamRecordSerializerConfigSnapshot() {} - public StreamRecordSerializerConfigSnapshot(TypeSerializerConfigSnapshot typeSerializerConfigSnapshot) { - super(typeSerializerConfigSnapshot); + public StreamRecordSerializerConfigSnapshot(TypeSerializer<T> typeSerializer) { + super(typeSerializer); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/09cc3f7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java index ba69fed..390ac9d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java @@ -23,10 +23,13 @@ import static java.util.Objects.requireNonNull; import java.io.IOException; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.streaming.api.watermark.Watermark; @@ -277,14 +280,20 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme @Override public StreamElementSerializerConfigSnapshot snapshotConfiguration() { - return new StreamElementSerializerConfigSnapshot(typeSerializer.snapshotConfiguration()); + return new StreamElementSerializerConfigSnapshot<>(typeSerializer); } @Override public CompatibilityResult<StreamElement> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { if (configSnapshot instanceof StreamElementSerializerConfigSnapshot) { - CompatibilityResult<T> compatResult = typeSerializer.ensureCompatibility( - ((StreamElementSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerConfigSnapshot()); + Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousTypeSerializerAndConfig = + ((StreamElementSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig(); + + CompatibilityResult<T> compatResult = CompatibilityUtil.resolveCompatibilityResult( + previousTypeSerializerAndConfig.f0, + UnloadableDummyTypeSerializer.class, + previousTypeSerializerAndConfig.f1, + typeSerializer); if (!compatResult.isRequiresMigration()) { return CompatibilityResult.compatible(); @@ -301,15 +310,15 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme /** * Configuration snapshot specific to the {@link StreamElementSerializer}. */ - public static final class StreamElementSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { + public static final class StreamElementSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot { private static final int VERSION = 1; /** This empty nullary constructor is required for deserializing the configuration. */ public StreamElementSerializerConfigSnapshot() {} - public StreamElementSerializerConfigSnapshot(TypeSerializerConfigSnapshot typeSerializerConfigSnapshot) { - super(typeSerializerConfigSnapshot); + public StreamElementSerializerConfigSnapshot(TypeSerializer<T> typeSerializer) { + super(typeSerializer); } @Override
