This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f1a033067e [core] Skip processed sequence group fields to improve 
performance of PartialUpdateMergeFunction (#6345)
f1a033067e is described below

commit f1a033067e73a7e86fd48503725892d569a3c355
Author: Yubin Li <[email protected]>
AuthorDate: Sun May 24 10:53:51 2026 +0800

    [core] Skip processed sequence group fields to improve performance of 
PartialUpdateMergeFunction (#6345)
---
 .../compact/PartialUpdateMergeFunction.java        | 48 +++++++++++++---------
 1 file changed, 28 insertions(+), 20 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index 96c3371692..5d04440c99 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -196,7 +196,7 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
         Iterator<WrapperWithFieldIndex<FieldAggregator>> aggIter = 
fieldAggregators.iterator();
         WrapperWithFieldIndex<FieldAggregator> curAgg = aggIter.hasNext() ? 
aggIter.next() : null;
 
-        boolean[] isEmptySequenceGroup = new boolean[getters.length];
+        boolean[] isProcessedSequenceField = new boolean[getters.length];
         for (int i = 0; i < getters.length; i++) {
             FieldsComparator seqComparator = null;
             if (curComparator != null && curComparator.fieldIndex == i) {
@@ -211,15 +211,13 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
             }
 
             Object accumulator = row.getField(i);
-            if (seqComparator == null) {
-                Object field = getters[i].getFieldOrNull(kv.value());
-                if (aggregator != null) {
-                    row.setField(i, aggregator.agg(accumulator, field));
-                } else if (field != null) {
-                    row.setField(i, field);
+            if (seqComparator != null) {
+                // Skip if this field has already been processed as part of a 
sequence group
+                if (isProcessedSequenceField[i]) {
+                    continue;
                 }
-            } else {
-                if (isEmptySequenceGroup(kv, seqComparator, 
isEmptySequenceGroup)) {
+
+                if (isEmptySequenceGroup(kv, seqComparator, 
isProcessedSequenceField)) {
                     // skip null sequence group
                     continue;
                 }
@@ -234,6 +232,8 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
                         for (int fieldIndex : seqComparator.compareFields()) {
                             row.setField(
                                     fieldIndex, 
getters[fieldIndex].getFieldOrNull(kv.value()));
+                            // Mark these sequence fields as processed
+                            isProcessedSequenceField[fieldIndex] = true;
                         }
                         continue;
                     }
@@ -242,27 +242,28 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
                 } else if (aggregator != null) {
                     row.setField(i, aggregator.aggReversed(accumulator, 
field));
                 }
+            } else {
+                Object field = getters[i].getFieldOrNull(kv.value());
+                if (aggregator != null) {
+                    row.setField(i, aggregator.agg(accumulator, field));
+                } else if (field != null) {
+                    row.setField(i, field);
+                }
             }
         }
     }
 
     private boolean isEmptySequenceGroup(
-            KeyValue kv, FieldsComparator comparator, boolean[] 
isEmptySequenceGroup) {
-
-        // If any flag of the sequence fields is set, it means the sequence 
group is empty.
-        if (isEmptySequenceGroup[comparator.compareFields()[0]]) {
-            return true;
-        }
-
+            KeyValue kv, FieldsComparator comparator, boolean[] 
isProcessedSequenceField) {
         for (int fieldIndex : comparator.compareFields()) {
             if (getters[fieldIndex].getFieldOrNull(kv.value()) != null) {
                 return false;
             }
         }
 
-        // Set the flag of all the sequence fields of the sequence group.
+        // Mark these sequence fields as processed
         for (int fieldIndex : comparator.compareFields()) {
-            isEmptySequenceGroup[fieldIndex] = true;
+            isProcessedSequenceField[fieldIndex] = true;
         }
 
         return true;
@@ -277,7 +278,7 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
         Iterator<WrapperWithFieldIndex<FieldAggregator>> aggIter = 
fieldAggregators.iterator();
         WrapperWithFieldIndex<FieldAggregator> curAgg = aggIter.hasNext() ? 
aggIter.next() : null;
 
-        boolean[] isEmptySequenceGroup = new boolean[getters.length];
+        boolean[] isProcessedSequenceField = new boolean[getters.length];
         for (int i = 0; i < getters.length; i++) {
             FieldsComparator seqComparator = null;
             if (curComparator != null && curComparator.fieldIndex == i) {
@@ -292,7 +293,12 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
             }
 
             if (seqComparator != null) {
-                if (isEmptySequenceGroup(kv, seqComparator, 
isEmptySequenceGroup)) {
+                // Skip if this field has already been processed as part of a 
sequence group
+                if (isProcessedSequenceField[i]) {
+                    continue;
+                }
+
+                if (isEmptySequenceGroup(kv, seqComparator, 
isProcessedSequenceField)) {
                     // skip null sequence group
                     continue;
                 }
@@ -316,6 +322,8 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
                                     updatedSequenceFields.add(field);
                                 }
                             }
+                            // Mark these sequence fields as processed
+                            isProcessedSequenceField[field] = true;
                         }
                     } else {
                         // retract normal field

Reply via email to