wuchong commented on code in PR #2024:
URL: https://github.com/apache/fluss/pull/2024#discussion_r2567705180
##########
fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java:
##########
@@ -316,14 +335,61 @@ private static Cluster initializeCluster(Configuration
conf, RpcClient rpcClient
return cluster;
}
- private static Cluster tryToInitializeCluster(RpcClient rpcClient,
InetSocketAddress address)
+ @VisibleForTesting
+ static @Nullable Cluster tryToInitializeClusterWithRetries(
+ RpcClient rpcClient,
+ ServerNode serverNode,
+ AdminReadOnlyGateway gateway,
+ int maxRetryTimes)
+ throws Exception {
+ int retryCount = 0;
+ while (retryCount <= maxRetryTimes) {
+ try {
+ return tryToInitializeCluster(gateway);
+ } catch (Exception e) {
+ Throwable cause = stripExecutionException(e);
+ if (!(cause instanceof StaleMetadataException
+ || cause instanceof NetworkException)) {
+ throw e;
+ }
+
+ if (retryCount >= maxRetryTimes) {
+ LOG.warn(
+ "Max retries ({}) exceeded to connect to {}. ",
+ maxRetryTimes,
+ serverNode,
+ e);
+ return null;
Review Comment:
We don't need to logging, we should throw the exception and let the parent
method catch it. Otherwise, the root exception is swallowed in the exception
message.
##########
fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java:
##########
@@ -316,14 +335,61 @@ private static Cluster initializeCluster(Configuration
conf, RpcClient rpcClient
return cluster;
}
- private static Cluster tryToInitializeCluster(RpcClient rpcClient,
InetSocketAddress address)
+ @VisibleForTesting
+ static @Nullable Cluster tryToInitializeClusterWithRetries(
+ RpcClient rpcClient,
+ ServerNode serverNode,
+ AdminReadOnlyGateway gateway,
+ int maxRetryTimes)
+ throws Exception {
+ int retryCount = 0;
+ while (retryCount <= maxRetryTimes) {
+ try {
+ return tryToInitializeCluster(gateway);
+ } catch (Exception e) {
+ Throwable cause = stripExecutionException(e);
+ if (!(cause instanceof StaleMetadataException
+ || cause instanceof NetworkException)) {
+ throw e;
+ }
+
+ if (retryCount >= maxRetryTimes) {
Review Comment:
Add comment above `// in case of bootstrap is recovering, we should retry to
connect`
##########
fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java:
##########
@@ -56,12 +57,14 @@
import java.util.stream.Collectors;
import static
org.apache.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster;
+import static org.apache.fluss.utils.ExceptionUtils.stripExecutionException;
/** The updater to initialize and update client metadata. */
public class MetadataUpdater {
private static final Logger LOG =
LoggerFactory.getLogger(MetadataUpdater.class);
private static final int MAX_RETRY_TIMES = 5;
Review Comment:
Do we need so many retry times? 3 times is enough?
##########
fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java:
##########
@@ -316,14 +335,61 @@ private static Cluster initializeCluster(Configuration
conf, RpcClient rpcClient
return cluster;
Review Comment:
We should improve this exception message, it didn't carry exception message
to the top error message which may confuse users.
```
String errorMsg =
"Failed to initialize fluss client connection to
bootstrap servers: "
+ inetSocketAddresses
+ ". \nReason: "
+ lastException.getMessage();
```
##########
fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java:
##########
@@ -293,8 +296,24 @@ private static Cluster initializeCluster(Configuration
conf, RpcClient rpcClient
Exception lastException = null;
for (InetSocketAddress address : inetSocketAddresses) {
try {
- cluster = tryToInitializeCluster(rpcClient, address);
- break;
+ ServerNode serverNode =
+ new ServerNode(
+ -1,
+ address.getHostString(),
+ address.getPort(),
+ ServerType.COORDINATOR);
+ AdminReadOnlyGateway adminReadOnlyGateway =
+ GatewayClientProxy.createGatewayProxy(
+ () -> serverNode, rpcClient,
AdminReadOnlyGateway.class);
+ if (inetSocketAddresses.size() == 1) {
+ // if there is only one bootstrap server, we can retry to
connect to it.
+ cluster =
+ tryToInitializeClusterWithRetries(
+ rpcClient, serverNode,
adminReadOnlyGateway, MAX_RETRY_TIMES);
+ } else {
+ cluster = tryToInitializeCluster(adminReadOnlyGateway);
+ break;
+ }
} catch (Exception e) {
LOG.error(
Review Comment:
We should `.disconnect()` here as well? for the case of
`tryToInitializeCluster` failed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]