http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java index a8368c4..2311158 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java @@ -34,13 +34,20 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.GenericTypeSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeSerializerUtil; +import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil; +import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.util.Preconditions; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -134,6 +141,25 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { this.subclassSerializerCache = new HashMap<>(); } + + public PojoSerializer( + Class<T> clazz, + Field[] fields, + TypeSerializer<Object>[] fieldSerializers, + LinkedHashMap<Class<?>, Integer> registeredClasses, + TypeSerializer<?>[] registeredSerializers, + HashMap<Class<?>, TypeSerializer<?>> subclassSerializerCache) { + + this.clazz = checkNotNull(clazz); + this.fields = checkNotNull(fields); + this.numFields = fields.length; + this.fieldSerializers = checkNotNull(fieldSerializers); + this.registeredClasses = checkNotNull(registeredClasses); + this.registeredSerializers = checkNotNull(registeredSerializers); + this.subclassSerializerCache = checkNotNull(subclassSerializerCache); + + this.executionConfig = null; + } @Override public boolean isImmutableType() { @@ -558,6 +584,8 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { if (configSnapshot instanceof PojoSerializerConfigSnapshot) { final PojoSerializerConfigSnapshot<T> config = (PojoSerializerConfigSnapshot<T>) configSnapshot; + boolean requiresMigration = false; + if (clazz.equals(config.getTypeClass())) { if (this.numFields == config.getFieldToSerializerConfigSnapshot().size()) { @@ -572,16 +600,27 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { (TypeSerializer<Object>[]) new TypeSerializer<?>[this.numFields]; int i = 0; - for (Map.Entry<Field, TypeSerializerConfigSnapshot> fieldToConfigSnapshotEntry + for (Map.Entry<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToConfigSnapshotEntry : config.getFieldToSerializerConfigSnapshot().entrySet()) { int fieldIndex = findField(fieldToConfigSnapshotEntry.getKey()); if (fieldIndex != -1) { reorderedFields[i] = fieldToConfigSnapshotEntry.getKey(); - compatResult = fieldSerializers[fieldIndex].ensureCompatibility(fieldToConfigSnapshotEntry.getValue()); + compatResult = CompatibilityUtil.resolveCompatibilityResult( + fieldToConfigSnapshotEntry.getValue().f0, + UnloadableDummyTypeSerializer.class, + fieldToConfigSnapshotEntry.getValue().f1, + fieldSerializers[fieldIndex]); + if (compatResult.isRequiresMigration()) { - return CompatibilityResult.requiresMigration(); + requiresMigration = true; + + if (compatResult.getConvertDeserializer() != null) { + reorderedFieldSerializers[i] = (TypeSerializer<Object>) compatResult.getConvertDeserializer(); + } else { + return CompatibilityResult.requiresMigration(); + } } else { reorderedFieldSerializers[i] = fieldSerializers[fieldIndex]; } @@ -599,7 +638,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { final LinkedHashMap<Class<?>, Integer> reorderedRegisteredSubclassesToClasstags; final TypeSerializer<?>[] reorderedRegisteredSubclassSerializers; - final LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> previousRegistrations = + final LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousRegistrations = config.getRegisteredSubclassesToSerializerConfigSnapshots(); // the reconfigured list of registered subclasses will be the previous registered @@ -615,11 +654,20 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { reorderedRegisteredSubclasses, executionConfig); i = 0; - for (TypeSerializerConfigSnapshot previousRegisteredSerializerConfig : previousRegistrations.values()) { + for (Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousRegisteredSerializerConfig : previousRegistrations.values()) { // check compatibility of subclass serializer - compatResult = reorderedRegisteredSubclassSerializers[i].ensureCompatibility(previousRegisteredSerializerConfig); + compatResult = CompatibilityUtil.resolveCompatibilityResult( + previousRegisteredSerializerConfig.f0, + UnloadableDummyTypeSerializer.class, + previousRegisteredSerializerConfig.f1, + reorderedRegisteredSubclassSerializers[i]); + if (compatResult.isRequiresMigration()) { - return CompatibilityResult.requiresMigration(); + requiresMigration = true; + + if (compatResult.getConvertDeserializer() == null) { + return CompatibilityResult.requiresMigration(); + } } i++; @@ -631,15 +679,26 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { // this won't be applied to this serializer until all compatibility checks have been completed HashMap<Class<?>, TypeSerializer<?>> rebuiltCache = new HashMap<>(); - for (Map.Entry<Class<?>, TypeSerializerConfigSnapshot> previousCachedEntry + for (Map.Entry<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousCachedEntry : config.getNonRegisteredSubclassesToSerializerConfigSnapshots().entrySet()) { TypeSerializer<?> cachedSerializer = createSubclassSerializer(previousCachedEntry.getKey()); // check compatibility of cached subclass serializer - compatResult = cachedSerializer.ensureCompatibility(previousCachedEntry.getValue()); + compatResult = CompatibilityUtil.resolveCompatibilityResult( + previousCachedEntry.getValue().f0, + UnloadableDummyTypeSerializer.class, + previousCachedEntry.getValue().f1, + cachedSerializer); + if (compatResult.isRequiresMigration()) { - return CompatibilityResult.requiresMigration(); + requiresMigration = true; + + if (compatResult.getConvertDeserializer() != null) { + rebuiltCache.put(previousCachedEntry.getKey(), cachedSerializer); + } else { + return CompatibilityResult.requiresMigration(); + } } else { rebuiltCache.put(previousCachedEntry.getKey(), cachedSerializer); } @@ -648,15 +707,26 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { // completed compatibility checks; up to this point, we can just reconfigure // the serializer so that it is compatible and migration is not required - this.fields = reorderedFields; - this.fieldSerializers = reorderedFieldSerializers; + if (!requiresMigration) { + this.fields = reorderedFields; + this.fieldSerializers = reorderedFieldSerializers; - this.registeredClasses = reorderedRegisteredSubclassesToClasstags; - this.registeredSerializers = reorderedRegisteredSubclassSerializers; + this.registeredClasses = reorderedRegisteredSubclassesToClasstags; + this.registeredSerializers = reorderedRegisteredSubclassSerializers; - this.subclassSerializerCache = rebuiltCache; + this.subclassSerializerCache = rebuiltCache; - return CompatibilityResult.compatible(); + return CompatibilityResult.compatible(); + } else { + return CompatibilityResult.requiresMigration( + new PojoSerializer<>( + clazz, + reorderedFields, + reorderedFieldSerializers, + reorderedRegisteredSubclassesToClasstags, + reorderedRegisteredSubclassSerializers, + rebuiltCache)); + } } } } @@ -669,39 +739,56 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { private static final int VERSION = 1; /** - * Ordered map of POJO fields to the configuration snapshots of their corresponding serializers. + * Ordered map of POJO fields to their corresponding serializers and its configuration snapshots. * * <p>Ordering of the fields is kept so that new Pojo serializers for previous data * may reorder the fields in case they are different. The order of the fields need to * stay the same for binary compatibility, as the field order is part of the serialization format. */ - private LinkedHashMap<Field, TypeSerializerConfigSnapshot> fieldToSerializerConfigSnapshot; + private LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot; /** - * Ordered map of registered subclasses to the configuration snapshots of their corresponding serializers. + * Ordered map of registered subclasses to their corresponding serializers and its configuration snapshots. * * <p>Ordering of the registered subclasses is kept so that new Pojo serializers for previous data * may retain the same class tag used for registered subclasses. Newly registered subclasses that * weren't present before should be appended with the next available class tag. */ - private LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> registeredSubclassesToSerializerConfigSnapshots; + private LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> registeredSubclassesToSerializerConfigSnapshots; /** - * Configuration snapshots of previously cached non-registered subclass serializers. + * Previously cached non-registered subclass serializers and its configuration snapshots. * * <p>This is kept so that new Pojo serializers may eagerly repopulate their * cache with reconfigured subclass serializers. */ - private HashMap<Class<?>, TypeSerializerConfigSnapshot> nonRegisteredSubclassesToSerializerConfigSnapshots; + private HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> nonRegisteredSubclassesToSerializerConfigSnapshots; + + private boolean ignoreTypeSerializerSerialization; /** This empty nullary constructor is required for deserializing the configuration. */ public PojoSerializerConfigSnapshot() {} public PojoSerializerConfigSnapshot( Class<T> pojoType, - LinkedHashMap<Field, TypeSerializerConfigSnapshot> fieldToSerializerConfigSnapshot, - LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> registeredSubclassesToSerializerConfigSnapshots, - HashMap<Class<?>, TypeSerializerConfigSnapshot> nonRegisteredSubclassesToSerializerConfigSnapshots) { + LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot, + LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> registeredSubclassesToSerializerConfigSnapshots, + HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> nonRegisteredSubclassesToSerializerConfigSnapshots) { + + this( + pojoType, + fieldToSerializerConfigSnapshot, + registeredSubclassesToSerializerConfigSnapshots, + nonRegisteredSubclassesToSerializerConfigSnapshots, + false); + } + + public PojoSerializerConfigSnapshot( + Class<T> pojoType, + LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot, + LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> registeredSubclassesToSerializerConfigSnapshots, + HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> nonRegisteredSubclassesToSerializerConfigSnapshots, + boolean ignoreTypeSerializerSerialization) { super(pojoType); @@ -711,37 +798,71 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { Preconditions.checkNotNull(registeredSubclassesToSerializerConfigSnapshots); this.nonRegisteredSubclassesToSerializerConfigSnapshots = Preconditions.checkNotNull(nonRegisteredSubclassesToSerializerConfigSnapshots); + + this.ignoreTypeSerializerSerialization = ignoreTypeSerializerSerialization; } @Override public void write(DataOutputView out) throws IOException { super.write(out); - // --- write fields and their serializers, in order + try ( + ByteArrayOutputStreamWithPos outWithPos = new ByteArrayOutputStreamWithPos(); + DataOutputViewStreamWrapper outViewWrapper = new DataOutputViewStreamWrapper(outWithPos)) { - out.writeInt(fieldToSerializerConfigSnapshot.size()); - for (Map.Entry<Field, TypeSerializerConfigSnapshot> entry - : fieldToSerializerConfigSnapshot.entrySet()) { - out.writeUTF(entry.getKey().getName()); - TypeSerializerUtil.writeSerializerConfigSnapshot(out, entry.getValue()); - } + // --- write fields and their serializers, in order - // --- write registered subclasses and their serializers, in registration order + out.writeInt(fieldToSerializerConfigSnapshot.size()); + for (Map.Entry<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry + : fieldToSerializerConfigSnapshot.entrySet()) { - out.writeInt(registeredSubclassesToSerializerConfigSnapshots.size()); - for (Map.Entry<Class<?>, TypeSerializerConfigSnapshot> entry - : registeredSubclassesToSerializerConfigSnapshots.entrySet()) { - out.writeUTF(entry.getKey().getName()); - TypeSerializerUtil.writeSerializerConfigSnapshot(out, entry.getValue()); - } + outViewWrapper.writeUTF(entry.getKey().getName()); + + out.writeInt(outWithPos.getPosition()); + if (!ignoreTypeSerializerSerialization) { + TypeSerializerSerializationUtil.writeSerializer(outViewWrapper, entry.getValue().f0); + } + + out.writeInt(outWithPos.getPosition()); + TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(outViewWrapper, entry.getValue().f1); + } + + // --- write registered subclasses and their serializers, in registration order + + out.writeInt(registeredSubclassesToSerializerConfigSnapshots.size()); + for (Map.Entry<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry + : registeredSubclassesToSerializerConfigSnapshots.entrySet()) { + + outViewWrapper.writeUTF(entry.getKey().getName()); + + out.writeInt(outWithPos.getPosition()); + if (!ignoreTypeSerializerSerialization) { + TypeSerializerSerializationUtil.writeSerializer(outViewWrapper, entry.getValue().f0); + } + + out.writeInt(outWithPos.getPosition()); + TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(outViewWrapper, entry.getValue().f1); + } + + // --- write snapshot of non-registered subclass serializer cache + + out.writeInt(nonRegisteredSubclassesToSerializerConfigSnapshots.size()); + for (Map.Entry<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry + : nonRegisteredSubclassesToSerializerConfigSnapshots.entrySet()) { + + outViewWrapper.writeUTF(entry.getKey().getName()); + + out.writeInt(outWithPos.getPosition()); + if (!ignoreTypeSerializerSerialization) { + TypeSerializerSerializationUtil.writeSerializer(outViewWrapper, entry.getValue().f0); + } - // --- write snapshot of non-registered subclass serializer cache + out.writeInt(outWithPos.getPosition()); + TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(outViewWrapper, entry.getValue().f1); + } - out.writeInt(nonRegisteredSubclassesToSerializerConfigSnapshots.size()); - for (Map.Entry<Class<?>, TypeSerializerConfigSnapshot> entry - : nonRegisteredSubclassesToSerializerConfigSnapshots.entrySet()) { - out.writeUTF(entry.getKey().getName()); - TypeSerializerUtil.writeSerializerConfigSnapshot(out, entry.getValue()); + out.writeInt(outWithPos.getPosition()); + out.write(outWithPos.getBuf(), 0 , outWithPos.getPosition()); } } @@ -749,74 +870,126 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { public void read(DataInputView in) throws IOException { super.read(in); - // --- read fields and their serializers, in order - int numFields = in.readInt(); - this.fieldToSerializerConfigSnapshot = new LinkedHashMap<>(numFields); - String fieldName; - Field field; + int[] fieldSerializerOffsets = new int[numFields * 2]; for (int i = 0; i < numFields; i++) { - fieldName = in.readUTF(); + fieldSerializerOffsets[i * 2] = in.readInt(); + fieldSerializerOffsets[i * 2 + 1] = in.readInt(); + } - // search all superclasses for the field - Class<?> clazz = getTypeClass(); - field = null; - while (clazz != null) { - try { - field = clazz.getDeclaredField(fieldName); - field.setAccessible(true); - break; - } catch (NoSuchFieldException e) { - clazz = clazz.getSuperclass(); + + int numRegisteredSubclasses = in.readInt(); + int[] registeredSerializerOffsets = new int[numRegisteredSubclasses * 2]; + for (int i = 0; i < numRegisteredSubclasses; i++) { + registeredSerializerOffsets[i * 2] = in.readInt(); + registeredSerializerOffsets[i * 2 + 1] = in.readInt(); + } + + int numCachedSubclassSerializers = in.readInt(); + int[] cachedSerializerOffsets = new int[numCachedSubclassSerializers * 2]; + for (int i = 0; i < numCachedSubclassSerializers; i++) { + cachedSerializerOffsets[i * 2] = in.readInt(); + cachedSerializerOffsets[i * 2 + 1] = in.readInt(); + } + + int totalBytes = in.readInt(); + byte[] buffer = new byte[totalBytes]; + in.readFully(buffer); + + try ( + ByteArrayInputStreamWithPos inWithPos = new ByteArrayInputStreamWithPos(buffer); + DataInputViewStreamWrapper inViewWrapper = new DataInputViewStreamWrapper(inWithPos)) { + + // --- read fields and their serializers, in order + + this.fieldToSerializerConfigSnapshot = new LinkedHashMap<>(numFields); + String fieldName; + Field field; + TypeSerializer<?> fieldSerializer; + TypeSerializerConfigSnapshot fieldSerializerConfigSnapshot; + for (int i = 0; i < numFields; i++) { + fieldName = inViewWrapper.readUTF(); + + // search all superclasses for the field + Class<?> clazz = getTypeClass(); + field = null; + while (clazz != null) { + try { + field = clazz.getDeclaredField(fieldName); + field.setAccessible(true); + break; + } catch (NoSuchFieldException e) { + clazz = clazz.getSuperclass(); + } } - } - if (field == null) { - // the field no longer exists in the POJO - throw new IOException("Can't find field " + fieldName + " in POJO class " + getTypeClass().getName()); - } else { - fieldToSerializerConfigSnapshot.put( + if (field == null) { + // the field no longer exists in the POJO + throw new IOException("Can't find field " + fieldName + " in POJO class " + getTypeClass().getName()); + } else { + inWithPos.setPosition(fieldSerializerOffsets[i * 2]); + fieldSerializer = TypeSerializerSerializationUtil.tryReadSerializer(inViewWrapper, getUserCodeClassLoader()); + + inWithPos.setPosition(fieldSerializerOffsets[i * 2 + 1]); + fieldSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(inViewWrapper, getUserCodeClassLoader()); + + fieldToSerializerConfigSnapshot.put( field, - TypeSerializerUtil.readSerializerConfigSnapshot(in, getUserCodeClassLoader())); + new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(fieldSerializer, fieldSerializerConfigSnapshot)); + } } - } - // --- read registered subclasses and their serializers, in registration order + // --- read registered subclasses and their serializers, in registration order - int numRegisteredSubclasses = in.readInt(); - this.registeredSubclassesToSerializerConfigSnapshots = new LinkedHashMap<>(numRegisteredSubclasses); - String registeredSubclassname; - Class<?> registeredSubclass; - for (int i = 0; i < numRegisteredSubclasses; i++) { - registeredSubclassname = in.readUTF(); - try { - registeredSubclass = Class.forName(registeredSubclassname, true, getUserCodeClassLoader()); - } catch (ClassNotFoundException e) { - throw new IOException("Cannot find requested class " + registeredSubclassname + " in classpath.", e); - } + this.registeredSubclassesToSerializerConfigSnapshots = new LinkedHashMap<>(numRegisteredSubclasses); + String registeredSubclassname; + Class<?> registeredSubclass; + TypeSerializer<?> registeredSubclassSerializer; + TypeSerializerConfigSnapshot registeredSubclassSerializerConfigSnapshot; + for (int i = 0; i < numRegisteredSubclasses; i++) { + registeredSubclassname = inViewWrapper.readUTF(); + try { + registeredSubclass = Class.forName(registeredSubclassname, true, getUserCodeClassLoader()); + } catch (ClassNotFoundException e) { + throw new IOException("Cannot find requested class " + registeredSubclassname + " in classpath.", e); + } - this.registeredSubclassesToSerializerConfigSnapshots.put( - registeredSubclass, - TypeSerializerUtil.readSerializerConfigSnapshot(in, getUserCodeClassLoader())); - } + inWithPos.setPosition(registeredSerializerOffsets[i * 2]); + registeredSubclassSerializer = TypeSerializerSerializationUtil.tryReadSerializer(inViewWrapper, getUserCodeClassLoader()); - // --- read snapshot of non-registered subclass serializer cache + inWithPos.setPosition(registeredSerializerOffsets[i * 2 + 1]); + registeredSubclassSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(inViewWrapper, getUserCodeClassLoader()); - int numCachedSubclassSerializers = in.readInt(); - this.nonRegisteredSubclassesToSerializerConfigSnapshots = new HashMap<>(numCachedSubclassSerializers); - String cachedSubclassname; - Class<?> cachedSubclass; - for (int i = 0; i < numCachedSubclassSerializers; i++) { - cachedSubclassname = in.readUTF(); - try { - cachedSubclass = Class.forName(cachedSubclassname, true, getUserCodeClassLoader()); - } catch (ClassNotFoundException e) { - throw new IOException("Cannot find requested class " + cachedSubclassname + " in classpath.", e); + this.registeredSubclassesToSerializerConfigSnapshots.put( + registeredSubclass, + new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(registeredSubclassSerializer, registeredSubclassSerializerConfigSnapshot)); } - this.nonRegisteredSubclassesToSerializerConfigSnapshots.put( + // --- read snapshot of non-registered subclass serializer cache + + this.nonRegisteredSubclassesToSerializerConfigSnapshots = new HashMap<>(numCachedSubclassSerializers); + String cachedSubclassname; + Class<?> cachedSubclass; + TypeSerializer<?> cachedSubclassSerializer; + TypeSerializerConfigSnapshot cachedSubclassSerializerConfigSnapshot; + for (int i = 0; i < numCachedSubclassSerializers; i++) { + cachedSubclassname = inViewWrapper.readUTF(); + try { + cachedSubclass = Class.forName(cachedSubclassname, true, getUserCodeClassLoader()); + } catch (ClassNotFoundException e) { + throw new IOException("Cannot find requested class " + cachedSubclassname + " in classpath.", e); + } + + inWithPos.setPosition(cachedSerializerOffsets[i * 2]); + cachedSubclassSerializer = TypeSerializerSerializationUtil.tryReadSerializer(inViewWrapper, getUserCodeClassLoader()); + + inWithPos.setPosition(cachedSerializerOffsets[i * 2 + 1]); + cachedSubclassSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(inViewWrapper, getUserCodeClassLoader()); + + this.nonRegisteredSubclassesToSerializerConfigSnapshots.put( cachedSubclass, - TypeSerializerUtil.readSerializerConfigSnapshot(in, getUserCodeClassLoader())); + new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(cachedSubclassSerializer, cachedSubclassSerializerConfigSnapshot)); + } } } @@ -825,15 +998,15 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { return VERSION; } - public LinkedHashMap<Field, TypeSerializerConfigSnapshot> getFieldToSerializerConfigSnapshot() { + public LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> getFieldToSerializerConfigSnapshot() { return fieldToSerializerConfigSnapshot; } - public LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> getRegisteredSubclassesToSerializerConfigSnapshots() { + public LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> getRegisteredSubclassesToSerializerConfigSnapshots() { return registeredSubclassesToSerializerConfigSnapshots; } - public HashMap<Class<?>, TypeSerializerConfigSnapshot> getNonRegisteredSubclassesToSerializerConfigSnapshots() { + public HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> getNonRegisteredSubclassesToSerializerConfigSnapshots() { return nonRegisteredSubclassesToSerializerConfigSnapshots; } @@ -1001,27 +1174,35 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { TypeSerializer<?>[] fieldSerializers, HashMap<Class<?>, TypeSerializer<?>> nonRegisteredSubclassSerializerCache) { - final LinkedHashMap<Field, TypeSerializerConfigSnapshot> fieldToSerializerConfigSnapshots = + final LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshots = new LinkedHashMap<>(fields.length); for (int i = 0; i < fields.length; i++) { - fieldToSerializerConfigSnapshots.put(fields[i], fieldSerializers[i].snapshotConfiguration()); + fieldToSerializerConfigSnapshots.put( + fields[i], + new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(fieldSerializers[i], fieldSerializers[i].snapshotConfiguration())); } - final LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> registeredSubclassesToSerializerConfigSnapshots = + final LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> registeredSubclassesToSerializerConfigSnapshots = new LinkedHashMap<>(registeredSubclassesToTags.size()); for (Map.Entry<Class<?>, Integer> entry : registeredSubclassesToTags.entrySet()) { registeredSubclassesToSerializerConfigSnapshots.put( entry.getKey(), - registeredSubclassSerializers[entry.getValue()].snapshotConfiguration()); + new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>( + registeredSubclassSerializers[entry.getValue()], + registeredSubclassSerializers[entry.getValue()].snapshotConfiguration())); } - final HashMap<Class<?>, TypeSerializerConfigSnapshot> nonRegisteredSubclassesToSerializerConfigSnapshots = + final HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> nonRegisteredSubclassesToSerializerConfigSnapshots = new LinkedHashMap<>(nonRegisteredSubclassSerializerCache.size()); for (Map.Entry<Class<?>, TypeSerializer<?>> entry : nonRegisteredSubclassSerializerCache.entrySet()) { - nonRegisteredSubclassesToSerializerConfigSnapshots.put(entry.getKey(), entry.getValue().snapshotConfiguration()); + nonRegisteredSubclassesToSerializerConfigSnapshots.put( + entry.getKey(), + new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>( + entry.getValue(), + entry.getValue().snapshotConfiguration())); } return new PojoSerializerConfigSnapshot<>(
http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java index ba41d4b..bd08b04 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java @@ -19,11 +19,13 @@ package org.apache.flink.api.java.typeutils.runtime; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeSerializerUtil; +import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.types.Row; @@ -31,6 +33,7 @@ import org.apache.flink.types.Row; import java.io.IOException; import java.io.ObjectInputStream; import java.util.Arrays; +import java.util.List; import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoAndCopyNullMask; import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask; @@ -254,22 +257,28 @@ public final class RowSerializer extends TypeSerializer<Row> { @Override public RowSerializerConfigSnapshot snapshotConfiguration() { - return new RowSerializerConfigSnapshot(TypeSerializerUtil.snapshotConfigurations(fieldSerializers)); + return new RowSerializerConfigSnapshot(fieldSerializers); } @Override public CompatibilityResult<Row> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { if (configSnapshot instanceof RowSerializerConfigSnapshot) { - TypeSerializerConfigSnapshot[] fieldSerializerConfigSnapshots = - ((RowSerializerConfigSnapshot) configSnapshot).getNestedSerializerConfigSnapshots(); + List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousFieldSerializersAndConfigs = + ((RowSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs(); - if (fieldSerializerConfigSnapshots.length == fieldSerializers.length) { + if (previousFieldSerializersAndConfigs.size() == fieldSerializers.length) { boolean requireMigration = false; TypeSerializer<?>[] convertDeserializers = new TypeSerializer<?>[fieldSerializers.length]; CompatibilityResult<?> compatResult; - for (int i = 0; i < fieldSerializers.length; i++) { - compatResult = fieldSerializers[i].ensureCompatibility(fieldSerializerConfigSnapshots[i]); + int i = 0; + for (Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> f : previousFieldSerializersAndConfigs) { + compatResult = CompatibilityUtil.resolveCompatibilityResult( + f.f0, + UnloadableDummyTypeSerializer.class, + f.f1, + fieldSerializers[i]); + if (compatResult.isRequiresMigration()) { requireMigration = true; @@ -281,6 +290,8 @@ public final class RowSerializer extends TypeSerializer<Row> { new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()); } } + + i++; } if (requireMigration) { @@ -301,8 +312,8 @@ public final class RowSerializer extends TypeSerializer<Row> { /** This empty nullary constructor is required for deserializing the configuration. */ public RowSerializerConfigSnapshot() {} - public RowSerializerConfigSnapshot(TypeSerializerConfigSnapshot[] fieldSerializerConfigSnapshots) { - super(fieldSerializerConfigSnapshots); + public RowSerializerConfigSnapshot(TypeSerializer[] fieldSerializers) { + super(fieldSerializers); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java index f485c3e..911c96f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java @@ -156,4 +156,9 @@ public class TupleSerializer<T extends Tuple> extends TupleSerializerBase<T> { throw new RuntimeException("Cannot instantiate tuple.", e); } } + + @Override + protected TupleSerializerBase<T> createSerializerInstance(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers) { + return new TupleSerializer<>(tupleClass, fieldSerializers); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java index 032c3f1..f12dcd9 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java @@ -20,14 +20,18 @@ package org.apache.flink.api.java.typeutils.runtime; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompatibilityUtil; +import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeSerializerUtil; +import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import java.io.IOException; import java.util.Arrays; +import java.util.List; import java.util.Objects; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -125,9 +129,7 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> { @Override public TupleSerializerConfigSnapshot<T> snapshotConfiguration() { - return new TupleSerializerConfigSnapshot<>( - tupleClass, - TypeSerializerUtil.snapshotConfigurations(fieldSerializers)); + return new TupleSerializerConfigSnapshot<>(tupleClass, fieldSerializers); } @SuppressWarnings("unchecked") @@ -137,24 +139,48 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> { final TupleSerializerConfigSnapshot<T> config = (TupleSerializerConfigSnapshot<T>) configSnapshot; if (tupleClass.equals(config.getTupleClass())) { - TypeSerializerConfigSnapshot[] fieldSerializerConfigSnapshots = - ((TupleSerializerConfigSnapshot) configSnapshot).getNestedSerializerConfigSnapshots(); + List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousFieldSerializersAndConfigs = + ((TupleSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs(); + + if (previousFieldSerializersAndConfigs.size() == fieldSerializers.length) { + + TypeSerializer<Object>[] convertFieldSerializers = new TypeSerializer[fieldSerializers.length]; + boolean requiresMigration = false; + CompatibilityResult<Object> compatResult; + int i = 0; + for (Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> f : previousFieldSerializersAndConfigs) { + compatResult = CompatibilityUtil.resolveCompatibilityResult( + f.f0, + UnloadableDummyTypeSerializer.class, + f.f1, + fieldSerializers[i]); - if (fieldSerializerConfigSnapshots.length == fieldSerializers.length) { - - CompatibilityResult compatResult; - for (int i = 0; i < fieldSerializers.length; i++) { - compatResult = fieldSerializers[i].ensureCompatibility(fieldSerializerConfigSnapshots[i]); if (compatResult.isRequiresMigration()) { - return CompatibilityResult.requiresMigration(); + requiresMigration = true; + + if (compatResult.getConvertDeserializer() != null) { + convertFieldSerializers[i] = + new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()); + } else { + return CompatibilityResult.requiresMigration(); + } } + + i++; } - return CompatibilityResult.compatible(); + if (!requiresMigration) { + return CompatibilityResult.compatible(); + } else { + return CompatibilityResult.requiresMigration( + createSerializerInstance(tupleClass, convertFieldSerializers)); + } } } } return CompatibilityResult.requiresMigration(); } + + protected abstract TupleSerializerBase<T> createSerializerInstance(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers); } http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java index 6d2bb5f..1e7701c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java @@ -20,7 +20,7 @@ package org.apache.flink.api.java.typeutils.runtime; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.util.InstantiationUtil; @@ -41,11 +41,8 @@ public final class TupleSerializerConfigSnapshot<T> extends CompositeTypeSeriali /** This empty nullary constructor is required for deserializing the configuration. */ public TupleSerializerConfigSnapshot() {} - public TupleSerializerConfigSnapshot( - Class<T> tupleClass, - TypeSerializerConfigSnapshot[] fieldSerializerConfigSnapshots) { - - super(fieldSerializerConfigSnapshots); + public TupleSerializerConfigSnapshot(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers) { + super(fieldSerializers); this.tupleClass = Preconditions.checkNotNull(tupleClass); } http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java index 73c4379..57015c7 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java @@ -109,14 +109,14 @@ public abstract class SerializerTestBase<T> extends TestLogger { byte[] serializedConfig; try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { - TypeSerializerUtil.writeSerializerConfigSnapshot( + TypeSerializerSerializationUtil.writeSerializerConfigSnapshot( new DataOutputViewStreamWrapper(out), configSnapshot); serializedConfig = out.toByteArray(); } TypeSerializerConfigSnapshot restoredConfig; try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) { - restoredConfig = TypeSerializerUtil.readSerializerConfigSnapshot( + restoredConfig = TypeSerializerSerializationUtil.readSerializerConfigSnapshot( new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader()); } http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotTest.java deleted file mode 100644 index 0783bb6..0000000 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotTest.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.common.typeutils; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.junit.Test; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.net.URL; -import java.net.URLClassLoader; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -/** - * Unit tests related to {@link TypeSerializerConfigSnapshot}. - */ -public class TypeSerializerConfigSnapshotTest { - - /** - * Verifies that reading and writing configuration snapshots work correctly. - */ - @Test - public void testSerializeConfigurationSnapshots() throws Exception { - TestConfigSnapshot configSnapshot1 = new TestConfigSnapshot(1, "foo"); - TestConfigSnapshot configSnapshot2 = new TestConfigSnapshot(2, "bar"); - - byte[] serializedConfig; - try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { - TypeSerializerUtil.writeSerializerConfigSnapshots( - new DataOutputViewStreamWrapper(out), - configSnapshot1, - configSnapshot2); - - serializedConfig = out.toByteArray(); - } - - TypeSerializerConfigSnapshot[] restoredConfigs; - try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) { - restoredConfigs = TypeSerializerUtil.readSerializerConfigSnapshots( - new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader()); - } - - assertEquals(2, restoredConfigs.length); - assertEquals(configSnapshot1, restoredConfigs[0]); - assertEquals(configSnapshot2, restoredConfigs[1]); - } - - /** - * Verifies that deserializing config snapshots fail if the config class could not be found. - */ - @Test - public void testFailsWhenConfigurationSnapshotClassNotFound() throws Exception { - byte[] serializedConfig; - try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { - TypeSerializerUtil.writeSerializerConfigSnapshot( - new DataOutputViewStreamWrapper(out), new TestConfigSnapshot(123, "foobar")); - serializedConfig = out.toByteArray(); - } - - try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) { - // read using a dummy classloader - TypeSerializerUtil.readSerializerConfigSnapshot( - new DataInputViewStreamWrapper(in), new URLClassLoader(new URL[0], null)); - fail("Expected a ClassNotFoundException wrapped in IOException"); - } catch (IOException expected) { - // test passes - } - } - - public static class TestConfigSnapshot extends TypeSerializerConfigSnapshot { - - static final int VERSION = 1; - - private int val; - private String msg; - - public TestConfigSnapshot() {} - - public TestConfigSnapshot(int val, String msg) { - this.val = val; - this.msg = msg; - } - - @Override - public void write(DataOutputView out) throws IOException { - super.write(out); - out.writeInt(val); - out.writeUTF(msg); - } - - @Override - public void read(DataInputView in) throws IOException { - super.read(in); - val = in.readInt(); - msg = in.readUTF(); - } - - @Override - public int getVersion() { - return VERSION; - } - - @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } - - if (obj == null) { - return false; - } - - if (obj instanceof TestConfigSnapshot) { - return val == ((TestConfigSnapshot) obj).val && msg.equals(((TestConfigSnapshot) obj).msg); - } else { - return false; - } - } - - @Override - public int hashCode() { - return 31 * val + msg.hashCode(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java deleted file mode 100644 index db1b4ef..0000000 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.common.typeutils; - -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; -import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.util.InstantiationUtil; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -import java.io.IOException; -import java.io.InvalidClassException; -import java.net.URL; -import java.net.URLClassLoader; - -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; - -@RunWith(PowerMockRunner.class) -@PrepareForTest(InstantiationUtil.class) -public class TypeSerializerSerializationProxyTest { - - @Test - public void testStateSerializerSerializationProxy() throws Exception { - - TypeSerializer<?> serializer = IntSerializer.INSTANCE; - - TypeSerializerSerializationProxy<?> proxy = new TypeSerializerSerializationProxy<>(serializer); - - byte[] serialized; - try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) { - proxy.write(new DataOutputViewStreamWrapper(out)); - serialized = out.toByteArray(); - } - - proxy = new TypeSerializerSerializationProxy<>(Thread.currentThread().getContextClassLoader()); - - try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) { - proxy.read(new DataInputViewStreamWrapper(in)); - } - - Assert.assertEquals(serializer, proxy.getTypeSerializer()); - } - - @Test - public void testStateSerializerSerializationProxyClassNotFound() throws Exception { - - TypeSerializer<?> serializer = IntSerializer.INSTANCE; - - TypeSerializerSerializationProxy<?> proxy = new TypeSerializerSerializationProxy<>(serializer); - - byte[] serialized; - try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) { - proxy.write(new DataOutputViewStreamWrapper(out)); - serialized = out.toByteArray(); - } - - proxy = new TypeSerializerSerializationProxy<>(new URLClassLoader(new URL[0], null)); - - try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) { - proxy.read(new DataInputViewStreamWrapper(in)); - fail("ClassNotFoundException expected, leading to IOException"); - } catch (IOException expected) { - - } - - proxy = new TypeSerializerSerializationProxy<>(new URLClassLoader(new URL[0], null), true); - - try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) { - proxy.read(new DataInputViewStreamWrapper(in)); - } - - Assert.assertTrue(proxy.getTypeSerializer() instanceof TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer); - - Assert.assertArrayEquals( - InstantiationUtil.serializeObject(serializer), - ((TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer<?>) proxy.getTypeSerializer()).getActualBytes()); - } - - @Test - public void testStateSerializerSerializationProxyInvalidClass() throws Exception { - - TypeSerializer<?> serializer = IntSerializer.INSTANCE; - - TypeSerializerSerializationProxy<?> proxy = new TypeSerializerSerializationProxy<>(serializer); - - byte[] serialized; - try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) { - proxy.write(new DataOutputViewStreamWrapper(out)); - serialized = out.toByteArray(); - } - - PowerMockito.spy(InstantiationUtil.class); - PowerMockito - .doThrow(new InvalidClassException("test invalid class exception")) - .when(InstantiationUtil.class, "deserializeObject", any(byte[].class), any(ClassLoader.class)); - - proxy = new TypeSerializerSerializationProxy<>(new URLClassLoader(new URL[0], null)); - - try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) { - proxy.read(new DataInputViewStreamWrapper(in)); - fail("InvalidClassException expected, leading to IOException"); - } catch (IOException expected) { - - } - - proxy = new TypeSerializerSerializationProxy<>(new URLClassLoader(new URL[0], null), true); - - try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) { - proxy.read(new DataInputViewStreamWrapper(in)); - } - - Assert.assertTrue(proxy.getTypeSerializer() instanceof TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer); - - Assert.assertArrayEquals( - InstantiationUtil.serializeObject(serializer), - ((TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer<?>) proxy.getTypeSerializer()).getActualBytes()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java new file mode 100644 index 0000000..738644b --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.common.typeutils.base.DoubleSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.InstantiationUtil; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InvalidClassException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; + +/** + * Unit tests for {@link TypeSerializerSerializationUtil}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(TypeSerializerSerializationUtil.class) +public class TypeSerializerSerializationUtilTest { + + /** + * Verifies that reading and writing serializers work correctly. + */ + @Test + public void testSerializerSerialization() throws Exception { + + TypeSerializer<?> serializer = IntSerializer.INSTANCE; + + byte[] serialized; + try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) { + TypeSerializerSerializationUtil.writeSerializer(new DataOutputViewStreamWrapper(out), serializer); + serialized = out.toByteArray(); + } + + TypeSerializer<?> deserializedSerializer; + try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) { + deserializedSerializer = TypeSerializerSerializationUtil.tryReadSerializer( + new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader()); + } + + Assert.assertEquals(serializer, deserializedSerializer); + } + + /** + * Verifies deserialization failure cases when reading a serializer from bytes, in the + * case of a {@link ClassNotFoundException}. + */ + @Test + public void testSerializerSerializationWithClassNotFound() throws Exception { + + TypeSerializer<?> serializer = IntSerializer.INSTANCE; + + byte[] serialized; + try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) { + TypeSerializerSerializationUtil.writeSerializer(new DataOutputViewStreamWrapper(out), serializer); + serialized = out.toByteArray(); + } + + TypeSerializer<?> deserializedSerializer; + + try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) { + deserializedSerializer = TypeSerializerSerializationUtil.tryReadSerializer( + new DataInputViewStreamWrapper(in), new URLClassLoader(new URL[0], null)); + } + Assert.assertEquals(null, deserializedSerializer); + + try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) { + deserializedSerializer = TypeSerializerSerializationUtil.tryReadSerializer( + new DataInputViewStreamWrapper(in), new URLClassLoader(new URL[0], null), true); + } + Assert.assertTrue(deserializedSerializer instanceof UnloadableDummyTypeSerializer); + + Assert.assertArrayEquals( + InstantiationUtil.serializeObject(serializer), + ((UnloadableDummyTypeSerializer<?>) deserializedSerializer).getActualBytes()); + } + + /** + * Verifies deserialization failure cases when reading a serializer from bytes, in the + * case of a {@link InvalidClassException}. + */ + @Test + public void testSerializerSerializationWithInvalidClass() throws Exception { + + TypeSerializer<?> serializer = IntSerializer.INSTANCE; + + byte[] serialized; + try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) { + TypeSerializerSerializationUtil.writeSerializer(new DataOutputViewStreamWrapper(out), serializer); + serialized = out.toByteArray(); + } + + TypeSerializer<?> deserializedSerializer; + + // mock failure when deserializing serializers + TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy = + mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class); + doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class)); + PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy); + + try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) { + deserializedSerializer = TypeSerializerSerializationUtil.tryReadSerializer( + new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader()); + } + Assert.assertEquals(null, deserializedSerializer); + } + + /** + * Verifies that reading and writing configuration snapshots work correctly. + */ + @Test + public void testSerializeConfigurationSnapshots() throws Exception { + TypeSerializerSerializationUtilTest.TestConfigSnapshot configSnapshot1 = + new TypeSerializerSerializationUtilTest.TestConfigSnapshot(1, "foo"); + + TypeSerializerSerializationUtilTest.TestConfigSnapshot configSnapshot2 = + new TypeSerializerSerializationUtilTest.TestConfigSnapshot(2, "bar"); + + byte[] serializedConfig; + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + TypeSerializerSerializationUtil.writeSerializerConfigSnapshots( + new DataOutputViewStreamWrapper(out), + configSnapshot1, + configSnapshot2); + + serializedConfig = out.toByteArray(); + } + + TypeSerializerConfigSnapshot[] restoredConfigs; + try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) { + restoredConfigs = TypeSerializerSerializationUtil.readSerializerConfigSnapshots( + new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader()); + } + + assertEquals(2, restoredConfigs.length); + assertEquals(configSnapshot1, restoredConfigs[0]); + assertEquals(configSnapshot2, restoredConfigs[1]); + } + + /** + * Verifies that deserializing config snapshots fail if the config class could not be found. + */ + @Test + public void testFailsWhenConfigurationSnapshotClassNotFound() throws Exception { + byte[] serializedConfig; + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + TypeSerializerSerializationUtil.writeSerializerConfigSnapshot( + new DataOutputViewStreamWrapper(out), new TypeSerializerSerializationUtilTest.TestConfigSnapshot(123, "foobar")); + serializedConfig = out.toByteArray(); + } + + try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) { + // read using a dummy classloader + TypeSerializerSerializationUtil.readSerializerConfigSnapshot( + new DataInputViewStreamWrapper(in), new URLClassLoader(new URL[0], null)); + fail("Expected a ClassNotFoundException wrapped in IOException"); + } catch (IOException expected) { + // test passes + } + } + + /** + * Verifies resilience to serializer deserialization failures when writing and reading + * serializer and config snapshot pairs. + */ + @Test + public void testSerializerAndConfigPairsSerializationWithSerializerDeserializationFailures() throws Exception { + List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> serializersAndConfigs = Arrays.asList( + new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>( + IntSerializer.INSTANCE, IntSerializer.INSTANCE.snapshotConfiguration()), + new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>( + DoubleSerializer.INSTANCE, DoubleSerializer.INSTANCE.snapshotConfiguration())); + + byte[] serializedSerializersAndConfigs; + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience( + new DataOutputViewStreamWrapper(out), serializersAndConfigs); + serializedSerializersAndConfigs = out.toByteArray(); + } + + // mock failure when deserializing serializers + TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy = + mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class); + doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class)); + PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy); + + List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> restored; + try (ByteArrayInputStream in = new ByteArrayInputStream(serializedSerializersAndConfigs)) { + restored = TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience( + new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader()); + } + + Assert.assertEquals(2, restored.size()); + Assert.assertEquals(null, restored.get(0).f0); + Assert.assertEquals(IntSerializer.INSTANCE.snapshotConfiguration(), restored.get(0).f1); + Assert.assertEquals(null, restored.get(1).f0); + Assert.assertEquals(DoubleSerializer.INSTANCE.snapshotConfiguration(), restored.get(1).f1); + } + + public static class TestConfigSnapshot extends TypeSerializerConfigSnapshot { + + static final int VERSION = 1; + + private int val; + private String msg; + + public TestConfigSnapshot() {} + + public TestConfigSnapshot(int val, String msg) { + this.val = val; + this.msg = msg; + } + + @Override + public void write(DataOutputView out) throws IOException { + super.write(out); + out.writeInt(val); + out.writeUTF(msg); + } + + @Override + public void read(DataInputView in) throws IOException { + super.read(in); + val = in.readInt(); + msg = in.readUTF(); + } + + @Override + public int getVersion() { + return VERSION; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (obj == null) { + return false; + } + + if (obj instanceof TypeSerializerSerializationUtilTest.TestConfigSnapshot) { + return val == ((TypeSerializerSerializationUtilTest.TestConfigSnapshot) obj).val + && msg.equals(((TypeSerializerSerializationUtilTest.TestConfigSnapshot) obj).msg); + } else { + return false; + } + } + + @Override + public int hashCode() { + return 31 * val + msg.hashCode(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java index 16ea945..e3ce3ee 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java @@ -21,7 +21,7 @@ package org.apache.flink.api.common.typeutils.base; import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.SerializerTestInstance; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeSerializerUtil; +import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.util.InstantiationUtil; @@ -95,14 +95,14 @@ public class EnumSerializerTest extends TestLogger { byte[] serializedConfig; try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { - TypeSerializerUtil.writeSerializerConfigSnapshot( + TypeSerializerSerializationUtil.writeSerializerConfigSnapshot( new DataOutputViewStreamWrapper(out), serializer.snapshotConfiguration()); serializedConfig = out.toByteArray(); } TypeSerializerConfigSnapshot restoredConfig; try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) { - restoredConfig = TypeSerializerUtil.readSerializerConfigSnapshot( + restoredConfig = TypeSerializerSerializationUtil.readSerializerConfigSnapshot( new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader()); } http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java index c77ffcc..10f4708 100644 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java @@ -20,12 +20,14 @@ package org.apache.flink.api.java.typeutils.runtime; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Random; @@ -39,8 +41,9 @@ import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor; import org.apache.flink.api.common.operators.Keys.ExpressionKeys; import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeSerializerUtil; +import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil; import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; @@ -50,14 +53,23 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; /** * A test for the {@link PojoSerializer}. */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(TypeSerializerSerializationUtil.class) public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.TestUserClass> { private TypeInformation<TestUserClass> type = TypeExtractor.getForClass(TestUserClass.class); @@ -286,7 +298,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = pojoSerializer1.snapshotConfiguration(); byte[] serializedConfig; try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { - TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot); + TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot); serializedConfig = out.toByteArray(); } @@ -295,7 +307,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te // read configuration again from bytes try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) { - pojoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot( + pojoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot( new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader()); } @@ -322,7 +334,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = pojoSerializer.snapshotConfiguration(); byte[] serializedConfig; try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { - TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot); + TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot); serializedConfig = out.toByteArray(); } @@ -335,7 +347,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te // read configuration from bytes try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) { - pojoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot( + pojoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot( new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader()); } @@ -368,7 +380,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = pojoSerializer.snapshotConfiguration(); byte[] serializedConfig; try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { - TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot); + TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot); serializedConfig = out.toByteArray(); } @@ -378,7 +390,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te // read configuration from bytes try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) { - pojoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot( + pojoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot( new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader()); } @@ -426,7 +438,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = pojoSerializer.snapshotConfiguration(); byte[] serializedConfig; try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { - TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot); + TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot); serializedConfig = out.toByteArray(); } @@ -438,7 +450,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te // read configuration from bytes try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) { - pojoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot( + pojoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot( new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader()); } @@ -472,14 +484,38 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te // creating this serializer just for generating config snapshots of the field serializers PojoSerializer<TestUserClass> ser = (PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig()); - LinkedHashMap<Field, TypeSerializerConfigSnapshot> mockOriginalFieldToSerializerConfigSnapshot = + LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> mockOriginalFieldToSerializerConfigSnapshot = new LinkedHashMap<>(mockOriginalFieldOrder.length); - mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[0], ser.getFieldSerializers()[3].snapshotConfiguration()); - mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[1], ser.getFieldSerializers()[2].snapshotConfiguration()); - mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[2], ser.getFieldSerializers()[5].snapshotConfiguration()); - mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[3], ser.getFieldSerializers()[0].snapshotConfiguration()); - mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[4], ser.getFieldSerializers()[1].snapshotConfiguration()); - mockOriginalFieldToSerializerConfigSnapshot.put(mockOriginalFieldOrder[5], ser.getFieldSerializers()[4].snapshotConfiguration()); + mockOriginalFieldToSerializerConfigSnapshot.put( + mockOriginalFieldOrder[0], + new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>( + ser.getFieldSerializers()[3], + ser.getFieldSerializers()[3].snapshotConfiguration())); + mockOriginalFieldToSerializerConfigSnapshot.put( + mockOriginalFieldOrder[1], + new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>( + ser.getFieldSerializers()[2], + ser.getFieldSerializers()[2].snapshotConfiguration())); + mockOriginalFieldToSerializerConfigSnapshot.put( + mockOriginalFieldOrder[2], + new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>( + ser.getFieldSerializers()[5], + ser.getFieldSerializers()[5].snapshotConfiguration())); + mockOriginalFieldToSerializerConfigSnapshot.put( + mockOriginalFieldOrder[3], + new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>( + ser.getFieldSerializers()[0], + ser.getFieldSerializers()[0].snapshotConfiguration())); + mockOriginalFieldToSerializerConfigSnapshot.put( + mockOriginalFieldOrder[4], + new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>( + ser.getFieldSerializers()[1], + ser.getFieldSerializers()[1].snapshotConfiguration())); + mockOriginalFieldToSerializerConfigSnapshot.put( + mockOriginalFieldOrder[5], + new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>( + ser.getFieldSerializers()[4], + ser.getFieldSerializers()[4].snapshotConfiguration())); PojoSerializer<TestUserClass> pojoSerializer = (PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig()); @@ -494,13 +530,11 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te new PojoSerializer.PojoSerializerConfigSnapshot<>( TestUserClass.class, mockOriginalFieldToSerializerConfigSnapshot, // this mocks the previous field order - new LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot>(), // empty; irrelevant for this test - new HashMap<Class<?>, TypeSerializerConfigSnapshot>()); // empty; irrelevant for this test + new LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>>(), // empty; irrelevant for this test + new HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>>()); // empty; irrelevant for this test // reconfigure - check reconfiguration result and that fields are reordered to the previous order - CompatibilityResult<TestUserClass> compatResult = pojoSerializer.ensureCompatibility( - - mockPreviousConfigSnapshot); + CompatibilityResult<TestUserClass> compatResult = pojoSerializer.ensureCompatibility(mockPreviousConfigSnapshot); assertFalse(compatResult.isRequiresMigration()); int i = 0; for (Field field : mockOriginalFieldOrder) { @@ -508,4 +542,74 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te i++; } } + + @SuppressWarnings("unchecked") + @Test + public void testSerializerSerializationFailureResilience() throws Exception{ + PojoSerializer<TestUserClass> pojoSerializer = (PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig()); + + // snapshot configuration and serialize to bytes + PojoSerializer.PojoSerializerConfigSnapshot<TestUserClass> config = pojoSerializer.snapshotConfiguration(); + byte[] serializedConfig; + try ( + ByteArrayOutputStream out = new ByteArrayOutputStream()) { + TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), config); + serializedConfig = out.toByteArray(); + } + + // mock failure when deserializing serializers + TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy = + mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class); + doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class)); + PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy); + + // read configuration from bytes + PojoSerializer.PojoSerializerConfigSnapshot<TestUserClass> deserializedConfig; + try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) { + deserializedConfig = (PojoSerializer.PojoSerializerConfigSnapshot<TestUserClass>) + TypeSerializerSerializationUtil.readSerializerConfigSnapshot( + new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader()); + } + + Assert.assertFalse(pojoSerializer.ensureCompatibility(deserializedConfig).isRequiresMigration()); + verifyPojoSerializerConfigSnapshotWithSerializerSerializationFailure(config, deserializedConfig); + } + + private static void verifyPojoSerializerConfigSnapshotWithSerializerSerializationFailure( + PojoSerializer.PojoSerializerConfigSnapshot<?> original, + PojoSerializer.PojoSerializerConfigSnapshot<?> deserializedConfig) { + + LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> originalFieldSerializersAndConfs = + original.getFieldToSerializerConfigSnapshot(); + for (Map.Entry<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry + : deserializedConfig.getFieldToSerializerConfigSnapshot().entrySet()) { + + Assert.assertEquals(null, entry.getValue().f0); + + if (entry.getValue().f1 instanceof PojoSerializer.PojoSerializerConfigSnapshot) { + verifyPojoSerializerConfigSnapshotWithSerializerSerializationFailure( + (PojoSerializer.PojoSerializerConfigSnapshot<?>) originalFieldSerializersAndConfs.get(entry.getKey()).f1, + (PojoSerializer.PojoSerializerConfigSnapshot<?>) entry.getValue().f1); + } else { + Assert.assertEquals(originalFieldSerializersAndConfs.get(entry.getKey()).f1, entry.getValue().f1); + } + } + + LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> originalRegistrations = + original.getRegisteredSubclassesToSerializerConfigSnapshots(); + + for (Map.Entry<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry + : deserializedConfig.getRegisteredSubclassesToSerializerConfigSnapshots().entrySet()) { + + Assert.assertEquals(null, entry.getValue().f0); + + if (entry.getValue().f1 instanceof PojoSerializer.PojoSerializerConfigSnapshot) { + verifyPojoSerializerConfigSnapshotWithSerializerSerializationFailure( + (PojoSerializer.PojoSerializerConfigSnapshot<?>) originalRegistrations.get(entry.getKey()).f1, + (PojoSerializer.PojoSerializerConfigSnapshot<?>) entry.getValue().f1); + } else { + Assert.assertEquals(originalRegistrations.get(entry.getKey()).f1, entry.getValue().f1); + } + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/a7bc5de9/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java index 860c560..5a404bd 100644 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java @@ -25,7 +25,7 @@ import com.esotericsoftware.kryo.io.Output; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeSerializerUtil; +import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.junit.Test; @@ -53,7 +53,7 @@ public class KryoSerializerCompatibilityTest { TypeSerializerConfigSnapshot kryoSerializerConfigSnapshot = kryoSerializerForA.snapshotConfiguration(); byte[] serializedConfig; try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { - TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), kryoSerializerConfigSnapshot); + TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), kryoSerializerConfigSnapshot); serializedConfig = out.toByteArray(); } @@ -61,7 +61,7 @@ public class KryoSerializerCompatibilityTest { // read configuration again from bytes try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) { - kryoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot( + kryoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot( new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader()); } @@ -91,7 +91,7 @@ public class KryoSerializerCompatibilityTest { TypeSerializerConfigSnapshot kryoSerializerConfigSnapshot = kryoSerializer.snapshotConfiguration(); byte[] serializedConfig; try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { - TypeSerializerUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), kryoSerializerConfigSnapshot); + TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), kryoSerializerConfigSnapshot); serializedConfig = out.toByteArray(); } @@ -104,7 +104,7 @@ public class KryoSerializerCompatibilityTest { // read configuration from bytes try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) { - kryoSerializerConfigSnapshot = TypeSerializerUtil.readSerializerConfigSnapshot( + kryoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot( new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader()); }
