This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b20b78e020 [core] Avoid the unnecessary key value copy (#5116)
b20b78e020 is described below

commit b20b78e02076bdc27a85570d8997435322d58919
Author: WenjunMin <[email protected]>
AuthorDate: Thu Feb 20 10:59:56 2025 +0800

    [core] Avoid the unnecessary key value copy (#5116)
---
 .../paimon/benchmark/TableWriterBenchmark.java     | 18 ++++++++++
 .../java/org/apache/paimon/KeyValueSerializer.java | 13 ++++++-
 .../paimon/mergetree/SortBufferWriteBuffer.java    |  8 +++--
 .../compact/DeduplicateMergeFunction.java          |  5 +++
 .../mergetree/compact/FirstRowMergeFunction.java   | 29 ++++++----------
 .../LookupChangelogMergeFunctionWrapper.java       |  8 +----
 .../mergetree/compact/LookupMergeFunction.java     | 40 ++++++----------------
 .../paimon/mergetree/compact/MergeFunction.java    |  2 ++
 .../compact/PartialUpdateMergeFunction.java        |  5 +++
 .../compact/aggregate/AggregateMergeFunction.java  |  5 +++
 .../paimon/table/PrimaryKeyFileStoreTable.java     |  4 +--
 .../apache/paimon/table/PrimaryKeyTableUtils.java  |  3 +-
 .../java/org/apache/paimon/utils/OffsetRow.java    |  4 +++
 .../mergetree/SortBufferWriteBufferTestBase.java   | 13 ++-----
 .../LookupChangelogMergeFunctionWrapperTest.java   | 30 +++-------------
 .../mergetree/compact/SortMergeReaderTestBase.java | 10 +-----
 .../paimon/operation/MergeFileSplitReadTest.java   |  5 +++
 .../paimon/flink/LookupChangelogWithAggITCase.java | 10 +++++-
 18 files changed, 104 insertions(+), 108 deletions(-)

diff --git 
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java
 
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java
index d3624f9330..19349d4eee 100644
--- 
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java
+++ 
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.benchmark;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.disk.IOManagerImpl;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.BatchTableCommit;
@@ -80,6 +81,7 @@ public class TableWriterBenchmark extends TableBenchmark {
     public void testParquet() throws Exception {
         Options options = new Options();
         options.set(CoreOptions.FILE_FORMAT, CoreOptions.FILE_FORMAT_PARQUET);
+        options.set(CoreOptions.CHANGELOG_PRODUCER, 
CoreOptions.ChangelogProducer.LOOKUP);
         innerTest("parquet", options);
         /*
          * Java HotSpot(TM) 64-Bit Server VM 1.8.0_301-b09 on Mac OS X 10.16
@@ -104,6 +106,21 @@ public class TableWriterBenchmark extends TableBenchmark {
          */
     }
 
+    @Test
+    public void testParquetLookupCompaction() throws Exception {
+        Options options = new Options();
+        options.set(CoreOptions.FILE_FORMAT, CoreOptions.FILE_FORMAT_PARQUET);
+        options.set(CoreOptions.CHANGELOG_PRODUCER, 
CoreOptions.ChangelogProducer.LOOKUP);
+        innerTest("parquet", options);
+        /*
+         * OpenJDK 64-Bit Server VM 11.0.24+0 on Mac OS X 14.5
+         * Apple M3 Pro
+         * parquet:         Best/Avg Time(ms)    Row Rate(K/s)      Per 
Row(ns)   Relative
+         * 
-------------------------------------------------------------------------------
+         * parquet_write     31918 / 32666             94.0          10639.3   
    1.0X
+         */
+    }
+
     public void innerTest(String name, Options options) throws Exception {
         options.set(CoreOptions.BUCKET, 1);
         Table table = createTable(options, "T");
@@ -119,6 +136,7 @@ public class TableWriterBenchmark extends TableBenchmark {
                 () -> {
                     BatchWriteBuilder writeBuilder = 
table.newBatchWriteBuilder();
                     BatchTableWrite write = writeBuilder.newWrite();
+                    write.withIOManager(new 
IOManagerImpl(tempFile.toString()));
                     BatchTableCommit commit = writeBuilder.newCommit();
                     for (int i = 0; i < valuesPerIteration; i++) {
                         try {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/KeyValueSerializer.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValueSerializer.java
index 34fd9b1405..13a172e49a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueSerializer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueSerializer.java
@@ -36,6 +36,7 @@ public class KeyValueSerializer extends 
ObjectSerializer<KeyValue> {
     private static final long serialVersionUID = 1L;
 
     private final int keyArity;
+    private final int valueArity;
 
     private final GenericRow reusedMeta;
     private final JoinedRow reusedKeyWithMeta;
@@ -49,7 +50,7 @@ public class KeyValueSerializer extends 
ObjectSerializer<KeyValue> {
         super(KeyValue.schema(keyType, valueType));
 
         this.keyArity = keyType.getFieldCount();
-        int valueArity = valueType.getFieldCount();
+        this.valueArity = valueType.getFieldCount();
 
         this.reusedMeta = new GenericRow(2);
         this.reusedKeyWithMeta = new JoinedRow();
@@ -85,4 +86,14 @@ public class KeyValueSerializer extends 
ObjectSerializer<KeyValue> {
     public KeyValue getReusedKv() {
         return reusedKv;
     }
+
+    public KeyValue getCopiedKv() {
+        InternalRow row = rowSerializer.copy(reusedKey.getOriginalRow());
+        return new KeyValue()
+                .replace(
+                        new OffsetRow(keyArity, 0).replace(row),
+                        reusedKv.sequenceNumber(),
+                        reusedKv.valueKind(),
+                        new OffsetRow(valueArity, keyArity + 2).replace(row));
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java
index 433ff2158b..3c60c10bb9 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java
@@ -178,6 +178,7 @@ public class SortBufferWriteBuffer implements WriteBuffer {
         private final MutableObjectIterator<BinaryRow> kvIter;
         private final Comparator<InternalRow> keyComparator;
         private final ReducerMergeFunctionWrapper mergeFunctionWrapper;
+        private final boolean requireCopy;
 
         // previously read kv
         private KeyValueSerializer previous;
@@ -199,6 +200,7 @@ public class SortBufferWriteBuffer implements WriteBuffer {
             this.kvIter = kvIter;
             this.keyComparator = keyComparator;
             this.mergeFunctionWrapper = new 
ReducerMergeFunctionWrapper(mergeFunction);
+            this.requireCopy = mergeFunction.requireCopy();
 
             int totalFieldCount = keyType.getFieldCount() + 2 + 
valueType.getFieldCount();
             this.previous = new KeyValueSerializer(keyType, valueType);
@@ -235,7 +237,8 @@ public class SortBufferWriteBuffer implements WriteBuffer {
                     return;
                 }
                 mergeFunctionWrapper.reset();
-                mergeFunctionWrapper.add(previous.getReusedKv());
+                mergeFunctionWrapper.add(
+                        requireCopy ? previous.getCopiedKv() : 
previous.getReusedKv());
 
                 while (readOnce()) {
                     if (keyComparator.compare(
@@ -243,7 +246,8 @@ public class SortBufferWriteBuffer implements WriteBuffer {
                             != 0) {
                         break;
                     }
-                    mergeFunctionWrapper.add(current.getReusedKv());
+                    mergeFunctionWrapper.add(
+                            requireCopy ? current.getCopiedKv() : 
current.getReusedKv());
                     swapSerializers();
                 }
                 result = mergeFunctionWrapper.getResult();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
index bad9b29160..5422f5469e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
@@ -58,6 +58,11 @@ public class DeduplicateMergeFunction implements 
MergeFunction<KeyValue> {
         return latestKv;
     }
 
+    @Override
+    public boolean requireCopy() {
+        return false;
+    }
+
     public static MergeFunctionFactory<KeyValue> factory() {
         return new Factory(false);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
index b3d9b8661e..d795f16bbd 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
@@ -20,9 +20,7 @@ package org.apache.paimon.mergetree.compact;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
-import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.options.Options;
-import org.apache.paimon.types.RowType;
 
 import javax.annotation.Nullable;
 
@@ -32,15 +30,11 @@ import javax.annotation.Nullable;
  */
 public class FirstRowMergeFunction implements MergeFunction<KeyValue> {
 
-    private final InternalRowSerializer keySerializer;
-    private final InternalRowSerializer valueSerializer;
     private KeyValue first;
     public boolean containsHighLevel;
     private final boolean ignoreDelete;
 
-    protected FirstRowMergeFunction(RowType keyType, RowType valueType, 
boolean ignoreDelete) {
-        this.keySerializer = new InternalRowSerializer(keyType);
-        this.valueSerializer = new InternalRowSerializer(valueType);
+    protected FirstRowMergeFunction(boolean ignoreDelete) {
         this.ignoreDelete = ignoreDelete;
     }
 
@@ -65,7 +59,7 @@ public class FirstRowMergeFunction implements 
MergeFunction<KeyValue> {
         }
 
         if (first == null) {
-            this.first = kv.copy(keySerializer, valueSerializer);
+            this.first = kv;
         }
         if (kv.level() > 0) {
             containsHighLevel = true;
@@ -77,28 +71,27 @@ public class FirstRowMergeFunction implements 
MergeFunction<KeyValue> {
         return first;
     }
 
-    public static MergeFunctionFactory<KeyValue> factory(
-            Options options, RowType keyType, RowType valueType) {
-        return new FirstRowMergeFunction.Factory(
-                keyType, valueType, options.get(CoreOptions.IGNORE_DELETE));
+    @Override
+    public boolean requireCopy() {
+        return true;
+    }
+
+    public static MergeFunctionFactory<KeyValue> factory(Options options) {
+        return new 
FirstRowMergeFunction.Factory(options.get(CoreOptions.IGNORE_DELETE));
     }
 
     private static class Factory implements MergeFunctionFactory<KeyValue> {
 
         private static final long serialVersionUID = 1L;
-        private final RowType keyType;
-        private final RowType valueType;
         private final boolean ignoreDelete;
 
-        public Factory(RowType keyType, RowType valueType, boolean 
ignoreDelete) {
-            this.keyType = keyType;
-            this.valueType = valueType;
+        public Factory(boolean ignoreDelete) {
             this.ignoreDelete = ignoreDelete;
         }
 
         @Override
         public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
-            return new FirstRowMergeFunction(keyType, valueType, ignoreDelete);
+            return new FirstRowMergeFunction(ignoreDelete);
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
index 450df52314..c327474fa8 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
@@ -21,7 +21,6 @@ package org.apache.paimon.mergetree.compact;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.codegen.RecordEqualiser;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
 import org.apache.paimon.lookup.LookupStrategy;
 import org.apache.paimon.mergetree.LookupLevels.PositionedKeyValue;
@@ -69,8 +68,6 @@ public class LookupChangelogMergeFunctionWrapper<T>
     private final Comparator<KeyValue> comparator;
 
     private final LinkedList<KeyValue> candidates = new LinkedList<>();
-    private final InternalRowSerializer keySerializer;
-    private final InternalRowSerializer valueSerializer;
 
     public LookupChangelogMergeFunctionWrapper(
             MergeFunctionFactory<KeyValue> mergeFunctionFactory,
@@ -89,9 +86,6 @@ public class LookupChangelogMergeFunctionWrapper<T>
                     deletionVectorsMaintainer != null,
                     "deletionVectorsMaintainer should not be null, there is a 
bug.");
         }
-        LookupMergeFunction lookupMergeFunction = (LookupMergeFunction) 
mergeFunction;
-        this.keySerializer = lookupMergeFunction.getKeySerializer();
-        this.valueSerializer = lookupMergeFunction.getValueSerializer();
         this.mergeFunction = mergeFunctionFactory.create();
         this.lookup = lookup;
         this.valueEqualiser = valueEqualiser;
@@ -107,7 +101,7 @@ public class LookupChangelogMergeFunctionWrapper<T>
 
     @Override
     public void add(KeyValue kv) {
-        candidates.add(kv.copy(keySerializer, valueSerializer));
+        candidates.add(kv);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
index a3a6af23cb..6f999297aa 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
@@ -19,9 +19,6 @@
 package org.apache.paimon.mergetree.compact;
 
 import org.apache.paimon.KeyValue;
-import org.apache.paimon.data.serializer.InternalRowSerializer;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.Projection;
 
 import javax.annotation.Nullable;
 
@@ -37,14 +34,9 @@ public class LookupMergeFunction implements 
MergeFunction<KeyValue> {
 
     private final MergeFunction<KeyValue> mergeFunction;
     private final LinkedList<KeyValue> candidates = new LinkedList<>();
-    private final InternalRowSerializer keySerializer;
-    private final InternalRowSerializer valueSerializer;
 
-    public LookupMergeFunction(
-            MergeFunction<KeyValue> mergeFunction, RowType keyType, RowType 
valueType) {
+    public LookupMergeFunction(MergeFunction<KeyValue> mergeFunction) {
         this.mergeFunction = mergeFunction;
-        this.keySerializer = new InternalRowSerializer(keyType);
-        this.valueSerializer = new InternalRowSerializer(valueType);
     }
 
     @Override
@@ -54,15 +46,7 @@ public class LookupMergeFunction implements 
MergeFunction<KeyValue> {
 
     @Override
     public void add(KeyValue kv) {
-        candidates.add(kv.copy(keySerializer, valueSerializer));
-    }
-
-    public InternalRowSerializer getKeySerializer() {
-        return keySerializer;
-    }
-
-    public InternalRowSerializer getValueSerializer() {
-        return valueSerializer;
+        candidates.add(kv);
     }
 
     @Override
@@ -87,14 +71,18 @@ public class LookupMergeFunction implements 
MergeFunction<KeyValue> {
         return mergeFunction.getResult();
     }
 
-    public static MergeFunctionFactory<KeyValue> wrap(
-            MergeFunctionFactory<KeyValue> wrapped, RowType keyType, RowType 
valueType) {
+    @Override
+    public boolean requireCopy() {
+        return true;
+    }
+
+    public static MergeFunctionFactory<KeyValue> 
wrap(MergeFunctionFactory<KeyValue> wrapped) {
         if (wrapped.create() instanceof FirstRowMergeFunction) {
             // don't wrap first row, it is already OK
             return wrapped;
         }
 
-        return new Factory(wrapped, keyType, valueType);
+        return new Factory(wrapped);
     }
 
     private static class Factory implements MergeFunctionFactory<KeyValue> {
@@ -102,20 +90,14 @@ public class LookupMergeFunction implements 
MergeFunction<KeyValue> {
         private static final long serialVersionUID = 1L;
 
         private final MergeFunctionFactory<KeyValue> wrapped;
-        private final RowType keyType;
-        private final RowType rowType;
 
-        private Factory(MergeFunctionFactory<KeyValue> wrapped, RowType 
keyType, RowType rowType) {
+        private Factory(MergeFunctionFactory<KeyValue> wrapped) {
             this.wrapped = wrapped;
-            this.keyType = keyType;
-            this.rowType = rowType;
         }
 
         @Override
         public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
-            RowType valueType =
-                    projection == null ? rowType : 
Projection.of(projection).project(rowType);
-            return new LookupMergeFunction(wrapped.create(projection), 
keyType, valueType);
+            return new LookupMergeFunction(wrapped.create(projection));
         }
 
         @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeFunction.java
index dc2c9e4580..46240b038a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeFunction.java
@@ -44,4 +44,6 @@ public interface MergeFunction<T> {
 
     /** Get current merged value. */
     T getResult();
+
+    boolean requireCopy();
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index 3ce51127b1..2497de0893 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -273,6 +273,11 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
         return reused.replace(currentKey, latestSequenceNumber, rowKind, row);
     }
 
+    @Override
+    public boolean requireCopy() {
+        return false;
+    }
+
     public static MergeFunctionFactory<KeyValue> factory(
             Options options, RowType rowType, List<String> primaryKeys) {
         return new Factory(options, rowType, primaryKeys);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
index 9af786ae6c..7b18ac77f3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
@@ -96,6 +96,11 @@ public class AggregateMergeFunction implements 
MergeFunction<KeyValue> {
         return reused.replace(latestKv.key(), latestKv.sequenceNumber(), 
RowKind.INSERT, row);
     }
 
+    @Override
+    public boolean requireCopy() {
+        return false;
+    }
+
     public static MergeFunctionFactory<KeyValue> factory(
             Options conf,
             List<String> tableNames,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index 516ae766ce..ea71204dc9 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -79,9 +79,7 @@ class PrimaryKeyFileStoreTable extends AbstractFileStoreTable 
{
             MergeFunctionFactory<KeyValue> mfFactory =
                     
PrimaryKeyTableUtils.createMergeFunctionFactory(tableSchema, extractor);
             if (options.needLookup()) {
-                mfFactory =
-                        LookupMergeFunction.wrap(
-                                mfFactory, new 
RowType(extractor.keyFields(tableSchema)), rowType);
+                mfFactory = LookupMergeFunction.wrap(mfFactory);
             }
 
             lazyStore =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
index d156d23a91..a47e44718e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
@@ -69,8 +69,7 @@ public class PrimaryKeyTableUtils {
                         rowType.getFieldTypes(),
                         tableSchema.primaryKeys());
             case FIRST_ROW:
-                return FirstRowMergeFunction.factory(
-                        conf, new RowType(extractor.keyFields(tableSchema)), 
rowType);
+                return FirstRowMergeFunction.factory(conf);
             default:
                 throw new UnsupportedOperationException("Unsupported merge 
engine: " + mergeEngine);
         }
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/OffsetRow.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/OffsetRow.java
index 822488f271..02da5a15be 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/OffsetRow.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/OffsetRow.java
@@ -41,6 +41,10 @@ public class OffsetRow implements InternalRow {
         this.offset = offset;
     }
 
+    public InternalRow getOriginalRow() {
+        return row;
+    }
+
     public OffsetRow replace(InternalRow row) {
         this.row = row;
         return this;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
index 27606ee33f..8169d6a840 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
@@ -43,7 +43,6 @@ import org.apache.paimon.utils.ReusingTestData;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 
-import org.assertj.core.util.Lists;
 import org.junit.jupiter.api.Test;
 
 import java.io.EOFException;
@@ -234,11 +233,7 @@ public abstract class SortBufferWriteBufferTestBase {
                             Collections.singletonList("value"),
                             Collections.singletonList(DataTypes.BIGINT()),
                             Collections.emptyList());
-            return LookupMergeFunction.wrap(
-                            aggMergeFunction,
-                            RowType.of(DataTypes.INT()),
-                            RowType.of(DataTypes.BIGINT()))
-                    .create();
+            return LookupMergeFunction.wrap(aggMergeFunction).create();
         }
     }
 
@@ -257,11 +252,7 @@ public abstract class SortBufferWriteBufferTestBase {
 
         @Override
         protected MergeFunction<KeyValue> createMergeFunction() {
-            return FirstRowMergeFunction.factory(
-                            new Options(),
-                            new RowType(Lists.list(new DataField(0, "f0", new 
IntType()))),
-                            new RowType(Lists.list(new DataField(1, "f1", new 
BigIntType()))))
-                    .create();
+            return FirstRowMergeFunction.factory(new Options()).create();
         }
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
index 28cb4c099a..d44264c6f9 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
@@ -28,17 +28,14 @@ import 
org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction;
 import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
 import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastValueAggFactory;
 import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldSumAggFactory;
-import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.UserDefinedSeqComparator;
 import org.apache.paimon.utils.ValueEqualiserSupplier;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
 
-import org.assertj.core.util.Lists;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
@@ -69,10 +66,7 @@ public class LookupChangelogMergeFunctionWrapperTest {
         Map<InternalRow, KeyValue> highLevel = new HashMap<>();
         LookupChangelogMergeFunctionWrapper function =
                 new LookupChangelogMergeFunctionWrapper(
-                        LookupMergeFunction.wrap(
-                                DeduplicateMergeFunction.factory(),
-                                RowType.of(DataTypes.INT()),
-                                RowType.of(DataTypes.INT())),
+                        
LookupMergeFunction.wrap(DeduplicateMergeFunction.factory()),
                         highLevel::get,
                         changelogRowDeduplicate ? EQUALISER : null,
                         LookupStrategy.from(false, true, false, false),
@@ -234,10 +228,7 @@ public class LookupChangelogMergeFunctionWrapperTest {
                 ValueEqualiserSupplier.fromIgnoreFields(valueType, 
ignoreFields);
         LookupChangelogMergeFunctionWrapper function =
                 new LookupChangelogMergeFunctionWrapper(
-                        LookupMergeFunction.wrap(
-                                DeduplicateMergeFunction.factory(),
-                                RowType.of(DataTypes.INT()),
-                                valueType),
+                        
LookupMergeFunction.wrap(DeduplicateMergeFunction.factory()),
                         highLevel::get,
                         logDedupEqualSupplier.get(),
                         LookupStrategy.from(false, true, false, false),
@@ -295,9 +286,7 @@ public class LookupChangelogMergeFunctionWrapperTest {
                                                 new FieldAggregator[] {
                                                     new FieldSumAggFactory()
                                                             
.create(DataTypes.INT(), null, null)
-                                                }),
-                                RowType.of(DataTypes.INT()),
-                                RowType.of(DataTypes.INT())),
+                                                })),
                         key -> null,
                         changelogRowDeduplicate ? EQUALISER : null,
                         LookupStrategy.from(false, true, false, false),
@@ -384,9 +373,7 @@ public class LookupChangelogMergeFunctionWrapperTest {
                                                 new FieldAggregator[] {
                                                     new 
FieldLastValueAggFactory()
                                                             
.create(DataTypes.INT(), null, null)
-                                                }),
-                                RowType.of(DataTypes.INT()),
-                                RowType.of(DataTypes.INT())),
+                                                })),
                         highLevel::get,
                         null,
                         LookupStrategy.from(false, true, false, false),
@@ -454,14 +441,7 @@ public class LookupChangelogMergeFunctionWrapperTest {
         Set<InternalRow> highLevel = new HashSet<>();
         FirstRowMergeFunctionWrapper function =
                 new FirstRowMergeFunctionWrapper(
-                        projection ->
-                                new FirstRowMergeFunction(
-                                        new RowType(
-                                                Lists.list(new DataField(0, 
"f0", new IntType()))),
-                                        new RowType(
-                                                Lists.list(new DataField(1, 
"f1", new IntType()))),
-                                        false),
-                        highLevel::contains);
+                        projection -> new FirstRowMergeFunction(false), 
highLevel::contains);
 
         // Without level-0
         function.reset();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
index 81e49b81d2..50eebd5007 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
@@ -22,14 +22,9 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.SortEngine;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.types.BigIntType;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.IntType;
-import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.ReusingTestData;
 import org.apache.paimon.utils.TestReusingRecordReader;
 
-import org.assertj.core.util.Lists;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 
@@ -127,10 +122,7 @@ public abstract class SortMergeReaderTestBase extends 
CombiningRecordReaderTestB
 
         @Override
         protected MergeFunction<KeyValue> createMergeFunction() {
-            RowType keyType = new RowType(Lists.list(new DataField(0, "f0", 
new IntType())));
-            RowType valueType = new RowType(Lists.list(new DataField(1, "f1", 
new BigIntType())));
-            return new LookupMergeFunction(
-                    new FirstRowMergeFunction(keyType, valueType, false), 
keyType, valueType);
+            return new LookupMergeFunction(new FirstRowMergeFunction(false));
         }
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
index 59f848a296..44a02699e4 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
@@ -344,6 +344,11 @@ public class MergeFileSplitReadTest {
                     GenericRow.of(total));
         }
 
+        @Override
+        public boolean requireCopy() {
+            return false;
+        }
+
         private long count(InternalRow value) {
             checkArgument(!value.isNullAt(0), "Value count should not be 
null.");
             return value.getLong(0);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupChangelogWithAggITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupChangelogWithAggITCase.java
index f4c554aa1c..e288f8bcbf 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupChangelogWithAggITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupChangelogWithAggITCase.java
@@ -90,10 +90,18 @@ public class LookupChangelogWithAggITCase extends 
CatalogITCaseBase {
                         + "'fields.v.aggregate-function'='sum')");
         BlockingIterator<Row, Row> iterator = streamSqlBlockIter("SELECT * 
FROM T");
 
+        // merge by sort buffer
         sql("INSERT INTO T VALUES (1, 1), (2, 2), (1, 3), (1, 4), (1, 5)");
         assertThat(iterator.collect(2)).containsExactlyInAnyOrder(Row.of(1, 
13), Row.of(2, 2));
-
         iterator.close();
+
+        // merge by compaction
+        sql("INSERT INTO T VALUES (1, 3), (2, 2), (3, 1)");
+        sql("INSERT INTO T VALUES (1, 2), (2, 2), (3, 2)");
+        sql("INSERT INTO T VALUES (1, 1), (2, 2), (3, 3)");
+
+        assertThat(sql("SELECT * FROM T"))
+                .containsExactlyInAnyOrder(Row.of(1, 19), Row.of(2, 8), 
Row.of(3, 6));
     }
 
     @Test

Reply via email to