This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1249eb224e [test] Fix unstable tests
1249eb224e is described below
commit 1249eb224ea5e76b872624dbdb452470c9f5f023
Author: JingsongLi <[email protected]>
AuthorDate: Sat May 23 23:29:27 2026 +0800
[test] Fix unstable tests
---
.../action/cdc/mysql/MySqlSyncDatabaseActionITCase.java | 15 ++++++++++++++-
.../apache/paimon/flink/action/CopyFilesActionITCase.java | 13 +++++++++++--
2 files changed, 25 insertions(+), 3 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index 10ee548125..a0026a528b 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -759,7 +759,9 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
newTableName = getNewTableName(newTableCount);
createNewTable(statement, newTableName);
- Thread.sleep(5000L);
+ // wait until the Paimon table is created by CDC before inserting
records,
+ // so that the CDC source is ready to capture the INSERT events
+ waitForPaimonTableToExist(newTableName);
// insert records
newTableRecords = getNewTableRecords();
@@ -862,6 +864,17 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
"CREATE TABLE %s (k INT, v1 VARCHAR(10), PRIMARY KEY
(k))", newTableName));
}
+ private void waitForPaimonTableToExist(String tableName) throws Exception {
+ while (true) {
+ try {
+ getFileStoreTable(tableName);
+ return;
+ } catch (Exception e) {
+ Thread.sleep(1000);
+ }
+ }
+ }
+
private JobClient buildSyncDatabaseActionWithNewlyAddedTables(
String databaseName, boolean testSchemaChange) throws Exception {
return buildSyncDatabaseActionWithNewlyAddedTables(null, databaseName,
testSchemaChange);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CopyFilesActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CopyFilesActionITCase.java
index 89ab5ef125..4c945b64e8 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CopyFilesActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CopyFilesActionITCase.java
@@ -876,8 +876,17 @@ public class CopyFilesActionITCase extends
ActionITCaseBase {
IntStream.range(0, numPartitions)
.mapToObj(i -> String.format("+I[%d, %d]", i,
numKeysPerPartition))
.collect(Collectors.toList()));
- assertThat(collect(tEnv, "SELECT COUNT(DISTINCT v) FROM t"))
- .isEqualTo(Collections.singletonList("+I[1]"));
+
+ while (true) {
+ try {
+ result = collect(tEnv, "SELECT COUNT(DISTINCT v) FROM t");
+ } catch (Exception e) {
+ doCopyJob(invoker, sourceWarehouse, targetWarehouse);
+ continue;
+ }
+ break;
+ }
+ assertThat(result).isEqualTo(Collections.singletonList("+I[1]"));
}
private void doCopyJob(String invoker, String sourceWarehouse, String
targetWarehouse)