http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java index 1a9c8f9..08da49e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java @@ -23,22 +23,25 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.Field; import java.lang.reflect.Modifier; -import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedHashSet; -import java.util.List; import java.util.Map; import java.util.Objects; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.GenericTypeSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerUtil; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -53,22 +56,55 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { private static final long serialVersionUID = 1L; - private final Class<T> clazz; + // -------------------------------------------------------------------------------------------- + // PojoSerializer parameters + // -------------------------------------------------------------------------------------------- - private final TypeSerializer<Object>[] fieldSerializers; + /** The POJO type class. */ + private final Class<T> clazz; + /** + * Fields of the POJO and their serializers. + * + * <p>The fields are kept as a separate transient member, with their serialization + * handled with the {@link #readObject(ObjectInputStream)} and {@link #writeObject(ObjectOutputStream)} + * methods. + * + * <p>These may be reconfigured in {@link #ensureCompatibility(TypeSerializerConfigSnapshot)}. + */ + private transient Field[] fields; + private TypeSerializer<Object>[] fieldSerializers; private final int numFields; - private final Map<Class<?>, Integer> registeredClasses; - - private final TypeSerializer<?>[] registeredSerializers; - + /** + * Registered subclasses and their serializers. + * Each subclass to their registered class tag is maintained as a separate map ordered by the class tag. + * + * <p>These may be reconfigured in {@link #ensureCompatibility(TypeSerializerConfigSnapshot)}. + */ + private LinkedHashMap<Class<?>, Integer> registeredClasses; + private TypeSerializer<?>[] registeredSerializers; + + /** + * Cache of non-registered subclasses to their serializers, created on-the-fly. + * + * <p>This cache is persisted and will be repopulated with reconfigured serializers + * in {@link #ensureCompatibility(TypeSerializerConfigSnapshot)}. + */ + private transient HashMap<Class<?>, TypeSerializer<?>> subclassSerializerCache; + + // -------------------------------------------------------------------------------------------- + + /** + * Configuration of the current execution. + * + * <p>Nested serializers created using this will have the most up-to-date configuration, + * and can be resolved for backwards compatibility with previous configuration + * snapshots in {@link #ensureCompatibility(TypeSerializerConfigSnapshot)}. + */ private final ExecutionConfig executionConfig; - private transient Map<Class<?>, TypeSerializer<?>> subclassSerializerCache; private transient ClassLoader cl; - // We need to handle these ourselves in writeObject()/readObject() - private transient Field[] fields; @SuppressWarnings("unchecked") public PojoSerializer( @@ -83,93 +119,20 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { this.numFields = fieldSerializers.length; this.executionConfig = checkNotNull(executionConfig); - LinkedHashSet<Class<?>> registeredPojoTypes = executionConfig.getRegisteredPojoTypes(); - for (int i = 0; i < numFields; i++) { this.fields[i].setAccessible(true); } cl = Thread.currentThread().getContextClassLoader(); - subclassSerializerCache = new HashMap<Class<?>, TypeSerializer<?>>(); - // We only want those classes that are not our own class and are actually sub-classes. - List<Class<?>> cleanedTaggedClasses = new ArrayList<Class<?>>(registeredPojoTypes.size()); - for (Class<?> registeredClass: registeredPojoTypes) { - if (registeredClass.equals(clazz)) { - continue; - } - if (!clazz.isAssignableFrom(registeredClass)) { - continue; - } - cleanedTaggedClasses.add(registeredClass); - - } - this.registeredClasses = new LinkedHashMap<Class<?>, Integer>(cleanedTaggedClasses.size()); - registeredSerializers = new TypeSerializer[cleanedTaggedClasses.size()]; + LinkedHashSet<Class<?>> registeredSubclasses = + getRegisteredSubclassesFromExecutionConfig(clazz, executionConfig); - int id = 0; - for (Class<?> registeredClass: cleanedTaggedClasses) { - this.registeredClasses.put(registeredClass, id); - TypeInformation<?> typeInfo = TypeExtractor.createTypeInfo(registeredClass); - registeredSerializers[id] = typeInfo.createSerializer(executionConfig); + this.registeredClasses = createRegisteredSubclassTags(registeredSubclasses); + this.registeredSerializers = createRegisteredSubclassSerializers(registeredSubclasses, executionConfig); - id++; - } - } - - private void writeObject(ObjectOutputStream out) - throws IOException, ClassNotFoundException { - out.defaultWriteObject(); - out.writeInt(fields.length); - for (Field field: fields) { - FieldSerializer.serializeField(field, out); - } - } - - private void readObject(ObjectInputStream in) - throws IOException, ClassNotFoundException { - in.defaultReadObject(); - int numFields = in.readInt(); - fields = new Field[numFields]; - for (int i = 0; i < numFields; i++) { - fields[i] = FieldSerializer.deserializeField(in); - } - - cl = Thread.currentThread().getContextClassLoader(); - subclassSerializerCache = new HashMap<Class<?>, TypeSerializer<?>>(); - } - - private TypeSerializer<?> getSubclassSerializer(Class<?> subclass) { - TypeSerializer<?> result = subclassSerializerCache.get(subclass); - if (result == null) { - - TypeInformation<?> typeInfo = TypeExtractor.createTypeInfo(subclass); - result = typeInfo.createSerializer(executionConfig); - if (result instanceof PojoSerializer) { - PojoSerializer<?> subclassSerializer = (PojoSerializer<?>) result; - subclassSerializer.copyBaseFieldOrder(this); - } - subclassSerializerCache.put(subclass, result); - - } - return result; - } - - @SuppressWarnings("unused") - private boolean hasField(Field f) { - for (Field field: fields) { - if (f.equals(field)) { - return true; - } - } - return false; - } - - private void copyBaseFieldOrder(PojoSerializer<?> baseSerializer) { - // do nothing for now, but in the future, adapt subclass serializer to have same - // ordering as base class serializer so that binary comparison on base class fields - // can work + this.subclassSerializerCache = new HashMap<>(); } @Override @@ -296,7 +259,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { } } } catch (IllegalAccessException e) { - throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" + "before."); + throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields before.", e); } } else { TypeSerializer subclassSerializer = getSubclassSerializer(actualType); @@ -341,13 +304,15 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { target.writeByte(flags); + // if its a registered subclass, write the class tag id, otherwise write the full classname if ((flags & IS_SUBCLASS) != 0) { target.writeUTF(actualClass.getName()); } else if ((flags & IS_TAGGED_SUBCLASS) != 0) { target.writeByte(subclassTag); } - + // if its a subclass, use the corresponding subclass serializer, + // otherwise serialize each field with our field serializers if ((flags & NO_SUBCLASS) != 0) { try { for (int i = 0; i < numFields; i++) { @@ -360,8 +325,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { } } } catch (IllegalAccessException e) { - throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" + "before."); - + throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields before.", e); } } else { // subclass @@ -418,8 +382,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { } } } catch (IllegalAccessException e) { - throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" + "before."); - + throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields before.", e); } } else { if (subclassSerializer != null) { @@ -493,8 +456,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { } } } catch (IllegalAccessException e) { - throw new RuntimeException( - "Error during POJO copy, this should not happen since we check the fields before."); + throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields before.", e); } } else { if (subclassSerializer != null) { @@ -574,4 +536,527 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { public boolean canEqual(Object obj) { return obj instanceof PojoSerializer; } + + // -------------------------------------------------------------------------------------------- + // Serializer configuration snapshotting & compatibility + // -------------------------------------------------------------------------------------------- + + @Override + public PojoSerializerConfigSnapshot<T> snapshotConfiguration() { + return buildConfigSnapshot( + clazz, + registeredClasses, + registeredSerializers, + fields, + fieldSerializers, + subclassSerializerCache); + } + + @SuppressWarnings("unchecked") + @Override + public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof PojoSerializerConfigSnapshot) { + final PojoSerializerConfigSnapshot<T> config = (PojoSerializerConfigSnapshot<T>) configSnapshot; + + if (clazz.equals(config.getTypeClass())) { + if (this.numFields == config.getFieldToSerializerConfigSnapshot().size()) { + + CompatibilityResult<?> compatResult; + + // ----------- check field order and compatibility of field serializers ----------- + + // reordered fields and their serializers; + // this won't be applied to this serializer until all compatibility checks have been completed + final Field[] reorderedFields = new Field[this.numFields]; + final TypeSerializer<Object>[] reorderedFieldSerializers = + (TypeSerializer<Object>[]) new TypeSerializer<?>[this.numFields]; + + int i = 0; + for (Map.Entry<Field, TypeSerializerConfigSnapshot> fieldToConfigSnapshotEntry + : config.getFieldToSerializerConfigSnapshot().entrySet()) { + + int fieldIndex = findField(fieldToConfigSnapshotEntry.getKey()); + if (fieldIndex != -1) { + reorderedFields[i] = fieldToConfigSnapshotEntry.getKey(); + + compatResult = fieldSerializers[fieldIndex].ensureCompatibility(fieldToConfigSnapshotEntry.getValue()); + if (compatResult.requiresMigration()) { + return CompatibilityResult.requiresMigration(null); + } else { + reorderedFieldSerializers[i] = fieldSerializers[fieldIndex]; + } + } else { + return CompatibilityResult.requiresMigration(null); + } + + i++; + } + + // ---- check subclass registration order and compatibility of registered serializers ---- + + // reordered subclass registrations and their serializers; + // this won't be applied to this serializer until all compatibility checks have been completed + final LinkedHashMap<Class<?>, Integer> reorderedRegisteredSubclassesToClasstags; + final TypeSerializer<?>[] reorderedRegisteredSubclassSerializers; + + final LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> previousRegistrations = + config.getRegisteredSubclassesToSerializerConfigSnapshots(); + + // the reconfigured list of registered subclasses will be the previous registered + // subclasses in the original order with new subclasses appended at the end + LinkedHashSet<Class<?>> reorderedRegisteredSubclasses = new LinkedHashSet<>(); + reorderedRegisteredSubclasses.addAll(previousRegistrations.keySet()); + reorderedRegisteredSubclasses.addAll( + getRegisteredSubclassesFromExecutionConfig(clazz, executionConfig)); + + // re-establish the registered class tags and serializers + reorderedRegisteredSubclassesToClasstags = createRegisteredSubclassTags(reorderedRegisteredSubclasses); + reorderedRegisteredSubclassSerializers = createRegisteredSubclassSerializers( + reorderedRegisteredSubclasses, executionConfig); + + i = 0; + for (TypeSerializerConfigSnapshot previousRegisteredSerializerConfig : previousRegistrations.values()) { + // check compatibility of subclass serializer + compatResult = reorderedRegisteredSubclassSerializers[i].ensureCompatibility(previousRegisteredSerializerConfig); + if (compatResult.requiresMigration()) { + return CompatibilityResult.requiresMigration(null); + } + + i++; + } + + // ------------------ ensure compatibility of non-registered subclass serializers ------------------ + + // the rebuilt cache for non-registered subclass serializers; + // this won't be applied to this serializer until all compatibility checks have been completed + HashMap<Class<?>, TypeSerializer<?>> rebuiltCache = new HashMap<>(); + + for (Map.Entry<Class<?>, TypeSerializerConfigSnapshot> previousCachedEntry + : config.getNonRegisteredSubclassesToSerializerConfigSnapshots().entrySet()) { + + TypeSerializer<?> cachedSerializer = createSubclassSerializer(previousCachedEntry.getKey()); + + // check compatibility of cached subclass serializer + compatResult = cachedSerializer.ensureCompatibility(previousCachedEntry.getValue()); + if (compatResult.requiresMigration()) { + return CompatibilityResult.requiresMigration(null); + } else { + rebuiltCache.put(previousCachedEntry.getKey(), cachedSerializer); + } + } + + // completed compatibility checks; up to this point, we can just reconfigure + // the serializer so that it is compatible and migration is not required + + this.fields = reorderedFields; + this.fieldSerializers = reorderedFieldSerializers; + + this.registeredClasses = reorderedRegisteredSubclassesToClasstags; + this.registeredSerializers = reorderedRegisteredSubclassSerializers; + + this.subclassSerializerCache = rebuiltCache; + + return CompatibilityResult.compatible(); + } + } + } + + return CompatibilityResult.requiresMigration(null); + } + + public static final class PojoSerializerConfigSnapshot<T> extends GenericTypeSerializerConfigSnapshot<T> { + + private static final int VERSION = 1; + + /** + * Ordered map of POJO fields to the configuration snapshots of their corresponding serializers. + * + * <p>Ordering of the fields is kept so that new Pojo serializers for previous data + * may reorder the fields in case they are different. The order of the fields need to + * stay the same for binary compatibility, as the field order is part of the serialization format. + */ + private LinkedHashMap<Field, TypeSerializerConfigSnapshot> fieldToSerializerConfigSnapshot; + + /** + * Ordered map of registered subclasses to the configuration snapshots of their corresponding serializers. + * + * <p>Ordering of the registered subclasses is kept so that new Pojo serializers for previous data + * may retain the same class tag used for registered subclasses. Newly registered subclasses that + * weren't present before should be appended with the next available class tag. + */ + private LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> registeredSubclassesToSerializerConfigSnapshots; + + /** + * Configuration snapshots of previously cached non-registered subclass serializers. + * + * <p>This is kept so that new Pojo serializers may eagerly repopulate their + * cache with reconfigured subclass serializers. + */ + private HashMap<Class<?>, TypeSerializerConfigSnapshot> nonRegisteredSubclassesToSerializerConfigSnapshots; + + /** This empty nullary constructor is required for deserializing the configuration. */ + public PojoSerializerConfigSnapshot() {} + + public PojoSerializerConfigSnapshot( + Class<T> pojoType, + LinkedHashMap<Field, TypeSerializerConfigSnapshot> fieldToSerializerConfigSnapshot, + LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> registeredSubclassesToSerializerConfigSnapshots, + HashMap<Class<?>, TypeSerializerConfigSnapshot> nonRegisteredSubclassesToSerializerConfigSnapshots) { + + super(pojoType); + + this.fieldToSerializerConfigSnapshot = + Preconditions.checkNotNull(fieldToSerializerConfigSnapshot); + this.registeredSubclassesToSerializerConfigSnapshots = + Preconditions.checkNotNull(registeredSubclassesToSerializerConfigSnapshots); + this.nonRegisteredSubclassesToSerializerConfigSnapshots = + Preconditions.checkNotNull(nonRegisteredSubclassesToSerializerConfigSnapshots); + } + + @Override + public void write(DataOutputView out) throws IOException { + super.write(out); + + // --- write fields and their serializers, in order + + out.writeInt(fieldToSerializerConfigSnapshot.size()); + for (Map.Entry<Field, TypeSerializerConfigSnapshot> entry + : fieldToSerializerConfigSnapshot.entrySet()) { + out.writeUTF(entry.getKey().getName()); + TypeSerializerUtil.writeSerializerConfigSnapshot(out, entry.getValue()); + } + + // --- write registered subclasses and their serializers, in registration order + + out.writeInt(registeredSubclassesToSerializerConfigSnapshots.size()); + for (Map.Entry<Class<?>, TypeSerializerConfigSnapshot> entry + : registeredSubclassesToSerializerConfigSnapshots.entrySet()) { + out.writeUTF(entry.getKey().getName()); + TypeSerializerUtil.writeSerializerConfigSnapshot(out, entry.getValue()); + } + + // --- write snapshot of non-registered subclass serializer cache + + out.writeInt(nonRegisteredSubclassesToSerializerConfigSnapshots.size()); + for (Map.Entry<Class<?>, TypeSerializerConfigSnapshot> entry + : nonRegisteredSubclassesToSerializerConfigSnapshots.entrySet()) { + out.writeUTF(entry.getKey().getName()); + TypeSerializerUtil.writeSerializerConfigSnapshot(out, entry.getValue()); + } + } + + @Override + public void read(DataInputView in) throws IOException { + super.read(in); + + // --- read fields and their serializers, in order + + int numFields = in.readInt(); + this.fieldToSerializerConfigSnapshot = new LinkedHashMap<>(numFields); + String fieldName; + Field field; + for (int i = 0; i < numFields; i++) { + fieldName = in.readUTF(); + + // search all superclasses for the field + Class<?> clazz = getTypeClass(); + field = null; + while (clazz != null) { + try { + field = clazz.getDeclaredField(fieldName); + field.setAccessible(true); + break; + } catch (NoSuchFieldException e) { + clazz = clazz.getSuperclass(); + } + } + + if (field == null) { + // the field no longer exists in the POJO + throw new IOException("Can't find field " + fieldName + " in POJO class " + getTypeClass().getName()); + } else { + fieldToSerializerConfigSnapshot.put( + field, + TypeSerializerUtil.readSerializerConfigSnapshot(in, getUserCodeClassLoader())); + } + } + + // --- read registered subclasses and their serializers, in registration order + + int numRegisteredSubclasses = in.readInt(); + this.registeredSubclassesToSerializerConfigSnapshots = new LinkedHashMap<>(numRegisteredSubclasses); + String registeredSubclassname; + Class<?> registeredSubclass; + for (int i = 0; i < numRegisteredSubclasses; i++) { + registeredSubclassname = in.readUTF(); + try { + registeredSubclass = Class.forName(registeredSubclassname, true, getUserCodeClassLoader()); + } catch (ClassNotFoundException e) { + throw new IOException("Cannot find requested class " + registeredSubclassname + " in classpath.", e); + } + + this.registeredSubclassesToSerializerConfigSnapshots.put( + registeredSubclass, + TypeSerializerUtil.readSerializerConfigSnapshot(in, getUserCodeClassLoader())); + } + + // --- read snapshot of non-registered subclass serializer cache + + int numCachedSubclassSerializers = in.readInt(); + this.nonRegisteredSubclassesToSerializerConfigSnapshots = new HashMap<>(numCachedSubclassSerializers); + String cachedSubclassname; + Class<?> cachedSubclass; + for (int i = 0; i < numCachedSubclassSerializers; i++) { + cachedSubclassname = in.readUTF(); + try { + cachedSubclass = Class.forName(cachedSubclassname, true, getUserCodeClassLoader()); + } catch (ClassNotFoundException e) { + throw new IOException("Cannot find requested class " + cachedSubclassname + " in classpath.", e); + } + + this.nonRegisteredSubclassesToSerializerConfigSnapshots.put( + cachedSubclass, + TypeSerializerUtil.readSerializerConfigSnapshot(in, getUserCodeClassLoader())); + } + } + + @Override + public int getVersion() { + return VERSION; + } + + public LinkedHashMap<Field, TypeSerializerConfigSnapshot> getFieldToSerializerConfigSnapshot() { + return fieldToSerializerConfigSnapshot; + } + + public LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> getRegisteredSubclassesToSerializerConfigSnapshots() { + return registeredSubclassesToSerializerConfigSnapshots; + } + + public HashMap<Class<?>, TypeSerializerConfigSnapshot> getNonRegisteredSubclassesToSerializerConfigSnapshots() { + return nonRegisteredSubclassesToSerializerConfigSnapshots; + } + + @Override + public boolean equals(Object obj) { + return super.equals(obj) + && (obj instanceof PojoSerializerConfigSnapshot) + && fieldToSerializerConfigSnapshot.equals(((PojoSerializerConfigSnapshot) obj).getFieldToSerializerConfigSnapshot()) + && registeredSubclassesToSerializerConfigSnapshots.equals(((PojoSerializerConfigSnapshot) obj).getRegisteredSubclassesToSerializerConfigSnapshots()) + && nonRegisteredSubclassesToSerializerConfigSnapshots.equals(((PojoSerializerConfigSnapshot) obj).nonRegisteredSubclassesToSerializerConfigSnapshots); + } + + @Override + public int hashCode() { + return super.hashCode() + + Objects.hash( + fieldToSerializerConfigSnapshot, + registeredSubclassesToSerializerConfigSnapshots, + nonRegisteredSubclassesToSerializerConfigSnapshots); + } + } + + // -------------------------------------------------------------------------------------------- + + private void writeObject(ObjectOutputStream out) + throws IOException, ClassNotFoundException { + out.defaultWriteObject(); + out.writeInt(fields.length); + for (Field field: fields) { + FieldSerializer.serializeField(field, out); + } + } + + private void readObject(ObjectInputStream in) + throws IOException, ClassNotFoundException { + in.defaultReadObject(); + int numFields = in.readInt(); + fields = new Field[numFields]; + for (int i = 0; i < numFields; i++) { + fields[i] = FieldSerializer.deserializeField(in); + } + + cl = Thread.currentThread().getContextClassLoader(); + subclassSerializerCache = new HashMap<Class<?>, TypeSerializer<?>>(); + } + + // -------------------------------------------------------------------------------------------- + // Utilities + // -------------------------------------------------------------------------------------------- + + /** + * Extracts the subclasses of the base POJO class registered in the execution config. + */ + private static LinkedHashSet<Class<?>> getRegisteredSubclassesFromExecutionConfig( + Class<?> basePojoClass, + ExecutionConfig executionConfig) { + + LinkedHashSet<Class<?>> subclassesInRegistrationOrder = new LinkedHashSet<>(executionConfig.getRegisteredPojoTypes().size()); + for (Class<?> registeredClass : executionConfig.getRegisteredPojoTypes()) { + if (registeredClass.equals(basePojoClass)) { + continue; + } + if (!basePojoClass.isAssignableFrom(registeredClass)) { + continue; + } + subclassesInRegistrationOrder.add(registeredClass); + } + + return subclassesInRegistrationOrder; + } + + /** + * Builds map of registered subclasses to their class tags. + * Class tags will be integers starting from 0, assigned incrementally with the order of provided subclasses. + */ + private static LinkedHashMap<Class<?>, Integer> createRegisteredSubclassTags(LinkedHashSet<Class<?>> registeredSubclasses) { + final LinkedHashMap<Class<?>, Integer> classToTag = new LinkedHashMap<>(); + + int id = 0; + for (Class<?> registeredClass : registeredSubclasses) { + classToTag.put(registeredClass, id); + id ++; + } + + return classToTag; + } + + /** + * Creates an array of serializers for provided list of registered subclasses. + * Order of returned serializers will correspond to order of provided subclasses. + */ + private static TypeSerializer<?>[] createRegisteredSubclassSerializers( + LinkedHashSet<Class<?>> registeredSubclasses, + ExecutionConfig executionConfig) { + + final TypeSerializer<?>[] subclassSerializers = new TypeSerializer[registeredSubclasses.size()]; + + int i = 0; + for (Class<?> registeredClass : registeredSubclasses) { + subclassSerializers[i] = TypeExtractor.createTypeInfo(registeredClass).createSerializer(executionConfig); + i++; + } + + return subclassSerializers; + } + + /** + * Fetches cached serializer for a non-registered subclass; + * also creates the serializer if it doesn't exist yet. + * + * This method is also exposed to package-private access + * for testing purposes. + */ + TypeSerializer<?> getSubclassSerializer(Class<?> subclass) { + TypeSerializer<?> result = subclassSerializerCache.get(subclass); + if (result == null) { + result = createSubclassSerializer(subclass); + subclassSerializerCache.put(subclass, result); + } + return result; + } + + private TypeSerializer<?> createSubclassSerializer(Class<?> subclass) { + TypeSerializer<?> serializer = TypeExtractor.createTypeInfo(subclass).createSerializer(executionConfig); + + if (serializer instanceof PojoSerializer) { + PojoSerializer<?> subclassSerializer = (PojoSerializer<?>) serializer; + subclassSerializer.copyBaseFieldOrder(this); + } + + return serializer; + } + + /** + * Finds and returns the order (0-based) of a POJO field. + * Returns -1 if the field does not exist for this POJO. + */ + private int findField(Field f) { + int foundIndex = 0; + for (Field field : fields) { + if (f.equals(field)) { + return foundIndex; + } + + foundIndex++; + } + + return -1; + } + + private void copyBaseFieldOrder(PojoSerializer<?> baseSerializer) { + // do nothing for now, but in the future, adapt subclass serializer to have same + // ordering as base class serializer so that binary comparison on base class fields + // can work + } + + /** + * Build and return a snapshot of the serializer's parameters and currently cached serializers. + */ + private static <T> PojoSerializerConfigSnapshot<T> buildConfigSnapshot( + Class<T> pojoType, + LinkedHashMap<Class<?>, Integer> registeredSubclassesToTags, + TypeSerializer<?>[] registeredSubclassSerializers, + Field[] fields, + TypeSerializer<?>[] fieldSerializers, + HashMap<Class<?>, TypeSerializer<?>> nonRegisteredSubclassSerializerCache) { + + final LinkedHashMap<Field, TypeSerializerConfigSnapshot> fieldToSerializerConfigSnapshots = + new LinkedHashMap<>(fields.length); + + for (int i = 0; i < fields.length; i++) { + fieldToSerializerConfigSnapshots.put(fields[i], fieldSerializers[i].snapshotConfiguration()); + } + + final LinkedHashMap<Class<?>, TypeSerializerConfigSnapshot> registeredSubclassesToSerializerConfigSnapshots = + new LinkedHashMap<>(registeredSubclassesToTags.size()); + + for (Map.Entry<Class<?>, Integer> entry : registeredSubclassesToTags.entrySet()) { + registeredSubclassesToSerializerConfigSnapshots.put( + entry.getKey(), + registeredSubclassSerializers[entry.getValue()].snapshotConfiguration()); + } + + final HashMap<Class<?>, TypeSerializerConfigSnapshot> nonRegisteredSubclassesToSerializerConfigSnapshots = + new LinkedHashMap<>(nonRegisteredSubclassSerializerCache.size()); + + for (Map.Entry<Class<?>, TypeSerializer<?>> entry : nonRegisteredSubclassSerializerCache.entrySet()) { + nonRegisteredSubclassesToSerializerConfigSnapshots.put(entry.getKey(), entry.getValue().snapshotConfiguration()); + } + + return new PojoSerializerConfigSnapshot<>( + pojoType, + fieldToSerializerConfigSnapshots, + registeredSubclassesToSerializerConfigSnapshots, + nonRegisteredSubclassesToSerializerConfigSnapshots); + } + + // -------------------------------------------------------------------------------------------- + // Test utilities + // -------------------------------------------------------------------------------------------- + + @VisibleForTesting + Field[] getFields() { + return fields; + } + + @VisibleForTesting + TypeSerializer<?>[] getFieldSerializers() { + return fieldSerializers; + } + + @VisibleForTesting + LinkedHashMap<Class<?>, Integer> getRegisteredClasses() { + return registeredClasses; + } + + @VisibleForTesting + TypeSerializer<?>[] getRegisteredSerializers() { + return registeredSerializers; + } + + @VisibleForTesting + HashMap<Class<?>, TypeSerializer<?>> getSubclassSerializerCache() { + return subclassSerializerCache; + } }
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java index dbd5d3a..5770dac 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java @@ -18,12 +18,17 @@ package org.apache.flink.api.java.typeutils.runtime; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerUtil; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.types.Row; import java.io.IOException; +import java.io.ObjectInputStream; import java.util.Arrays; import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoAndCopyNullMask; @@ -35,12 +40,15 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * Serializer for {@link Row}. */ @Internal -public class RowSerializer extends TypeSerializer<Row> { +public final class RowSerializer extends TypeSerializer<Row> { private static final long serialVersionUID = 1L; - private final boolean[] nullMask; + private final TypeSerializer<Object>[] fieldSerializers; + private transient boolean[] nullMask; + + @SuppressWarnings("unchecked") public RowSerializer(TypeSerializer<?>[] fieldSerializers) { this.fieldSerializers = (TypeSerializer<Object>[]) checkNotNull(fieldSerializers); this.nullMask = new boolean[fieldSerializers.length]; @@ -231,4 +239,73 @@ public class RowSerializer extends TypeSerializer<Row> { public int hashCode() { return Arrays.hashCode(fieldSerializers); } + + // -------------------------------------------------------------------------------------------- + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + this.nullMask = new boolean[fieldSerializers.length]; + } + + // -------------------------------------------------------------------------------------------- + // Serializer configuration snapshotting & compatibility + // -------------------------------------------------------------------------------------------- + + @Override + public RowSerializerConfigSnapshot snapshotConfiguration() { + return new RowSerializerConfigSnapshot(TypeSerializerUtil.snapshotConfigurations(fieldSerializers)); + } + + @Override + public CompatibilityResult<Row> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof RowSerializerConfigSnapshot) { + TypeSerializerConfigSnapshot[] fieldSerializerConfigSnapshots = + ((RowSerializerConfigSnapshot) configSnapshot).getNestedSerializerConfigSnapshots(); + + if (fieldSerializerConfigSnapshots.length == fieldSerializers.length) { + boolean requireMigration = false; + TypeSerializer<?>[] convertDeserializers = new TypeSerializer<?>[fieldSerializers.length]; + + CompatibilityResult<?> compatResult; + for (int i = 0; i < fieldSerializers.length; i++) { + compatResult = fieldSerializers[i].ensureCompatibility(fieldSerializerConfigSnapshots[i]); + if (compatResult.requiresMigration()) { + requireMigration = true; + + if (compatResult.getConvertDeserializer() == null) { + // one of the field serializers cannot provide a fallback deserializer + return CompatibilityResult.requiresMigration(null); + } else { + convertDeserializers[i] = compatResult.getConvertDeserializer(); + } + } + } + + if (requireMigration) { + return CompatibilityResult.requiresMigration(new RowSerializer(convertDeserializers)); + } else { + return CompatibilityResult.compatible(); + } + } + } + + return CompatibilityResult.requiresMigration(null); + } + + public static final class RowSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { + + private static final int VERSION = 1; + + /** This empty nullary constructor is required for deserializing the configuration. */ + public RowSerializerConfigSnapshot() {} + + public RowSerializerConfigSnapshot(TypeSerializerConfigSnapshot[] fieldSerializerConfigSnapshots) { + super(fieldSerializerConfigSnapshots); + } + + @Override + public int getVersion() { + return VERSION; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java index afc4aa2..68d5aa8 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java @@ -19,7 +19,10 @@ package org.apache.flink.api.java.typeutils.runtime; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerUtil; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -115,4 +118,43 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> { public boolean canEqual(Object obj) { return obj instanceof TupleSerializerBase; } + + // -------------------------------------------------------------------------------------------- + // Serializer configuration snapshotting & compatibility + // -------------------------------------------------------------------------------------------- + + @Override + public TupleSerializerConfigSnapshot<T> snapshotConfiguration() { + return new TupleSerializerConfigSnapshot<>( + tupleClass, + TypeSerializerUtil.snapshotConfigurations(fieldSerializers)); + } + + @SuppressWarnings("unchecked") + @Override + public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof TupleSerializerConfigSnapshot) { + final TupleSerializerConfigSnapshot<T> config = (TupleSerializerConfigSnapshot<T>) configSnapshot; + + if (tupleClass.equals(config.getTupleClass())) { + TypeSerializerConfigSnapshot[] fieldSerializerConfigSnapshots = + ((TupleSerializerConfigSnapshot) configSnapshot).getNestedSerializerConfigSnapshots(); + + if (fieldSerializerConfigSnapshots.length == fieldSerializers.length) { + + CompatibilityResult compatResult; + for (int i = 0; i < fieldSerializers.length; i++) { + compatResult = fieldSerializers[i].ensureCompatibility(fieldSerializerConfigSnapshots[i]); + if (compatResult.requiresMigration()) { + return CompatibilityResult.requiresMigration(null); + } + } + + return CompatibilityResult.compatible(); + } + } + } + + return CompatibilityResult.requiresMigration(null); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java new file mode 100644 index 0000000..6d2bb5f --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * Snapshot of a tuple serializer's configuration. + */ +@Internal +public final class TupleSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot { + + private static final int VERSION = 1; + + private Class<T> tupleClass; + + /** This empty nullary constructor is required for deserializing the configuration. */ + public TupleSerializerConfigSnapshot() {} + + public TupleSerializerConfigSnapshot( + Class<T> tupleClass, + TypeSerializerConfigSnapshot[] fieldSerializerConfigSnapshots) { + + super(fieldSerializerConfigSnapshots); + + this.tupleClass = Preconditions.checkNotNull(tupleClass); + } + + @Override + public void write(DataOutputView out) throws IOException { + super.write(out); + + InstantiationUtil.serializeObject(new DataOutputViewStream(out), tupleClass); + } + + @Override + public void read(DataInputView in) throws IOException { + super.read(in); + + try { + tupleClass = InstantiationUtil.deserializeObject(new DataInputViewStream(in), getUserCodeClassLoader()); + } catch (ClassNotFoundException e) { + throw new IOException("Could not find requested tuple class in classpath.", e); + } + } + + @Override + public int getVersion() { + return VERSION; + } + + public Class<T> getTupleClass() { + return tupleClass; + } + + @Override + public boolean equals(Object obj) { + return super.equals(obj) + && (obj instanceof TupleSerializerConfigSnapshot) + && (tupleClass.equals(((TupleSerializerConfigSnapshot) obj).getTupleClass())); + } + + @Override + public int hashCode() { + return super.hashCode() * 31 + tupleClass.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java index 56e204c..10e2330 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java @@ -19,15 +19,20 @@ package org.apache.flink.api.java.typeutils.runtime; import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.LinkedHashMap; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.types.Value; import org.apache.flink.util.InstantiationUtil; import com.esotericsoftware.kryo.Kryo; +import org.apache.flink.util.Preconditions; import org.objenesis.strategy.StdInstantiatorStrategy; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -39,12 +44,23 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * @param <T> The type serialized. */ @Internal -public class ValueSerializer<T extends Value> extends TypeSerializer<T> { +public final class ValueSerializer<T extends Value> extends TypeSerializer<T> { private static final long serialVersionUID = 1L; private final Class<T> type; - + + /** + * Map of class tag (using classname as tag) to their Kryo registration. + * + * <p>This map serves as a preview of the final registration result of + * the Kryo instance, taking into account registration overwrites. + * + * <p>Currently, we only have one single registration for the value type. + * Nevertheless, we keep this information here for future compatibility. + */ + private LinkedHashMap<String, KryoRegistration> kryoRegistrations; + private transient Kryo kryo; private transient T copyInstance; @@ -53,6 +69,7 @@ public class ValueSerializer<T extends Value> extends TypeSerializer<T> { public ValueSerializer(Class<T> type) { this.type = checkNotNull(type); + this.kryoRegistrations = asKryoRegistrations(type); } // -------------------------------------------------------------------------------------------- @@ -126,7 +143,8 @@ public class ValueSerializer<T extends Value> extends TypeSerializer<T> { kryo.setInstantiatorStrategy(instantiatorStrategy); this.kryo.setAsmEnabled(true); - this.kryo.register(type); + + KryoUtils.applyRegistrations(this.kryo, kryoRegistrations.values()); } } @@ -152,4 +170,66 @@ public class ValueSerializer<T extends Value> extends TypeSerializer<T> { public boolean canEqual(Object obj) { return obj instanceof ValueSerializer; } + + // -------------------------------------------------------------------------------------------- + // Serializer configuration snapshotting & compatibility + // -------------------------------------------------------------------------------------------- + + @Override + public ValueSerializerConfigSnapshot<T> snapshotConfiguration() { + return new ValueSerializerConfigSnapshot<>(type); + } + + @SuppressWarnings("unchecked") + @Override + public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof ValueSerializerConfigSnapshot) { + final ValueSerializerConfigSnapshot<T> config = (ValueSerializerConfigSnapshot<T>) configSnapshot; + + if (type.equals(config.getTypeClass())) { + // currently, simply checking the type of the value class is sufficient; + // in the future, if there are more Kryo registrations, we should try to resolve that + return CompatibilityResult.compatible(); + } + } + + return CompatibilityResult.requiresMigration(null); + } + + public static class ValueSerializerConfigSnapshot<T extends Value> extends KryoRegistrationSerializerConfigSnapshot<T> { + + private static final int VERSION = 1; + + /** This empty nullary constructor is required for deserializing the configuration. */ + public ValueSerializerConfigSnapshot() {} + + public ValueSerializerConfigSnapshot(Class<T> valueTypeClass) { + super(valueTypeClass, asKryoRegistrations(valueTypeClass)); + } + + @Override + public int getVersion() { + return VERSION; + } + } + + // -------------------------------------------------------------------------------------------- + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + + // kryoRegistrations may be null if this value serializer is deserialized from an old version + if (kryoRegistrations == null) { + this.kryoRegistrations = asKryoRegistrations(type); + } + } + + private static LinkedHashMap<String, KryoRegistration> asKryoRegistrations(Class<?> type) { + Preconditions.checkNotNull(type); + + LinkedHashMap<String, KryoRegistration> registration = new LinkedHashMap<>(1); + registration.put(type.getClass().getName(), new KryoRegistration(type)); + + return registration; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java index cba0c84..a172b72 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java @@ -21,18 +21,22 @@ package org.apache.flink.api.java.typeutils.runtime.kryo; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.KryoException; import com.esotericsoftware.kryo.Serializer; -import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import org.apache.avro.generic.GenericData; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream; +import org.apache.flink.api.java.typeutils.runtime.KryoRegistration; +import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot; +import org.apache.flink.api.java.typeutils.runtime.KryoUtils; import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput; -import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers.SpecificInstanceCollectionSerializerForArrayList; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -45,6 +49,7 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.EOFException; import java.io.IOException; +import java.io.ObjectInputStream; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Modifier; @@ -72,11 +77,16 @@ public class KryoSerializer<T> extends TypeSerializer<T> { // ------------------------------------------------------------------------ - private final LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> registeredTypesWithSerializers; - private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> registeredTypesWithSerializerClasses; private final LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> defaultSerializers; private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> defaultSerializerClasses; - private final LinkedHashSet<Class<?>> registeredTypes; + + /** + * Map of class tag (using classname as tag) to their Kryo registration. + * + * <p>This map serves as a preview of the final registration result of + * the Kryo instance, taking into account registration overwrites. + */ + private LinkedHashMap<String, KryoRegistration> kryoRegistrations; private final Class<T> type; @@ -93,26 +103,35 @@ public class KryoSerializer<T> extends TypeSerializer<T> { private transient Output output; // ------------------------------------------------------------------------ + // legacy fields; these fields cannot yet be removed to retain backwards compatibility + + private LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> registeredTypesWithSerializers; + private LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> registeredTypesWithSerializerClasses; + private LinkedHashSet<Class<?>> registeredTypes; + + // ------------------------------------------------------------------------ public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){ this.type = checkNotNull(type); this.defaultSerializers = executionConfig.getDefaultKryoSerializers(); this.defaultSerializerClasses = executionConfig.getDefaultKryoSerializerClasses(); - this.registeredTypesWithSerializers = executionConfig.getRegisteredTypesWithKryoSerializers(); - this.registeredTypesWithSerializerClasses = executionConfig.getRegisteredTypesWithKryoSerializerClasses(); - this.registeredTypes = executionConfig.getRegisteredKryoTypes(); + + this.kryoRegistrations = buildKryoRegistrations( + this.type, + executionConfig.getRegisteredKryoTypes(), + executionConfig.getRegisteredTypesWithKryoSerializerClasses(), + executionConfig.getRegisteredTypesWithKryoSerializers()); } /** * Copy-constructor that does not copy transient fields. They will be initialized once required. */ protected KryoSerializer(KryoSerializer<T> toCopy) { - registeredTypesWithSerializers = toCopy.registeredTypesWithSerializers; - registeredTypesWithSerializerClasses = toCopy.registeredTypesWithSerializerClasses; defaultSerializers = toCopy.defaultSerializers; defaultSerializerClasses = toCopy.defaultSerializerClasses; - registeredTypes = toCopy.registeredTypes; + + kryoRegistrations = toCopy.kryoRegistrations; type = toCopy.type; if(type == null){ @@ -272,11 +291,9 @@ public class KryoSerializer<T> extends TypeSerializer<T> { if (obj instanceof KryoSerializer) { KryoSerializer<?> other = (KryoSerializer<?>) obj; - // we cannot include the Serializers here because they don't implement the equals method return other.canEqual(this) && type == other.type && - registeredTypes.equals(other.registeredTypes) && - registeredTypesWithSerializerClasses.equals(other.registeredTypesWithSerializerClasses) && + kryoRegistrations.equals(other.kryoRegistrations) && defaultSerializerClasses.equals(other.defaultSerializerClasses); } else { return false; @@ -334,7 +351,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> { // This is due to a know issue with Kryo's JavaSerializer. See FLINK-6025 for details. kryo.addDefaultSerializer(Throwable.class, new JavaSerializer()); - // Add default serializers first, so that they type registrations without a serializer + // Add default serializers first, so that the type registrations without a serializer // are registered with a default serializer for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> entry: defaultSerializers.entrySet()) { kryo.addDefaultSerializer(entry.getKey(), entry.getValue().getSerializer()); @@ -344,59 +361,152 @@ public class KryoSerializer<T> extends TypeSerializer<T> { kryo.addDefaultSerializer(entry.getKey(), entry.getValue()); } - // register the type of our class - kryo.register(type); + KryoUtils.applyRegistrations(this.kryo, kryoRegistrations.values()); - // register given types. we do this first so that any registration of a - // more specific serializer overrides this - for (Class<?> type : registeredTypes) { - kryo.register(type); - } + kryo.setRegistrationRequired(false); + kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); + } + } - // register given serializer classes - for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> e : registeredTypesWithSerializerClasses.entrySet()) { - Class<?> typeClass = e.getKey(); - Class<? extends Serializer<?>> serializerClass = e.getValue(); + // -------------------------------------------------------------------------------------------- + // Serializer configuration snapshotting & compatibility + // -------------------------------------------------------------------------------------------- - Serializer<?> serializer = - ReflectionSerializerFactory.makeSerializer(kryo, serializerClass, typeClass); - kryo.register(typeClass, serializer); - } + @Override + public KryoSerializerConfigSnapshot<T> snapshotConfiguration() { + return new KryoSerializerConfigSnapshot<>(type, kryoRegistrations); + } - // register given serializers - for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> e : registeredTypesWithSerializers.entrySet()) { - kryo.register(e.getKey(), e.getValue().getSerializer()); + @SuppressWarnings("unchecked") + @Override + public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof KryoSerializerConfigSnapshot) { + final KryoSerializerConfigSnapshot<T> config = (KryoSerializerConfigSnapshot<T>) configSnapshot; + + if (type.equals(config.getTypeClass())) { + LinkedHashMap<String, KryoRegistration> reconfiguredRegistrations = config.getKryoRegistrations(); + + // reconfigure by assuring that classes which were previously registered are registered + // again in the exact same order; new class registrations will be appended. + // this also overwrites any dummy placeholders that the restored old configuration has + reconfiguredRegistrations.putAll(kryoRegistrations); + + // check if there is still any dummy placeholders even after reconfiguration; + // if so, then this new Kryo serializer cannot read old data and is therefore incompatible + for (Map.Entry<String, KryoRegistration> reconfiguredRegistrationEntry : reconfiguredRegistrations.entrySet()) { + if (reconfiguredRegistrationEntry.getValue().isDummy()) { + LOG.warn("The Kryo registration for a previously registered class {} does not have a " + + "proper serializer, because its previous serializer cannot be loaded or is no " + + "longer valid but a new serializer is not available", reconfiguredRegistrationEntry.getKey()); + + return CompatibilityResult.requiresMigration(null); + } + } + + // there's actually no way to tell if new Kryo serializers are compatible with + // the previous ones they overwrite; we can only signal compatibly and hope for the best + this.kryoRegistrations = reconfiguredRegistrations; + return CompatibilityResult.compatible(); } - // this is needed for Avro but can not be added on demand. - kryo.register(GenericData.Array.class, new SpecificInstanceCollectionSerializerForArrayList()); + } - kryo.setRegistrationRequired(false); - kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); + return CompatibilityResult.requiresMigration(null); + } + + public static final class KryoSerializerConfigSnapshot<T> extends KryoRegistrationSerializerConfigSnapshot<T> { + + private static final int VERSION = 1; + + /** This empty nullary constructor is required for deserializing the configuration. */ + public KryoSerializerConfigSnapshot() {} + + public KryoSerializerConfigSnapshot( + Class<T> typeClass, + LinkedHashMap<String, KryoRegistration> kryoRegistrations) { + + super(typeClass, kryoRegistrations); + } + + @Override + public int getVersion() { + return VERSION; } } // -------------------------------------------------------------------------------------------- - // For testing + // Utilities // -------------------------------------------------------------------------------------------- - - public Kryo getKryo() { - checkKryoInitialized(); - return this.kryo; + + /** + * Utility method that takes lists of registered types and their serializers, and resolve + * them into a single list such that the result will resemble the final registration + * result in Kryo. + */ + private static LinkedHashMap<String, KryoRegistration> buildKryoRegistrations( + Class<?> serializedType, + LinkedHashSet<Class<?>> registeredTypes, + LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> registeredTypesWithSerializerClasses, + LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> registeredTypesWithSerializers) { + + final LinkedHashMap<String, KryoRegistration> kryoRegistrations = new LinkedHashMap<>(); + + kryoRegistrations.put(serializedType.getName(), new KryoRegistration(serializedType)); + + for (Class<?> registeredType : checkNotNull(registeredTypes)) { + kryoRegistrations.put(registeredType.getName(), new KryoRegistration(registeredType)); + } + + for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> registeredTypeWithSerializerClassEntry : + checkNotNull(registeredTypesWithSerializerClasses).entrySet()) { + + kryoRegistrations.put( + registeredTypeWithSerializerClassEntry.getKey().getName(), + new KryoRegistration( + registeredTypeWithSerializerClassEntry.getKey(), + registeredTypeWithSerializerClassEntry.getValue())); + } + + for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> registeredTypeWithSerializerEntry : + checkNotNull(registeredTypesWithSerializers).entrySet()) { + + kryoRegistrations.put( + registeredTypeWithSerializerEntry.getKey().getName(), + new KryoRegistration( + registeredTypeWithSerializerEntry.getKey(), + registeredTypeWithSerializerEntry.getValue())); + } + + kryoRegistrations.put( + GenericData.Array.class.getName(), + new KryoRegistration( + GenericData.Array.class, + new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList()))); + + return kryoRegistrations; } - @Override - public boolean canRestoreFrom(TypeSerializer<?> other) { - if (other instanceof KryoSerializer) { - KryoSerializer<?> otherKryo = (KryoSerializer<?>) other; + // -------------------------------------------------------------------------------------------- - // we cannot include the Serializers here because they don't implement the equals method - return other.canEqual(this) && - type == otherKryo.type && - (registeredTypes.equals(otherKryo.registeredTypes) || otherKryo.registeredTypes.isEmpty()) && - (registeredTypesWithSerializerClasses.equals(otherKryo.registeredTypesWithSerializerClasses) || otherKryo.registeredTypesWithSerializerClasses.isEmpty()) && - (defaultSerializerClasses.equals(otherKryo.defaultSerializerClasses) || otherKryo.defaultSerializerClasses.isEmpty()); - } else { - return false; + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + + // kryoRegistrations may be null if this Kryo serializer is deserialized from an old version + if (kryoRegistrations == null) { + this.kryoRegistrations = buildKryoRegistrations( + type, + registeredTypes, + registeredTypesWithSerializerClasses, + registeredTypesWithSerializers); } } + + // -------------------------------------------------------------------------------------------- + // For testing + // -------------------------------------------------------------------------------------------- + + @VisibleForTesting + public Kryo getKryo() { + checkKryoInitialized(); + return this.kryo; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java index 91c6145..a846703 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java @@ -20,6 +20,7 @@ package org.apache.flink.api.common.typeutils; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -31,6 +32,9 @@ import java.io.DataOutputStream; import java.io.IOException; import java.util.Arrays; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.TestLogger; import org.junit.Assert; @@ -85,6 +89,38 @@ public abstract class SerializerTestBase<T> extends TestLogger { fail("Exception in test: " + e.getMessage()); } } + + @Test + public void testConfigSnapshotInstantiation() { + TypeSerializerConfigSnapshot configSnapshot = getSerializer().snapshotConfiguration(); + + InstantiationUtil.instantiate(configSnapshot.getClass()); + } + + @Test + public void testSnapshotConfigurationAndReconfigure() throws Exception { + final TypeSerializerConfigSnapshot configSnapshot = getSerializer().snapshotConfiguration(); + + byte[] serializedConfig; + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + TypeSerializerUtil.writeSerializerConfigSnapshot( + new DataOutputViewStreamWrapper(out), configSnapshot); + serializedConfig = out.toByteArray(); + } + + TypeSerializerConfigSnapshot restoredConfig; + try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) { + restoredConfig = TypeSerializerUtil.readSerializerConfigSnapshot( + new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader()); + } + + CompatibilityResult strategy = getSerializer().ensureCompatibility(restoredConfig); + assertFalse(strategy.requiresMigration()); + + // also verify that the serializer's reconfigure implementation detects incompatibility + strategy = getSerializer().ensureCompatibility(new TestIncompatibleSerializerConfigSnapshot()); + assertTrue(strategy.requiresMigration()); + } @Test public void testGetLength() { @@ -477,4 +513,21 @@ public abstract class SerializerTestBase<T> extends TestLogger { } } } + + public static final class TestIncompatibleSerializerConfigSnapshot extends TypeSerializerConfigSnapshot { + @Override + public int getVersion() { + return 0; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof TestIncompatibleSerializerConfigSnapshot; + } + + @Override + public int hashCode() { + return getClass().hashCode(); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotTest.java new file mode 100644 index 0000000..0783bb6 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshotTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Unit tests related to {@link TypeSerializerConfigSnapshot}. + */ +public class TypeSerializerConfigSnapshotTest { + + /** + * Verifies that reading and writing configuration snapshots work correctly. + */ + @Test + public void testSerializeConfigurationSnapshots() throws Exception { + TestConfigSnapshot configSnapshot1 = new TestConfigSnapshot(1, "foo"); + TestConfigSnapshot configSnapshot2 = new TestConfigSnapshot(2, "bar"); + + byte[] serializedConfig; + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + TypeSerializerUtil.writeSerializerConfigSnapshots( + new DataOutputViewStreamWrapper(out), + configSnapshot1, + configSnapshot2); + + serializedConfig = out.toByteArray(); + } + + TypeSerializerConfigSnapshot[] restoredConfigs; + try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) { + restoredConfigs = TypeSerializerUtil.readSerializerConfigSnapshots( + new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader()); + } + + assertEquals(2, restoredConfigs.length); + assertEquals(configSnapshot1, restoredConfigs[0]); + assertEquals(configSnapshot2, restoredConfigs[1]); + } + + /** + * Verifies that deserializing config snapshots fail if the config class could not be found. + */ + @Test + public void testFailsWhenConfigurationSnapshotClassNotFound() throws Exception { + byte[] serializedConfig; + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + TypeSerializerUtil.writeSerializerConfigSnapshot( + new DataOutputViewStreamWrapper(out), new TestConfigSnapshot(123, "foobar")); + serializedConfig = out.toByteArray(); + } + + try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) { + // read using a dummy classloader + TypeSerializerUtil.readSerializerConfigSnapshot( + new DataInputViewStreamWrapper(in), new URLClassLoader(new URL[0], null)); + fail("Expected a ClassNotFoundException wrapped in IOException"); + } catch (IOException expected) { + // test passes + } + } + + public static class TestConfigSnapshot extends TypeSerializerConfigSnapshot { + + static final int VERSION = 1; + + private int val; + private String msg; + + public TestConfigSnapshot() {} + + public TestConfigSnapshot(int val, String msg) { + this.val = val; + this.msg = msg; + } + + @Override + public void write(DataOutputView out) throws IOException { + super.write(out); + out.writeInt(val); + out.writeUTF(msg); + } + + @Override + public void read(DataInputView in) throws IOException { + super.read(in); + val = in.readInt(); + msg = in.readUTF(); + } + + @Override + public int getVersion() { + return VERSION; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (obj == null) { + return false; + } + + if (obj instanceof TestConfigSnapshot) { + return val == ((TestConfigSnapshot) obj).val && msg.equals(((TestConfigSnapshot) obj).msg); + } else { + return false; + } + } + + @Override + public int hashCode() { + return 31 * val + msg.hashCode(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java index 5e2e733..5c615de 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/EnumSerializerTest.java @@ -18,11 +18,25 @@ package org.apache.flink.api.common.typeutils.base; +import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.SerializerTestInstance; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerUtil; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.TestLogger; import org.junit.Test; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.util.Arrays; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + public class EnumSerializerTest extends TestLogger { @Test @@ -41,6 +55,132 @@ public class EnumSerializerTest extends TestLogger { new EnumSerializer<>(EmptyEnum.class); } + @Test + public void testReconfiguration() { + // mock the previous ordering of enum constants to be BAR, PAULA, NATHANIEL + PublicEnum[] mockPreviousOrder = {PublicEnum.BAR, PublicEnum.PAULA, PublicEnum.NATHANIEL}; + + // now, the actual order of FOO, BAR, PETER, NATHANIEL, EMMA, PAULA will be the "new wrong order" + EnumSerializer<PublicEnum> serializer = new EnumSerializer<>(PublicEnum.class); + + // verify that the serializer is first using the "wrong order" (i.e., the initial new configuration) + assertEquals(PublicEnum.FOO.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.FOO).intValue()); + assertEquals(PublicEnum.BAR.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.BAR).intValue()); + assertEquals(PublicEnum.PETER.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PETER).intValue()); + assertEquals(PublicEnum.NATHANIEL.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.NATHANIEL).intValue()); + assertEquals(PublicEnum.EMMA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.EMMA).intValue()); + assertEquals(PublicEnum.PAULA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PAULA).intValue()); + + // reconfigure and verify compatibility + CompatibilityResult<PublicEnum> compatResult = serializer.ensureCompatibility( + new EnumSerializer.EnumSerializerConfigSnapshot<>(PublicEnum.class, mockPreviousOrder)); + assertFalse(compatResult.requiresMigration()); + + // after reconfiguration, the order should be first the original BAR, PAULA, NATHANIEL, + // followed by the "new enum constants" FOO, PETER, EMMA + PublicEnum[] expectedOrder = {PublicEnum.BAR, PublicEnum.PAULA, PublicEnum.NATHANIEL, PublicEnum.FOO, PublicEnum.PETER, PublicEnum.EMMA}; + + int i = 0; + for (PublicEnum constant : expectedOrder) { + assertEquals(i, serializer.getValueToOrdinal().get(constant).intValue()); + i++; + } + + assertTrue(Arrays.equals(expectedOrder, serializer.getValues())); + } + + @Test + public void testConfigurationSnapshotSerialization() throws Exception { + EnumSerializer<PublicEnum> serializer = new EnumSerializer<>(PublicEnum.class); + + byte[] serializedConfig; + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + TypeSerializerUtil.writeSerializerConfigSnapshot( + new DataOutputViewStreamWrapper(out), serializer.snapshotConfiguration()); + serializedConfig = out.toByteArray(); + } + + TypeSerializerConfigSnapshot restoredConfig; + try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) { + restoredConfig = TypeSerializerUtil.readSerializerConfigSnapshot( + new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader()); + } + + CompatibilityResult<PublicEnum> compatResult = serializer.ensureCompatibility(restoredConfig); + assertFalse(compatResult.requiresMigration()); + + assertEquals(PublicEnum.FOO.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.FOO).intValue()); + assertEquals(PublicEnum.BAR.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.BAR).intValue()); + assertEquals(PublicEnum.PETER.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PETER).intValue()); + assertEquals(PublicEnum.NATHANIEL.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.NATHANIEL).intValue()); + assertEquals(PublicEnum.EMMA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.EMMA).intValue()); + assertEquals(PublicEnum.PAULA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PAULA).intValue()); + assertTrue(Arrays.equals(PublicEnum.values(), serializer.getValues())); + } + + @Test + public void testSerializeEnumSerializer() throws Exception { + EnumSerializer<PublicEnum> serializer = new EnumSerializer<>(PublicEnum.class); + + // verify original transient parameters + assertEquals(PublicEnum.FOO.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.FOO).intValue()); + assertEquals(PublicEnum.BAR.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.BAR).intValue()); + assertEquals(PublicEnum.PETER.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PETER).intValue()); + assertEquals(PublicEnum.NATHANIEL.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.NATHANIEL).intValue()); + assertEquals(PublicEnum.EMMA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.EMMA).intValue()); + assertEquals(PublicEnum.PAULA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PAULA).intValue()); + assertTrue(Arrays.equals(PublicEnum.values(), serializer.getValues())); + + byte[] serializedSerializer = InstantiationUtil.serializeObject(serializer); + + // deserialize and re-verify transient parameters + serializer = InstantiationUtil.deserializeObject(serializedSerializer, Thread.currentThread().getContextClassLoader()); + assertEquals(PublicEnum.FOO.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.FOO).intValue()); + assertEquals(PublicEnum.BAR.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.BAR).intValue()); + assertEquals(PublicEnum.PETER.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PETER).intValue()); + assertEquals(PublicEnum.NATHANIEL.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.NATHANIEL).intValue()); + assertEquals(PublicEnum.EMMA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.EMMA).intValue()); + assertEquals(PublicEnum.PAULA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PAULA).intValue()); + assertTrue(Arrays.equals(PublicEnum.values(), serializer.getValues())); + } + + @Test + public void testSerializeReconfiguredEnumSerializer() throws Exception { + // mock the previous ordering of enum constants to be BAR, PAULA, NATHANIEL + PublicEnum[] mockPreviousOrder = {PublicEnum.BAR, PublicEnum.PAULA, PublicEnum.NATHANIEL}; + + // now, the actual order of FOO, BAR, PETER, NATHANIEL, EMMA, PAULA will be the "new wrong order" + EnumSerializer<PublicEnum> serializer = new EnumSerializer<>(PublicEnum.class); + + // verify that the serializer is first using the "wrong order" (i.e., the initial new configuration) + assertEquals(PublicEnum.FOO.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.FOO).intValue()); + assertEquals(PublicEnum.BAR.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.BAR).intValue()); + assertEquals(PublicEnum.PETER.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PETER).intValue()); + assertEquals(PublicEnum.NATHANIEL.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.NATHANIEL).intValue()); + assertEquals(PublicEnum.EMMA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.EMMA).intValue()); + assertEquals(PublicEnum.PAULA.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.PAULA).intValue()); + + // reconfigure and verify compatibility + CompatibilityResult<PublicEnum> compatResult = serializer.ensureCompatibility( + new EnumSerializer.EnumSerializerConfigSnapshot<>(PublicEnum.class, mockPreviousOrder)); + assertFalse(compatResult.requiresMigration()); + + // serialize and deserialize again the serializer + byte[] serializedSerializer = InstantiationUtil.serializeObject(serializer); + serializer = InstantiationUtil.deserializeObject(serializedSerializer, Thread.currentThread().getContextClassLoader()); + + // verify that after the serializer was read, the reconfigured constant ordering is untouched + PublicEnum[] expectedOrder = {PublicEnum.BAR, PublicEnum.PAULA, PublicEnum.NATHANIEL, PublicEnum.FOO, PublicEnum.PETER, PublicEnum.EMMA}; + + int i = 0; + for (PublicEnum constant : expectedOrder) { + assertEquals(i, serializer.getValueToOrdinal().get(constant).intValue()); + i++; + } + + assertTrue(Arrays.equals(expectedOrder, serializer.getValues())); + } + @SafeVarargs public final <T extends Enum<T>> void testEnumSerializer(T... data) { @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializerTest.java index 9fda3d0..3301aa2 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializerTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializerTest.java @@ -20,8 +20,6 @@ package org.apache.flink.api.common.typeutils.base.array; import org.apache.flink.api.common.typeutils.SerializerTestBase; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.array.CharPrimitiveArraySerializer; -import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerializer; /** * A test for the {@link LongPrimitiveArraySerializer}.
