This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new ea30fd657 [chore][improve][fluss] Improve flaky test case
FlussSinkITCase (#4256)
ea30fd657 is described below
commit ea30fd6570adbb5fd99cafc1ad646d7eb7057ec8
Author: Jia Fan <[email protected]>
AuthorDate: Tue Feb 3 11:13:19 2026 +0800
[chore][improve][fluss] Improve flaky test case FlussSinkITCase (#4256)
---
.../connectors/fluss/sink/v2/FlussSinkITCase.java | 27 +++++++++++++++++++++-
1 file changed, 26 insertions(+), 1 deletion(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkITCase.java
index 535d90244..e65a7d199 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-fluss/src/test/java/org/apache/flink/cdc/connectors/fluss/sink/v2/FlussSinkITCase.java
@@ -40,6 +40,8 @@ import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
+import com.alibaba.fluss.client.Connection;
+import com.alibaba.fluss.client.ConnectionFactory;
import com.alibaba.fluss.server.testutils.FlussClusterExtension;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -74,7 +76,9 @@ public class FlussSinkITCase extends AbstractTestBase {
protected TableEnvironment tBatchEnv;
@BeforeEach
- void before() {
+ void before() throws Exception {
+ waitForFlussClusterReady();
+
// open a catalog so that we can get table from the catalog
String bootstrapServers =
FLUSS_CLUSTER_EXTENSION.getBootstrapServers();
@@ -94,6 +98,27 @@ public class FlussSinkITCase extends AbstractTestBase {
tBatchEnv.useDatabase(DEFAULT_DB);
}
+ private void waitForFlussClusterReady() throws Exception {
+ int maxRetries = 30;
+ int retryIntervalMs = 1000;
+ Exception lastException = null;
+
+ for (int i = 0; i < maxRetries; i++) {
+ try (Connection connection =
+
ConnectionFactory.createConnection(FLUSS_CLUSTER_EXTENSION.getClientConfig())) {
+ // Connection successful, cluster is ready
+ return;
+ } catch (Exception e) {
+ lastException = e;
+ Thread.sleep(retryIntervalMs);
+ }
+ }
+
+ throw new IllegalStateException(
+ "Failed to connect to Fluss cluster after " + maxRetries + "
attempts",
+ lastException);
+ }
+
@AfterEach
void after() {
tBatchEnv.useDatabase(BUILTIN_DATABASE);