[FLINK-6801] [core] Relax missing fields check when reading PojoSerializerConfigSnapshot
Prior to this commit, when reading the PojoSerializerConfigSnapshot, if the underlying POJO type has a missing field, then the read would fail. Failing the deserialization of the config snapshot is too severe, because that would leave no oppurtunity to restore the checkpoint at all, whereas we should be able to restore the config and provide it to the new PojoSerializer for the change of getting a convert deserializer. This commit changes this by only restoring the field names when reading the PojoSerializerConfigSnapshot. In PojoSerializer.ensureCompatibility, the field name is used to lookup the fields of the new PojoSerializer. This change does not change the serialization format of the PojoSerializerConfigSnapshot. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b8f239d7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b8f239d7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b8f239d7 Branch: refs/heads/release-1.3 Commit: b8f239d723aa4dd145835f6f8de33b89b4bded92 Parents: deaf4bf Author: Tzu-Li (Gordon) Tai <[email protected]> Authored: Sun Jun 4 12:30:58 2017 +0200 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Tue Jun 13 07:48:32 2017 +0200 ---------------------------------------------------------------------- .../java/typeutils/runtime/PojoSerializer.java | 65 +++++++------------- .../typeutils/runtime/PojoSerializerTest.java | 18 +++--- 2 files changed, 32 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b8f239d7/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java index 2311158..7818897 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java @@ -600,18 +600,18 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { (TypeSerializer<Object>[]) new TypeSerializer<?>[this.numFields]; int i = 0; - for (Map.Entry<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToConfigSnapshotEntry + for (Map.Entry<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToConfigSnapshotEntry : config.getFieldToSerializerConfigSnapshot().entrySet()) { int fieldIndex = findField(fieldToConfigSnapshotEntry.getKey()); if (fieldIndex != -1) { - reorderedFields[i] = fieldToConfigSnapshotEntry.getKey(); + reorderedFields[i] = fields[fieldIndex]; compatResult = CompatibilityUtil.resolveCompatibilityResult( - fieldToConfigSnapshotEntry.getValue().f0, - UnloadableDummyTypeSerializer.class, - fieldToConfigSnapshotEntry.getValue().f1, - fieldSerializers[fieldIndex]); + fieldToConfigSnapshotEntry.getValue().f0, + UnloadableDummyTypeSerializer.class, + fieldToConfigSnapshotEntry.getValue().f1, + fieldSerializers[fieldIndex]); if (compatResult.isRequiresMigration()) { requiresMigration = true; @@ -745,7 +745,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { * may reorder the fields in case they are different. The order of the fields need to * stay the same for binary compatibility, as the field order is part of the serialization format. */ - private LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot; + private LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot; /** * Ordered map of registered subclasses to their corresponding serializers and its configuration snapshots. @@ -771,7 +771,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { public PojoSerializerConfigSnapshot( Class<T> pojoType, - LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot, + LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot, LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> registeredSubclassesToSerializerConfigSnapshots, HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> nonRegisteredSubclassesToSerializerConfigSnapshots) { @@ -785,7 +785,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { public PojoSerializerConfigSnapshot( Class<T> pojoType, - LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot, + LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot, LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> registeredSubclassesToSerializerConfigSnapshots, HashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> nonRegisteredSubclassesToSerializerConfigSnapshots, boolean ignoreTypeSerializerSerialization) { @@ -813,10 +813,10 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { // --- write fields and their serializers, in order out.writeInt(fieldToSerializerConfigSnapshot.size()); - for (Map.Entry<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry + for (Map.Entry<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry : fieldToSerializerConfigSnapshot.entrySet()) { - outViewWrapper.writeUTF(entry.getKey().getName()); + outViewWrapper.writeUTF(entry.getKey()); out.writeInt(outWithPos.getPosition()); if (!ignoreTypeSerializerSerialization) { @@ -904,39 +904,20 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { this.fieldToSerializerConfigSnapshot = new LinkedHashMap<>(numFields); String fieldName; - Field field; TypeSerializer<?> fieldSerializer; TypeSerializerConfigSnapshot fieldSerializerConfigSnapshot; for (int i = 0; i < numFields; i++) { fieldName = inViewWrapper.readUTF(); - // search all superclasses for the field - Class<?> clazz = getTypeClass(); - field = null; - while (clazz != null) { - try { - field = clazz.getDeclaredField(fieldName); - field.setAccessible(true); - break; - } catch (NoSuchFieldException e) { - clazz = clazz.getSuperclass(); - } - } + inWithPos.setPosition(fieldSerializerOffsets[i * 2]); + fieldSerializer = TypeSerializerSerializationUtil.tryReadSerializer(inViewWrapper, getUserCodeClassLoader()); - if (field == null) { - // the field no longer exists in the POJO - throw new IOException("Can't find field " + fieldName + " in POJO class " + getTypeClass().getName()); - } else { - inWithPos.setPosition(fieldSerializerOffsets[i * 2]); - fieldSerializer = TypeSerializerSerializationUtil.tryReadSerializer(inViewWrapper, getUserCodeClassLoader()); - - inWithPos.setPosition(fieldSerializerOffsets[i * 2 + 1]); - fieldSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(inViewWrapper, getUserCodeClassLoader()); + inWithPos.setPosition(fieldSerializerOffsets[i * 2 + 1]); + fieldSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(inViewWrapper, getUserCodeClassLoader()); - fieldToSerializerConfigSnapshot.put( - field, - new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(fieldSerializer, fieldSerializerConfigSnapshot)); - } + fieldToSerializerConfigSnapshot.put( + fieldName, + new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(fieldSerializer, fieldSerializerConfigSnapshot)); } // --- read registered subclasses and their serializers, in registration order @@ -998,7 +979,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { return VERSION; } - public LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> getFieldToSerializerConfigSnapshot() { + public LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> getFieldToSerializerConfigSnapshot() { return fieldToSerializerConfigSnapshot; } @@ -1144,10 +1125,10 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { * Finds and returns the order (0-based) of a POJO field. * Returns -1 if the field does not exist for this POJO. */ - private int findField(Field f) { + private int findField(String fieldName) { int foundIndex = 0; for (Field field : fields) { - if (f.equals(field)) { + if (fieldName.equals(field.getName())) { return foundIndex; } @@ -1174,12 +1155,12 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { TypeSerializer<?>[] fieldSerializers, HashMap<Class<?>, TypeSerializer<?>> nonRegisteredSubclassSerializerCache) { - final LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshots = + final LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshots = new LinkedHashMap<>(fields.length); for (int i = 0; i < fields.length; i++) { fieldToSerializerConfigSnapshots.put( - fields[i], + fields[i].getName(), new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(fieldSerializers[i], fieldSerializers[i].snapshotConfiguration())); } http://git-wip-us.apache.org/repos/asf/flink/blob/b8f239d7/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java index 10f4708..e5315aa 100644 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java @@ -484,35 +484,35 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te // creating this serializer just for generating config snapshots of the field serializers PojoSerializer<TestUserClass> ser = (PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig()); - LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> mockOriginalFieldToSerializerConfigSnapshot = + LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> mockOriginalFieldToSerializerConfigSnapshot = new LinkedHashMap<>(mockOriginalFieldOrder.length); mockOriginalFieldToSerializerConfigSnapshot.put( - mockOriginalFieldOrder[0], + mockOriginalFieldOrder[0].getName(), new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>( ser.getFieldSerializers()[3], ser.getFieldSerializers()[3].snapshotConfiguration())); mockOriginalFieldToSerializerConfigSnapshot.put( - mockOriginalFieldOrder[1], + mockOriginalFieldOrder[1].getName(), new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>( ser.getFieldSerializers()[2], ser.getFieldSerializers()[2].snapshotConfiguration())); mockOriginalFieldToSerializerConfigSnapshot.put( - mockOriginalFieldOrder[2], + mockOriginalFieldOrder[2].getName(), new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>( ser.getFieldSerializers()[5], ser.getFieldSerializers()[5].snapshotConfiguration())); mockOriginalFieldToSerializerConfigSnapshot.put( - mockOriginalFieldOrder[3], + mockOriginalFieldOrder[3].getName(), new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>( ser.getFieldSerializers()[0], ser.getFieldSerializers()[0].snapshotConfiguration())); mockOriginalFieldToSerializerConfigSnapshot.put( - mockOriginalFieldOrder[4], + mockOriginalFieldOrder[4].getName(), new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>( ser.getFieldSerializers()[1], ser.getFieldSerializers()[1].snapshotConfiguration())); mockOriginalFieldToSerializerConfigSnapshot.put( - mockOriginalFieldOrder[5], + mockOriginalFieldOrder[5].getName(), new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>( ser.getFieldSerializers()[4], ser.getFieldSerializers()[4].snapshotConfiguration())); @@ -579,9 +579,9 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te PojoSerializer.PojoSerializerConfigSnapshot<?> original, PojoSerializer.PojoSerializerConfigSnapshot<?> deserializedConfig) { - LinkedHashMap<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> originalFieldSerializersAndConfs = + LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> originalFieldSerializersAndConfs = original.getFieldToSerializerConfigSnapshot(); - for (Map.Entry<Field, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry + for (Map.Entry<String, Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry : deserializedConfig.getFieldToSerializerConfigSnapshot().entrySet()) { Assert.assertEquals(null, entry.getValue().f0);
