This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new b853efde9 [test] Fix FlinkTableSinkITCase#testWalModeWithAutoIncrement 
to correct assert changelogs of WAL image mode (#2568)
b853efde9 is described below

commit b853efde95111fd55157c8996adf285357d48171
Author: Jark Wu <[email protected]>
AuthorDate: Wed Feb 4 10:03:56 2026 +0800

    [test] Fix FlinkTableSinkITCase#testWalModeWithAutoIncrement to correct 
assert changelogs of WAL image mode (#2568)
---
 .../fluss/flink/sink/FlinkTableSinkITCase.java     | 22 +++++++++++-----------
 1 file changed, 11 insertions(+), 11 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
index 73324a6ab..08b6b6c9f 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
@@ -1803,20 +1803,20 @@ abstract class FlinkTableSinkITCase extends 
AbstractTestBase {
 
         List<String> expectedResults =
                 Arrays.asList(
-                        "+I[1, 1, 100]",
-                        "+I[2, 2, 200]",
-                        "+I[3, 3, 150]",
-                        "+I[4, 4, 250]",
-                        "-U[1, 1, 100]",
-                        "+U[1, 1, 120]",
-                        "-U[3, 3, 150]",
-                        "+U[3, 3, 180]");
-
-        // Collect results with timeout
+                        "+I[insert, 1, 1, 100]",
+                        "+I[insert, 2, 2, 200]",
+                        "+I[insert, 3, 3, 150]",
+                        "+I[insert, 4, 4, 250]",
+                        "+I[update_after, 1, 1, 120]",
+                        "+I[update_after, 3, 3, 180]");
+
+        // Flink will generate ChangelogNormalize node to auto-complete the -U 
message,
+        // so we should read $changelog table to force read the raw underlying 
changelog
         assertQueryResultExactOrder(
                 tEnv,
                 String.format(
-                        "SELECT id, auto_increment_id, amount FROM %s /*+ 
OPTIONS('scan.startup.mode' = 'earliest') */",
+                        "SELECT _change_type, id, auto_increment_id, amount "
+                                + "FROM %s$changelog /*+ 
OPTIONS('scan.startup.mode' = 'earliest') */",
                         tableName),
                 expectedResults);
     }

Reply via email to