This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new f02bcdf21 [client] Support initial cluster with retries to avoid 
initial failed when bootstrap server only contains one available address (#2024)
f02bcdf21 is described below

commit f02bcdf21bf7db78420d99d5e30115c10e366768
Author: yunhong <[email protected]>
AuthorDate: Fri Nov 28 19:47:26 2025 +0800

    [client] Support initial cluster with retries to avoid initial failed when 
bootstrap server only contains one available address (#2024)
---
 .../fluss/client/metadata/MetadataUpdater.java     |  96 +++++++++++---
 ...UpdaterTest.java => MetadataUpdaterITCase.java} |   4 +-
 .../fluss/client/metadata/MetadataUpdaterTest.java | 142 +++++++++------------
 3 files changed, 141 insertions(+), 101 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
index 92d81bd10..831a82436 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
@@ -26,8 +26,10 @@ import org.apache.fluss.cluster.ServerType;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.exception.NetworkException;
 import org.apache.fluss.exception.PartitionNotExistException;
 import org.apache.fluss.exception.RetriableException;
+import org.apache.fluss.exception.StaleMetadataException;
 import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TableInfo;
@@ -38,7 +40,6 @@ import org.apache.fluss.rpc.RpcClient;
 import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
 import org.apache.fluss.rpc.gateway.CoordinatorGateway;
 import org.apache.fluss.rpc.gateway.TabletServerGateway;
-import org.apache.fluss.utils.ExceptionUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,12 +57,14 @@ import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
 import static 
org.apache.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster;
+import static org.apache.fluss.utils.ExceptionUtils.stripExecutionException;
 
 /** The updater to initialize and update client metadata. */
 public class MetadataUpdater {
     private static final Logger LOG = 
LoggerFactory.getLogger(MetadataUpdater.class);
 
-    private static final int MAX_RETRY_TIMES = 5;
+    private static final int MAX_RETRY_TIMES = 3;
+    private static final int RETRY_INTERVAL_MS = 100;
 
     private final RpcClient rpcClient;
     protected volatile Cluster cluster;
@@ -270,7 +273,7 @@ public class MetadataUpdater {
                                 tablePartitionIds);
             }
         } catch (Exception e) {
-            Throwable t = ExceptionUtils.stripExecutionException(e);
+            Throwable t = stripExecutionException(e);
             if (t instanceof RetriableException || t instanceof 
TimeoutException) {
                 LOG.warn("Failed to update metadata, but the exception is 
re-triable.", t);
             } else if (t instanceof PartitionNotExistException) {
@@ -292,10 +295,33 @@ public class MetadataUpdater {
         Cluster cluster = null;
         Exception lastException = null;
         for (InetSocketAddress address : inetSocketAddresses) {
+            ServerNode serverNode = null;
             try {
-                cluster = tryToInitializeCluster(rpcClient, address);
-                break;
+                serverNode =
+                        new ServerNode(
+                                -1,
+                                address.getHostString(),
+                                address.getPort(),
+                                ServerType.COORDINATOR);
+                ServerNode finalServerNode = serverNode;
+                AdminReadOnlyGateway adminReadOnlyGateway =
+                        GatewayClientProxy.createGatewayProxy(
+                                () -> finalServerNode, rpcClient, 
AdminReadOnlyGateway.class);
+                if (inetSocketAddresses.size() == 1) {
+                    // if there is only one bootstrap server, we can retry to 
connect to it.
+                    cluster =
+                            tryToInitializeClusterWithRetries(
+                                    rpcClient, serverNode, 
adminReadOnlyGateway, MAX_RETRY_TIMES);
+                } else {
+                    cluster = tryToInitializeCluster(adminReadOnlyGateway);
+                    break;
+                }
             } catch (Exception e) {
+                // We should dis-connected with the bootstrap server id to 
make sure the next
+                // retry can rebuild the connection.
+                if (serverNode != null) {
+                    rpcClient.disconnect(serverNode.uid());
+                }
                 LOG.error(
                         "Failed to initialize fluss client connection to 
bootstrap server: {}",
                         address,
@@ -306,9 +332,10 @@ public class MetadataUpdater {
 
         if (cluster == null && lastException != null) {
             String errorMsg =
-                    "Failed to initialize fluss client connection to server 
because no "
-                            + "bootstrap server is validate. bootstrap 
servers: "
-                            + inetSocketAddresses;
+                    "Failed to initialize fluss client connection to bootstrap 
servers: "
+                            + inetSocketAddresses
+                            + ". \nReason: "
+                            + lastException.getMessage();
             LOG.error(errorMsg);
             throw new IllegalStateException(errorMsg, lastException);
         }
@@ -316,14 +343,53 @@ public class MetadataUpdater {
         return cluster;
     }
 
-    private static Cluster tryToInitializeCluster(RpcClient rpcClient, 
InetSocketAddress address)
+    @VisibleForTesting
+    static @Nullable Cluster tryToInitializeClusterWithRetries(
+            RpcClient rpcClient,
+            ServerNode serverNode,
+            AdminReadOnlyGateway gateway,
+            int maxRetryTimes)
+            throws Exception {
+        int retryCount = 0;
+        while (retryCount <= maxRetryTimes) {
+            try {
+                return tryToInitializeCluster(gateway);
+            } catch (Exception e) {
+                Throwable cause = stripExecutionException(e);
+                // in case of bootstrap is recovering, we should retry to 
connect.
+                if (!(cause instanceof StaleMetadataException || cause 
instanceof NetworkException)
+                        || retryCount >= maxRetryTimes) {
+                    throw e;
+                }
+
+                // We should dis-connected with the bootstrap server id to 
make sure the next
+                // retry can rebuild the connection.
+                rpcClient.disconnect(serverNode.uid());
+
+                long delayMs = (long) (RETRY_INTERVAL_MS * Math.pow(2, 
retryCount));
+                LOG.warn(
+                        "Failed to connect to bootstrap server: {} (retry 
{}/{}). Retrying in {} ms.",
+                        serverNode,
+                        retryCount + 1,
+                        maxRetryTimes,
+                        delayMs,
+                        e);
+
+                try {
+                    Thread.sleep(delayMs);
+                } catch (InterruptedException ex) {
+                    Thread.currentThread().interrupt();
+                    throw new RuntimeException("Interrupted during retry 
sleep", ex);
+                }
+                retryCount++;
+            }
+        }
+
+        return null;
+    }
+
+    private static Cluster tryToInitializeCluster(AdminReadOnlyGateway 
adminReadOnlyGateway)
             throws Exception {
-        ServerNode serverNode =
-                new ServerNode(
-                        -1, address.getHostString(), address.getPort(), 
ServerType.COORDINATOR);
-        AdminReadOnlyGateway adminReadOnlyGateway =
-                GatewayClientProxy.createGatewayProxy(
-                        () -> serverNode, rpcClient, 
AdminReadOnlyGateway.class);
         return sendMetadataRequestAndRebuildCluster(adminReadOnlyGateway, 
Collections.emptySet());
     }
 
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterITCase.java
similarity index 98%
copy from 
fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java
copy to 
fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterITCase.java
index 5471d8b60..f7151dbda 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterITCase.java
@@ -37,8 +37,8 @@ import static 
org.apache.fluss.client.utils.MetadataUtils.sendMetadataRequestAnd
 import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Test for update metadata of {@link MetadataUpdater}. */
-class MetadataUpdaterTest {
+/** IT test for update metadata of {@link MetadataUpdater}. */
+class MetadataUpdaterITCase {
 
     @RegisterExtension
     public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java
index 5471d8b60..6717cfe5a 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java
@@ -17,108 +17,82 @@
 
 package org.apache.fluss.client.metadata;
 
-import org.apache.fluss.client.Connection;
-import org.apache.fluss.client.ConnectionFactory;
-import org.apache.fluss.client.admin.Admin;
 import org.apache.fluss.cluster.Cluster;
 import org.apache.fluss.cluster.ServerNode;
+import org.apache.fluss.cluster.ServerType;
 import org.apache.fluss.config.Configuration;
-import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.exception.StaleMetadataException;
 import org.apache.fluss.rpc.RpcClient;
-import org.apache.fluss.server.testutils.FlussClusterExtension;
+import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
+import org.apache.fluss.rpc.messages.MetadataRequest;
+import org.apache.fluss.rpc.messages.MetadataResponse;
+import org.apache.fluss.rpc.metrics.TestingClientMetricGroup;
+import org.apache.fluss.server.coordinator.TestCoordinatorGateway;
 
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.util.Collections;
-import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
-import static 
org.apache.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster;
-import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
+import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.buildMetadataResponse;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** Test for update metadata of {@link MetadataUpdater}. */
-class MetadataUpdaterTest {
+/** UT Test for update metadata of {@link MetadataUpdater}. */
+public class MetadataUpdaterTest {
 
-    @RegisterExtension
-    public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
-            FlussClusterExtension.builder().setNumOfTabletServers(2).build();
+    private static final ServerNode CS_NODE =
+            new ServerNode(1, "localhost", 8080, ServerType.COORDINATOR);
+    private static final ServerNode TS_NODE =
+            new ServerNode(1, "localhost", 8080, ServerType.TABLET_SERVER);
 
     @Test
-    void testRebuildClusterNTimes() throws Exception {
-        Configuration clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
-        Connection conn = ConnectionFactory.createConnection(clientConf);
-        Admin admin = conn.getAdmin();
-        TablePath tablePath = TablePath.of("fluss", "test");
-        admin.createTable(tablePath, DATA1_TABLE_DESCRIPTOR, true).get();
-        admin.close();
-        conn.close();
-
-        RpcClient rpcClient = FLUSS_CLUSTER_EXTENSION.getRpcClient();
-        MetadataUpdater metadataUpdater = new MetadataUpdater(clientConf, 
rpcClient);
-        // update metadata
-        metadataUpdater.updateMetadata(Collections.singleton(tablePath), null, 
null);
-        Cluster cluster = metadataUpdater.getCluster();
-
-        // repeat 20K times to reproduce StackOverflowError if there is
-        // any N levels UnmodifiableCollection
-        for (int i = 0; i < 20000; i++) {
-            cluster =
-                    sendMetadataRequestAndRebuildCluster(
-                            FLUSS_CLUSTER_EXTENSION.newCoordinatorClient(),
-                            true,
-                            cluster,
-                            null,
-                            null,
-                            null);
-        }
+    void testInitializeClusterWithRetries() throws Exception {
+        Configuration configuration = new Configuration();
+        RpcClient rpcClient =
+                RpcClient.create(configuration, 
TestingClientMetricGroup.newInstance(), false);
+
+        // retry lower than max retry count.
+        AdminReadOnlyGateway gateway = new TestingAdminReadOnlyGateway(2);
+        Cluster cluster =
+                MetadataUpdater.tryToInitializeClusterWithRetries(rpcClient, 
CS_NODE, gateway, 3);
+        assertThat(cluster).isNotNull();
+        assertThat(cluster.getCoordinatorServer()).isEqualTo(CS_NODE);
+        
assertThat(cluster.getAliveTabletServerList()).containsExactly(TS_NODE);
+
+        // retry higher than max retry count.
+        AdminReadOnlyGateway gateway2 = new TestingAdminReadOnlyGateway(5);
+        assertThatThrownBy(
+                        () ->
+                                
MetadataUpdater.tryToInitializeClusterWithRetries(
+                                        rpcClient, CS_NODE, gateway2, 3))
+                .isInstanceOf(StaleMetadataException.class)
+                .hasMessageContaining("The metadata is stale.");
     }
 
-    @Test
-    void testUpdateWithEmptyMetadataResponse() throws Exception {
-        RpcClient rpcClient = FLUSS_CLUSTER_EXTENSION.getRpcClient();
-        MetadataUpdater metadataUpdater =
-                new MetadataUpdater(FLUSS_CLUSTER_EXTENSION.getClientConfig(), 
rpcClient);
-
-        // update metadata
-        metadataUpdater.updateMetadata(null, null, null);
-        Cluster cluster = metadataUpdater.getCluster();
+    private static final class TestingAdminReadOnlyGateway extends 
TestCoordinatorGateway {
 
-        List<ServerNode> expectedServerNodes = 
FLUSS_CLUSTER_EXTENSION.getTabletServerNodes();
-        assertThat(expectedServerNodes).hasSize(2);
-        
assertThat(cluster.getAliveTabletServerList()).isEqualTo(expectedServerNodes);
+        private final int maxRetryCount;
+        private int retryCount;
 
-        // then, stop coordinator server, can still update metadata
-        FLUSS_CLUSTER_EXTENSION.stopCoordinatorServer();
-        metadataUpdater.updateMetadata(null, null, null);
-        
assertThat(cluster.getAliveTabletServerList()).isEqualTo(expectedServerNodes);
-
-        // start a new tablet server, the tablet server will return empty 
metadata
-        // response since no coordinator server to send newest metadata to the 
tablet server
-        int newServerId = 2;
-        FLUSS_CLUSTER_EXTENSION.startTabletServer(newServerId);
-
-        // we mock a new cluster with only server 1 so that it'll only send 
request
-        // to server 1, which will return empty resonate
-        Cluster newCluster =
-                new Cluster(
-                        Collections.singletonMap(
-                                newServerId,
-                                
FLUSS_CLUSTER_EXTENSION.getTabletServerNodes().get(newServerId)),
-                        null,
-                        Collections.emptyMap(),
-                        Collections.emptyMap(),
-                        Collections.emptyMap(),
-                        Collections.emptyMap());
-
-        metadataUpdater = new MetadataUpdater(rpcClient, newCluster);
-        // shouldn't update metadata to empty since the empty metadata will be 
ignored
-        metadataUpdater.updateMetadata(null, null, null);
-        assertThat(metadataUpdater.getCluster().getAliveTabletServers())
-                .isEqualTo(newCluster.getAliveTabletServers())
-                .hasSize(1);
+        public TestingAdminReadOnlyGateway(int maxRetryCount) {
+            this.maxRetryCount = maxRetryCount;
+        }
 
-        // recover the coordinator
-        FLUSS_CLUSTER_EXTENSION.startCoordinatorServer();
+        @Override
+        public CompletableFuture<MetadataResponse> metadata(MetadataRequest 
request) {
+            retryCount++;
+            if (retryCount <= maxRetryCount) {
+                throw new StaleMetadataException("The metadata is stale.");
+            } else {
+                MetadataResponse metadataResponse =
+                        buildMetadataResponse(
+                                CS_NODE,
+                                Collections.singleton(TS_NODE),
+                                Collections.emptyList(),
+                                Collections.emptyList());
+                return CompletableFuture.completedFuture(metadataResponse);
+            }
+        }
     }
 }

Reply via email to