[FLINK-6801] [core] Relax missing fields check when reading 
PojoSerializerConfigSnapshot

Prior to this commit, when reading the PojoSerializerConfigSnapshot, if
the underlying POJO type has a missing field, then the read would fail.
Failing the deserialization of the config snapshot is too severe,
because that would leave no oppurtunity to restore the checkpoint at
all, whereas we should be able to restore the config and provide it to
the new PojoSerializer for the change of getting a convert deserializer.

This commit changes this by only restoring the field names when reading
the PojoSerializerConfigSnapshot. In PojoSerializer.ensureCompatibility,
the field name is used to lookup the fields of the new PojoSerializer.
This change does not change the serialization format of the
PojoSerializerConfigSnapshot.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c929eb30
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c929eb30
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c929eb30

Branch: refs/heads/master
Commit: c929eb30867bb1f539c98fe9e47f91790bd85764
Parents: 7c157d6
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Sun Jun 4 12:30:58 2017 +0200
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Tue Jun 13 06:38:17 2017 +0200

----------------------------------------------------------------------
 .../java/typeutils/runtime/PojoSerializer.java  | 65 +++++++-------------
 .../typeutils/runtime/PojoSerializerTest.java   | 18 +++---
 2 files changed, 32 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c929eb30/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index 2311158..7818897 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -600,18 +600,18 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                                                (TypeSerializer<Object>[]) new 
TypeSerializer<?>[this.numFields];
 
                                        int i = 0;
-                                       for (Map.Entry<Field, 
Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> 
fieldToConfigSnapshotEntry
+                                       for (Map.Entry<String, 
Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> 
fieldToConfigSnapshotEntry
                                                        : 
config.getFieldToSerializerConfigSnapshot().entrySet()) {
 
                                                int fieldIndex = 
findField(fieldToConfigSnapshotEntry.getKey());
                                                if (fieldIndex != -1) {
-                                                       reorderedFields[i] = 
fieldToConfigSnapshotEntry.getKey();
+                                                       reorderedFields[i] = 
fields[fieldIndex];
 
                                                        compatResult = 
CompatibilityUtil.resolveCompatibilityResult(
-                                                                       
fieldToConfigSnapshotEntry.getValue().f0,
-                                                                       
UnloadableDummyTypeSerializer.class,
-                                                                       
fieldToConfigSnapshotEntry.getValue().f1,
-                                                                       
fieldSerializers[fieldIndex]);
+                                                               
fieldToConfigSnapshotEntry.getValue().f0,
+                                                               
UnloadableDummyTypeSerializer.class,
+                                                               
fieldToConfigSnapshotEntry.getValue().f1,
+                                                               
fieldSerializers[fieldIndex]);
 
                                                        if 
(compatResult.isRequiresMigration()) {
                                                                
requiresMigration = true;
@@ -745,7 +745,7 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                 * may reorder the fields in case they are different. The order 
of the fields need to
                 * stay the same for binary compatibility, as the field order 
is part of the serialization format.
                 */
-               private LinkedHashMap<Field, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot;
+               private LinkedHashMap<String, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot;
 
                /**
                 * Ordered map of registered subclasses to their corresponding 
serializers and its configuration snapshots.
@@ -771,7 +771,7 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
 
                public PojoSerializerConfigSnapshot(
                                Class<T> pojoType,
-                               LinkedHashMap<Field, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot,
+                               LinkedHashMap<String, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot,
                                LinkedHashMap<Class<?>, 
Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> 
registeredSubclassesToSerializerConfigSnapshots,
                                HashMap<Class<?>, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> 
nonRegisteredSubclassesToSerializerConfigSnapshots) {
 
@@ -785,7 +785,7 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
 
                public PojoSerializerConfigSnapshot(
                                Class<T> pojoType,
-                               LinkedHashMap<Field, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot,
+                               LinkedHashMap<String, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot,
                                LinkedHashMap<Class<?>, 
Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> 
registeredSubclassesToSerializerConfigSnapshots,
                                HashMap<Class<?>, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> 
nonRegisteredSubclassesToSerializerConfigSnapshots,
                                boolean ignoreTypeSerializerSerialization) {
@@ -813,10 +813,10 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                                // --- write fields and their serializers, in 
order
 
                                
out.writeInt(fieldToSerializerConfigSnapshot.size());
-                               for (Map.Entry<Field, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> entry
+                               for (Map.Entry<String, 
Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry
                                                : 
fieldToSerializerConfigSnapshot.entrySet()) {
 
-                                       
outViewWrapper.writeUTF(entry.getKey().getName());
+                                       outViewWrapper.writeUTF(entry.getKey());
 
                                        out.writeInt(outWithPos.getPosition());
                                        if (!ignoreTypeSerializerSerialization) 
{
@@ -904,39 +904,20 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
 
                                this.fieldToSerializerConfigSnapshot = new 
LinkedHashMap<>(numFields);
                                String fieldName;
-                               Field field;
                                TypeSerializer<?> fieldSerializer;
                                TypeSerializerConfigSnapshot 
fieldSerializerConfigSnapshot;
                                for (int i = 0; i < numFields; i++) {
                                        fieldName = inViewWrapper.readUTF();
 
-                                       // search all superclasses for the field
-                                       Class<?> clazz = getTypeClass();
-                                       field = null;
-                                       while (clazz != null) {
-                                               try {
-                                                       field = 
clazz.getDeclaredField(fieldName);
-                                                       
field.setAccessible(true);
-                                                       break;
-                                               } catch (NoSuchFieldException 
e) {
-                                                       clazz = 
clazz.getSuperclass();
-                                               }
-                                       }
+                                       
inWithPos.setPosition(fieldSerializerOffsets[i * 2]);
+                                       fieldSerializer = 
TypeSerializerSerializationUtil.tryReadSerializer(inViewWrapper, 
getUserCodeClassLoader());
 
-                                       if (field == null) {
-                                               // the field no longer exists 
in the POJO
-                                               throw new IOException("Can't 
find field " + fieldName + " in POJO class " + getTypeClass().getName());
-                                       } else {
-                                               
inWithPos.setPosition(fieldSerializerOffsets[i * 2]);
-                                               fieldSerializer = 
TypeSerializerSerializationUtil.tryReadSerializer(inViewWrapper, 
getUserCodeClassLoader());
-
-                                               
inWithPos.setPosition(fieldSerializerOffsets[i * 2 + 1]);
-                                               fieldSerializerConfigSnapshot = 
TypeSerializerSerializationUtil.readSerializerConfigSnapshot(inViewWrapper, 
getUserCodeClassLoader());
+                                       
inWithPos.setPosition(fieldSerializerOffsets[i * 2 + 1]);
+                                       fieldSerializerConfigSnapshot = 
TypeSerializerSerializationUtil.readSerializerConfigSnapshot(inViewWrapper, 
getUserCodeClassLoader());
 
-                                               
fieldToSerializerConfigSnapshot.put(
-                                                       field,
-                                                       new 
Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(fieldSerializer, 
fieldSerializerConfigSnapshot));
-                                       }
+                                       fieldToSerializerConfigSnapshot.put(
+                                               fieldName,
+                                               new Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>(fieldSerializer, fieldSerializerConfigSnapshot));
                                }
 
                                // --- read registered subclasses and their 
serializers, in registration order
@@ -998,7 +979,7 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                        return VERSION;
                }
 
-               public LinkedHashMap<Field, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> getFieldToSerializerConfigSnapshot() {
+               public LinkedHashMap<String, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> getFieldToSerializerConfigSnapshot() {
                        return fieldToSerializerConfigSnapshot;
                }
 
@@ -1144,10 +1125,10 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
         * Finds and returns the order (0-based) of a POJO field.
         * Returns -1 if the field does not exist for this POJO.
         */
-       private int findField(Field f) {
+       private int findField(String fieldName) {
                int foundIndex = 0;
                for (Field field : fields) {
-                       if (f.equals(field)) {
+                       if (fieldName.equals(field.getName())) {
                                return foundIndex;
                        }
 
@@ -1174,12 +1155,12 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                        TypeSerializer<?>[] fieldSerializers,
                        HashMap<Class<?>, TypeSerializer<?>> 
nonRegisteredSubclassSerializerCache) {
 
-               final LinkedHashMap<Field, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshots =
+               final LinkedHashMap<String, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshots =
                        new LinkedHashMap<>(fields.length);
 
                for (int i = 0; i < fields.length; i++) {
                        fieldToSerializerConfigSnapshots.put(
-                               fields[i],
+                               fields[i].getName(),
                                new Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>(fieldSerializers[i], 
fieldSerializers[i].snapshotConfiguration()));
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c929eb30/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
index 10f4708..e5315aa 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
@@ -484,35 +484,35 @@ public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.Te
                // creating this serializer just for generating config 
snapshots of the field serializers
                PojoSerializer<TestUserClass> ser = 
(PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig());
 
-               LinkedHashMap<Field, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> mockOriginalFieldToSerializerConfigSnapshot =
+               LinkedHashMap<String, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> mockOriginalFieldToSerializerConfigSnapshot =
                        new LinkedHashMap<>(mockOriginalFieldOrder.length);
                mockOriginalFieldToSerializerConfigSnapshot.put(
-                       mockOriginalFieldOrder[0],
+                       mockOriginalFieldOrder[0].getName(),
                        new Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>(
                                ser.getFieldSerializers()[3],
                                
ser.getFieldSerializers()[3].snapshotConfiguration()));
                mockOriginalFieldToSerializerConfigSnapshot.put(
-                       mockOriginalFieldOrder[1],
+                       mockOriginalFieldOrder[1].getName(),
                        new Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>(
                                ser.getFieldSerializers()[2],
                                
ser.getFieldSerializers()[2].snapshotConfiguration()));
                mockOriginalFieldToSerializerConfigSnapshot.put(
-                       mockOriginalFieldOrder[2],
+                       mockOriginalFieldOrder[2].getName(),
                        new Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>(
                                ser.getFieldSerializers()[5],
                                
ser.getFieldSerializers()[5].snapshotConfiguration()));
                mockOriginalFieldToSerializerConfigSnapshot.put(
-                       mockOriginalFieldOrder[3],
+                       mockOriginalFieldOrder[3].getName(),
                        new Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>(
                                ser.getFieldSerializers()[0],
                                
ser.getFieldSerializers()[0].snapshotConfiguration()));
                mockOriginalFieldToSerializerConfigSnapshot.put(
-                       mockOriginalFieldOrder[4],
+                       mockOriginalFieldOrder[4].getName(),
                        new Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>(
                                ser.getFieldSerializers()[1],
                                
ser.getFieldSerializers()[1].snapshotConfiguration()));
                mockOriginalFieldToSerializerConfigSnapshot.put(
-                       mockOriginalFieldOrder[5],
+                       mockOriginalFieldOrder[5].getName(),
                        new Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>(
                                ser.getFieldSerializers()[4],
                                
ser.getFieldSerializers()[4].snapshotConfiguration()));
@@ -579,9 +579,9 @@ public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.Te
                        PojoSerializer.PojoSerializerConfigSnapshot<?> original,
                        PojoSerializer.PojoSerializerConfigSnapshot<?> 
deserializedConfig) {
 
-               LinkedHashMap<Field, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> originalFieldSerializersAndConfs =
+               LinkedHashMap<String, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> originalFieldSerializersAndConfs =
                                original.getFieldToSerializerConfigSnapshot();
-               for (Map.Entry<Field, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> entry
+               for (Map.Entry<String, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> entry
                                : 
deserializedConfig.getFieldToSerializerConfigSnapshot().entrySet()) {
 
                        Assert.assertEquals(null, entry.getValue().f0);

Reply via email to