This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new abe5b4b496 [core] Improve performance of PartialUpdateMergeFunction 
with sequence group (#5481)
abe5b4b496 is described below

commit abe5b4b496581b0a37a11965d4a72398e1695eb4
Author: Xuannan <[email protected]>
AuthorDate: Tue Apr 22 10:59:00 2025 +0800

    [core] Improve performance of PartialUpdateMergeFunction with sequence 
group (#5481)
---
 .../PartialUpdateMergeFunctionBenchmark.java       | 142 +++++++++++++++++++++
 .../compact/PartialUpdateMergeFunction.java        | 110 +++++++++++++---
 2 files changed, 237 insertions(+), 15 deletions(-)

diff --git 
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/compact/PartialUpdateMergeFunctionBenchmark.java
 
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/compact/PartialUpdateMergeFunctionBenchmark.java
new file mode 100644
index 0000000000..c0aab4b2c8
--- /dev/null
+++ 
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/compact/PartialUpdateMergeFunctionBenchmark.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.benchmark.compact;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.benchmark.Benchmark;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.mergetree.compact.MergeFunction;
+import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
+import org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import org.junit.jupiter.api.Test;
+
+/** Benchmark for measure the performance for {@link 
PartialUpdateMergeFunction}. */
+public class PartialUpdateMergeFunctionBenchmark {
+
+    private final int rowCount = 40000000;
+
+    @Test
+    public void testUpdateWithSequenceGroup() {
+        Options options = new Options();
+        options.set("fields.f1.sequence-group", "f2,f3,f4,f5");
+
+        MergeFunctionFactory<KeyValue> factory =
+                PartialUpdateMergeFunction.factory(options, getRowType(6), 
ImmutableList.of("f0"));
+
+        MergeFunction<KeyValue> func = factory.create();
+
+        Benchmark benchmark =
+                new Benchmark("partial-update-benchmark", rowCount)
+                        .setNumWarmupIters(1)
+                        .setOutputPerIteration(true);
+
+        benchmark.addCase(
+                "updateWithSequenceGroup",
+                5,
+                () -> {
+                    func.reset();
+                    for (int i = 0; i < rowCount; i++) {
+                        add(func, i, RowKind.INSERT, 1, i, 1, 1, 1, 1);
+                    }
+                });
+
+        benchmark.run();
+    }
+
+    @Test
+    public void testRetractWithSequenceGroup() {
+        Options options = new Options();
+        options.set("fields.f1.sequence-group", "f2,f3,f4,f5");
+
+        MergeFunctionFactory<KeyValue> factory =
+                PartialUpdateMergeFunction.factory(options, getRowType(6), 
ImmutableList.of("f0"));
+
+        MergeFunction<KeyValue> func = factory.create();
+
+        Benchmark benchmark =
+                new Benchmark("partial-update-benchmark", rowCount)
+                        .setNumWarmupIters(1)
+                        .setOutputPerIteration(true);
+
+        benchmark.addCase(
+                "retractWithSequenceGroup",
+                5,
+                () -> {
+                    func.reset();
+                    for (int i = 0; i < rowCount; i++) {
+                        add(func, i, RowKind.DELETE, 1, i, 1, 1, 1, 1);
+                    }
+                });
+
+        benchmark.run();
+    }
+
+    @Test
+    public void testUpdateWithEmptySequenceGroup() {
+        Options options = new Options();
+        options.set("fields.f1.sequence-group", "f2");
+        options.set("fields.f3.sequence-group", "f4");
+        options.set("fields.f5.sequence-group", "f6,f7,f8");
+
+        MergeFunctionFactory<KeyValue> factory =
+                PartialUpdateMergeFunction.factory(options, getRowType(9), 
ImmutableList.of("f0"));
+
+        MergeFunction<KeyValue> func = factory.create();
+
+        Benchmark benchmark =
+                new Benchmark("partial-update-benchmark", rowCount)
+                        .setNumWarmupIters(1)
+                        .setOutputPerIteration(true);
+
+        benchmark.addCase(
+                "updateWithEmptySequenceGroup",
+                5,
+                () -> {
+                    func.reset();
+                    for (int i = 0; i < rowCount; i++) {
+                        add(func, i, RowKind.INSERT, 1, i, 1, null, null, 
null, null, null, null);
+                    }
+                });
+
+        benchmark.run();
+    }
+
+    private RowType getRowType(int numFields) {
+        DataField[] fields = new DataField[numFields];
+        fields[0] = new DataField(0, "k", DataTypes.INT());
+
+        for (int i = 1; i < numFields; i++) {
+            fields[i] = new DataField(i, "f" + i, DataTypes.INT());
+        }
+        return RowType.of(fields);
+    }
+
+    private void add(
+            MergeFunction<KeyValue> function, int sequence, RowKind rowKind, 
Integer... f) {
+        function.add(new KeyValue().replace(GenericRow.of(1), sequence, 
rowKind, GenericRow.of(f)));
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index b24cf1edeb..f061192979 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -41,8 +41,10 @@ import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -69,9 +71,9 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
 
     private final InternalRow.FieldGetter[] getters;
     private final boolean ignoreDelete;
-    private final Map<Integer, FieldsComparator> fieldSeqComparators;
+    private final List<WrapperWithFieldIndex<FieldsComparator>> 
fieldSeqComparators;
     private final boolean fieldSequenceEnabled;
-    private final Map<Integer, FieldAggregator> fieldAggregators;
+    private final List<WrapperWithFieldIndex<FieldAggregator>> 
fieldAggregators;
     private final boolean removeRecordOnDelete;
     private final Set<Integer> sequenceGroupPartialDelete;
     private final boolean[] nullables;
@@ -100,8 +102,8 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
             boolean[] nullables) {
         this.getters = getters;
         this.ignoreDelete = ignoreDelete;
-        this.fieldSeqComparators = fieldSeqComparators;
-        this.fieldAggregators = fieldAggregators;
+        this.fieldSeqComparators = 
getKeySortedListFromMap(fieldSeqComparators);
+        this.fieldAggregators = getKeySortedListFromMap(fieldAggregators);
         this.fieldSequenceEnabled = fieldSequenceEnabled;
         this.removeRecordOnDelete = removeRecordOnDelete;
         this.sequenceGroupPartialDelete = sequenceGroupPartialDelete;
@@ -115,7 +117,7 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
         this.notNullColumnFilled = false;
         this.row = new GenericRow(getters.length);
         this.latestSequenceNumber = 0;
-        fieldAggregators.values().forEach(FieldAggregator::reset);
+        fieldAggregators.forEach(w -> w.getValue().reset());
     }
 
     @Override
@@ -188,23 +190,43 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
     }
 
     private void updateWithSequenceGroup(KeyValue kv) {
+
+        Iterator<WrapperWithFieldIndex<FieldsComparator>> comparatorIter =
+                fieldSeqComparators.iterator();
+        WrapperWithFieldIndex<FieldsComparator> curComparator =
+                comparatorIter.hasNext() ? comparatorIter.next() : null;
+        Iterator<WrapperWithFieldIndex<FieldAggregator>> aggIter = 
fieldAggregators.iterator();
+        WrapperWithFieldIndex<FieldAggregator> curAgg = aggIter.hasNext() ? 
aggIter.next() : null;
+
+        boolean[] isEmptySequenceGroup = new boolean[getters.length];
         for (int i = 0; i < getters.length; i++) {
-            Object field = getters[i].getFieldOrNull(kv.value());
-            FieldsComparator seqComparator = fieldSeqComparators.get(i);
-            FieldAggregator aggregator = fieldAggregators.get(i);
-            Object accumulator = getters[i].getFieldOrNull(row);
+            FieldsComparator seqComparator = null;
+            if (curComparator != null && curComparator.fieldIndex == i) {
+                seqComparator = curComparator.getValue();
+                curComparator = comparatorIter.hasNext() ? 
comparatorIter.next() : null;
+            }
+
+            FieldAggregator aggregator = null;
+            if (curAgg != null && curAgg.fieldIndex == i) {
+                aggregator = curAgg.getValue();
+                curAgg = aggIter.hasNext() ? aggIter.next() : null;
+            }
+
+            Object accumulator = row.getField(i);
             if (seqComparator == null) {
+                Object field = getters[i].getFieldOrNull(kv.value());
                 if (aggregator != null) {
                     row.setField(i, aggregator.agg(accumulator, field));
                 } else if (field != null) {
                     row.setField(i, field);
                 }
             } else {
-                if (isEmptySequenceGroup(kv, seqComparator)) {
+                if (isEmptySequenceGroup(kv, seqComparator, 
isEmptySequenceGroup)) {
                     // skip null sequence group
                     continue;
                 }
 
+                Object field = getters[i].getFieldOrNull(kv.value());
                 if (seqComparator.compare(kv.value(), row) >= 0) {
                     int index = i;
 
@@ -226,24 +248,53 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
         }
     }
 
-    private boolean isEmptySequenceGroup(KeyValue kv, FieldsComparator 
comparator) {
+    private boolean isEmptySequenceGroup(
+            KeyValue kv, FieldsComparator comparator, boolean[] 
isEmptySequenceGroup) {
+
+        // If any flag of the sequence fields is set, it means the sequence 
group is empty.
+        if (isEmptySequenceGroup[comparator.compareFields()[0]]) {
+            return true;
+        }
+
         for (int fieldIndex : comparator.compareFields()) {
             if (getters[fieldIndex].getFieldOrNull(kv.value()) != null) {
                 return false;
             }
         }
 
+        // Set the flag of all the sequence fields of the sequence group.
+        for (int fieldIndex : comparator.compareFields()) {
+            isEmptySequenceGroup[fieldIndex] = true;
+        }
+
         return true;
     }
 
     private void retractWithSequenceGroup(KeyValue kv) {
         Set<Integer> updatedSequenceFields = new HashSet<>();
-
+        Iterator<WrapperWithFieldIndex<FieldsComparator>> comparatorIter =
+                fieldSeqComparators.iterator();
+        WrapperWithFieldIndex<FieldsComparator> curComparator =
+                comparatorIter.hasNext() ? comparatorIter.next() : null;
+        Iterator<WrapperWithFieldIndex<FieldAggregator>> aggIter = 
fieldAggregators.iterator();
+        WrapperWithFieldIndex<FieldAggregator> curAgg = aggIter.hasNext() ? 
aggIter.next() : null;
+
+        boolean[] isEmptySequenceGroup = new boolean[getters.length];
         for (int i = 0; i < getters.length; i++) {
-            FieldsComparator seqComparator = fieldSeqComparators.get(i);
+            FieldsComparator seqComparator = null;
+            if (curComparator != null && curComparator.fieldIndex == i) {
+                seqComparator = curComparator.getValue();
+                curComparator = comparatorIter.hasNext() ? 
comparatorIter.next() : null;
+            }
+
+            FieldAggregator aggregator = null;
+            if (curAgg != null && curAgg.fieldIndex == i) {
+                aggregator = curAgg.getValue();
+                curAgg = aggIter.hasNext() ? aggIter.next() : null;
+            }
+
             if (seqComparator != null) {
-                FieldAggregator aggregator = fieldAggregators.get(i);
-                if (isEmptySequenceGroup(kv, seqComparator)) {
+                if (isEmptySequenceGroup(kv, seqComparator, 
isEmptySequenceGroup)) {
                     // skip null sequence group
                     continue;
                 }
@@ -628,4 +679,33 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
             return aggFunc == null ? options.fieldsDefaultFunc() : aggFunc;
         }
     }
+
+    private <T> List<WrapperWithFieldIndex<T>> 
getKeySortedListFromMap(Map<Integer, T> map) {
+        List<WrapperWithFieldIndex<T>> res = new ArrayList<>();
+        map.forEach(
+                (index, value) -> {
+                    res.add(new WrapperWithFieldIndex<>(value, index));
+                });
+        Collections.sort(res);
+        return res;
+    }
+
+    private static class WrapperWithFieldIndex<T> implements 
Comparable<WrapperWithFieldIndex<T>> {
+        private final T value;
+        private final int fieldIndex;
+
+        WrapperWithFieldIndex(T value, int fieldIndex) {
+            this.value = value;
+            this.fieldIndex = fieldIndex;
+        }
+
+        @Override
+        public int 
compareTo(PartialUpdateMergeFunction.WrapperWithFieldIndex<T> o) {
+            return this.fieldIndex - o.fieldIndex;
+        }
+
+        public T getValue() {
+            return value;
+        }
+    }
 }

Reply via email to