This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.0
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/release-1.0 by this push:
new 9d2d214cde [flink] Replace per record in ReadOperator to work with
object reuse
9d2d214cde is described below
commit 9d2d214cde7d07379db3bdb9947247e555dfd1e4
Author: JingsongLi <[email protected]>
AuthorDate: Fri Jan 24 15:09:53 2025 +0800
[flink] Replace per record in ReadOperator to work with object reuse
---
.../apache/paimon/flink/source/operator/ReadOperator.java | 13 ++++++-------
1 file changed, 6 insertions(+), 7 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
index 1757a859df..c7189f811d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
@@ -91,11 +91,7 @@ public class ReadOperator extends
AbstractStreamOperator<RowData>
.getSpillingDirectoriesPaths());
this.read = readBuilder.newRead().withIOManager(ioManager);
this.reuseRow = new FlinkRowData(null);
- if (nestedProjectedRowData != null) {
- this.reuseRecord = new StreamRecord<>(nestedProjectedRowData);
- } else {
- this.reuseRecord = new StreamRecord<>(reuseRow);
- }
+ this.reuseRecord = new StreamRecord<>(null);
this.idlingStarted();
}
@@ -126,8 +122,11 @@ public class ReadOperator extends
AbstractStreamOperator<RowData>
}
reuseRow.replace(iterator.next());
- if (nestedProjectedRowData != null) {
- nestedProjectedRowData.replaceRow(this.reuseRow);
+ if (nestedProjectedRowData == null) {
+ reuseRecord.replace(reuseRow);
+ } else {
+ nestedProjectedRowData.replaceRow(reuseRow);
+ reuseRecord.replace(nestedProjectedRowData);
}
output.collect(reuseRecord);
}