This is an automated email from the ASF dual-hosted git repository.
huweihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ae4583209f5 [FLINK-37722] Eliminate redundant field initialization of
PojoSerializer
ae4583209f5 is described below
commit ae4583209f5f9b89ee98bc069b7f2fe8c882de2a
Author: Zhanghao Chen <[email protected]>
AuthorDate: Sun Apr 27 14:52:37 2025 +0800
[FLINK-37722] Eliminate redundant field initialization of PojoSerializer
---
.../api/java/typeutils/runtime/PojoSerializer.java | 95 ++++++++++------------
1 file changed, 42 insertions(+), 53 deletions(-)
diff --git
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index 7b93088695b..9d7e4254e86 100644
---
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -272,12 +272,7 @@ public final class PojoSerializer<T> extends
TypeSerializer<T> {
"Error during POJO copy, this should not happen since
we check the fields before.");
}
} else if (actualType == clazz) {
- T target;
- try {
- target = instantiateRaw();
- } catch (Throwable t) {
- throw new RuntimeException("Cannot instantiate class.", t);
- }
+ T target = instantiateRaw();
// no subclass
try {
for (int i = 0; i < numFields; i++) {
@@ -442,27 +437,17 @@ public final class PojoSerializer<T> extends
TypeSerializer<T> {
return null;
}
- T target;
-
- Class<?> actualSubclass = null;
- TypeSerializer subclassSerializer = null;
-
if ((flags & IS_SUBCLASS) != 0) {
String subclassName = source.readUTF();
- actualSubclass = getSubclassByName(subclassName);
- subclassSerializer = getSubclassSerializer(actualSubclass);
- target = (T) subclassSerializer.createInstance();
- // also initialize fields for which the subclass serializer is not
responsible
- initializeFields(target);
- } else if ((flags & IS_TAGGED_SUBCLASS) != 0) {
+ Class<?> actualSubclass = getSubclassByName(subclassName);
+ TypeSerializer subclassSerializer =
getSubclassSerializer(actualSubclass);
+ return (T) subclassSerializer.deserialize(source);
+ }
+ if ((flags & IS_TAGGED_SUBCLASS) != 0) {
int subclassTag = source.readByte();
- subclassSerializer = registeredSerializers[subclassTag];
- target = (T) subclassSerializer.createInstance();
- // also initialize fields for which the subclass serializer is not
responsible
- initializeFields(target);
- } else {
- target = createInstance();
+ TypeSerializer subclassSerializer =
registeredSerializers[subclassTag];
+ return (T) subclassSerializer.deserialize(source);
}
if (isRecord()) {
@@ -474,8 +459,11 @@ public final class PojoSerializer<T> extends
TypeSerializer<T> {
builder.setField(i, fieldValue);
}
}
- target = builder.build();
- } else if ((flags & NO_SUBCLASS) != 0) {
+ return builder.build();
+ }
+
+ if ((flags & NO_SUBCLASS) != 0) {
+ T target = instantiateRaw();
try {
for (int i = 0; i < numFields; i++) {
boolean isNull = source.readBoolean();
@@ -489,12 +477,10 @@ public final class PojoSerializer<T> extends
TypeSerializer<T> {
"Error during POJO copy, this should not happen since
we check the fields before.",
e);
}
- } else {
- if (subclassSerializer != null) {
- target = (T) subclassSerializer.deserialize(target, source);
- }
+ return target;
}
- return target;
+
+ throw new RuntimeException("Unknown POJO flags, this should not
happen.");
}
@Override
@@ -507,36 +493,36 @@ public final class PojoSerializer<T> extends
TypeSerializer<T> {
return null;
}
- Class<?> subclass = null;
- TypeSerializer subclassSerializer = null;
if ((flags & IS_SUBCLASS) != 0) {
String subclassName = source.readUTF();
- subclass = getSubclassByName(subclassName);
- subclassSerializer = getSubclassSerializer(subclass);
+ Class<?> subclass = getSubclassByName(subclassName);
+ TypeSerializer subclassSerializer =
getSubclassSerializer(subclass);
if (reuse == null || subclass != reuse.getClass()) {
// cannot reuse
- reuse = (T) subclassSerializer.createInstance();
- // also initialize fields for which the subclass serializer is
not responsible
- initializeFields(reuse);
+ return (T) subclassSerializer.deserialize(source);
+ } else {
+ return (T) subclassSerializer.deserialize(reuse, source);
}
- } else if ((flags & IS_TAGGED_SUBCLASS) != 0) {
+ }
+
+ if ((flags & IS_TAGGED_SUBCLASS) != 0) {
int subclassTag = source.readByte();
- subclassSerializer = registeredSerializers[subclassTag];
+ TypeSerializer subclassSerializer =
registeredSerializers[subclassTag];
if (reuse == null || ((PojoSerializer) subclassSerializer).clazz
!= reuse.getClass()) {
// cannot reuse
- reuse = (T) subclassSerializer.createInstance();
- // also initialize fields for which the subclass serializer is
not responsible
- initializeFields(reuse);
- }
- } else {
- if (reuse == null || clazz != reuse.getClass()) {
- reuse = createInstance();
+ return (T) subclassSerializer.deserialize(source);
+ } else {
+ return (T) subclassSerializer.deserialize(reuse, source);
}
}
if (isRecord()) {
+ if (reuse != null && clazz != reuse.getClass()) {
+ // cannot reuse, and cannot directly instantiate a record
either
+ reuse = null;
+ }
try {
JavaRecordBuilderFactory<T>.JavaRecordBuilder builder =
recordFactory.newBuilder();
for (int i = 0; i < numFields; i++) {
@@ -555,13 +541,19 @@ public final class PojoSerializer<T> extends
TypeSerializer<T> {
}
}
- reuse = builder.build();
+ return builder.build();
} catch (IllegalAccessException e) {
throw new RuntimeException(
"Error during POJO copy, this should not happen since
we check the fields before.",
e);
}
- } else if ((flags & NO_SUBCLASS) != 0) {
+ }
+
+ if ((flags & NO_SUBCLASS) != 0) {
+ if (reuse == null || clazz != reuse.getClass()) {
+ // cannot reuse
+ reuse = instantiateRaw();
+ }
try {
for (int i = 0; i < numFields; i++) {
boolean isNull = source.readBoolean();
@@ -582,13 +574,10 @@ public final class PojoSerializer<T> extends
TypeSerializer<T> {
"Error during POJO copy, this should not happen since
we check the fields before.",
e);
}
- } else {
- if (subclassSerializer != null) {
- reuse = (T) subclassSerializer.deserialize(reuse, source);
- }
+ return reuse;
}
- return reuse;
+ throw new RuntimeException("Unknown POJO flags, this should not
happen.");
}
private Object deserializeField(Object reuseField, int i, DataInputView
source)