This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new ea30fd657 [chore][improve][fluss] Improve flaky test case 
FlussSinkITCase (#4256)
ea30fd657 is described below

commit ea30fd6570adbb5fd99cafc1ad646d7eb7057ec8
Author: Jia Fan <[email protected]>
AuthorDate: Tue Feb 3 11:13:19 2026 +0800

    [chore][improve][fluss] Improve flaky test case FlussSinkITCase (#4256)
---
 .../connectors/fluss/sink/v2/FlussSinkITCase.java  | 27 +++++++++++++++++++++-
 1 file changed, 26 insertions(+), 1 deletion(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkITCase.java
index 535d90244..e65a7d199 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkITCase.java
@@ -40,6 +40,8 @@ import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 
+import com.alibaba.fluss.client.Connection;
+import com.alibaba.fluss.client.ConnectionFactory;
 import com.alibaba.fluss.server.testutils.FlussClusterExtension;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -74,7 +76,9 @@ public class FlussSinkITCase extends AbstractTestBase {
     protected TableEnvironment tBatchEnv;
 
     @BeforeEach
-    void before() {
+    void before() throws Exception {
+        waitForFlussClusterReady();
+
         // open a catalog so that we can get table from the catalog
         String bootstrapServers = 
FLUSS_CLUSTER_EXTENSION.getBootstrapServers();
 
@@ -94,6 +98,27 @@ public class FlussSinkITCase extends AbstractTestBase {
         tBatchEnv.useDatabase(DEFAULT_DB);
     }
 
+    private void waitForFlussClusterReady() throws Exception {
+        int maxRetries = 30;
+        int retryIntervalMs = 1000;
+        Exception lastException = null;
+
+        for (int i = 0; i < maxRetries; i++) {
+            try (Connection connection =
+                    
ConnectionFactory.createConnection(FLUSS_CLUSTER_EXTENSION.getClientConfig())) {
+                // Connection successful, cluster is ready
+                return;
+            } catch (Exception e) {
+                lastException = e;
+                Thread.sleep(retryIntervalMs);
+            }
+        }
+
+        throw new IllegalStateException(
+                "Failed to connect to Fluss cluster after " + maxRetries + " 
attempts",
+                lastException);
+    }
+
     @AfterEach
     void after() {
         tBatchEnv.useDatabase(BUILTIN_DATABASE);

Reply via email to