This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new f02bcdf21 [client] Support initial cluster with retries to avoid
initial failed when bootstrap server only contains one available address (#2024)
f02bcdf21 is described below
commit f02bcdf21bf7db78420d99d5e30115c10e366768
Author: yunhong <[email protected]>
AuthorDate: Fri Nov 28 19:47:26 2025 +0800
[client] Support initial cluster with retries to avoid initial failed when
bootstrap server only contains one available address (#2024)
---
.../fluss/client/metadata/MetadataUpdater.java | 96 +++++++++++---
...UpdaterTest.java => MetadataUpdaterITCase.java} | 4 +-
.../fluss/client/metadata/MetadataUpdaterTest.java | 142 +++++++++------------
3 files changed, 141 insertions(+), 101 deletions(-)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
index 92d81bd10..831a82436 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
@@ -26,8 +26,10 @@ import org.apache.fluss.cluster.ServerType;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.exception.NetworkException;
import org.apache.fluss.exception.PartitionNotExistException;
import org.apache.fluss.exception.RetriableException;
+import org.apache.fluss.exception.StaleMetadataException;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableInfo;
@@ -38,7 +40,6 @@ import org.apache.fluss.rpc.RpcClient;
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
import org.apache.fluss.rpc.gateway.CoordinatorGateway;
import org.apache.fluss.rpc.gateway.TabletServerGateway;
-import org.apache.fluss.utils.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,12 +57,14 @@ import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static
org.apache.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster;
+import static org.apache.fluss.utils.ExceptionUtils.stripExecutionException;
/** The updater to initialize and update client metadata. */
public class MetadataUpdater {
private static final Logger LOG =
LoggerFactory.getLogger(MetadataUpdater.class);
- private static final int MAX_RETRY_TIMES = 5;
+ private static final int MAX_RETRY_TIMES = 3;
+ private static final int RETRY_INTERVAL_MS = 100;
private final RpcClient rpcClient;
protected volatile Cluster cluster;
@@ -270,7 +273,7 @@ public class MetadataUpdater {
tablePartitionIds);
}
} catch (Exception e) {
- Throwable t = ExceptionUtils.stripExecutionException(e);
+ Throwable t = stripExecutionException(e);
if (t instanceof RetriableException || t instanceof
TimeoutException) {
LOG.warn("Failed to update metadata, but the exception is
re-triable.", t);
} else if (t instanceof PartitionNotExistException) {
@@ -292,10 +295,33 @@ public class MetadataUpdater {
Cluster cluster = null;
Exception lastException = null;
for (InetSocketAddress address : inetSocketAddresses) {
+ ServerNode serverNode = null;
try {
- cluster = tryToInitializeCluster(rpcClient, address);
- break;
+ serverNode =
+ new ServerNode(
+ -1,
+ address.getHostString(),
+ address.getPort(),
+ ServerType.COORDINATOR);
+ ServerNode finalServerNode = serverNode;
+ AdminReadOnlyGateway adminReadOnlyGateway =
+ GatewayClientProxy.createGatewayProxy(
+ () -> finalServerNode, rpcClient,
AdminReadOnlyGateway.class);
+ if (inetSocketAddresses.size() == 1) {
+ // if there is only one bootstrap server, we can retry to
connect to it.
+ cluster =
+ tryToInitializeClusterWithRetries(
+ rpcClient, serverNode,
adminReadOnlyGateway, MAX_RETRY_TIMES);
+ } else {
+ cluster = tryToInitializeCluster(adminReadOnlyGateway);
+ break;
+ }
} catch (Exception e) {
+ // We should dis-connected with the bootstrap server id to
make sure the next
+ // retry can rebuild the connection.
+ if (serverNode != null) {
+ rpcClient.disconnect(serverNode.uid());
+ }
LOG.error(
"Failed to initialize fluss client connection to
bootstrap server: {}",
address,
@@ -306,9 +332,10 @@ public class MetadataUpdater {
if (cluster == null && lastException != null) {
String errorMsg =
- "Failed to initialize fluss client connection to server
because no "
- + "bootstrap server is validate. bootstrap
servers: "
- + inetSocketAddresses;
+ "Failed to initialize fluss client connection to bootstrap
servers: "
+ + inetSocketAddresses
+ + ". \nReason: "
+ + lastException.getMessage();
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg, lastException);
}
@@ -316,14 +343,53 @@ public class MetadataUpdater {
return cluster;
}
- private static Cluster tryToInitializeCluster(RpcClient rpcClient,
InetSocketAddress address)
+ @VisibleForTesting
+ static @Nullable Cluster tryToInitializeClusterWithRetries(
+ RpcClient rpcClient,
+ ServerNode serverNode,
+ AdminReadOnlyGateway gateway,
+ int maxRetryTimes)
+ throws Exception {
+ int retryCount = 0;
+ while (retryCount <= maxRetryTimes) {
+ try {
+ return tryToInitializeCluster(gateway);
+ } catch (Exception e) {
+ Throwable cause = stripExecutionException(e);
+ // in case of bootstrap is recovering, we should retry to
connect.
+ if (!(cause instanceof StaleMetadataException || cause
instanceof NetworkException)
+ || retryCount >= maxRetryTimes) {
+ throw e;
+ }
+
+ // We should dis-connected with the bootstrap server id to
make sure the next
+ // retry can rebuild the connection.
+ rpcClient.disconnect(serverNode.uid());
+
+ long delayMs = (long) (RETRY_INTERVAL_MS * Math.pow(2,
retryCount));
+ LOG.warn(
+ "Failed to connect to bootstrap server: {} (retry
{}/{}). Retrying in {} ms.",
+ serverNode,
+ retryCount + 1,
+ maxRetryTimes,
+ delayMs,
+ e);
+
+ try {
+ Thread.sleep(delayMs);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted during retry
sleep", ex);
+ }
+ retryCount++;
+ }
+ }
+
+ return null;
+ }
+
+ private static Cluster tryToInitializeCluster(AdminReadOnlyGateway
adminReadOnlyGateway)
throws Exception {
- ServerNode serverNode =
- new ServerNode(
- -1, address.getHostString(), address.getPort(),
ServerType.COORDINATOR);
- AdminReadOnlyGateway adminReadOnlyGateway =
- GatewayClientProxy.createGatewayProxy(
- () -> serverNode, rpcClient,
AdminReadOnlyGateway.class);
return sendMetadataRequestAndRebuildCluster(adminReadOnlyGateway,
Collections.emptySet());
}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterITCase.java
similarity index 98%
copy from
fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java
copy to
fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterITCase.java
index 5471d8b60..f7151dbda 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterITCase.java
@@ -37,8 +37,8 @@ import static
org.apache.fluss.client.utils.MetadataUtils.sendMetadataRequestAnd
import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
import static org.assertj.core.api.Assertions.assertThat;
-/** Test for update metadata of {@link MetadataUpdater}. */
-class MetadataUpdaterTest {
+/** IT test for update metadata of {@link MetadataUpdater}. */
+class MetadataUpdaterITCase {
@RegisterExtension
public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java
index 5471d8b60..6717cfe5a 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java
@@ -17,108 +17,82 @@
package org.apache.fluss.client.metadata;
-import org.apache.fluss.client.Connection;
-import org.apache.fluss.client.ConnectionFactory;
-import org.apache.fluss.client.admin.Admin;
import org.apache.fluss.cluster.Cluster;
import org.apache.fluss.cluster.ServerNode;
+import org.apache.fluss.cluster.ServerType;
import org.apache.fluss.config.Configuration;
-import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.exception.StaleMetadataException;
import org.apache.fluss.rpc.RpcClient;
-import org.apache.fluss.server.testutils.FlussClusterExtension;
+import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
+import org.apache.fluss.rpc.messages.MetadataRequest;
+import org.apache.fluss.rpc.messages.MetadataResponse;
+import org.apache.fluss.rpc.metrics.TestingClientMetricGroup;
+import org.apache.fluss.server.coordinator.TestCoordinatorGateway;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.RegisterExtension;
import java.util.Collections;
-import java.util.List;
+import java.util.concurrent.CompletableFuture;
-import static
org.apache.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster;
-import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
+import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.buildMetadataResponse;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
-/** Test for update metadata of {@link MetadataUpdater}. */
-class MetadataUpdaterTest {
+/** UT Test for update metadata of {@link MetadataUpdater}. */
+public class MetadataUpdaterTest {
- @RegisterExtension
- public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
- FlussClusterExtension.builder().setNumOfTabletServers(2).build();
+ private static final ServerNode CS_NODE =
+ new ServerNode(1, "localhost", 8080, ServerType.COORDINATOR);
+ private static final ServerNode TS_NODE =
+ new ServerNode(1, "localhost", 8080, ServerType.TABLET_SERVER);
@Test
- void testRebuildClusterNTimes() throws Exception {
- Configuration clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
- Connection conn = ConnectionFactory.createConnection(clientConf);
- Admin admin = conn.getAdmin();
- TablePath tablePath = TablePath.of("fluss", "test");
- admin.createTable(tablePath, DATA1_TABLE_DESCRIPTOR, true).get();
- admin.close();
- conn.close();
-
- RpcClient rpcClient = FLUSS_CLUSTER_EXTENSION.getRpcClient();
- MetadataUpdater metadataUpdater = new MetadataUpdater(clientConf,
rpcClient);
- // update metadata
- metadataUpdater.updateMetadata(Collections.singleton(tablePath), null,
null);
- Cluster cluster = metadataUpdater.getCluster();
-
- // repeat 20K times to reproduce StackOverflowError if there is
- // any N levels UnmodifiableCollection
- for (int i = 0; i < 20000; i++) {
- cluster =
- sendMetadataRequestAndRebuildCluster(
- FLUSS_CLUSTER_EXTENSION.newCoordinatorClient(),
- true,
- cluster,
- null,
- null,
- null);
- }
+ void testInitializeClusterWithRetries() throws Exception {
+ Configuration configuration = new Configuration();
+ RpcClient rpcClient =
+ RpcClient.create(configuration,
TestingClientMetricGroup.newInstance(), false);
+
+ // retry lower than max retry count.
+ AdminReadOnlyGateway gateway = new TestingAdminReadOnlyGateway(2);
+ Cluster cluster =
+ MetadataUpdater.tryToInitializeClusterWithRetries(rpcClient,
CS_NODE, gateway, 3);
+ assertThat(cluster).isNotNull();
+ assertThat(cluster.getCoordinatorServer()).isEqualTo(CS_NODE);
+
assertThat(cluster.getAliveTabletServerList()).containsExactly(TS_NODE);
+
+ // retry higher than max retry count.
+ AdminReadOnlyGateway gateway2 = new TestingAdminReadOnlyGateway(5);
+ assertThatThrownBy(
+ () ->
+
MetadataUpdater.tryToInitializeClusterWithRetries(
+ rpcClient, CS_NODE, gateway2, 3))
+ .isInstanceOf(StaleMetadataException.class)
+ .hasMessageContaining("The metadata is stale.");
}
- @Test
- void testUpdateWithEmptyMetadataResponse() throws Exception {
- RpcClient rpcClient = FLUSS_CLUSTER_EXTENSION.getRpcClient();
- MetadataUpdater metadataUpdater =
- new MetadataUpdater(FLUSS_CLUSTER_EXTENSION.getClientConfig(),
rpcClient);
-
- // update metadata
- metadataUpdater.updateMetadata(null, null, null);
- Cluster cluster = metadataUpdater.getCluster();
+ private static final class TestingAdminReadOnlyGateway extends
TestCoordinatorGateway {
- List<ServerNode> expectedServerNodes =
FLUSS_CLUSTER_EXTENSION.getTabletServerNodes();
- assertThat(expectedServerNodes).hasSize(2);
-
assertThat(cluster.getAliveTabletServerList()).isEqualTo(expectedServerNodes);
+ private final int maxRetryCount;
+ private int retryCount;
- // then, stop coordinator server, can still update metadata
- FLUSS_CLUSTER_EXTENSION.stopCoordinatorServer();
- metadataUpdater.updateMetadata(null, null, null);
-
assertThat(cluster.getAliveTabletServerList()).isEqualTo(expectedServerNodes);
-
- // start a new tablet server, the tablet server will return empty
metadata
- // response since no coordinator server to send newest metadata to the
tablet server
- int newServerId = 2;
- FLUSS_CLUSTER_EXTENSION.startTabletServer(newServerId);
-
- // we mock a new cluster with only server 1 so that it'll only send
request
- // to server 1, which will return empty resonate
- Cluster newCluster =
- new Cluster(
- Collections.singletonMap(
- newServerId,
-
FLUSS_CLUSTER_EXTENSION.getTabletServerNodes().get(newServerId)),
- null,
- Collections.emptyMap(),
- Collections.emptyMap(),
- Collections.emptyMap(),
- Collections.emptyMap());
-
- metadataUpdater = new MetadataUpdater(rpcClient, newCluster);
- // shouldn't update metadata to empty since the empty metadata will be
ignored
- metadataUpdater.updateMetadata(null, null, null);
- assertThat(metadataUpdater.getCluster().getAliveTabletServers())
- .isEqualTo(newCluster.getAliveTabletServers())
- .hasSize(1);
+ public TestingAdminReadOnlyGateway(int maxRetryCount) {
+ this.maxRetryCount = maxRetryCount;
+ }
- // recover the coordinator
- FLUSS_CLUSTER_EXTENSION.startCoordinatorServer();
+ @Override
+ public CompletableFuture<MetadataResponse> metadata(MetadataRequest
request) {
+ retryCount++;
+ if (retryCount <= maxRetryCount) {
+ throw new StaleMetadataException("The metadata is stale.");
+ } else {
+ MetadataResponse metadataResponse =
+ buildMetadataResponse(
+ CS_NODE,
+ Collections.singleton(TS_NODE),
+ Collections.emptyList(),
+ Collections.emptyList());
+ return CompletableFuture.completedFuture(metadataResponse);
+ }
+ }
}
}