This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f1a033067e [core] Skip processed sequence group fields to improve
performance of PartialUpdateMergeFunction (#6345)
f1a033067e is described below
commit f1a033067e73a7e86fd48503725892d569a3c355
Author: Yubin Li <[email protected]>
AuthorDate: Sun May 24 10:53:51 2026 +0800
[core] Skip processed sequence group fields to improve performance of
PartialUpdateMergeFunction (#6345)
---
.../compact/PartialUpdateMergeFunction.java | 48 +++++++++++++---------
1 file changed, 28 insertions(+), 20 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index 96c3371692..5d04440c99 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -196,7 +196,7 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
Iterator<WrapperWithFieldIndex<FieldAggregator>> aggIter =
fieldAggregators.iterator();
WrapperWithFieldIndex<FieldAggregator> curAgg = aggIter.hasNext() ?
aggIter.next() : null;
- boolean[] isEmptySequenceGroup = new boolean[getters.length];
+ boolean[] isProcessedSequenceField = new boolean[getters.length];
for (int i = 0; i < getters.length; i++) {
FieldsComparator seqComparator = null;
if (curComparator != null && curComparator.fieldIndex == i) {
@@ -211,15 +211,13 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
}
Object accumulator = row.getField(i);
- if (seqComparator == null) {
- Object field = getters[i].getFieldOrNull(kv.value());
- if (aggregator != null) {
- row.setField(i, aggregator.agg(accumulator, field));
- } else if (field != null) {
- row.setField(i, field);
+ if (seqComparator != null) {
+ // Skip if this field has already been processed as part of a
sequence group
+ if (isProcessedSequenceField[i]) {
+ continue;
}
- } else {
- if (isEmptySequenceGroup(kv, seqComparator,
isEmptySequenceGroup)) {
+
+ if (isEmptySequenceGroup(kv, seqComparator,
isProcessedSequenceField)) {
// skip null sequence group
continue;
}
@@ -234,6 +232,8 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
for (int fieldIndex : seqComparator.compareFields()) {
row.setField(
fieldIndex,
getters[fieldIndex].getFieldOrNull(kv.value()));
+ // Mark these sequence fields as processed
+ isProcessedSequenceField[fieldIndex] = true;
}
continue;
}
@@ -242,27 +242,28 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
} else if (aggregator != null) {
row.setField(i, aggregator.aggReversed(accumulator,
field));
}
+ } else {
+ Object field = getters[i].getFieldOrNull(kv.value());
+ if (aggregator != null) {
+ row.setField(i, aggregator.agg(accumulator, field));
+ } else if (field != null) {
+ row.setField(i, field);
+ }
}
}
}
private boolean isEmptySequenceGroup(
- KeyValue kv, FieldsComparator comparator, boolean[]
isEmptySequenceGroup) {
-
- // If any flag of the sequence fields is set, it means the sequence
group is empty.
- if (isEmptySequenceGroup[comparator.compareFields()[0]]) {
- return true;
- }
-
+ KeyValue kv, FieldsComparator comparator, boolean[]
isProcessedSequenceField) {
for (int fieldIndex : comparator.compareFields()) {
if (getters[fieldIndex].getFieldOrNull(kv.value()) != null) {
return false;
}
}
- // Set the flag of all the sequence fields of the sequence group.
+ // Mark these sequence fields as processed
for (int fieldIndex : comparator.compareFields()) {
- isEmptySequenceGroup[fieldIndex] = true;
+ isProcessedSequenceField[fieldIndex] = true;
}
return true;
@@ -277,7 +278,7 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
Iterator<WrapperWithFieldIndex<FieldAggregator>> aggIter =
fieldAggregators.iterator();
WrapperWithFieldIndex<FieldAggregator> curAgg = aggIter.hasNext() ?
aggIter.next() : null;
- boolean[] isEmptySequenceGroup = new boolean[getters.length];
+ boolean[] isProcessedSequenceField = new boolean[getters.length];
for (int i = 0; i < getters.length; i++) {
FieldsComparator seqComparator = null;
if (curComparator != null && curComparator.fieldIndex == i) {
@@ -292,7 +293,12 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
}
if (seqComparator != null) {
- if (isEmptySequenceGroup(kv, seqComparator,
isEmptySequenceGroup)) {
+ // Skip if this field has already been processed as part of a
sequence group
+ if (isProcessedSequenceField[i]) {
+ continue;
+ }
+
+ if (isEmptySequenceGroup(kv, seqComparator,
isProcessedSequenceField)) {
// skip null sequence group
continue;
}
@@ -316,6 +322,8 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
updatedSequenceFields.add(field);
}
}
+ // Mark these sequence fields as processed
+ isProcessedSequenceField[field] = true;
}
} else {
// retract normal field