This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a5fba14d [core] Serializer in LookupMergeFunction should be projected 
(#1776)
4a5fba14d is described below

commit 4a5fba14d8d7734f37b6e68e0b8d42b8a090e554
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Aug 9 17:01:46 2023 +0800

    [core] Serializer in LookupMergeFunction should be projected (#1776)
---
 .../mergetree/compact/LookupMergeFunction.java       |  5 ++++-
 .../paimon/flink/LookupChangelogWithAggITCase.java   | 20 ++++++++++++++++++++
 2 files changed, 24 insertions(+), 1 deletion(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
index 64fc78045..341401da7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
@@ -21,6 +21,7 @@ package org.apache.paimon.mergetree.compact;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Projection;
 
 import javax.annotation.Nullable;
 
@@ -110,7 +111,9 @@ public class LookupMergeFunction implements 
MergeFunction<KeyValue> {
 
         @Override
         public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
-            return new LookupMergeFunction(wrapped.create(projection), 
keyType, rowType);
+            RowType valueType =
+                    projection == null ? rowType : 
Projection.of(projection).project(rowType);
+            return new LookupMergeFunction(wrapped.create(projection), 
keyType, valueType);
         }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupChangelogWithAggITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupChangelogWithAggITCase.java
index 96f06860d..f4c554aa1 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupChangelogWithAggITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupChangelogWithAggITCase.java
@@ -26,6 +26,8 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import java.util.concurrent.ThreadLocalRandom;
+
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test Lookup changelog producer with aggregation tables. */
@@ -93,4 +95,22 @@ public class LookupChangelogWithAggITCase extends 
CatalogITCaseBase {
 
         iterator.close();
     }
+
+    @Test
+    public void testLookupChangelogProducerWithProjection() {
+        sql(
+                "CREATE TABLE T (k INT PRIMARY KEY NOT ENFORCED, v1 INT, v2 
INT) WITH ("
+                        + "'bucket'='3', "
+                        + "'changelog-producer'='lookup', "
+                        + "'merge-engine'='aggregation', "
+                        + "'fields.v1.aggregate-function'='sum', "
+                        + "'fields.v2.aggregate-function'='sum')");
+
+        int times = 3 + ThreadLocalRandom.current().nextInt(3);
+        for (int i = 0; i < times; i++) {
+            sql("INSERT INTO T VALUES (1, 1, 1), (2, 2, 2)");
+        }
+        assertThat(sql("SELECT v2 FROM T"))
+                .containsExactlyInAnyOrder(Row.of(times), Row.of(times * 2));
+    }
 }

Reply via email to