This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1249eb224e [test] Fix unstable tests
1249eb224e is described below

commit 1249eb224ea5e76b872624dbdb452470c9f5f023
Author: JingsongLi <[email protected]>
AuthorDate: Sat May 23 23:29:27 2026 +0800

    [test] Fix unstable tests
---
 .../action/cdc/mysql/MySqlSyncDatabaseActionITCase.java   | 15 ++++++++++++++-
 .../apache/paimon/flink/action/CopyFilesActionITCase.java | 13 +++++++++++--
 2 files changed, 25 insertions(+), 3 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index 10ee548125..a0026a528b 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -759,7 +759,9 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
             newTableName = getNewTableName(newTableCount);
             createNewTable(statement, newTableName);
 
-            Thread.sleep(5000L);
+            // wait until the Paimon table is created by CDC before inserting 
records,
+            // so that the CDC source is ready to capture the INSERT events
+            waitForPaimonTableToExist(newTableName);
 
             // insert records
             newTableRecords = getNewTableRecords();
@@ -862,6 +864,17 @@ public class MySqlSyncDatabaseActionITCase extends 
MySqlActionITCaseBase {
                         "CREATE TABLE %s (k INT, v1 VARCHAR(10), PRIMARY KEY 
(k))", newTableName));
     }
 
+    private void waitForPaimonTableToExist(String tableName) throws Exception {
+        while (true) {
+            try {
+                getFileStoreTable(tableName);
+                return;
+            } catch (Exception e) {
+                Thread.sleep(1000);
+            }
+        }
+    }
+
     private JobClient buildSyncDatabaseActionWithNewlyAddedTables(
             String databaseName, boolean testSchemaChange) throws Exception {
         return buildSyncDatabaseActionWithNewlyAddedTables(null, databaseName, 
testSchemaChange);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CopyFilesActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CopyFilesActionITCase.java
index 89ab5ef125..4c945b64e8 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CopyFilesActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CopyFilesActionITCase.java
@@ -876,8 +876,17 @@ public class CopyFilesActionITCase extends 
ActionITCaseBase {
                         IntStream.range(0, numPartitions)
                                 .mapToObj(i -> String.format("+I[%d, %d]", i, 
numKeysPerPartition))
                                 .collect(Collectors.toList()));
-        assertThat(collect(tEnv, "SELECT COUNT(DISTINCT v) FROM t"))
-                .isEqualTo(Collections.singletonList("+I[1]"));
+
+        while (true) {
+            try {
+                result = collect(tEnv, "SELECT COUNT(DISTINCT v) FROM t");
+            } catch (Exception e) {
+                doCopyJob(invoker, sourceWarehouse, targetWarehouse);
+                continue;
+            }
+            break;
+        }
+        assertThat(result).isEqualTo(Collections.singletonList("+I[1]"));
     }
 
     private void doCopyJob(String invoker, String sourceWarehouse, String 
targetWarehouse)

Reply via email to