This is an automated email from the ASF dual-hosted git repository.

huweihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ae4583209f5 [FLINK-37722] Eliminate redundant field initialization of 
PojoSerializer
ae4583209f5 is described below

commit ae4583209f5f9b89ee98bc069b7f2fe8c882de2a
Author: Zhanghao Chen <[email protected]>
AuthorDate: Sun Apr 27 14:52:37 2025 +0800

    [FLINK-37722] Eliminate redundant field initialization of PojoSerializer
---
 .../api/java/typeutils/runtime/PojoSerializer.java | 95 ++++++++++------------
 1 file changed, 42 insertions(+), 53 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index 7b93088695b..9d7e4254e86 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -272,12 +272,7 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                         "Error during POJO copy, this should not happen since 
we check the fields before.");
             }
         } else if (actualType == clazz) {
-            T target;
-            try {
-                target = instantiateRaw();
-            } catch (Throwable t) {
-                throw new RuntimeException("Cannot instantiate class.", t);
-            }
+            T target = instantiateRaw();
             // no subclass
             try {
                 for (int i = 0; i < numFields; i++) {
@@ -442,27 +437,17 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
             return null;
         }
 
-        T target;
-
-        Class<?> actualSubclass = null;
-        TypeSerializer subclassSerializer = null;
-
         if ((flags & IS_SUBCLASS) != 0) {
             String subclassName = source.readUTF();
-            actualSubclass = getSubclassByName(subclassName);
-            subclassSerializer = getSubclassSerializer(actualSubclass);
-            target = (T) subclassSerializer.createInstance();
-            // also initialize fields for which the subclass serializer is not 
responsible
-            initializeFields(target);
-        } else if ((flags & IS_TAGGED_SUBCLASS) != 0) {
+            Class<?> actualSubclass = getSubclassByName(subclassName);
+            TypeSerializer subclassSerializer = 
getSubclassSerializer(actualSubclass);
+            return (T) subclassSerializer.deserialize(source);
+        }
 
+        if ((flags & IS_TAGGED_SUBCLASS) != 0) {
             int subclassTag = source.readByte();
-            subclassSerializer = registeredSerializers[subclassTag];
-            target = (T) subclassSerializer.createInstance();
-            // also initialize fields for which the subclass serializer is not 
responsible
-            initializeFields(target);
-        } else {
-            target = createInstance();
+            TypeSerializer subclassSerializer = 
registeredSerializers[subclassTag];
+            return (T) subclassSerializer.deserialize(source);
         }
 
         if (isRecord()) {
@@ -474,8 +459,11 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                     builder.setField(i, fieldValue);
                 }
             }
-            target = builder.build();
-        } else if ((flags & NO_SUBCLASS) != 0) {
+            return builder.build();
+        }
+
+        if ((flags & NO_SUBCLASS) != 0) {
+            T target = instantiateRaw();
             try {
                 for (int i = 0; i < numFields; i++) {
                     boolean isNull = source.readBoolean();
@@ -489,12 +477,10 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                         "Error during POJO copy, this should not happen since 
we check the fields before.",
                         e);
             }
-        } else {
-            if (subclassSerializer != null) {
-                target = (T) subclassSerializer.deserialize(target, source);
-            }
+            return target;
         }
-        return target;
+
+        throw new RuntimeException("Unknown POJO flags, this should not 
happen.");
     }
 
     @Override
@@ -507,36 +493,36 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
             return null;
         }
 
-        Class<?> subclass = null;
-        TypeSerializer subclassSerializer = null;
         if ((flags & IS_SUBCLASS) != 0) {
             String subclassName = source.readUTF();
-            subclass = getSubclassByName(subclassName);
-            subclassSerializer = getSubclassSerializer(subclass);
+            Class<?> subclass = getSubclassByName(subclassName);
+            TypeSerializer subclassSerializer = 
getSubclassSerializer(subclass);
 
             if (reuse == null || subclass != reuse.getClass()) {
                 // cannot reuse
-                reuse = (T) subclassSerializer.createInstance();
-                // also initialize fields for which the subclass serializer is 
not responsible
-                initializeFields(reuse);
+                return (T) subclassSerializer.deserialize(source);
+            } else {
+                return (T) subclassSerializer.deserialize(reuse, source);
             }
-        } else if ((flags & IS_TAGGED_SUBCLASS) != 0) {
+        }
+
+        if ((flags & IS_TAGGED_SUBCLASS) != 0) {
             int subclassTag = source.readByte();
-            subclassSerializer = registeredSerializers[subclassTag];
+            TypeSerializer subclassSerializer = 
registeredSerializers[subclassTag];
 
             if (reuse == null || ((PojoSerializer) subclassSerializer).clazz 
!= reuse.getClass()) {
                 // cannot reuse
-                reuse = (T) subclassSerializer.createInstance();
-                // also initialize fields for which the subclass serializer is 
not responsible
-                initializeFields(reuse);
-            }
-        } else {
-            if (reuse == null || clazz != reuse.getClass()) {
-                reuse = createInstance();
+                return (T) subclassSerializer.deserialize(source);
+            } else {
+                return (T) subclassSerializer.deserialize(reuse, source);
             }
         }
 
         if (isRecord()) {
+            if (reuse != null && clazz != reuse.getClass()) {
+                // cannot reuse, and cannot directly instantiate a record 
either
+                reuse = null;
+            }
             try {
                 JavaRecordBuilderFactory<T>.JavaRecordBuilder builder = 
recordFactory.newBuilder();
                 for (int i = 0; i < numFields; i++) {
@@ -555,13 +541,19 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                     }
                 }
 
-                reuse = builder.build();
+                return builder.build();
             } catch (IllegalAccessException e) {
                 throw new RuntimeException(
                         "Error during POJO copy, this should not happen since 
we check the fields before.",
                         e);
             }
-        } else if ((flags & NO_SUBCLASS) != 0) {
+        }
+
+        if ((flags & NO_SUBCLASS) != 0) {
+            if (reuse == null || clazz != reuse.getClass()) {
+                // cannot reuse
+                reuse = instantiateRaw();
+            }
             try {
                 for (int i = 0; i < numFields; i++) {
                     boolean isNull = source.readBoolean();
@@ -582,13 +574,10 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                         "Error during POJO copy, this should not happen since 
we check the fields before.",
                         e);
             }
-        } else {
-            if (subclassSerializer != null) {
-                reuse = (T) subclassSerializer.deserialize(reuse, source);
-            }
+            return reuse;
         }
 
-        return reuse;
+        throw new RuntimeException("Unknown POJO flags, this should not 
happen.");
     }
 
     private Object deserializeField(Object reuseField, int i, DataInputView 
source)

Reply via email to