This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3798eb8b9 [hotfix] Add SumRetractionAggregation Flink ITCase
3798eb8b9 is described below
commit 3798eb8b9dfeaf941009e152f7859d8c8205eab1
Author: JingsongLi <[email protected]>
AuthorDate: Thu Apr 13 14:23:43 2023 +0800
[hotfix] Add SumRetractionAggregation Flink ITCase
---
.../apache/paimon/flink/PreAggregationITCase.java | 36 ++++++++++++++++++++++
1 file changed, 36 insertions(+)
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
index d842b2c16..9268f9530 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
@@ -18,7 +18,10 @@
package org.apache.paimon.flink;
+import org.apache.paimon.utils.BlockingIterator;
+
import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.Test;
import java.math.BigDecimal;
@@ -817,4 +820,37 @@ public class PreAggregationITCase {
"Pre-aggregate continuous reading is not supported");
}
}
+
+ /** ITCase for sum aggregate function retraction. */
+ public static class SumRetractionAggregation extends CatalogITCaseBase {
+ @Override
+ protected List<String> ddl() {
+ return Collections.singletonList(
+ "CREATE TABLE T ("
+ + "k INT,"
+ + "b Decimal(12, 2),"
+ + "PRIMARY KEY (k) NOT ENFORCED)"
+ + " WITH ('merge-engine'='aggregation', "
+ + "'changelog-producer' = 'full-compaction',"
+ + "'fields.b.aggregate-function'='sum'"
+ + ");");
+ }
+
+ @Test
+ public void testRetraction() throws Exception {
+ sql("CREATE TABLE INPUT (" + "k INT," + "b INT," + "PRIMARY KEY
(k) NOT ENFORCED);");
+ CloseableIterator<Row> insert =
+ streamSqlIter("INSERT INTO T SELECT k, SUM(b) FROM INPUT
GROUP BY k;");
+ sql("INSERT INTO INPUT VALUES (1, 1), (2, 2)");
+ sql("INSERT INTO INPUT VALUES (1, 3), (2, 4)");
+ BlockingIterator<Row, Row> select = streamSqlBlockIter("SELECT *
FROM T");
+ List<Row> result = select.collect(2);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of(1, BigDecimal.valueOf(300, 2)),
+ Row.of(2, BigDecimal.valueOf(400, 2)));
+ select.close();
+ insert.close();
+ }
+ }
}