This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4a5fba14d [core] Serializer in LookupMergeFunction should be projected
(#1776)
4a5fba14d is described below
commit 4a5fba14d8d7734f37b6e68e0b8d42b8a090e554
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Aug 9 17:01:46 2023 +0800
[core] Serializer in LookupMergeFunction should be projected (#1776)
---
.../mergetree/compact/LookupMergeFunction.java | 5 ++++-
.../paimon/flink/LookupChangelogWithAggITCase.java | 20 ++++++++++++++++++++
2 files changed, 24 insertions(+), 1 deletion(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
index 64fc78045..341401da7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
@@ -21,6 +21,7 @@ package org.apache.paimon.mergetree.compact;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Projection;
import javax.annotation.Nullable;
@@ -110,7 +111,9 @@ public class LookupMergeFunction implements
MergeFunction<KeyValue> {
@Override
public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
- return new LookupMergeFunction(wrapped.create(projection),
keyType, rowType);
+ RowType valueType =
+ projection == null ? rowType :
Projection.of(projection).project(rowType);
+ return new LookupMergeFunction(wrapped.create(projection),
keyType, valueType);
}
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupChangelogWithAggITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupChangelogWithAggITCase.java
index 96f06860d..f4c554aa1 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupChangelogWithAggITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupChangelogWithAggITCase.java
@@ -26,6 +26,8 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import java.util.concurrent.ThreadLocalRandom;
+
import static org.assertj.core.api.Assertions.assertThat;
/** Test Lookup changelog producer with aggregation tables. */
@@ -93,4 +95,22 @@ public class LookupChangelogWithAggITCase extends
CatalogITCaseBase {
iterator.close();
}
+
+ @Test
+ public void testLookupChangelogProducerWithProjection() {
+ sql(
+ "CREATE TABLE T (k INT PRIMARY KEY NOT ENFORCED, v1 INT, v2
INT) WITH ("
+ + "'bucket'='3', "
+ + "'changelog-producer'='lookup', "
+ + "'merge-engine'='aggregation', "
+ + "'fields.v1.aggregate-function'='sum', "
+ + "'fields.v2.aggregate-function'='sum')");
+
+ int times = 3 + ThreadLocalRandom.current().nextInt(3);
+ for (int i = 0; i < times; i++) {
+ sql("INSERT INTO T VALUES (1, 1, 1), (2, 2, 2)");
+ }
+ assertThat(sql("SELECT v2 FROM T"))
+ .containsExactlyInAnyOrder(Row.of(times), Row.of(times * 2));
+ }
}