This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new b853efde9 [test] Fix FlinkTableSinkITCase#testWalModeWithAutoIncrement
to correct assert changelogs of WAL image mode (#2568)
b853efde9 is described below
commit b853efde95111fd55157c8996adf285357d48171
Author: Jark Wu <[email protected]>
AuthorDate: Wed Feb 4 10:03:56 2026 +0800
[test] Fix FlinkTableSinkITCase#testWalModeWithAutoIncrement to correct
assert changelogs of WAL image mode (#2568)
---
.../fluss/flink/sink/FlinkTableSinkITCase.java | 22 +++++++++++-----------
1 file changed, 11 insertions(+), 11 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
index 73324a6ab..08b6b6c9f 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
@@ -1803,20 +1803,20 @@ abstract class FlinkTableSinkITCase extends
AbstractTestBase {
List<String> expectedResults =
Arrays.asList(
- "+I[1, 1, 100]",
- "+I[2, 2, 200]",
- "+I[3, 3, 150]",
- "+I[4, 4, 250]",
- "-U[1, 1, 100]",
- "+U[1, 1, 120]",
- "-U[3, 3, 150]",
- "+U[3, 3, 180]");
-
- // Collect results with timeout
+ "+I[insert, 1, 1, 100]",
+ "+I[insert, 2, 2, 200]",
+ "+I[insert, 3, 3, 150]",
+ "+I[insert, 4, 4, 250]",
+ "+I[update_after, 1, 1, 120]",
+ "+I[update_after, 3, 3, 180]");
+
+ // Flink will generate ChangelogNormalize node to auto-complete the -U
message,
+ // so we should read $changelog table to force read the raw underlying
changelog
assertQueryResultExactOrder(
tEnv,
String.format(
- "SELECT id, auto_increment_id, amount FROM %s /*+
OPTIONS('scan.startup.mode' = 'earliest') */",
+ "SELECT _change_type, id, auto_increment_id, amount "
+ + "FROM %s$changelog /*+
OPTIONS('scan.startup.mode' = 'earliest') */",
tableName),
expectedResults);
}