[FLINK-6801] [core] Allow deserialized PojoSerializer to have removed fields

Prior to this commit, deserializing the PojoSerializer would fail when
we encounter a missing field that existed in the POJO type before. It is
actually perfectly fine to have a missing field; the deserialized
PojoSerializer should simply skip reading the removed field's previously
serialized values, i.e. much like how Java Object Serialization works.

This commit relaxes the deserialization of the PojoSerializer, so that a
null will be used as a placeholder value to indicate a removed field
that previously existed. De-/serialization and copying methods on the
PojoSerializer will respect null Fields and simply skip them.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8a0276af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8a0276af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8a0276af

Branch: refs/heads/release-1.3
Commit: 8a0276af6f65814d945bef1372c199f96f265bb5
Parents: b8f239d
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Sun Jun 4 20:41:59 2017 +0200
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Tue Jun 13 07:48:32 2017 +0200

----------------------------------------------------------------------
 .../java/typeutils/runtime/FieldSerializer.java |   4 +-
 .../java/typeutils/runtime/PojoSerializer.java  | 109 +++++++++++--------
 2 files changed, 64 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8a0276af/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java
index 56a4445..5d23b91 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java
@@ -51,7 +51,7 @@ public class FieldSerializer {
                                clazz = clazz.getSuperclass();
                        }
                }
-               throw new IOException("Class resolved at TaskManager is not 
compatible with class read during Plan setup."
-                               + " (" + fieldName + ")");
+
+               return null;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8a0276af/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index 7818897..6a67428 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -204,10 +204,12 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
 
        protected void initializeFields(T t) {
                for (int i = 0; i < numFields; i++) {
-                       try {
-                               fields[i].set(t, 
fieldSerializers[i].createInstance());
-                       } catch (IllegalAccessException e) {
-                               throw new RuntimeException("Cannot initialize 
fields.", e);
+                       if (fields[i] != null) {
+                               try {
+                                       fields[i].set(t, 
fieldSerializers[i].createInstance());
+                               } catch (IllegalAccessException e) {
+                                       throw new RuntimeException("Cannot 
initialize fields.", e);
+                               }
                        }
                }
        }
@@ -231,13 +233,14 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                        // no subclass
                        try {
                                for (int i = 0; i < numFields; i++) {
-                                       Object value = fields[i].get(from);
-                                       if (value != null) {
-                                               Object copy = 
fieldSerializers[i].copy(value);
-                                               fields[i].set(target, copy);
-                                       }
-                                       else {
-                                               fields[i].set(target, null);
+                                       if (fields[i] != null) {
+                                               Object value = 
fields[i].get(from);
+                                               if (value != null) {
+                                                       Object copy = 
fieldSerializers[i].copy(value);
+                                                       fields[i].set(target, 
copy);
+                                               } else {
+                                                       fields[i].set(target, 
null);
+                                               }
                                        }
                                }
                        } catch (IllegalAccessException e) {
@@ -268,20 +271,20 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                if (actualType == clazz) {
                        try {
                                for (int i = 0; i < numFields; i++) {
-                                       Object value = fields[i].get(from);
-                                       if (value != null) {
-                                               Object reuseValue = 
fields[i].get(reuse);
-                                               Object copy;
-                                               if(reuseValue != null) {
-                                                       copy = 
fieldSerializers[i].copy(value, reuseValue);
-                                               }
-                                               else {
-                                                       copy = 
fieldSerializers[i].copy(value);
+                                       if (fields[i] != null) {
+                                               Object value = 
fields[i].get(from);
+                                               if (value != null) {
+                                                       Object reuseValue = 
fields[i].get(reuse);
+                                                       Object copy;
+                                                       if (reuseValue != null) 
{
+                                                               copy = 
fieldSerializers[i].copy(value, reuseValue);
+                                                       } else {
+                                                               copy = 
fieldSerializers[i].copy(value);
+                                                       }
+                                                       fields[i].set(reuse, 
copy);
+                                               } else {
+                                                       fields[i].set(reuse, 
null);
                                                }
-                                               fields[i].set(reuse, copy);
-                                       }
-                                       else {
-                                               fields[i].set(reuse, null);
                                        }
                                }
                        } catch (IllegalAccessException e) {
@@ -342,7 +345,7 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                if ((flags & NO_SUBCLASS) != 0) {
                        try {
                                for (int i = 0; i < numFields; i++) {
-                                       Object o = fields[i].get(value);
+                                       Object o = (fields[i] != null) ? 
fields[i].get(value) : null;
                                        if (o == null) {
                                                target.writeBoolean(true); // 
null field handling
                                        } else {
@@ -400,11 +403,17 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                        try {
                                for (int i = 0; i < numFields; i++) {
                                        boolean isNull = source.readBoolean();
-                                       if (isNull) {
-                                               fields[i].set(target, null);
-                                       } else {
-                                               Object field = 
fieldSerializers[i].deserialize(source);
-                                               fields[i].set(target, field);
+
+                                       if (fields[i] != null) {
+                                               if (isNull) {
+                                                       fields[i].set(target, 
null);
+                                               } else {
+                                                       Object field = 
fieldSerializers[i].deserialize(source);
+                                                       fields[i].set(target, 
field);
+                                               }
+                                       } else if (!isNull) {
+                                               // read and dump a pre-existing 
field value
+                                               
fieldSerializers[i].deserialize(source);
                                        }
                                }
                        } catch (IllegalAccessException e) {
@@ -465,20 +474,25 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                        try {
                                for (int i = 0; i < numFields; i++) {
                                        boolean isNull = source.readBoolean();
-                                       if (isNull) {
-                                               fields[i].set(reuse, null);
-                                       } else {
-                                               Object field;
 
-                                               Object reuseField = 
fields[i].get(reuse);
-                                               if(reuseField != null) {
-                                                       field = 
fieldSerializers[i].deserialize(reuseField, source);
-                                               }
-                                               else {
-                                                       field = 
fieldSerializers[i].deserialize(source);
-                                               }
+                                       if (fields[i] != null) {
+                                               if (isNull) {
+                                                       fields[i].set(reuse, 
null);
+                                               } else {
+                                                       Object field;
 
-                                               fields[i].set(reuse, field);
+                                                       Object reuseField = 
fields[i].get(reuse);
+                                                       if (reuseField != null) 
{
+                                                               field = 
fieldSerializers[i].deserialize(reuseField, source);
+                                                       } else {
+                                                               field = 
fieldSerializers[i].deserialize(source);
+                                                       }
+
+                                                       fields[i].set(reuse, 
field);
+                                               }
+                                       } else if (!isNull) {
+                                               // read and dump a pre-existing 
field value
+                                               
fieldSerializers[i].deserialize(source);
                                        }
                                }
                        } catch (IllegalAccessException e) {
@@ -1012,8 +1026,7 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
 
        // 
--------------------------------------------------------------------------------------------
 
-       private void writeObject(ObjectOutputStream out)
-               throws IOException, ClassNotFoundException {
+       private void writeObject(ObjectOutputStream out) throws IOException, 
ClassNotFoundException {
                out.defaultWriteObject();
                out.writeInt(fields.length);
                for (Field field: fields) {
@@ -1021,12 +1034,14 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                }
        }
 
-       private void readObject(ObjectInputStream in)
-               throws IOException, ClassNotFoundException {
+       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
                in.defaultReadObject();
                int numFields = in.readInt();
                fields = new Field[numFields];
                for (int i = 0; i < numFields; i++) {
+                       // the deserialized Field may be null if the field no 
longer exists in the POJO;
+                       // in this case, when de-/serializing and copying 
instances using this serializer
+                       // instance, the missing fields will simply be skipped
                        fields[i] = FieldSerializer.deserializeField(in);
                }
 
@@ -1128,7 +1143,7 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
        private int findField(String fieldName) {
                int foundIndex = 0;
                for (Field field : fields) {
-                       if (fieldName.equals(field.getName())) {
+                       if (field != null && fieldName.equals(field.getName())) 
{
                                return foundIndex;
                        }
 

Reply via email to