This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 150f768573cb526a2c67123da61f64b199d0ea7a Author: Dawid Wysakowicz <[email protected]> AuthorDate: Thu Feb 3 13:22:37 2022 +0100 [FLINK-21752] NullPointerException on restore in PojoSerializer In order to support Pojo schema migration, we added a new ctor to the PojoSerializer, which uses data extracted from a snapshot. However the duplicate method still used the old ctor which tries to recreate parts of the data from the current context. We should use the same ctor as we use for schema migration in the duplicate methods. We must make sure though all serializers are properly duplicated. --- .../api/java/typeutils/runtime/PojoSerializer.java | 34 ++++++++++++++++------ 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java index 7182f68..30f4880 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java @@ -50,6 +50,7 @@ import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -163,12 +164,30 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { @Override public PojoSerializer<T> duplicate() { + TypeSerializer<Object>[] duplicateFieldSerializers = duplicateSerializers(fieldSerializers); + TypeSerializer<Object>[] duplicateRegisteredSerializers = + duplicateSerializers(registeredSerializers); + + return new PojoSerializer<>( + clazz, + fields, + duplicateFieldSerializers, + new LinkedHashMap<>(registeredClasses), + duplicateRegisteredSerializers, + subclassSerializerCache.entrySet().stream() + .collect( + Collectors.toMap(Map.Entry::getKey, e -> e.getValue().duplicate())), + executionConfig); + } + + @SuppressWarnings("unchecked") + private TypeSerializer<Object>[] duplicateSerializers(TypeSerializer<?>[] serializers) { boolean stateful = false; - TypeSerializer<?>[] duplicateFieldSerializers = new TypeSerializer[fieldSerializers.length]; + TypeSerializer<?>[] duplicateSerializers = new TypeSerializer[serializers.length]; - for (int i = 0; i < fieldSerializers.length; i++) { - duplicateFieldSerializers[i] = fieldSerializers[i].duplicate(); - if (duplicateFieldSerializers[i] != fieldSerializers[i]) { + for (int i = 0; i < serializers.length; i++) { + duplicateSerializers[i] = serializers[i].duplicate(); + if (duplicateSerializers[i] != serializers[i]) { // at least one of them is stateful stateful = true; } @@ -176,12 +195,9 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { if (!stateful) { // as a small memory optimization, we can share the same object between instances - duplicateFieldSerializers = fieldSerializers; + duplicateSerializers = serializers; } - - // we must create a new instance, otherwise the subclassSerializerCache can create - // concurrency problems - return new PojoSerializer<>(clazz, duplicateFieldSerializers, fields, executionConfig); + return (TypeSerializer<Object>[]) duplicateSerializers; } @Override
