This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f809556e7e Do not wait to retry when configLeader exists (#12075)
9f809556e7e is described below

commit 9f809556e7e5c06d5d59854817f3a0889c5897ff
Author: Peng Junzhi <[email protected]>
AuthorDate: Thu Feb 29 14:57:20 2024 +0800

    Do not wait to retry when configLeader exists (#12075)
---
 .../manager/consensus/ConsensusManager.java        | 18 ++++++++++-
 .../iotdb/confignode/manager/node/NodeManager.java |  5 ++-
 .../iotdb/db/protocol/client/ConfigNodeClient.java | 37 ++++++++--------------
 3 files changed, 33 insertions(+), 27 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
index f9ce496ea65..0e329fda460 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
@@ -65,6 +65,9 @@ public class ConsensusManager {
   private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
   private static final CommonConfig COMMON_CONF = 
CommonDescriptor.getInstance().getConfig();
   private static final int SEED_CONFIG_NODE_ID = 0;
+  private static final long MAX_WAIT_READY_TIME_MS =
+      CommonDescriptor.getInstance().getConfig().getConnectionTimeoutInMS() / 
2;
+  private static final long RETRY_WAIT_TIME_MS = 100;
   /** There is only one ConfigNodeGroup */
   public static final ConsensusGroupId DEFAULT_CONSENSUS_GROUP_ID =
       new ConfigRegionId(CONF.getConfigRegionId());
@@ -345,7 +348,7 @@ public class ConsensusManager {
         return leaderPeer;
       }
       try {
-        TimeUnit.MILLISECONDS.sleep(100);
+        TimeUnit.MILLISECONDS.sleep(RETRY_WAIT_TIME_MS);
       } catch (InterruptedException e) {
         LOGGER.warn("ConsensusManager getLeaderPeer been interrupted, ", e);
         Thread.currentThread().interrupt();
@@ -384,6 +387,19 @@ public class ConsensusManager {
     } else {
       result.setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode());
       if (isLeader()) {
+        long startTime = System.currentTimeMillis();
+        while (System.currentTimeMillis() - startTime < 
MAX_WAIT_READY_TIME_MS) {
+          if (isLeaderReady()) {
+            result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+            return result;
+          }
+          try {
+            Thread.sleep(RETRY_WAIT_TIME_MS);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            LOGGER.warn("Unexpected interruption during waiting for configNode 
leader ready.");
+          }
+        }
         result.setMessage(
             "The current ConfigNode is leader but not ready yet, please try 
again later.");
       } else {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index 307da61e91c..e212e2cc90d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -79,7 +79,6 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TSetDataNodeStatusReq;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.exception.ConsensusException;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -263,7 +262,7 @@ public class NodeManager {
         configManager
             .getClusterManager()
             .getClusterIdWithRetry(
-                
IoTDBDescriptor.getInstance().getConfig().getConnectionTimeoutInMS() / 2);
+                
CommonDescriptor.getInstance().getConfig().getConnectionTimeoutInMS() / 2);
     if (clusterId == null) {
       resp.setStatus(
           new TSStatus(TSStatusCode.GET_CLUSTER_ID_ERROR.getStatusCode())
@@ -318,7 +317,7 @@ public class NodeManager {
         configManager
             .getClusterManager()
             .getClusterIdWithRetry(
-                
IoTDBDescriptor.getInstance().getConfig().getConnectionTimeoutInMS() / 2);
+                
CommonDescriptor.getInstance().getConfig().getConnectionTimeoutInMS() / 2);
     TDataNodeRestartResp resp = new TDataNodeRestartResp();
     resp.setConfigNodeList(getRegisteredConfigNodes());
     if (clusterId == null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index 1387e5f338f..c17af251d90 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -184,18 +184,10 @@ public class ConfigNodeClient implements 
IConfigNodeRPCService.Iface, ThriftClie
     this.configNodes = configNodes;
     this.property = property;
     this.clientManager = clientManager;
+    // Set the first configNode as configLeader for a tentative connection
+    this.configLeader = this.configNodes.get(0);
 
-    init();
-  }
-
-  public void init() throws TException {
-    try {
-      tryToConnect();
-    } catch (TException e) {
-      // Can not connect to each config node
-      syncLatestConfigNodeList();
-      tryToConnect();
-    }
+    connectAndSync();
   }
 
   public void connect(TEndPoint endpoint) throws TException {
@@ -215,16 +207,7 @@ public class ConfigNodeClient implements 
IConfigNodeRPCService.Iface, ThriftClie
     client = new 
IConfigNodeRPCService.Client(property.getProtocolFactory().getProtocol(transport));
   }
 
-  private void waitAndReconnect() throws TException {
-    try {
-      // Wait to start the next try
-      Thread.sleep(RETRY_INTERVAL_MS);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new TException(
-          "Unexpected interruption when waiting to retry to connect to 
ConfigNode");
-    }
-
+  private void connectAndSync() throws TException {
     try {
       tryToConnect();
     } catch (TException e) {
@@ -243,6 +226,14 @@ public class ConfigNodeClient implements 
IConfigNodeRPCService.Iface, ThriftClie
         logger.warn("The current node may have been down {},try next node", 
configLeader);
         configLeader = null;
       }
+    } else {
+      try {
+        // Wait to start the next try
+        Thread.sleep(RETRY_INTERVAL_MS);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        logger.warn("Unexpected interruption when waiting to try to connect to 
ConfigNode");
+      }
     }
 
     if (transport != null) {
@@ -338,7 +329,7 @@ public class ConfigNodeClient implements 
IConfigNodeRPCService.Iface, ThriftClie
         logger.warn(message, e);
         configLeader = null;
       }
-      waitAndReconnect();
+      connectAndSync();
     }
     throw new TException(MSG_RECONNECTION_FAIL);
   }
@@ -386,7 +377,7 @@ public class ConfigNodeClient implements 
IConfigNodeRPCService.Iface, ThriftClie
         logger.warn(message, e);
         configLeader = null;
       }
-      waitAndReconnect();
+      connectAndSync();
     }
     throw new TException(MSG_RECONNECTION_FAIL);
   }

Reply via email to