This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 272f11818 [Bug] Fix the lookup changelog producer may produce wrong 
result (#1555)
272f11818 is described below

commit 272f118189d09bb6a1de521d0cfa0d4938087886
Author: Aitozi <[email protected]>
AuthorDate: Thu Jul 13 11:24:19 2023 +0800

    [Bug] Fix the lookup changelog producer may produce wrong result (#1555)
---
 .../mergetree/compact/LookupMergeFunction.java     | 24 ++++++++++++++++------
 .../table/ChangelogWithKeyFileStoreTable.java      |  7 +++++--
 .../LookupChangelogMergeFunctionWrapperTest.java   | 10 +++++++--
 .../paimon/flink/LookupChangelogWithAggITCase.java | 17 +++++++++++++++
 4 files changed, 48 insertions(+), 10 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
index 254e5cb25..7d3d8c050 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
@@ -19,6 +19,8 @@
 package org.apache.paimon.mergetree.compact;
 
 import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.types.RowType;
 
 import javax.annotation.Nullable;
 
@@ -34,12 +36,17 @@ public class LookupMergeFunction implements 
MergeFunction<KeyValue> {
 
     private final MergeFunction<KeyValue> mergeFunction;
     private final LinkedList<KeyValue> candidates = new LinkedList<>();
+    private final InternalRowSerializer keySerializer;
+    private final InternalRowSerializer valueSerializer;
 
     KeyValue highLevel;
     boolean containLevel0;
 
-    public LookupMergeFunction(MergeFunction<KeyValue> mergeFunction) {
+    public LookupMergeFunction(
+            MergeFunction<KeyValue> mergeFunction, RowType keyType, RowType 
valueType) {
         this.mergeFunction = mergeFunction;
+        this.keySerializer = new InternalRowSerializer(keyType);
+        this.valueSerializer = new InternalRowSerializer(valueType);
     }
 
     @Override
@@ -51,7 +58,7 @@ public class LookupMergeFunction implements 
MergeFunction<KeyValue> {
 
     @Override
     public void add(KeyValue kv) {
-        candidates.add(kv);
+        candidates.add(kv.copy(keySerializer, valueSerializer));
     }
 
     @Override
@@ -77,8 +84,9 @@ public class LookupMergeFunction implements 
MergeFunction<KeyValue> {
         return mergeFunction.getResult();
     }
 
-    public static MergeFunctionFactory<KeyValue> 
wrap(MergeFunctionFactory<KeyValue> wrapped) {
-        return new Factory(wrapped);
+    public static MergeFunctionFactory<KeyValue> wrap(
+            MergeFunctionFactory<KeyValue> wrapped, RowType keyType, RowType 
valueType) {
+        return new Factory(wrapped, keyType, valueType);
     }
 
     private static class Factory implements MergeFunctionFactory<KeyValue> {
@@ -86,14 +94,18 @@ public class LookupMergeFunction implements 
MergeFunction<KeyValue> {
         private static final long serialVersionUID = 1L;
 
         private final MergeFunctionFactory<KeyValue> wrapped;
+        private final RowType keyType;
+        private final RowType rowType;
 
-        private Factory(MergeFunctionFactory<KeyValue> wrapped) {
+        private Factory(MergeFunctionFactory<KeyValue> wrapped, RowType 
keyType, RowType rowType) {
             this.wrapped = wrapped;
+            this.keyType = keyType;
+            this.rowType = rowType;
         }
 
         @Override
         public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
-            return new LookupMergeFunction(wrapped.create(projection));
+            return new LookupMergeFunction(wrapped.create(projection), 
keyType, rowType);
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
index 705e5c2a9..512354f0f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
@@ -96,6 +96,8 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
             CoreOptions options = new CoreOptions(conf);
             CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
             MergeFunctionFactory<KeyValue> mfFactory;
+            KeyValueFieldsExtractor extractor = 
ChangelogWithKeyKeyValueFieldsExtractor.EXTRACTOR;
+
             switch (mergeEngine) {
                 case DEDUPLICATE:
                     mfFactory = DeduplicateMergeFunction.factory();
@@ -117,10 +119,11 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
             }
 
             if (options.changelogProducer() == ChangelogProducer.LOOKUP) {
-                mfFactory = LookupMergeFunction.wrap(mfFactory);
+                mfFactory =
+                        LookupMergeFunction.wrap(
+                                mfFactory, new 
RowType(extractor.keyFields(tableSchema)), rowType);
             }
 
-            KeyValueFieldsExtractor extractor = 
ChangelogWithKeyKeyValueFieldsExtractor.EXTRACTOR;
             lazyStore =
                     new KeyValueFileStore(
                             fileIO(),
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
index 56c62bf49..a468bbef5 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
@@ -26,6 +26,7 @@ import 
org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction;
 import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
 import org.apache.paimon.mergetree.compact.aggregate.FieldSumAgg;
 import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
 
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
@@ -53,7 +54,10 @@ public class LookupChangelogMergeFunctionWrapperTest {
         Map<InternalRow, KeyValue> highLevel = new HashMap<>();
         LookupChangelogMergeFunctionWrapper function =
                 new LookupChangelogMergeFunctionWrapper(
-                        
LookupMergeFunction.wrap(DeduplicateMergeFunction.factory()),
+                        LookupMergeFunction.wrap(
+                                DeduplicateMergeFunction.factory(),
+                                RowType.of(DataTypes.INT()),
+                                RowType.of(DataTypes.INT())),
                         highLevel::get,
                         EQUALISER,
                         changelogRowDeduplicate);
@@ -207,7 +211,9 @@ public class LookupChangelogMergeFunctionWrapperTest {
                                                 },
                                                 new FieldAggregator[] {
                                                     new 
FieldSumAgg(DataTypes.INT())
-                                                })),
+                                                }),
+                                RowType.of(DataTypes.INT()),
+                                RowType.of(DataTypes.INT())),
                         key -> null,
                         EQUALISER,
                         changlogRowDeduplicate);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupChangelogWithAggITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupChangelogWithAggITCase.java
index a0481eb9d..96f06860d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupChangelogWithAggITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupChangelogWithAggITCase.java
@@ -22,6 +22,7 @@ import org.apache.paimon.utils.BlockingIterator;
 
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
@@ -76,4 +77,20 @@ public class LookupChangelogWithAggITCase extends 
CatalogITCaseBase {
 
         iterator.close();
     }
+
+    @Test
+    public void testLookupChangelogProducerWithValueSwitch() throws Exception {
+        sql(
+                "CREATE TABLE T (k INT PRIMARY KEY NOT ENFORCED, v INT) WITH ("
+                        + "'bucket'='3', "
+                        + "'changelog-producer'='lookup', "
+                        + "'merge-engine'='aggregation', "
+                        + "'fields.v.aggregate-function'='sum')");
+        BlockingIterator<Row, Row> iterator = streamSqlBlockIter("SELECT * 
FROM T");
+
+        sql("INSERT INTO T VALUES (1, 1), (2, 2), (1, 3), (1, 4), (1, 5)");
+        assertThat(iterator.collect(2)).containsExactlyInAnyOrder(Row.of(1, 
13), Row.of(2, 2));
+
+        iterator.close();
+    }
 }

Reply via email to