This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 1a3c51eb2 [flink] Revert unstable test case
FlinkTableSinkITCase.testWalModeWithDefaultMergeEngineAndAggregation (#2215)
1a3c51eb2 is described below
commit 1a3c51eb257d697efeb85a886a81f0b04a11ca7f
Author: Yang Wang <[email protected]>
AuthorDate: Sat Dec 20 15:50:43 2025 +0800
[flink] Revert unstable test case
FlinkTableSinkITCase.testWalModeWithDefaultMergeEngineAndAggregation (#2215)
---
.../fluss/flink/sink/FlinkTableSinkITCase.java | 70 ----------------------
1 file changed, 70 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
index c373dd998..c8aeaaae8 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
@@ -1384,74 +1384,4 @@ abstract class FlinkTableSinkITCase extends
AbstractTestBase {
assertResultsIgnoreOrder(rowIter, expectedRows, true);
}
}
-
- @Test
- void testWalModeWithDefaultMergeEngineAndAggregation() throws Exception {
- String tableName = "wal_mode_pk_table";
- // Create a table with WAL mode and default merge engine
- tEnv.executeSql(
- String.format(
- "create table %s ("
- + " id int not null,"
- + " category string,"
- + " amount bigint,"
- + " primary key (id) not enforced"
- + ") with ('table.changelog.image' = 'wal')",
- tableName));
-
- // Insert initial data
- tEnv.executeSql(
- String.format(
- "INSERT INTO %s VALUES "
- + "(1, 'A', 100), "
- + "(2, 'B', 200), "
- + "(3, 'A', 150), "
- + "(4, 'B', 250)",
- tableName))
- .await();
-
- // Use batch mode to update and delete records
- tBatchEnv.executeSql("UPDATE " + tableName + " SET amount = 120 WHERE
id = 1").await();
- tBatchEnv.executeSql("UPDATE " + tableName + " SET amount = 180 WHERE
id = 3").await();
- tBatchEnv.executeSql("DELETE FROM " + tableName + " WHERE id =
4").await();
-
- // Do aggregation on the table and verify ChangelogNormalize node is
generated
- String aggQuery =
- String.format(
- "SELECT category, SUM(amount) as total_amount FROM %s
/*+ OPTIONS('scan.startup.mode' = 'earliest') */ GROUP BY category",
- tableName);
-
- // Explain the aggregation query to check for ChangelogNormalize
- String aggPlan = tEnv.explainSql(aggQuery);
- // ChangelogNormalize should be present to normalize the changelog for
aggregation
- // In Flink, when the source produces changelog with primary key
semantics (I, UA, D),
- // a ChangelogNormalize operator is inserted before aggregation
- assertThat(aggPlan).contains("ChangelogNormalize");
-
- // Execute the aggregation and verify the result
- CloseableIterator<Row> aggIter = tEnv.executeSql(aggQuery).collect();
-
- // Expected aggregation results:
- // Category A: 120 (id=1) + 180 (id=3) = 300
- // Category B: 200 (id=2) = 200 (id=4 was deleted)
- List<String> expectedAggResults =
- Arrays.asList(
- "+I[A, 100]",
- "-U[A, 100]",
- "+U[A, 250]",
- "-U[A, 250]",
- "+U[A, 150]",
- "-U[A, 150]",
- "+U[A, 270]",
- "-U[A, 270]",
- "+U[A, 120]",
- "-U[A, 120]",
- "+U[A, 300]",
- "+I[B, 250]",
- "-D[B, 250]",
- "+I[B, 200]");
-
- // Collect results with timeout
- assertResultsIgnoreOrder(aggIter, expectedAggResults, true);
- }
}