[FLINK-6801] [core] Relax missing fields check when reading 
PojoSerializerConfigSnapshot

Prior to this commit, when reading the PojoSerializerConfigSnapshot, if
the underlying POJO type has a missing field, then the read would fail.
Failing the deserialization of the config snapshot is too severe,
because that would leave no oppurtunity to restore the checkpoint at
all, whereas we should be able to restore the config and provide it to
the new PojoSerializer for the change of getting a convert deserializer.

This commit changes this by only restoring the field names when reading
the PojoSerializerConfigSnapshot. In PojoSerializer.ensureCompatibility,
the field name is used to lookup the fields of the new PojoSerializer.
This change does not change the serialization format of the
PojoSerializerConfigSnapshot.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b8f239d7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b8f239d7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b8f239d7

Branch: refs/heads/release-1.3
Commit: b8f239d723aa4dd145835f6f8de33b89b4bded92
Parents: deaf4bf
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Sun Jun 4 12:30:58 2017 +0200
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Tue Jun 13 07:48:32 2017 +0200

----------------------------------------------------------------------
 .../java/typeutils/runtime/PojoSerializer.java  | 65 +++++++-------------
 .../typeutils/runtime/PojoSerializerTest.java   | 18 +++---
 2 files changed, 32 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b8f239d7/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index 2311158..7818897 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -600,18 +600,18 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                                                (TypeSerializer<Object>[]) new 
TypeSerializer<?>[this.numFields];
 
                                        int i = 0;
-                                       for (Map.Entry<Field, 
Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> 
fieldToConfigSnapshotEntry
+                                       for (Map.Entry<String, 
Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> 
fieldToConfigSnapshotEntry
                                                        : 
config.getFieldToSerializerConfigSnapshot().entrySet()) {
 
                                                int fieldIndex = 
findField(fieldToConfigSnapshotEntry.getKey());
                                                if (fieldIndex != -1) {
-                                                       reorderedFields[i] = 
fieldToConfigSnapshotEntry.getKey();
+                                                       reorderedFields[i] = 
fields[fieldIndex];
 
                                                        compatResult = 
CompatibilityUtil.resolveCompatibilityResult(
-                                                                       
fieldToConfigSnapshotEntry.getValue().f0,
-                                                                       
UnloadableDummyTypeSerializer.class,
-                                                                       
fieldToConfigSnapshotEntry.getValue().f1,
-                                                                       
fieldSerializers[fieldIndex]);
+                                                               
fieldToConfigSnapshotEntry.getValue().f0,
+                                                               
UnloadableDummyTypeSerializer.class,
+                                                               
fieldToConfigSnapshotEntry.getValue().f1,
+                                                               
fieldSerializers[fieldIndex]);
 
                                                        if 
(compatResult.isRequiresMigration()) {
                                                                
requiresMigration = true;
@@ -745,7 +745,7 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                 * may reorder the fields in case they are different. The order 
of the fields need to
                 * stay the same for binary compatibility, as the field order 
is part of the serialization format.
                 */
-               private LinkedHashMap<Field, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot;
+               private LinkedHashMap<String, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot;
 
                /**
                 * Ordered map of registered subclasses to their corresponding 
serializers and its configuration snapshots.
@@ -771,7 +771,7 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
 
                public PojoSerializerConfigSnapshot(
                                Class<T> pojoType,
-                               LinkedHashMap<Field, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot,
+                               LinkedHashMap<String, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot,
                                LinkedHashMap<Class<?>, 
Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> 
registeredSubclassesToSerializerConfigSnapshots,
                                HashMap<Class<?>, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> 
nonRegisteredSubclassesToSerializerConfigSnapshots) {
 
@@ -785,7 +785,7 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
 
                public PojoSerializerConfigSnapshot(
                                Class<T> pojoType,
-                               LinkedHashMap<Field, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot,
+                               LinkedHashMap<String, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshot,
                                LinkedHashMap<Class<?>, 
Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> 
registeredSubclassesToSerializerConfigSnapshots,
                                HashMap<Class<?>, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> 
nonRegisteredSubclassesToSerializerConfigSnapshots,
                                boolean ignoreTypeSerializerSerialization) {
@@ -813,10 +813,10 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                                // --- write fields and their serializers, in 
order
 
                                
out.writeInt(fieldToSerializerConfigSnapshot.size());
-                               for (Map.Entry<Field, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> entry
+                               for (Map.Entry<String, 
Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> entry
                                                : 
fieldToSerializerConfigSnapshot.entrySet()) {
 
-                                       
outViewWrapper.writeUTF(entry.getKey().getName());
+                                       outViewWrapper.writeUTF(entry.getKey());
 
                                        out.writeInt(outWithPos.getPosition());
                                        if (!ignoreTypeSerializerSerialization) 
{
@@ -904,39 +904,20 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
 
                                this.fieldToSerializerConfigSnapshot = new 
LinkedHashMap<>(numFields);
                                String fieldName;
-                               Field field;
                                TypeSerializer<?> fieldSerializer;
                                TypeSerializerConfigSnapshot 
fieldSerializerConfigSnapshot;
                                for (int i = 0; i < numFields; i++) {
                                        fieldName = inViewWrapper.readUTF();
 
-                                       // search all superclasses for the field
-                                       Class<?> clazz = getTypeClass();
-                                       field = null;
-                                       while (clazz != null) {
-                                               try {
-                                                       field = 
clazz.getDeclaredField(fieldName);
-                                                       
field.setAccessible(true);
-                                                       break;
-                                               } catch (NoSuchFieldException 
e) {
-                                                       clazz = 
clazz.getSuperclass();
-                                               }
-                                       }
+                                       
inWithPos.setPosition(fieldSerializerOffsets[i * 2]);
+                                       fieldSerializer = 
TypeSerializerSerializationUtil.tryReadSerializer(inViewWrapper, 
getUserCodeClassLoader());
 
-                                       if (field == null) {
-                                               // the field no longer exists 
in the POJO
-                                               throw new IOException("Can't 
find field " + fieldName + " in POJO class " + getTypeClass().getName());
-                                       } else {
-                                               
inWithPos.setPosition(fieldSerializerOffsets[i * 2]);
-                                               fieldSerializer = 
TypeSerializerSerializationUtil.tryReadSerializer(inViewWrapper, 
getUserCodeClassLoader());
-
-                                               
inWithPos.setPosition(fieldSerializerOffsets[i * 2 + 1]);
-                                               fieldSerializerConfigSnapshot = 
TypeSerializerSerializationUtil.readSerializerConfigSnapshot(inViewWrapper, 
getUserCodeClassLoader());
+                                       
inWithPos.setPosition(fieldSerializerOffsets[i * 2 + 1]);
+                                       fieldSerializerConfigSnapshot = 
TypeSerializerSerializationUtil.readSerializerConfigSnapshot(inViewWrapper, 
getUserCodeClassLoader());
 
-                                               
fieldToSerializerConfigSnapshot.put(
-                                                       field,
-                                                       new 
Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(fieldSerializer, 
fieldSerializerConfigSnapshot));
-                                       }
+                                       fieldToSerializerConfigSnapshot.put(
+                                               fieldName,
+                                               new Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>(fieldSerializer, fieldSerializerConfigSnapshot));
                                }
 
                                // --- read registered subclasses and their 
serializers, in registration order
@@ -998,7 +979,7 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                        return VERSION;
                }
 
-               public LinkedHashMap<Field, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> getFieldToSerializerConfigSnapshot() {
+               public LinkedHashMap<String, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> getFieldToSerializerConfigSnapshot() {
                        return fieldToSerializerConfigSnapshot;
                }
 
@@ -1144,10 +1125,10 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
         * Finds and returns the order (0-based) of a POJO field.
         * Returns -1 if the field does not exist for this POJO.
         */
-       private int findField(Field f) {
+       private int findField(String fieldName) {
                int foundIndex = 0;
                for (Field field : fields) {
-                       if (f.equals(field)) {
+                       if (fieldName.equals(field.getName())) {
                                return foundIndex;
                        }
 
@@ -1174,12 +1155,12 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                        TypeSerializer<?>[] fieldSerializers,
                        HashMap<Class<?>, TypeSerializer<?>> 
nonRegisteredSubclassSerializerCache) {
 
-               final LinkedHashMap<Field, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshots =
+               final LinkedHashMap<String, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> fieldToSerializerConfigSnapshots =
                        new LinkedHashMap<>(fields.length);
 
                for (int i = 0; i < fields.length; i++) {
                        fieldToSerializerConfigSnapshots.put(
-                               fields[i],
+                               fields[i].getName(),
                                new Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>(fieldSerializers[i], 
fieldSerializers[i].snapshotConfiguration()));
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b8f239d7/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
index 10f4708..e5315aa 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
@@ -484,35 +484,35 @@ public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.Te
                // creating this serializer just for generating config 
snapshots of the field serializers
                PojoSerializer<TestUserClass> ser = 
(PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig());
 
-               LinkedHashMap<Field, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> mockOriginalFieldToSerializerConfigSnapshot =
+               LinkedHashMap<String, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> mockOriginalFieldToSerializerConfigSnapshot =
                        new LinkedHashMap<>(mockOriginalFieldOrder.length);
                mockOriginalFieldToSerializerConfigSnapshot.put(
-                       mockOriginalFieldOrder[0],
+                       mockOriginalFieldOrder[0].getName(),
                        new Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>(
                                ser.getFieldSerializers()[3],
                                
ser.getFieldSerializers()[3].snapshotConfiguration()));
                mockOriginalFieldToSerializerConfigSnapshot.put(
-                       mockOriginalFieldOrder[1],
+                       mockOriginalFieldOrder[1].getName(),
                        new Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>(
                                ser.getFieldSerializers()[2],
                                
ser.getFieldSerializers()[2].snapshotConfiguration()));
                mockOriginalFieldToSerializerConfigSnapshot.put(
-                       mockOriginalFieldOrder[2],
+                       mockOriginalFieldOrder[2].getName(),
                        new Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>(
                                ser.getFieldSerializers()[5],
                                
ser.getFieldSerializers()[5].snapshotConfiguration()));
                mockOriginalFieldToSerializerConfigSnapshot.put(
-                       mockOriginalFieldOrder[3],
+                       mockOriginalFieldOrder[3].getName(),
                        new Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>(
                                ser.getFieldSerializers()[0],
                                
ser.getFieldSerializers()[0].snapshotConfiguration()));
                mockOriginalFieldToSerializerConfigSnapshot.put(
-                       mockOriginalFieldOrder[4],
+                       mockOriginalFieldOrder[4].getName(),
                        new Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>(
                                ser.getFieldSerializers()[1],
                                
ser.getFieldSerializers()[1].snapshotConfiguration()));
                mockOriginalFieldToSerializerConfigSnapshot.put(
-                       mockOriginalFieldOrder[5],
+                       mockOriginalFieldOrder[5].getName(),
                        new Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>(
                                ser.getFieldSerializers()[4],
                                
ser.getFieldSerializers()[4].snapshotConfiguration()));
@@ -579,9 +579,9 @@ public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.Te
                        PojoSerializer.PojoSerializerConfigSnapshot<?> original,
                        PojoSerializer.PojoSerializerConfigSnapshot<?> 
deserializedConfig) {
 
-               LinkedHashMap<Field, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> originalFieldSerializersAndConfs =
+               LinkedHashMap<String, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> originalFieldSerializersAndConfs =
                                original.getFieldToSerializerConfigSnapshot();
-               for (Map.Entry<Field, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> entry
+               for (Map.Entry<String, Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> entry
                                : 
deserializedConfig.getFieldToSerializerConfigSnapshot().entrySet()) {
 
                        Assert.assertEquals(null, entry.getValue().f0);

Reply via email to