This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9349579d9b [core] Fix aggregate delete bug and refactor 
SortBufferWriteBufferTestBase (#5414)
9349579d9b is described below

commit 9349579d9bc787492ee62723b596b1208c33dbd4
Author: yuzelin <[email protected]>
AuthorDate: Tue Apr 8 17:16:09 2025 +0800

    [core] Fix aggregate delete bug and refactor SortBufferWriteBufferTestBase 
(#5414)
---
 .../compact/aggregate/AggregateMergeFunction.java  | 40 +++++-------
 .../mergetree/SortBufferWriteBufferTestBase.java   | 75 ++++++++++++++++------
 .../mergetree/compact/MergeFunctionTestUtils.java  | 52 +++++++++++++--
 .../paimon/table/PrimaryKeySimpleTableTest.java    |  4 +-
 .../ValueContentRowDataRecordIteratorTest.java     |  7 +-
 .../org/apache/paimon/utils/ReusingKeyValue.java   |  9 ++-
 .../org/apache/paimon/utils/ReusingTestData.java   | 17 ++++-
 7 files changed, 144 insertions(+), 60 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
index f138afb6a9..01e8706168 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
@@ -57,7 +57,6 @@ public class AggregateMergeFunction implements 
MergeFunction<KeyValue> {
     private KeyValue reused;
     private boolean currentDeleteRow;
     private final boolean removeRecordOnDelete;
-    private boolean notNullColumnFilled;
 
     public AggregateMergeFunction(
             InternalRow.FieldGetter[] getters,
@@ -73,7 +72,6 @@ public class AggregateMergeFunction implements 
MergeFunction<KeyValue> {
     @Override
     public void reset() {
         this.latestKv = null;
-        this.notNullColumnFilled = false;
         this.row = new GenericRow(getters.length);
         Arrays.stream(aggregators).forEach(FieldAggregator::reset);
         this.currentDeleteRow = false;
@@ -82,18 +80,15 @@ public class AggregateMergeFunction implements 
MergeFunction<KeyValue> {
     @Override
     public void add(KeyValue kv) {
         latestKv = kv;
-        boolean isRetract =
-                kv.valueKind() != RowKind.INSERT && kv.valueKind() != 
RowKind.UPDATE_AFTER;
 
-        currentDeleteRow = removeRecordOnDelete && isRetract;
+        currentDeleteRow = removeRecordOnDelete && kv.valueKind() == 
RowKind.DELETE;
         if (currentDeleteRow) {
-            if (!notNullColumnFilled) {
-                initRow(row, kv.value());
-                notNullColumnFilled = true;
-            }
+            row = new GenericRow(getters.length);
+            initRow(row, kv.value());
             return;
         }
 
+        boolean isRetract = kv.valueKind().isRetract();
         for (int i = 0; i < getters.length; i++) {
             FieldAggregator fieldAggregator = aggregators[i];
             Object accumulator = getters[i].getFieldOrNull(row);
@@ -104,7 +99,6 @@ public class AggregateMergeFunction implements 
MergeFunction<KeyValue> {
                             : fieldAggregator.agg(accumulator, inputField);
             row.setField(i, mergedField);
         }
-        notNullColumnFilled = true;
     }
 
     private void initRow(GenericRow row, InternalRow value) {
@@ -140,10 +134,10 @@ public class AggregateMergeFunction implements 
MergeFunction<KeyValue> {
 
     public static MergeFunctionFactory<KeyValue> factory(
             Options conf,
-            List<String> tableNames,
-            List<DataType> tableTypes,
+            List<String> fieldNames,
+            List<DataType> fieldTypes,
             List<String> primaryKeys) {
-        return new Factory(conf, tableNames, tableTypes, primaryKeys);
+        return new Factory(conf, fieldNames, fieldTypes, primaryKeys);
     }
 
     private static class Factory implements MergeFunctionFactory<KeyValue> {
@@ -151,31 +145,31 @@ public class AggregateMergeFunction implements 
MergeFunction<KeyValue> {
         private static final long serialVersionUID = 1L;
 
         private final CoreOptions options;
-        private final List<String> tableNames;
-        private final List<DataType> tableTypes;
+        private final List<String> fieldNames;
+        private final List<DataType> fieldTypes;
         private final List<String> primaryKeys;
         private final boolean removeRecordOnDelete;
 
         private Factory(
                 Options conf,
-                List<String> tableNames,
-                List<DataType> tableTypes,
+                List<String> fieldNames,
+                List<DataType> fieldTypes,
                 List<String> primaryKeys) {
             this.options = new CoreOptions(conf);
-            this.tableNames = tableNames;
-            this.tableTypes = tableTypes;
+            this.fieldNames = fieldNames;
+            this.fieldTypes = fieldTypes;
             this.primaryKeys = primaryKeys;
             this.removeRecordOnDelete = 
options.aggregationRemoveRecordOnDelete();
         }
 
         @Override
         public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
-            List<String> fieldNames = tableNames;
-            List<DataType> fieldTypes = tableTypes;
+            List<String> fieldNames = this.fieldNames;
+            List<DataType> fieldTypes = this.fieldTypes;
             if (projection != null) {
                 Projection project = Projection.of(projection);
-                fieldNames = project.project(tableNames);
-                fieldTypes = project.project(tableTypes);
+                fieldNames = project.project(fieldNames);
+                fieldTypes = project.project(fieldTypes);
             }
 
             FieldAggregator[] fieldAggregators = new 
FieldAggregator[fieldNames.size()];
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
index 8169d6a840..ee34ac2e71 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.mergetree;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.codegen.RecordComparator;
 import org.apache.paimon.compression.CompressOptions;
@@ -33,10 +34,7 @@ import 
org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.sort.BinaryInMemorySortBuffer;
-import org.apache.paimon.types.BigIntType;
-import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.ReusingKeyValue;
 import org.apache.paimon.utils.ReusingTestData;
@@ -47,10 +45,12 @@ import org.junit.jupiter.api.Test;
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
+import java.util.concurrent.ThreadLocalRandom;
 
 import static org.apache.paimon.utils.Preconditions.checkState;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -63,9 +63,11 @@ public abstract class SortBufferWriteBufferTestBase {
 
     protected final SortBufferWriteBuffer table =
             new SortBufferWriteBuffer(
-                    new RowType(Collections.singletonList(new DataField(0, 
"key", new IntType()))),
-                    new RowType(
-                            Collections.singletonList(new DataField(1, 
"value", new BigIntType()))),
+                    RowType.builder().field("key_f0", DataTypes.INT()).build(),
+                    RowType.builder()
+                            .field("f0", DataTypes.INT())
+                            .field("f1", DataTypes.BIGINT())
+                            .build(),
                     null,
                     new HeapMemorySegmentPool(32 * 1024 * 3L, 32 * 1024),
                     false,
@@ -165,21 +167,30 @@ public abstract class SortBufferWriteBufferTestBase {
     /** Test for {@link SortBufferWriteBuffer} with {@link 
PartialUpdateMergeFunction}. */
     public static class WithPartialUpdateMergeFunctionTest extends 
SortBufferWriteBufferTestBase {
 
+        private final boolean addOnly;
+
+        private WithPartialUpdateMergeFunctionTest() {
+            this.addOnly = ThreadLocalRandom.current().nextBoolean();
+        }
+
         @Override
         protected boolean addOnly() {
-            return true;
+            return addOnly;
         }
 
         @Override
         protected List<ReusingTestData> getExpected(List<ReusingTestData> 
input) {
-            return MergeFunctionTestUtils.getExpectedForPartialUpdate(input);
+            return MergeFunctionTestUtils.getExpectedForPartialUpdate(input, 
addOnly);
         }
 
         @Override
         protected MergeFunction<KeyValue> createMergeFunction() {
             Options options = new Options();
+            options.set(CoreOptions.IGNORE_DELETE, !addOnly);
             return PartialUpdateMergeFunction.factory(
-                            options, RowType.of(DataTypes.BIGINT()), 
ImmutableList.of("f0"))
+                            options,
+                            RowType.of(DataTypes.INT().notNull(), 
DataTypes.BIGINT()),
+                            ImmutableList.of("f0"))
                     .create();
         }
     }
@@ -187,25 +198,36 @@ public abstract class SortBufferWriteBufferTestBase {
     /** Test for {@link SortBufferWriteBuffer} with {@link 
AggregateMergeFunction}. */
     public static class WithAggMergeFunctionTest extends 
SortBufferWriteBufferTestBase {
 
+        private final boolean addOnly;
+        private final boolean removeRecordOnDelete;
+
+        private WithAggMergeFunctionTest() {
+            ThreadLocalRandom random = ThreadLocalRandom.current();
+            this.addOnly = random.nextBoolean();
+            this.removeRecordOnDelete = !addOnly && random.nextBoolean();
+        }
+
         @Override
         protected boolean addOnly() {
-            return false;
+            return addOnly;
         }
 
         @Override
         protected List<ReusingTestData> getExpected(List<ReusingTestData> 
input) {
-            return MergeFunctionTestUtils.getExpectedForAggSum(input);
+            return MergeFunctionTestUtils.getExpectedForAggSum(
+                    input, addOnly, removeRecordOnDelete);
         }
 
         @Override
         protected MergeFunction<KeyValue> createMergeFunction() {
             Options options = new Options();
-            options.set("fields.value.aggregate-function", "sum");
+            options.set("fields.f1.aggregate-function", "sum");
+            options.set(CoreOptions.AGGREGATION_REMOVE_RECORD_ON_DELETE, 
removeRecordOnDelete);
             return AggregateMergeFunction.factory(
                             options,
-                            Collections.singletonList("value"),
-                            Collections.singletonList(DataTypes.BIGINT()),
-                            Collections.emptyList())
+                            Arrays.asList("f0", "f1"),
+                            Arrays.asList(DataTypes.INT().notNull(), 
DataTypes.BIGINT()),
+                            Collections.singletonList("f0"))
                     .create();
         }
     }
@@ -213,26 +235,37 @@ public abstract class SortBufferWriteBufferTestBase {
     /** Test for {@link SortBufferWriteBuffer} with {@link 
LookupMergeFunction}. */
     public static class WithLookupFunctionTest extends 
SortBufferWriteBufferTestBase {
 
+        private final boolean addOnly;
+        private final boolean removeRecordOnDelete;
+
+        private WithLookupFunctionTest() {
+            ThreadLocalRandom random = ThreadLocalRandom.current();
+            this.addOnly = random.nextBoolean();
+            this.removeRecordOnDelete = !addOnly && random.nextBoolean();
+        }
+
         @Override
         protected boolean addOnly() {
-            return false;
+            return addOnly;
         }
 
         @Override
         protected List<ReusingTestData> getExpected(List<ReusingTestData> 
input) {
-            return MergeFunctionTestUtils.getExpectedForAggSum(input);
+            return MergeFunctionTestUtils.getExpectedForAggSum(
+                    input, addOnly, removeRecordOnDelete);
         }
 
         @Override
         protected MergeFunction<KeyValue> createMergeFunction() {
             Options options = new Options();
-            options.set("fields.value.aggregate-function", "sum");
+            options.set("fields.f1.aggregate-function", "sum");
+            options.set(CoreOptions.AGGREGATION_REMOVE_RECORD_ON_DELETE, 
removeRecordOnDelete);
             MergeFunctionFactory<KeyValue> aggMergeFunction =
                     AggregateMergeFunction.factory(
                             options,
-                            Collections.singletonList("value"),
-                            Collections.singletonList(DataTypes.BIGINT()),
-                            Collections.emptyList());
+                            Arrays.asList("f0", "f1"),
+                            Arrays.asList(DataTypes.INT().notNull(), 
DataTypes.BIGINT()),
+                            Collections.singletonList("f0"));
             return LookupMergeFunction.wrap(aggMergeFunction).create();
         }
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeFunctionTestUtils.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeFunctionTestUtils.java
index 97410c7171..d52d1fa1f8 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeFunctionTestUtils.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeFunctionTestUtils.java
@@ -46,7 +46,8 @@ public class MergeFunctionTestUtils {
         return expected;
     }
 
-    public static List<ReusingTestData> 
getExpectedForPartialUpdate(List<ReusingTestData> input) {
+    public static List<ReusingTestData> getExpectedForPartialUpdate(
+            List<ReusingTestData> input, boolean addOnly) {
         input = new ArrayList<>(input);
         Collections.sort(input);
 
@@ -60,17 +61,31 @@ public class MergeFunctionTestUtils {
             if (group.size() == 1) {
                 // due to ReducerMergeFunctionWrapper
                 expected.add(group.get(0));
+            } else if (addOnly) {
+                // get the final value
+                expected.add(group.get(group.size() - 1));
             } else {
-                group.stream()
-                        .filter(d -> d.valueKind.isAdd())
-                        .reduce((first, second) -> second)
-                        .ifPresent(expected::add);
+                if (group.stream().noneMatch(data -> data.valueKind == 
RowKind.INSERT)) {
+                    // No insert: fill the pk and left nullable fields to 
null; sequenceNumber =
+                    // latest
+                    ReusingTestData last = group.get(group.size() - 1);
+                    expected.add(
+                            new ReusingTestData(
+                                    last.key, last.sequenceNumber, 
RowKind.DELETE, null));
+                } else {
+                    // get the last INSERT data because later DELETE data are 
ignored
+                    group.stream()
+                            .filter(d -> d.valueKind.isAdd())
+                            .reduce((first, second) -> second)
+                            .ifPresent(expected::add);
+                }
             }
         }
         return expected;
     }
 
-    public static List<ReusingTestData> 
getExpectedForAggSum(List<ReusingTestData> input) {
+    public static List<ReusingTestData> getExpectedForAggSum(
+            List<ReusingTestData> input, boolean addOnly, boolean 
removeRecordOndelete) {
         input = new ArrayList<>(input);
         Collections.sort(input);
 
@@ -84,7 +99,7 @@ public class MergeFunctionTestUtils {
             if (group.size() == 1) {
                 // due to ReducerMergeFunctionWrapper
                 expected.add(group.get(0));
-            } else {
+            } else if (addOnly || !removeRecordOndelete) {
                 long sum =
                         group.stream()
                                 .mapToLong(d -> d.valueKind.isAdd() ? d.value 
: -d.value)
@@ -92,6 +107,29 @@ public class MergeFunctionTestUtils {
                 ReusingTestData last = group.get(group.size() - 1);
                 expected.add(
                         new ReusingTestData(last.key, last.sequenceNumber, 
RowKind.INSERT, sum));
+            } else {
+                if (group.stream().noneMatch(data -> data.valueKind == 
RowKind.INSERT)) {
+                    // No insert: fill the pk and left nullable fields to 
null; sequenceNumber =
+                    // latest
+                    ReusingTestData last = group.get(group.size() - 1);
+                    expected.add(
+                            new ReusingTestData(
+                                    last.key, last.sequenceNumber, 
RowKind.DELETE, null));
+                } else {
+                    RowKind rowKind = null;
+                    Long sum = null;
+                    for (ReusingTestData data : group) {
+                        if (data.valueKind == RowKind.INSERT) {
+                            rowKind = RowKind.INSERT;
+                            sum = sum == null ? data.value : sum + data.value;
+                        } else {
+                            rowKind = RowKind.DELETE;
+                            sum = null;
+                        }
+                    }
+                    ReusingTestData last = group.get(group.size() - 1);
+                    expected.add(new ReusingTestData(last.key, 
last.sequenceNumber, rowKind, sum));
+                }
             }
         }
         return expected;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index c04b20cc6e..cb0767704f 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -1298,11 +1298,11 @@ public class PrimaryKeySimpleTableTest extends 
SimpleTableTestBase {
                 getResult(read, toSplits(snapshotReader.read().dataSplits()), 
rowToString);
         assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 2]");
 
-        // 2. Update Before
+        // 2. Update Before: retract
         write.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 1, 2, 2));
         commit.commit(1, write.prepareCommit(true, 1));
         result = getResult(read, toSplits(snapshotReader.read().dataSplits()), 
rowToString);
-        assertThat(result).isEmpty();
+        assertThat(result).containsExactly("+I[1, 1, NULL, NULL]");
 
         // 3. Update After
         write.write(GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 1, 2, 3));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/ValueContentRowDataRecordIteratorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/ValueContentRowDataRecordIteratorTest.java
index 6b5f3a7419..e1fbafcd15 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/ValueContentRowDataRecordIteratorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/ValueContentRowDataRecordIteratorTest.java
@@ -35,6 +35,8 @@ public class ValueContentRowDataRecordIteratorTest extends 
RowDataRecordIterator
     public void testIterator() throws Exception {
         List<ReusingTestData> input =
                 ReusingTestData.parse("1, 1, +, 100 | 2, 2, +, 200 | 1, 3, -, 
100 | 2, 4, +, 300");
+        // to check ReusingTestData.key
+        List<Integer> expectedKeys = Arrays.asList(1, 2, 1, 2);
         List<Long> expectedValues = Arrays.asList(100L, 200L, 100L, 300L);
         List<RowKind> expectedRowKinds =
                 Arrays.asList(RowKind.INSERT, RowKind.INSERT, RowKind.DELETE, 
RowKind.INSERT);
@@ -43,8 +45,9 @@ public class ValueContentRowDataRecordIteratorTest extends 
RowDataRecordIterator
                 input,
                 ValueContentRowDataRecordIterator::new,
                 (rowData, idx) -> {
-                    assertThat(rowData.getFieldCount()).isEqualTo(1);
-                    
assertThat(rowData.getLong(0)).isEqualTo(expectedValues.get(idx));
+                    assertThat(rowData.getFieldCount()).isEqualTo(2);
+                    
assertThat(rowData.getInt(0)).isEqualTo(expectedKeys.get(idx));
+                    
assertThat(rowData.getLong(1)).isEqualTo(expectedValues.get(idx));
                     
assertThat(rowData.getRowKind()).isEqualTo(expectedRowKinds.get(idx));
                 });
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/ReusingKeyValue.java 
b/paimon-core/src/test/java/org/apache/paimon/utils/ReusingKeyValue.java
index 2e37c6b9a7..33423cf4c8 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/ReusingKeyValue.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/ReusingKeyValue.java
@@ -37,7 +37,7 @@ public class ReusingKeyValue {
     public ReusingKeyValue() {
         this.key = new BinaryRow(1);
         this.keyWriter = new BinaryRowWriter(key);
-        this.value = new BinaryRow(1);
+        this.value = new BinaryRow(2);
         this.valueWriter = new BinaryRowWriter(value);
         this.kv = new KeyValue();
     }
@@ -45,7 +45,12 @@ public class ReusingKeyValue {
     public KeyValue update(ReusingTestData data) {
         keyWriter.writeInt(0, data.key);
         keyWriter.complete();
-        valueWriter.writeLong(0, data.value);
+        valueWriter.writeInt(0, data.key);
+        if (data.value == null) {
+            valueWriter.setNullAt(1);
+        } else {
+            valueWriter.writeLong(1, data.value);
+        }
         valueWriter.complete();
         return kv.replace(key, data.sequenceNumber, data.valueKind, value);
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/ReusingTestData.java 
b/paimon-core/src/test/java/org/apache/paimon/utils/ReusingTestData.java
index b96eef5bae..2c4b159ebd 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/ReusingTestData.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/ReusingTestData.java
@@ -21,6 +21,8 @@ package org.apache.paimon.utils;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.types.RowKind;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -39,9 +41,9 @@ public class ReusingTestData implements 
Comparable<ReusingTestData> {
     public final int key;
     public final long sequenceNumber;
     public final RowKind valueKind;
-    public final long value;
+    @Nullable public final Long value;
 
-    public ReusingTestData(int key, long sequenceNumber, RowKind valueKind, 
long value) {
+    public ReusingTestData(int key, long sequenceNumber, RowKind valueKind, 
@Nullable Long value) {
         this.key = key;
         this.sequenceNumber = sequenceNumber;
         this.valueKind = valueKind;
@@ -60,11 +62,20 @@ public class ReusingTestData implements 
Comparable<ReusingTestData> {
         return result;
     }
 
+    public String toString() {
+        return String.format("%d,%d,%s,%d", key, sequenceNumber, valueKind, 
value);
+    }
+
     public void assertEquals(KeyValue kv) {
         assertThat(kv.key().getInt(0)).isEqualTo(key);
         assertThat(kv.sequenceNumber()).isEqualTo(sequenceNumber);
         assertThat(kv.valueKind()).isEqualTo(valueKind);
-        assertThat(kv.value().getLong(0)).isEqualTo(value);
+        assertThat(kv.value().getInt(0)).isEqualTo(key);
+        if (kv.value().isNullAt(1)) {
+            assertThat(value).isNull();
+        } else {
+            assertThat(kv.value().getLong(1)).isEqualTo(value);
+        }
     }
 
     /**

Reply via email to