[FLINK-6801] [core] Allow deserialized PojoSerializer to have removed fields
Prior to this commit, deserializing the PojoSerializer would fail when we encounter a missing field that existed in the POJO type before. It is actually perfectly fine to have a missing field; the deserialized PojoSerializer should simply skip reading the removed field's previously serialized values, i.e. much like how Java Object Serialization works. This commit relaxes the deserialization of the PojoSerializer, so that a null will be used as a placeholder value to indicate a removed field that previously existed. De-/serialization and copying methods on the PojoSerializer will respect null Fields and simply skip them. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ae285f9b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ae285f9b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ae285f9b Branch: refs/heads/master Commit: ae285f9bd5398fe4d8d86eb3207bbc5beb8a24c8 Parents: c929eb3 Author: Tzu-Li (Gordon) Tai <[email protected]> Authored: Sun Jun 4 20:41:59 2017 +0200 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Tue Jun 13 06:38:17 2017 +0200 ---------------------------------------------------------------------- .../java/typeutils/runtime/FieldSerializer.java | 4 +- .../java/typeutils/runtime/PojoSerializer.java | 109 +++++++++++-------- 2 files changed, 64 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ae285f9b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java index 56a4445..5d23b91 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java @@ -51,7 +51,7 @@ public class FieldSerializer { clazz = clazz.getSuperclass(); } } - throw new IOException("Class resolved at TaskManager is not compatible with class read during Plan setup." - + " (" + fieldName + ")"); + + return null; } } http://git-wip-us.apache.org/repos/asf/flink/blob/ae285f9b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java index 7818897..6a67428 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java @@ -204,10 +204,12 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { protected void initializeFields(T t) { for (int i = 0; i < numFields; i++) { - try { - fields[i].set(t, fieldSerializers[i].createInstance()); - } catch (IllegalAccessException e) { - throw new RuntimeException("Cannot initialize fields.", e); + if (fields[i] != null) { + try { + fields[i].set(t, fieldSerializers[i].createInstance()); + } catch (IllegalAccessException e) { + throw new RuntimeException("Cannot initialize fields.", e); + } } } } @@ -231,13 +233,14 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { // no subclass try { for (int i = 0; i < numFields; i++) { - Object value = fields[i].get(from); - if (value != null) { - Object copy = fieldSerializers[i].copy(value); - fields[i].set(target, copy); - } - else { - fields[i].set(target, null); + if (fields[i] != null) { + Object value = fields[i].get(from); + if (value != null) { + Object copy = fieldSerializers[i].copy(value); + fields[i].set(target, copy); + } else { + fields[i].set(target, null); + } } } } catch (IllegalAccessException e) { @@ -268,20 +271,20 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { if (actualType == clazz) { try { for (int i = 0; i < numFields; i++) { - Object value = fields[i].get(from); - if (value != null) { - Object reuseValue = fields[i].get(reuse); - Object copy; - if(reuseValue != null) { - copy = fieldSerializers[i].copy(value, reuseValue); - } - else { - copy = fieldSerializers[i].copy(value); + if (fields[i] != null) { + Object value = fields[i].get(from); + if (value != null) { + Object reuseValue = fields[i].get(reuse); + Object copy; + if (reuseValue != null) { + copy = fieldSerializers[i].copy(value, reuseValue); + } else { + copy = fieldSerializers[i].copy(value); + } + fields[i].set(reuse, copy); + } else { + fields[i].set(reuse, null); } - fields[i].set(reuse, copy); - } - else { - fields[i].set(reuse, null); } } } catch (IllegalAccessException e) { @@ -342,7 +345,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { if ((flags & NO_SUBCLASS) != 0) { try { for (int i = 0; i < numFields; i++) { - Object o = fields[i].get(value); + Object o = (fields[i] != null) ? fields[i].get(value) : null; if (o == null) { target.writeBoolean(true); // null field handling } else { @@ -400,11 +403,17 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { try { for (int i = 0; i < numFields; i++) { boolean isNull = source.readBoolean(); - if (isNull) { - fields[i].set(target, null); - } else { - Object field = fieldSerializers[i].deserialize(source); - fields[i].set(target, field); + + if (fields[i] != null) { + if (isNull) { + fields[i].set(target, null); + } else { + Object field = fieldSerializers[i].deserialize(source); + fields[i].set(target, field); + } + } else if (!isNull) { + // read and dump a pre-existing field value + fieldSerializers[i].deserialize(source); } } } catch (IllegalAccessException e) { @@ -465,20 +474,25 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { try { for (int i = 0; i < numFields; i++) { boolean isNull = source.readBoolean(); - if (isNull) { - fields[i].set(reuse, null); - } else { - Object field; - Object reuseField = fields[i].get(reuse); - if(reuseField != null) { - field = fieldSerializers[i].deserialize(reuseField, source); - } - else { - field = fieldSerializers[i].deserialize(source); - } + if (fields[i] != null) { + if (isNull) { + fields[i].set(reuse, null); + } else { + Object field; - fields[i].set(reuse, field); + Object reuseField = fields[i].get(reuse); + if (reuseField != null) { + field = fieldSerializers[i].deserialize(reuseField, source); + } else { + field = fieldSerializers[i].deserialize(source); + } + + fields[i].set(reuse, field); + } + } else if (!isNull) { + // read and dump a pre-existing field value + fieldSerializers[i].deserialize(source); } } } catch (IllegalAccessException e) { @@ -1012,8 +1026,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { // -------------------------------------------------------------------------------------------- - private void writeObject(ObjectOutputStream out) - throws IOException, ClassNotFoundException { + private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFoundException { out.defaultWriteObject(); out.writeInt(fields.length); for (Field field: fields) { @@ -1021,12 +1034,14 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { } } - private void readObject(ObjectInputStream in) - throws IOException, ClassNotFoundException { + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { in.defaultReadObject(); int numFields = in.readInt(); fields = new Field[numFields]; for (int i = 0; i < numFields; i++) { + // the deserialized Field may be null if the field no longer exists in the POJO; + // in this case, when de-/serializing and copying instances using this serializer + // instance, the missing fields will simply be skipped fields[i] = FieldSerializer.deserializeField(in); } @@ -1128,7 +1143,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { private int findField(String fieldName) { int foundIndex = 0; for (Field field : fields) { - if (fieldName.equals(field.getName())) { + if (field != null && fieldName.equals(field.getName())) { return foundIndex; }
