This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 87582a723 [Bug] fix the sequence field missing when projection pushed
down (#2219)
87582a723 is described below
commit 87582a723ab660402c93ff231564f75e315e8349
Author: Aitozi <[email protected]>
AuthorDate: Wed Nov 1 08:24:35 2023 +0800
[Bug] fix the sequence field missing when projection pushed down (#2219)
---
.../apache/paimon/mergetree/compact/LookupMergeFunction.java | 5 +++++
.../java/org/apache/paimon/flink/PartialUpdateITCase.java | 11 +++++++++++
2 files changed, 16 insertions(+)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
index 341401da7..c525b4676 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
@@ -115,5 +115,10 @@ public class LookupMergeFunction implements
MergeFunction<KeyValue> {
projection == null ? rowType :
Projection.of(projection).project(rowType);
return new LookupMergeFunction(wrapped.create(projection),
keyType, valueType);
}
+
+ @Override
+ public AdjustedProjection adjustProjection(@Nullable int[][]
projection) {
+ return wrapped.adjustProjection(projection);
+ }
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
index 96796ff07..4fd718eec 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
@@ -240,4 +240,15 @@ public class PartialUpdateITCase extends CatalogITCaseBase
{
.hasRootCauseMessage(
"Field a is defined repeatedly by multiple groups:
[fields.g_1.sequence-group, fields.g_2.sequence-group].");
}
+
+ @Test
+ public void testProjectPushDownWithLookupChangelogProducer() {
+ sql(
+ "CREATE TABLE IF NOT EXISTS T_P ("
+ + "j INT, k INT, a INT, b INT, c STRING, PRIMARY KEY
(j,k) NOT ENFORCED)"
+ + " WITH ('merge-engine'='partial-update',
'changelog-producer' = 'lookup', "
+ + "'fields.a.sequence-group'='j',
'fields.b.sequence-group'='c');");
+ batchSql("INSERT INTO T_P VALUES (1, 1, 1, 1, '1')");
+ assertThat(sql("SELECT k, c FROM
T_P")).containsExactlyInAnyOrder(Row.of(1, "1"));
+ }
}