This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3d2329e9744eb89e31669f2096c4bce7dde7898a
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Thu Feb 3 13:22:37 2022 +0100

    [FLINK-21752] NullPointerException on restore in PojoSerializer
    
    In order to support Pojo schema migration, we added a new ctor to the
    PojoSerializer, which uses data extracted from a snapshot. However the
    duplicate method still used the old ctor which tries to recreate parts
    of the data from the current context.
    
    We should use the same ctor as we use for schema migration in the
    duplicate methods. We must make sure though all serializers are properly
    duplicated.
---
 .../api/java/typeutils/runtime/PojoSerializer.java | 34 ++++++++++++++++------
 1 file changed, 25 insertions(+), 9 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index 7182f68..30f4880 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -50,6 +50,7 @@ import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -163,12 +164,30 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
 
     @Override
     public PojoSerializer<T> duplicate() {
+        TypeSerializer<Object>[] duplicateFieldSerializers = 
duplicateSerializers(fieldSerializers);
+        TypeSerializer<Object>[] duplicateRegisteredSerializers =
+                duplicateSerializers(registeredSerializers);
+
+        return new PojoSerializer<>(
+                clazz,
+                fields,
+                duplicateFieldSerializers,
+                new LinkedHashMap<>(registeredClasses),
+                duplicateRegisteredSerializers,
+                subclassSerializerCache.entrySet().stream()
+                        .collect(
+                                Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().duplicate())),
+                executionConfig);
+    }
+
+    @SuppressWarnings("unchecked")
+    private TypeSerializer<Object>[] duplicateSerializers(TypeSerializer<?>[] 
serializers) {
         boolean stateful = false;
-        TypeSerializer<?>[] duplicateFieldSerializers = new 
TypeSerializer[fieldSerializers.length];
+        TypeSerializer<?>[] duplicateSerializers = new 
TypeSerializer[serializers.length];
 
-        for (int i = 0; i < fieldSerializers.length; i++) {
-            duplicateFieldSerializers[i] = fieldSerializers[i].duplicate();
-            if (duplicateFieldSerializers[i] != fieldSerializers[i]) {
+        for (int i = 0; i < serializers.length; i++) {
+            duplicateSerializers[i] = serializers[i].duplicate();
+            if (duplicateSerializers[i] != serializers[i]) {
                 // at least one of them is stateful
                 stateful = true;
             }
@@ -176,12 +195,9 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
 
         if (!stateful) {
             // as a small memory optimization, we can share the same object 
between instances
-            duplicateFieldSerializers = fieldSerializers;
+            duplicateSerializers = serializers;
         }
-
-        // we must create a new instance, otherwise the 
subclassSerializerCache can create
-        // concurrency problems
-        return new PojoSerializer<>(clazz, duplicateFieldSerializers, fields, 
executionConfig);
+        return (TypeSerializer<Object>[]) duplicateSerializers;
     }
 
     @Override

Reply via email to