wuchong commented on code in PR #2217:
URL: https://github.com/apache/fluss/pull/2217#discussion_r2637526323


##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java:
##########
@@ -1384,4 +1384,78 @@ void testDeleteBehaviorForInsertStmt(String 
deleteBehavior) throws Exception {
             assertResultsIgnoreOrder(rowIter, expectedRows, true);
         }
     }
+
+    @Test
+    void testWalModeWithDefaultMergeEngineAndAggregation() throws Exception {
+        
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 1);
+
+        String tableName = "wal_mode_pk_table";
+        // Create a table with WAL mode and default merge engine
+        tEnv.executeSql(
+                String.format(
+                        "create table %s ("
+                                + " id int not null,"
+                                + " category string,"
+                                + " amount bigint,"
+                                + " primary key (id) not enforced"
+                                + ") with ('table.changelog.image' = 'wal')",
+                        tableName));
+
+        // Insert initial data
+        tEnv.executeSql(
+                        String.format(
+                                "INSERT INTO %s VALUES "
+                                        + "(1, 'A', 100), "
+                                        + "(2, 'B', 200), "
+                                        + "(3, 'A', 150), "
+                                        + "(4, 'B', 250)",
+                                tableName))
+                .await();
+
+        // Use batch mode to update and delete records
+        tBatchEnv.executeSql("UPDATE " + tableName + " SET amount = 120 WHERE 
id = 1").await();
+        tBatchEnv.executeSql("UPDATE " + tableName + " SET amount = 180 WHERE 
id = 3").await();
+        tBatchEnv.executeSql("DELETE FROM " + tableName + " WHERE id = 
4").await();
+
+        // Do aggregation on the table and verify ChangelogNormalize node is 
generated
+        String aggQuery =
+                String.format(
+                        "SELECT category, SUM(amount) as total_amount FROM %s 
/*+ OPTIONS('scan.startup.mode' = 'earliest') */ GROUP BY category",
+                        tableName);
+
+        // Explain the aggregation query to check for ChangelogNormalize
+        String aggPlan = tEnv.explainSql(aggQuery);
+        // ChangelogNormalize should be present to normalize the changelog for 
aggregation
+        // In Flink, when the source produces changelog with primary key 
semantics (I, UA, D),
+        // a ChangelogNormalize operator is inserted before aggregation
+        assertThat(aggPlan).contains("ChangelogNormalize");
+
+        // Execute the aggregation and verify the result
+        CloseableIterator<Row> aggIter = tEnv.executeSql(aggQuery).collect();
+
+        // Expected aggregation results:
+        // Category A: 120 (id=1) + 180 (id=3) = 300
+        // Category B: 200 (id=2) = 200 (id=4 was deleted)
+        List<String> expectedAggResults =
+                Arrays.asList(
+                        "+I[A, 100]",
+                        "-U[A, 100]",
+                        "+U[A, 250]",
+                        "-U[A, 250]",
+                        "+U[A, 150]",
+                        "-U[A, 150]",
+                        "+U[A, 270]",
+                        "-U[A, 270]",
+                        "+U[A, 120]",
+                        "-U[A, 120]",
+                        "+U[A, 300]",
+                        "+I[B, 200]",
+                        "-U[B, 200]",
+                        "+U[B, 450]",
+                        "-U[B, 450]",
+                        "+U[B, 200]");
+
+        // Collect results with timeout
+        assertResultsIgnoreOrder(aggIter, expectedAggResults, true);

Review Comment:
   Considering we already use single parallelism to guarantee the source event 
ordering, we can also assert the result in order. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to