This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3798eb8b9 [hotfix] Add SumRetractionAggregation Flink ITCase
3798eb8b9 is described below

commit 3798eb8b9dfeaf941009e152f7859d8c8205eab1
Author: JingsongLi <[email protected]>
AuthorDate: Thu Apr 13 14:23:43 2023 +0800

    [hotfix] Add SumRetractionAggregation Flink ITCase
---
 .../apache/paimon/flink/PreAggregationITCase.java  | 36 ++++++++++++++++++++++
 1 file changed, 36 insertions(+)

diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
index d842b2c16..9268f9530 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
@@ -18,7 +18,10 @@
 
 package org.apache.paimon.flink;
 
+import org.apache.paimon.utils.BlockingIterator;
+
 import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
 import org.junit.jupiter.api.Test;
 
 import java.math.BigDecimal;
@@ -817,4 +820,37 @@ public class PreAggregationITCase {
                     "Pre-aggregate continuous reading is not supported");
         }
     }
+
+    /** ITCase for sum aggregate function retraction. */
+    public static class SumRetractionAggregation extends CatalogITCaseBase {
+        @Override
+        protected List<String> ddl() {
+            return Collections.singletonList(
+                    "CREATE TABLE T ("
+                            + "k INT,"
+                            + "b Decimal(12, 2),"
+                            + "PRIMARY KEY (k) NOT ENFORCED)"
+                            + " WITH ('merge-engine'='aggregation', "
+                            + "'changelog-producer' = 'full-compaction',"
+                            + "'fields.b.aggregate-function'='sum'"
+                            + ");");
+        }
+
+        @Test
+        public void testRetraction() throws Exception {
+            sql("CREATE TABLE INPUT (" + "k INT," + "b INT," + "PRIMARY KEY 
(k) NOT ENFORCED);");
+            CloseableIterator<Row> insert =
+                    streamSqlIter("INSERT INTO T SELECT k, SUM(b) FROM INPUT 
GROUP BY k;");
+            sql("INSERT INTO INPUT VALUES (1, 1), (2, 2)");
+            sql("INSERT INTO INPUT VALUES (1, 3), (2, 4)");
+            BlockingIterator<Row, Row> select = streamSqlBlockIter("SELECT * 
FROM T");
+            List<Row> result = select.collect(2);
+            assertThat(result)
+                    .containsExactlyInAnyOrder(
+                            Row.of(1, BigDecimal.valueOf(300, 2)),
+                            Row.of(2, BigDecimal.valueOf(400, 2)));
+            select.close();
+            insert.close();
+        }
+    }
 }

Reply via email to