This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d1dd425fcd [IOTDB-3718] Unify retry logic of SyncClientPool in 
ConfigNode (#6613)
d1dd425fcd is described below

commit d1dd425fcd9ab426b17459e12acedf017eacdd05
Author: 任宇华 <[email protected]>
AuthorDate: Mon Jul 11 14:19:27 2022 +0800

    [IOTDB-3718] Unify retry logic of SyncClientPool in ConfigNode (#6613)
---
 .../confignode/client/ConfigNodeRequestType.java   |  28 ++++
 .../confignode/client/DataNodeRequestType.java     |  27 ++++
 .../client/SyncConfigNodeClientPool.java           | 142 ++++++++-------------
 .../confignode/client/SyncDataNodeClientPool.java  |  99 +++++++-------
 .../confignode/conf/ConfigNodeRemoveCheck.java     |  17 ++-
 .../confignode/manager/PermissionManager.java      |   6 +-
 .../procedure/env/ConfigNodeProcedureEnv.java      |  24 +++-
 .../iotdb/confignode/service/ConfigNode.java       |   6 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |   9 +-
 9 files changed, 204 insertions(+), 154 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java
new file mode 100644
index 0000000000..c2592ceb56
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.client;
+
+public enum ConfigNodeRequestType {
+  addConsensusGroup,
+  notifyRegisterSuccess,
+  registerConfigNode,
+  removeConfigNode,
+  stopConfigNode;
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
new file mode 100644
index 0000000000..24199735ff
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.client;
+
+public enum DataNodeRequestType {
+  deleteRegions,
+  invalidatePartitionCache,
+  invalidatePermissionCache,
+  invalidateSchemaCache;
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncConfigNodeClientPool.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncConfigNodeClientPool.java
index f227d3314e..514cd1eab1 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncConfigNodeClientPool.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncConfigNodeClientPool.java
@@ -27,11 +27,14 @@ import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
 import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
+import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -62,118 +65,75 @@ public class SyncConfigNodeClientPool {
     }
   }
 
-  /** Only use registerConfigNode when the ConfigNode is first startup. */
-  public TConfigNodeRegisterResp registerConfigNode(
-      TEndPoint endPoint, TConfigNodeRegisterReq req) {
-    // TODO: Unified retry logic
+  public Object sendSyncRequestToConfigNode(
+      TEndPoint endPoint, Object req, ConfigNodeRequestType requestType) {
     Throwable lastException = null;
     for (int retry = 0; retry < retryNum; retry++) {
       try (SyncConfigNodeIServiceClient client = 
clientManager.borrowClient(endPoint)) {
-        return client.registerConfigNode(req);
-      } catch (Exception e) {
-        lastException = e;
-        LOGGER.warn("Register ConfigNode failed because {}, retrying {}...", 
e.getMessage(), retry);
-        doRetryWait(retry);
-      }
-    }
-    LOGGER.error("Register ConfigNode failed", lastException);
-    return new TConfigNodeRegisterResp()
-        .setStatus(
-            new TSStatus(TSStatusCode.ALL_RETRY_FAILED.getStatusCode())
-                .setMessage("All retry failed due to " + 
lastException.getMessage()));
-  }
-
-  public void addConsensusGroup(TEndPoint endPoint, List<TConfigNodeLocation> 
configNodeLocation)
-      throws Exception {
-    // TODO: Unified retry logic
-    Exception lastException = null;
-    for (int retry = 0; retry < retryNum; retry++) {
-      try (SyncConfigNodeIServiceClient client = 
clientManager.borrowClient(endPoint)) {
-        TConfigNodeRegisterResp registerResp = new TConfigNodeRegisterResp();
-        registerResp.setConfigNodeList(configNodeLocation);
-        registerResp.setStatus(StatusUtils.OK);
-        client.addConsensusGroup(registerResp);
-        return;
-      } catch (Exception e) {
+        switch (requestType) {
+          case registerConfigNode:
+            // Only use registerConfigNode when the ConfigNode is first 
startup.
+            return client.registerConfigNode((TConfigNodeRegisterReq) req);
+          case addConsensusGroup:
+            addConsensusGroup((List<TConfigNodeLocation>) req, client);
+            return null;
+          case notifyRegisterSuccess:
+            client.notifyRegisterSuccess();
+            return null;
+          case removeConfigNode:
+            return removeConfigNode((TConfigNodeLocation) req, client);
+          case stopConfigNode:
+            // Only use stopConfigNode when the ConfigNode is removed.
+            return client.stopConfigNode((TConfigNodeLocation) req);
+          default:
+            return RpcUtils.getStatus(
+                TSStatusCode.EXECUTE_STATEMENT_ERROR, "Unknown request type: " 
+ requestType);
+        }
+      } catch (Throwable e) {
         lastException = e;
         LOGGER.warn(
-            "Add Consensus Group failed because {}, retrying {} ...", 
e.getMessage(), retry);
+            "{} failed on ConfigNode {}, because {}, retrying {}...",
+            requestType,
+            endPoint,
+            e.getMessage(),
+            retry);
         doRetryWait(retry);
       }
     }
-
-    throw lastException;
+    LOGGER.error("{} failed on ConfigNode {}", requestType, endPoint, 
lastException);
+    return new TSStatus(TSStatusCode.ALL_RETRY_FAILED.getStatusCode())
+        .setMessage("All retry failed due to" + lastException.getMessage());
   }
 
-  public void notifyRegisterSuccess(TEndPoint endPoint) {
-    // TODO: Unified retry logic
-    for (int retry = 0; retry < retryNum; retry++) {
-      try (SyncConfigNodeIServiceClient client = 
clientManager.borrowClient(endPoint)) {
-        client.notifyRegisterSuccess();
-        return;
-      } catch (Exception e) {
-        LOGGER.warn("Notify register failed because {}, retrying {} ...", 
e.getMessage(), retry);
-        doRetryWait(retry);
-      }
-    }
+  public void addConsensusGroup(
+      List<TConfigNodeLocation> configNodeLocation, 
SyncConfigNodeIServiceClient client)
+      throws TException {
+    TConfigNodeRegisterResp registerResp = new TConfigNodeRegisterResp();
+    registerResp.setConfigNodeList(configNodeLocation);
+    registerResp.setStatus(StatusUtils.OK);
+    client.addConsensusGroup(registerResp);
+    return;
   }
 
   /**
    * ConfigNode Leader stop any ConfigNode in the cluster
    *
-   * @param configNodeLocations target_config_nodes of 
confignode-system.properties
    * @param configNodeLocation To be removed ConfigNode
    * @return SUCCESS_STATUS: remove ConfigNode success, other status remove 
failed
    */
   public TSStatus removeConfigNode(
-      List<TConfigNodeLocation> configNodeLocations, TConfigNodeLocation 
configNodeLocation) {
-    // TODO: Unified retry logic
-    Throwable lastException = null;
-    for (TConfigNodeLocation nodeLocation : configNodeLocations) {
-      for (int retry = 0; retry < retryNum; retry++) {
-        try (SyncConfigNodeIServiceClient client =
-            clientManager.borrowClient(nodeLocation.getInternalEndPoint())) {
-          TSStatus status = client.removeConfigNode(configNodeLocation);
-          while (status.getCode() == 
TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
-            TimeUnit.MILLISECONDS.sleep(2000);
-            updateConfigNodeLeader(status);
-            try (SyncConfigNodeIServiceClient clientLeader =
-                clientManager.borrowClient(configNodeLeader)) {
-              status = clientLeader.removeConfigNode(configNodeLocation);
-            }
-          }
-          return status;
-        } catch (Throwable e) {
-          lastException = e;
-          LOGGER.warn(
-              "Remove ConfigNode failed because {}, retrying {} ...", 
e.getMessage(), retry);
-          doRetryWait(retry);
-        }
+      TConfigNodeLocation configNodeLocation, SyncConfigNodeIServiceClient 
client)
+      throws TException, IOException, InterruptedException {
+    TSStatus status = client.removeConfigNode(configNodeLocation);
+    while (status.getCode() == TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
+      TimeUnit.MILLISECONDS.sleep(2000);
+      updateConfigNodeLeader(status);
+      try (SyncConfigNodeIServiceClient clientLeader =
+          clientManager.borrowClient(configNodeLeader)) {
+        status = clientLeader.removeConfigNode(configNodeLocation);
       }
     }
-
-    LOGGER.error("Remove ConfigNode failed", lastException);
-    return new TSStatus(TSStatusCode.ALL_RETRY_FAILED.getStatusCode())
-        .setMessage("All retry failed due to " + lastException.getMessage());
-  }
-
-  /** Only use stopConfigNode when the ConfigNode is removed. */
-  public TSStatus stopConfigNode(TConfigNodeLocation configNodeLocation) {
-    // TODO: Unified retry logic
-    Throwable lastException = null;
-    for (int retry = 0; retry < retryNum; retry++) {
-      try (SyncConfigNodeIServiceClient client =
-          
clientManager.borrowClient(configNodeLocation.getInternalEndPoint())) {
-        return client.stopConfigNode(configNodeLocation);
-      } catch (Exception e) {
-        lastException = e;
-        LOGGER.warn("Stop ConfigNode failed because {}, retrying {}...", 
e.getMessage(), retry);
-        doRetryWait(retry);
-      }
-    }
-    LOGGER.error("Stop ConfigNode failed", lastException);
-    return new TSStatus(TSStatusCode.ALL_RETRY_FAILED.getStatusCode())
-        .setMessage("All retry failed due to" + lastException.getMessage());
+    return status;
   }
 
   private void doRetryWait(int retryNum) {
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncDataNodeClientPool.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncDataNodeClientPool.java
index 22f184a345..83b84cfbac 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncDataNodeClientPool.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncDataNodeClientPool.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInvalidatePermissionCacheReq;
+import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.TException;
@@ -39,12 +40,15 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 /** Synchronously send RPC requests to DataNodes. See mpp.thrift for more 
details. */
 public class SyncDataNodeClientPool {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SyncDataNodeClientPool.class);
 
+  private static final int retryNum = 6;
+
   private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
clientManager;
 
   private SyncDataNodeClientPool() {
@@ -54,35 +58,38 @@ public class SyncDataNodeClientPool {
                 new 
ConfigNodeClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
   }
 
-  public TSStatus invalidatePartitionCache(
-      TEndPoint endPoint, TInvalidateCacheReq invalidateCacheReq) {
-    TSStatus status;
-    try (SyncDataNodeInternalServiceClient client = 
clientManager.borrowClient(endPoint)) {
-      status = client.invalidatePartitionCache(invalidateCacheReq);
-      LOGGER.info("Invalid Schema Cache {} successfully", invalidateCacheReq);
-    } catch (IOException e) {
-      LOGGER.error("Can't connect to DataNode {}", endPoint, e);
-      status = new TSStatus(TSStatusCode.TIME_OUT.getStatusCode());
-    } catch (TException e) {
-      LOGGER.error("Invalid Schema Cache on DataNode {} failed", endPoint, e);
-      status = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
-    }
-    return status;
-  }
-
-  public TSStatus invalidateSchemaCache(
-      TEndPoint endPoint, TInvalidateCacheReq invalidateCacheReq) {
-    TSStatus status;
-    try (SyncDataNodeInternalServiceClient client = 
clientManager.borrowClient(endPoint)) {
-      status = client.invalidateSchemaCache(invalidateCacheReq);
-    } catch (IOException e) {
-      LOGGER.error("Can't connect to DataNode {}", endPoint, e);
-      status = new TSStatus(TSStatusCode.TIME_OUT.getStatusCode());
-    } catch (TException e) {
-      LOGGER.error("Invalid Schema Cache on DataNode {} failed", endPoint, e);
-      status = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+  public TSStatus sendSyncRequestToDataNode(
+      TEndPoint endPoint, Object req, DataNodeRequestType requestType) {
+    Throwable lastException = null;
+    for (int retry = 0; retry < retryNum; retry++) {
+      try (SyncDataNodeInternalServiceClient client = 
clientManager.borrowClient(endPoint)) {
+        switch (requestType) {
+          case invalidatePartitionCache:
+            return client.invalidatePartitionCache((TInvalidateCacheReq) req);
+          case invalidateSchemaCache:
+            return client.invalidateSchemaCache((TInvalidateCacheReq) req);
+          case deleteRegions:
+            return client.deleteRegion((TConsensusGroupId) req);
+          case invalidatePermissionCache:
+            return 
client.invalidatePermissionCache((TInvalidatePermissionCacheReq) req);
+          default:
+            return RpcUtils.getStatus(
+                TSStatusCode.EXECUTE_STATEMENT_ERROR, "Unknown request type: " 
+ requestType);
+        }
+      } catch (TException | IOException e) {
+        lastException = e;
+        LOGGER.warn(
+            "{} failed on DataNode {}, because {}, retrying {}...",
+            requestType,
+            endPoint,
+            e.getMessage(),
+            retry);
+        doRetryWait(retry);
+      }
     }
-    return status;
+    LOGGER.error("{} failed on DataNode {}", requestType, endPoint, 
lastException);
+    return new TSStatus(TSStatusCode.ALL_RETRY_FAILED.getStatusCode())
+        .setMessage("All retry failed due to" + lastException.getMessage());
   }
 
   public void deleteRegions(Set<TRegionReplicaSet> deletedRegionSet) {
@@ -105,35 +112,23 @@ public class SyncDataNodeClientPool {
       TEndPoint endPoint,
       List<TConsensusGroupId> regionIds,
       Set<TRegionReplicaSet> deletedRegionSet) {
-    try (SyncDataNodeInternalServiceClient client = 
clientManager.borrowClient(endPoint)) {
-      for (TConsensusGroupId regionId : regionIds) {
-        LOGGER.debug("Delete region {} ", regionId);
-        final TSStatus status = client.deleteRegion(regionId);
-        if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-          LOGGER.info("DELETE Region {} successfully", regionId);
-          deletedRegionSet.removeIf(k -> k.getRegionId().equals(regionId));
-        }
+    for (TConsensusGroupId regionId : regionIds) {
+      LOGGER.debug("Delete region {} ", regionId);
+      final TSStatus status =
+          sendSyncRequestToDataNode(endPoint, regionId, 
DataNodeRequestType.deleteRegions);
+      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        LOGGER.info("DELETE Region {} successfully", regionId);
+        deletedRegionSet.removeIf(k -> k.getRegionId().equals(regionId));
       }
-    } catch (IOException e) {
-      LOGGER.error("Can't connect to DataNode {}", endPoint, e);
-    } catch (TException e) {
-      LOGGER.error("Delete Region on DataNode {} failed", endPoint, e);
     }
   }
 
-  public TSStatus invalidatePermissionCache(
-      TEndPoint endPoint, TInvalidatePermissionCacheReq 
invalidatePermissionCacheReq) {
-    TSStatus status;
-    try (SyncDataNodeInternalServiceClient client = 
clientManager.borrowClient(endPoint)) {
-      status = client.invalidatePermissionCache(invalidatePermissionCacheReq);
-    } catch (IOException e) {
-      LOGGER.error("Can't connect to DataNode {}", endPoint, e);
-      status = new TSStatus(TSStatusCode.TIME_OUT.getStatusCode());
-    } catch (TException e) {
-      LOGGER.error("Invalid Permission Cache on DataNode {} failed", endPoint, 
e);
-      status = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+  private void doRetryWait(int retryNum) {
+    try {
+      TimeUnit.MILLISECONDS.sleep(100L * (long) Math.pow(2, retryNum));
+    } catch (InterruptedException e) {
+      LOGGER.error("Retry wait failed.", e);
     }
-    return status;
   }
 
   // TODO: Is the ClientPool must be a singleton?
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeRemoveCheck.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeRemoveCheck.java
index ab18779c92..841e224ca7 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeRemoveCheck.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeRemoveCheck.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.BadNodeUrlException;
 import org.apache.iotdb.commons.utils.NodeUrlUtils;
+import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
 import org.apache.iotdb.confignode.client.SyncConfigNodeClientPool;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -72,14 +73,26 @@ public class ConfigNodeRemoveCheck {
 
   public void removeConfigNode(TConfigNodeLocation nodeLocation)
       throws BadNodeUrlException, IOException {
-    TSStatus status =
-        
SyncConfigNodeClientPool.getInstance().removeConfigNode(getConfigNodeList(), 
nodeLocation);
+    TSStatus status = new TSStatus();
+    for (TConfigNodeLocation configNodeLocation : getConfigNodeList()) {
+      status =
+          (TSStatus)
+              SyncConfigNodeClientPool.getInstance()
+                  .sendSyncRequestToConfigNode(
+                      configNodeLocation.getInternalEndPoint(),
+                      nodeLocation,
+                      ConfigNodeRequestType.removeConfigNode);
+      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        break;
+      }
+    }
     if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       LOGGER.error(status.getMessage());
       throw new IOException("Remove ConfigNode failed:");
     }
   }
 
+  /** target_config_nodes of confignode-system.properties */
   public List<TConfigNodeLocation> getConfigNodeList() throws 
BadNodeUrlException {
     return NodeUrlUtils.parseTConfigNodeUrls(
         systemProperties.getProperty(IoTDBConstant.TARGET_CONFIG_NODES));
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
index ad3e361645..d6d5da8bc1 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.manager;
 
 import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
 import org.apache.iotdb.confignode.client.SyncDataNodeClientPool;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
 import org.apache.iotdb.confignode.consensus.request.auth.AuthorPlan;
@@ -106,7 +107,10 @@ public class PermissionManager {
     for (TDataNodeInfo dataNodeInfo : allDataNodes) {
       status =
           SyncDataNodeClientPool.getInstance()
-              
.invalidatePermissionCache(dataNodeInfo.getLocation().getInternalEndPoint(), 
req);
+              .sendSyncRequestToDataNode(
+                  dataNodeInfo.getLocation().getInternalEndPoint(),
+                  req,
+                  DataNodeRequestType.invalidatePermissionCache);
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return status;
       }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 82981f54d6..1a6986813f 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.confignode.procedure.env;
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
 import org.apache.iotdb.confignode.client.SyncConfigNodeClientPool;
 import org.apache.iotdb.confignode.client.SyncDataNodeClientPool;
 import 
org.apache.iotdb.confignode.consensus.request.write.DeleteStorageGroupPlan;
@@ -113,12 +115,16 @@ public class ConfigNodeProcedureEnv {
     for (TDataNodeInfo dataNodeInfo : allDataNodes) {
       final TSStatus invalidateSchemaStatus =
           SyncDataNodeClientPool.getInstance()
-              .invalidateSchemaCache(
-                  dataNodeInfo.getLocation().getInternalEndPoint(), 
invalidateCacheReq);
+              .sendSyncRequestToDataNode(
+                  dataNodeInfo.getLocation().getInternalEndPoint(),
+                  invalidateCacheReq,
+                  DataNodeRequestType.invalidateSchemaCache);
       final TSStatus invalidatePartitionStatus =
           SyncDataNodeClientPool.getInstance()
-              .invalidatePartitionCache(
-                  dataNodeInfo.getLocation().getInternalEndPoint(), 
invalidateCacheReq);
+              .sendSyncRequestToDataNode(
+                  dataNodeInfo.getLocation().getInternalEndPoint(),
+                  invalidateCacheReq,
+                  DataNodeRequestType.invalidatePartitionCache);
       if (!verifySucceed(invalidatePartitionStatus, invalidateSchemaStatus)) {
         LOG.error(
             "Invalidate cache failed, invalidate partition cache status is {}, 
invalidate schema cache status is {}",
@@ -145,7 +151,10 @@ public class ConfigNodeProcedureEnv {
         new 
ArrayList<>(configManager.getNodeManager().getRegisteredConfigNodes());
     configNodeLocations.add(tConfigNodeLocation);
     SyncConfigNodeClientPool.getInstance()
-        .addConsensusGroup(tConfigNodeLocation.getInternalEndPoint(), 
configNodeLocations);
+        .sendSyncRequestToConfigNode(
+            tConfigNodeLocation.getInternalEndPoint(),
+            configNodeLocations,
+            ConfigNodeRequestType.addConsensusGroup);
   }
 
   /**
@@ -174,7 +183,10 @@ public class ConfigNodeProcedureEnv {
    */
   public void notifyRegisterSuccess(TConfigNodeLocation configNodeLocation) {
     SyncConfigNodeClientPool.getInstance()
-        .notifyRegisterSuccess(configNodeLocation.getInternalEndPoint());
+        .sendSyncRequestToConfigNode(
+            configNodeLocation.getInternalEndPoint(),
+            null,
+            ConfigNodeRequestType.notifyRegisterSuccess);
   }
 
   public ReentrantLock getAddConfigNodeLock() {
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index 4f239a7ab3..a1e3698103 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
 import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
 import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
 import org.apache.iotdb.commons.utils.NodeUrlUtils;
+import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
 import org.apache.iotdb.confignode.client.SyncConfigNodeClientPool;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
@@ -187,7 +188,10 @@ public class ConfigNode implements ConfigNodeMBean {
     TEndPoint targetConfigNode = conf.getTargetConfigNode();
     while (true) {
       TConfigNodeRegisterResp resp =
-          
SyncConfigNodeClientPool.getInstance().registerConfigNode(targetConfigNode, 
req);
+          (TConfigNodeRegisterResp)
+              SyncConfigNodeClientPool.getInstance()
+                  .sendSyncRequestToConfigNode(
+                      targetConfigNode, req, 
ConfigNodeRequestType.registerConfigNode);
       if (resp.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         conf.setPartitionRegionId(resp.getPartitionRegionId().getId());
         break;
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 8667f80c40..705417218a 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
 import org.apache.iotdb.confignode.client.SyncConfigNodeClientPool;
 import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
@@ -431,7 +432,13 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
 
     TSStatus status = configManager.removeConfigNode(removeConfigNodePlan);
     if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      status = 
SyncConfigNodeClientPool.getInstance().stopConfigNode(configNodeLocation);
+      status =
+          (TSStatus)
+              SyncConfigNodeClientPool.getInstance()
+                  .sendSyncRequestToConfigNode(
+                      configNodeLocation.getInternalEndPoint(),
+                      configNodeLocation,
+                      ConfigNodeRequestType.stopConfigNode);
     }
 
     // Print log to record the ConfigNode that performs the 
RemoveConfigNodeRequest

Reply via email to