This is an automated email from the ASF dual-hosted git repository.

hongshun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 71d00e40b [test][flink] Fix Unstable test 
FlinkTableSinkITCase.testVersionMergeEngineWithTypeTimestampLTZ9 (#2139)
71d00e40b is described below

commit 71d00e40bdb1b03425a2f686c4f611cd1d75a130
Author: Hongshun Wang <[email protected]>
AuthorDate: Fri Dec 19 16:28:20 2025 +0800

    [test][flink] Fix Unstable test 
FlinkTableSinkITCase.testVersionMergeEngineWithTypeTimestampLTZ9 (#2139)
---
 .../fluss/flink/sink/FlinkTableSinkITCase.java     | 55 +++++++++++++---------
 1 file changed, 32 insertions(+), 23 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
index a22316d70..c8aeaaae8 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
@@ -1151,32 +1151,35 @@ abstract class FlinkTableSinkITCase extends 
AbstractTestBase {
     @Test
     void testVersionMergeEngineWithTypeTimestamp() throws Exception {
         tEnv.executeSql(
-                "create table merge_engine_with_version (a int not null 
primary key not enforced,"
-                        + " b string, ts TIMESTAMP(3)) 
with('table.merge-engine' = 'versioned',"
-                        + "'table.merge-engine.versioned.ver-column' = 'ts')");
-
+                        "create table merge_engine_with_version (a int not 
null primary key not enforced,"
+                                + " b string, ts TIMESTAMP(3)) 
with('table.merge-engine' = 'versioned',"
+                                + "'table.merge-engine.versioned.ver-column' = 
'ts')")
+                .await();
         // insert once
         tEnv.executeSql(
                         "INSERT INTO merge_engine_with_version (a, b, ts) 
VALUES "
                                 + "(1, 'v1', TIMESTAMP '2024-12-27 
12:00:00.123'), "
                                 + "(2, 'v2', TIMESTAMP '2024-12-27 
12:00:00.123'), "
-                                + "(1, 'v11', TIMESTAMP '2024-12-27 
11:59:59.123'), "
-                                + "(3, 'v3', TIMESTAMP '2024-12-27 
12:00:00.123'),"
-                                + "(3, 'v33', TIMESTAMP '2024-12-27 
12:00:00.123');")
+                                + "(3, 'v3', TIMESTAMP '2024-12-27 
12:00:00.123');")
                 .await();
-
         CloseableIterator<Row> rowIter =
                 tEnv.executeSql("select * from 
merge_engine_with_version").collect();
-
-        // id=1 not update, but id=3 updated
         List<String> expectedRows =
                 Arrays.asList(
                         "+I[1, v1, 2024-12-27T12:00:00.123]",
                         "+I[2, v2, 2024-12-27T12:00:00.123]",
-                        "+I[3, v3, 2024-12-27T12:00:00.123]",
+                        "+I[3, v3, 2024-12-27T12:00:00.123]");
+        assertResultsIgnoreOrder(rowIter, expectedRows, false);
+
+        // insert again. id=1 not update, but id=3 updated
+        tEnv.executeSql(
+                "INSERT INTO merge_engine_with_version (a, b, ts) VALUES "
+                        + "(1, 'v11', TIMESTAMP '2024-12-27 11:59:59.123'), "
+                        + "(3, 'v33', TIMESTAMP '2024-12-27 12:00:00.123');");
+        expectedRows =
+                Arrays.asList(
                         "-U[3, v3, 2024-12-27T12:00:00.123]",
                         "+U[3, v33, 2024-12-27T12:00:00.123]");
-
         assertResultsIgnoreOrder(rowIter, expectedRows, true);
     }
 
@@ -1185,30 +1188,36 @@ abstract class FlinkTableSinkITCase extends 
AbstractTestBase {
 
         tEnv.getConfig().set("table.local-time-zone", "UTC");
         tEnv.executeSql(
-                "create table merge_engine_with_version (a int not null 
primary key not enforced,"
-                        + " b string, ts TIMESTAMP(9) WITH LOCAL TIME ZONE ) 
with("
-                        + "'table.merge-engine' = 'versioned',"
-                        + "'table.merge-engine.versioned.ver-column' = 'ts')");
+                        "create table merge_engine_with_version (a int not 
null primary key not enforced,"
+                                + " b string, ts TIMESTAMP(9) WITH LOCAL TIME 
ZONE ) with("
+                                + "'table.merge-engine' = 'versioned',"
+                                + "'table.merge-engine.versioned.ver-column' = 
'ts')")
+                .await();
 
         // insert once
         tEnv.executeSql(
                         "INSERT INTO merge_engine_with_version (a, b, ts) 
VALUES "
                                 + "(1, 'v1', CAST(TIMESTAMP '2024-12-27 
12:00:00.123456789' AS TIMESTAMP(9) WITH LOCAL TIME ZONE)), "
                                 + "(2, 'v2', CAST(TIMESTAMP '2024-12-27 
12:00:00.123456789' AS TIMESTAMP(9) WITH LOCAL TIME ZONE)), "
-                                + "(1, 'v11', CAST(TIMESTAMP '2024-12-27 
12:00:00.123456788' AS TIMESTAMP(9) WITH LOCAL TIME ZONE)), "
-                                + "(3, 'v3', CAST(TIMESTAMP '2024-12-27 
12:00:00.123456789' AS TIMESTAMP(9) WITH LOCAL TIME ZONE)), "
-                                + "(3, 'v33', CAST(TIMESTAMP '2024-12-27 
12:00:00.123456789' AS TIMESTAMP(9) WITH LOCAL TIME ZONE));")
+                                + "(3, 'v3', CAST(TIMESTAMP '2024-12-27 
12:00:00.123456789' AS TIMESTAMP(9) WITH LOCAL TIME ZONE));")
                 .await();
-
         CloseableIterator<Row> rowIter =
                 tEnv.executeSql("select * from 
merge_engine_with_version").collect();
-
-        // id=1 not update, but id=3 updated
         List<String> expectedRows =
                 Arrays.asList(
                         "+I[1, v1, 2024-12-27T12:00:00.123456789Z]",
                         "+I[2, v2, 2024-12-27T12:00:00.123456789Z]",
-                        "+I[3, v3, 2024-12-27T12:00:00.123456789Z]",
+                        "+I[3, v3, 2024-12-27T12:00:00.123456789Z]");
+        assertResultsIgnoreOrder(rowIter, expectedRows, false);
+
+        // insert again. id=1 not update, but id=3 updated
+        tEnv.executeSql(
+                        "INSERT INTO merge_engine_with_version (a, b, ts) 
VALUES "
+                                + "(1, 'v11', CAST(TIMESTAMP '2024-12-27 
12:00:00.123456788' AS TIMESTAMP(9) WITH LOCAL TIME ZONE)), "
+                                + "(3, 'v33', CAST(TIMESTAMP '2024-12-27 
12:00:00.123456789' AS TIMESTAMP(9) WITH LOCAL TIME ZONE));")
+                .await();
+        expectedRows =
+                Arrays.asList(
                         "-U[3, v3, 2024-12-27T12:00:00.123456789Z]",
                         "+U[3, v33, 2024-12-27T12:00:00.123456789Z]");
 

Reply via email to