This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 58d2bca9c3 [core] Fix that AggregateMergeFunction handles multiple
sequence fields mistakenly (#5065)
58d2bca9c3 is described below
commit 58d2bca9c38326c341d59deabe14bb1500cf4038
Author: yuzelin <[email protected]>
AuthorDate: Wed Feb 12 17:23:17 2025 +0800
[core] Fix that AggregateMergeFunction handles multiple sequence fields
mistakenly (#5065)
---
.../compact/aggregate/AggregateMergeFunction.java | 5 +++--
.../apache/paimon/flink/BatchFileStoreITCase.java | 20 ++++++++++++++++++++
2 files changed, 23 insertions(+), 2 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
index ca380d2778..9af786ae6c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
@@ -26,6 +26,7 @@ import org.apache.paimon.mergetree.compact.MergeFunction;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory;
import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastNonNullValueAggFactory;
+import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastValueAggFactory;
import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldPrimaryKeyAggFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataType;
@@ -149,8 +150,8 @@ public class AggregateMergeFunction implements
MergeFunction<KeyValue> {
private String getAggFuncName(String fieldName, List<String>
sequenceFields) {
if (sequenceFields.contains(fieldName)) {
- // no agg for sequence fields, use last_non_null_value to do
cover
- return FieldLastNonNullValueAggFactory.NAME;
+ // no agg for sequence fields, use last_value to do cover
+ return FieldLastValueAggFactory.NAME;
}
if (primaryKeys.contains(fieldName)) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 486bfcb69b..e92a8a30b4 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -682,6 +682,26 @@ public class BatchFileStoreITCase extends
CatalogITCaseBase {
}
}
+ @Test
+ public void testAggregationWithNullSequenceField() {
+ sql(
+ "CREATE TABLE test ("
+ + " pk INT PRIMARY KEY NOT ENFORCED,"
+ + " v STRING,"
+ + " s0 INT,"
+ + " s1 INT"
+ + ") WITH ("
+ + " 'merge-engine' = 'aggregation',"
+ + " 'sequence.field' = 's0,s1')");
+
+ sql(
+ "INSERT INTO test VALUES (1, 'A1', CAST (NULL AS INT), 1), (1,
'A2', 1, CAST (NULL AS INT))");
+ assertThat(sql("SELECT * FROM test")).containsExactly(Row.of(1, "A2",
1, null));
+
+ sql("INSERT INTO test VALUES (1, 'A3', 1, 0)");
+ assertThat(sql("SELECT * FROM test")).containsExactly(Row.of(1, "A3",
1, 0));
+ }
+
private void validateCount1PushDown(String sql) {
Transformation<?> transformation = AbstractTestBase.translate(tEnv,
sql);
while (!transformation.getInputs().isEmpty()) {