This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d1dd425fcd [IOTDB-3718] Unify retry logic of SyncClientPool in
ConfigNode (#6613)
d1dd425fcd is described below
commit d1dd425fcd9ab426b17459e12acedf017eacdd05
Author: 任宇华 <[email protected]>
AuthorDate: Mon Jul 11 14:19:27 2022 +0800
[IOTDB-3718] Unify retry logic of SyncClientPool in ConfigNode (#6613)
---
.../confignode/client/ConfigNodeRequestType.java | 28 ++++
.../confignode/client/DataNodeRequestType.java | 27 ++++
.../client/SyncConfigNodeClientPool.java | 142 ++++++++-------------
.../confignode/client/SyncDataNodeClientPool.java | 99 +++++++-------
.../confignode/conf/ConfigNodeRemoveCheck.java | 17 ++-
.../confignode/manager/PermissionManager.java | 6 +-
.../procedure/env/ConfigNodeProcedureEnv.java | 24 +++-
.../iotdb/confignode/service/ConfigNode.java | 6 +-
.../thrift/ConfigNodeRPCServiceProcessor.java | 9 +-
9 files changed, 204 insertions(+), 154 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java
new file mode 100644
index 0000000000..c2592ceb56
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/ConfigNodeRequestType.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.client;
+
+public enum ConfigNodeRequestType {
+ addConsensusGroup,
+ notifyRegisterSuccess,
+ registerConfigNode,
+ removeConfigNode,
+ stopConfigNode;
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
new file mode 100644
index 0000000000..24199735ff
--- /dev/null
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.client;
+
+public enum DataNodeRequestType {
+ deleteRegions,
+ invalidatePartitionCache,
+ invalidatePermissionCache,
+ invalidateSchemaCache;
+}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncConfigNodeClientPool.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncConfigNodeClientPool.java
index f227d3314e..514cd1eab1 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncConfigNodeClientPool.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncConfigNodeClientPool.java
@@ -27,11 +27,14 @@ import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
+import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -62,118 +65,75 @@ public class SyncConfigNodeClientPool {
}
}
- /** Only use registerConfigNode when the ConfigNode is first startup. */
- public TConfigNodeRegisterResp registerConfigNode(
- TEndPoint endPoint, TConfigNodeRegisterReq req) {
- // TODO: Unified retry logic
+ public Object sendSyncRequestToConfigNode(
+ TEndPoint endPoint, Object req, ConfigNodeRequestType requestType) {
Throwable lastException = null;
for (int retry = 0; retry < retryNum; retry++) {
try (SyncConfigNodeIServiceClient client =
clientManager.borrowClient(endPoint)) {
- return client.registerConfigNode(req);
- } catch (Exception e) {
- lastException = e;
- LOGGER.warn("Register ConfigNode failed because {}, retrying {}...",
e.getMessage(), retry);
- doRetryWait(retry);
- }
- }
- LOGGER.error("Register ConfigNode failed", lastException);
- return new TConfigNodeRegisterResp()
- .setStatus(
- new TSStatus(TSStatusCode.ALL_RETRY_FAILED.getStatusCode())
- .setMessage("All retry failed due to " +
lastException.getMessage()));
- }
-
- public void addConsensusGroup(TEndPoint endPoint, List<TConfigNodeLocation>
configNodeLocation)
- throws Exception {
- // TODO: Unified retry logic
- Exception lastException = null;
- for (int retry = 0; retry < retryNum; retry++) {
- try (SyncConfigNodeIServiceClient client =
clientManager.borrowClient(endPoint)) {
- TConfigNodeRegisterResp registerResp = new TConfigNodeRegisterResp();
- registerResp.setConfigNodeList(configNodeLocation);
- registerResp.setStatus(StatusUtils.OK);
- client.addConsensusGroup(registerResp);
- return;
- } catch (Exception e) {
+ switch (requestType) {
+ case registerConfigNode:
+ // Only use registerConfigNode when the ConfigNode is first
startup.
+ return client.registerConfigNode((TConfigNodeRegisterReq) req);
+ case addConsensusGroup:
+ addConsensusGroup((List<TConfigNodeLocation>) req, client);
+ return null;
+ case notifyRegisterSuccess:
+ client.notifyRegisterSuccess();
+ return null;
+ case removeConfigNode:
+ return removeConfigNode((TConfigNodeLocation) req, client);
+ case stopConfigNode:
+ // Only use stopConfigNode when the ConfigNode is removed.
+ return client.stopConfigNode((TConfigNodeLocation) req);
+ default:
+ return RpcUtils.getStatus(
+ TSStatusCode.EXECUTE_STATEMENT_ERROR, "Unknown request type: "
+ requestType);
+ }
+ } catch (Throwable e) {
lastException = e;
LOGGER.warn(
- "Add Consensus Group failed because {}, retrying {} ...",
e.getMessage(), retry);
+ "{} failed on ConfigNode {}, because {}, retrying {}...",
+ requestType,
+ endPoint,
+ e.getMessage(),
+ retry);
doRetryWait(retry);
}
}
-
- throw lastException;
+ LOGGER.error("{} failed on ConfigNode {}", requestType, endPoint,
lastException);
+ return new TSStatus(TSStatusCode.ALL_RETRY_FAILED.getStatusCode())
+ .setMessage("All retry failed due to" + lastException.getMessage());
}
- public void notifyRegisterSuccess(TEndPoint endPoint) {
- // TODO: Unified retry logic
- for (int retry = 0; retry < retryNum; retry++) {
- try (SyncConfigNodeIServiceClient client =
clientManager.borrowClient(endPoint)) {
- client.notifyRegisterSuccess();
- return;
- } catch (Exception e) {
- LOGGER.warn("Notify register failed because {}, retrying {} ...",
e.getMessage(), retry);
- doRetryWait(retry);
- }
- }
+ public void addConsensusGroup(
+ List<TConfigNodeLocation> configNodeLocation,
SyncConfigNodeIServiceClient client)
+ throws TException {
+ TConfigNodeRegisterResp registerResp = new TConfigNodeRegisterResp();
+ registerResp.setConfigNodeList(configNodeLocation);
+ registerResp.setStatus(StatusUtils.OK);
+ client.addConsensusGroup(registerResp);
+ return;
}
/**
* ConfigNode Leader stop any ConfigNode in the cluster
*
- * @param configNodeLocations target_config_nodes of
confignode-system.properties
* @param configNodeLocation To be removed ConfigNode
* @return SUCCESS_STATUS: remove ConfigNode success, other status remove
failed
*/
public TSStatus removeConfigNode(
- List<TConfigNodeLocation> configNodeLocations, TConfigNodeLocation
configNodeLocation) {
- // TODO: Unified retry logic
- Throwable lastException = null;
- for (TConfigNodeLocation nodeLocation : configNodeLocations) {
- for (int retry = 0; retry < retryNum; retry++) {
- try (SyncConfigNodeIServiceClient client =
- clientManager.borrowClient(nodeLocation.getInternalEndPoint())) {
- TSStatus status = client.removeConfigNode(configNodeLocation);
- while (status.getCode() ==
TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
- TimeUnit.MILLISECONDS.sleep(2000);
- updateConfigNodeLeader(status);
- try (SyncConfigNodeIServiceClient clientLeader =
- clientManager.borrowClient(configNodeLeader)) {
- status = clientLeader.removeConfigNode(configNodeLocation);
- }
- }
- return status;
- } catch (Throwable e) {
- lastException = e;
- LOGGER.warn(
- "Remove ConfigNode failed because {}, retrying {} ...",
e.getMessage(), retry);
- doRetryWait(retry);
- }
+ TConfigNodeLocation configNodeLocation, SyncConfigNodeIServiceClient
client)
+ throws TException, IOException, InterruptedException {
+ TSStatus status = client.removeConfigNode(configNodeLocation);
+ while (status.getCode() == TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
+ TimeUnit.MILLISECONDS.sleep(2000);
+ updateConfigNodeLeader(status);
+ try (SyncConfigNodeIServiceClient clientLeader =
+ clientManager.borrowClient(configNodeLeader)) {
+ status = clientLeader.removeConfigNode(configNodeLocation);
}
}
-
- LOGGER.error("Remove ConfigNode failed", lastException);
- return new TSStatus(TSStatusCode.ALL_RETRY_FAILED.getStatusCode())
- .setMessage("All retry failed due to " + lastException.getMessage());
- }
-
- /** Only use stopConfigNode when the ConfigNode is removed. */
- public TSStatus stopConfigNode(TConfigNodeLocation configNodeLocation) {
- // TODO: Unified retry logic
- Throwable lastException = null;
- for (int retry = 0; retry < retryNum; retry++) {
- try (SyncConfigNodeIServiceClient client =
-
clientManager.borrowClient(configNodeLocation.getInternalEndPoint())) {
- return client.stopConfigNode(configNodeLocation);
- } catch (Exception e) {
- lastException = e;
- LOGGER.warn("Stop ConfigNode failed because {}, retrying {}...",
e.getMessage(), retry);
- doRetryWait(retry);
- }
- }
- LOGGER.error("Stop ConfigNode failed", lastException);
- return new TSStatus(TSStatusCode.ALL_RETRY_FAILED.getStatusCode())
- .setMessage("All retry failed due to" + lastException.getMessage());
+ return status;
}
private void doRetryWait(int retryNum) {
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncDataNodeClientPool.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncDataNodeClientPool.java
index 22f184a345..83b84cfbac 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncDataNodeClientPool.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/SyncDataNodeClientPool.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidatePermissionCacheReq;
+import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
@@ -39,12 +40,15 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
/** Synchronously send RPC requests to DataNodes. See mpp.thrift for more
details. */
public class SyncDataNodeClientPool {
private static final Logger LOGGER =
LoggerFactory.getLogger(SyncDataNodeClientPool.class);
+ private static final int retryNum = 6;
+
private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
clientManager;
private SyncDataNodeClientPool() {
@@ -54,35 +58,38 @@ public class SyncDataNodeClientPool {
new
ConfigNodeClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
}
- public TSStatus invalidatePartitionCache(
- TEndPoint endPoint, TInvalidateCacheReq invalidateCacheReq) {
- TSStatus status;
- try (SyncDataNodeInternalServiceClient client =
clientManager.borrowClient(endPoint)) {
- status = client.invalidatePartitionCache(invalidateCacheReq);
- LOGGER.info("Invalid Schema Cache {} successfully", invalidateCacheReq);
- } catch (IOException e) {
- LOGGER.error("Can't connect to DataNode {}", endPoint, e);
- status = new TSStatus(TSStatusCode.TIME_OUT.getStatusCode());
- } catch (TException e) {
- LOGGER.error("Invalid Schema Cache on DataNode {} failed", endPoint, e);
- status = new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
- }
- return status;
- }
-
- public TSStatus invalidateSchemaCache(
- TEndPoint endPoint, TInvalidateCacheReq invalidateCacheReq) {
- TSStatus status;
- try (SyncDataNodeInternalServiceClient client =
clientManager.borrowClient(endPoint)) {
- status = client.invalidateSchemaCache(invalidateCacheReq);
- } catch (IOException e) {
- LOGGER.error("Can't connect to DataNode {}", endPoint, e);
- status = new TSStatus(TSStatusCode.TIME_OUT.getStatusCode());
- } catch (TException e) {
- LOGGER.error("Invalid Schema Cache on DataNode {} failed", endPoint, e);
- status = new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+ public TSStatus sendSyncRequestToDataNode(
+ TEndPoint endPoint, Object req, DataNodeRequestType requestType) {
+ Throwable lastException = null;
+ for (int retry = 0; retry < retryNum; retry++) {
+ try (SyncDataNodeInternalServiceClient client =
clientManager.borrowClient(endPoint)) {
+ switch (requestType) {
+ case invalidatePartitionCache:
+ return client.invalidatePartitionCache((TInvalidateCacheReq) req);
+ case invalidateSchemaCache:
+ return client.invalidateSchemaCache((TInvalidateCacheReq) req);
+ case deleteRegions:
+ return client.deleteRegion((TConsensusGroupId) req);
+ case invalidatePermissionCache:
+ return
client.invalidatePermissionCache((TInvalidatePermissionCacheReq) req);
+ default:
+ return RpcUtils.getStatus(
+ TSStatusCode.EXECUTE_STATEMENT_ERROR, "Unknown request type: "
+ requestType);
+ }
+ } catch (TException | IOException e) {
+ lastException = e;
+ LOGGER.warn(
+ "{} failed on DataNode {}, because {}, retrying {}...",
+ requestType,
+ endPoint,
+ e.getMessage(),
+ retry);
+ doRetryWait(retry);
+ }
}
- return status;
+ LOGGER.error("{} failed on DataNode {}", requestType, endPoint,
lastException);
+ return new TSStatus(TSStatusCode.ALL_RETRY_FAILED.getStatusCode())
+ .setMessage("All retry failed due to" + lastException.getMessage());
}
public void deleteRegions(Set<TRegionReplicaSet> deletedRegionSet) {
@@ -105,35 +112,23 @@ public class SyncDataNodeClientPool {
TEndPoint endPoint,
List<TConsensusGroupId> regionIds,
Set<TRegionReplicaSet> deletedRegionSet) {
- try (SyncDataNodeInternalServiceClient client =
clientManager.borrowClient(endPoint)) {
- for (TConsensusGroupId regionId : regionIds) {
- LOGGER.debug("Delete region {} ", regionId);
- final TSStatus status = client.deleteRegion(regionId);
- if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.info("DELETE Region {} successfully", regionId);
- deletedRegionSet.removeIf(k -> k.getRegionId().equals(regionId));
- }
+ for (TConsensusGroupId regionId : regionIds) {
+ LOGGER.debug("Delete region {} ", regionId);
+ final TSStatus status =
+ sendSyncRequestToDataNode(endPoint, regionId,
DataNodeRequestType.deleteRegions);
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.info("DELETE Region {} successfully", regionId);
+ deletedRegionSet.removeIf(k -> k.getRegionId().equals(regionId));
}
- } catch (IOException e) {
- LOGGER.error("Can't connect to DataNode {}", endPoint, e);
- } catch (TException e) {
- LOGGER.error("Delete Region on DataNode {} failed", endPoint, e);
}
}
- public TSStatus invalidatePermissionCache(
- TEndPoint endPoint, TInvalidatePermissionCacheReq
invalidatePermissionCacheReq) {
- TSStatus status;
- try (SyncDataNodeInternalServiceClient client =
clientManager.borrowClient(endPoint)) {
- status = client.invalidatePermissionCache(invalidatePermissionCacheReq);
- } catch (IOException e) {
- LOGGER.error("Can't connect to DataNode {}", endPoint, e);
- status = new TSStatus(TSStatusCode.TIME_OUT.getStatusCode());
- } catch (TException e) {
- LOGGER.error("Invalid Permission Cache on DataNode {} failed", endPoint,
e);
- status = new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+ private void doRetryWait(int retryNum) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(100L * (long) Math.pow(2, retryNum));
+ } catch (InterruptedException e) {
+ LOGGER.error("Retry wait failed.", e);
}
- return status;
}
// TODO: Is the ClientPool must be a singleton?
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeRemoveCheck.java
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeRemoveCheck.java
index ab18779c92..841e224ca7 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeRemoveCheck.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeRemoveCheck.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.BadNodeUrlException;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
+import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
import org.apache.iotdb.confignode.client.SyncConfigNodeClientPool;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -72,14 +73,26 @@ public class ConfigNodeRemoveCheck {
public void removeConfigNode(TConfigNodeLocation nodeLocation)
throws BadNodeUrlException, IOException {
- TSStatus status =
-
SyncConfigNodeClientPool.getInstance().removeConfigNode(getConfigNodeList(),
nodeLocation);
+ TSStatus status = new TSStatus();
+ for (TConfigNodeLocation configNodeLocation : getConfigNodeList()) {
+ status =
+ (TSStatus)
+ SyncConfigNodeClientPool.getInstance()
+ .sendSyncRequestToConfigNode(
+ configNodeLocation.getInternalEndPoint(),
+ nodeLocation,
+ ConfigNodeRequestType.removeConfigNode);
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ break;
+ }
+ }
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.error(status.getMessage());
throw new IOException("Remove ConfigNode failed:");
}
}
+ /** target_config_nodes of confignode-system.properties */
public List<TConfigNodeLocation> getConfigNodeList() throws
BadNodeUrlException {
return NodeUrlUtils.parseTConfigNodeUrls(
systemProperties.getProperty(IoTDBConstant.TARGET_CONFIG_NODES));
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
index ad3e361645..d6d5da8bc1 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.SyncDataNodeClientPool;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
import org.apache.iotdb.confignode.consensus.request.auth.AuthorPlan;
@@ -106,7 +107,10 @@ public class PermissionManager {
for (TDataNodeInfo dataNodeInfo : allDataNodes) {
status =
SyncDataNodeClientPool.getInstance()
-
.invalidatePermissionCache(dataNodeInfo.getLocation().getInternalEndPoint(),
req);
+ .sendSyncRequestToDataNode(
+ dataNodeInfo.getLocation().getInternalEndPoint(),
+ req,
+ DataNodeRequestType.invalidatePermissionCache);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 82981f54d6..1a6986813f 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.confignode.procedure.env;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.SyncConfigNodeClientPool;
import org.apache.iotdb.confignode.client.SyncDataNodeClientPool;
import
org.apache.iotdb.confignode.consensus.request.write.DeleteStorageGroupPlan;
@@ -113,12 +115,16 @@ public class ConfigNodeProcedureEnv {
for (TDataNodeInfo dataNodeInfo : allDataNodes) {
final TSStatus invalidateSchemaStatus =
SyncDataNodeClientPool.getInstance()
- .invalidateSchemaCache(
- dataNodeInfo.getLocation().getInternalEndPoint(),
invalidateCacheReq);
+ .sendSyncRequestToDataNode(
+ dataNodeInfo.getLocation().getInternalEndPoint(),
+ invalidateCacheReq,
+ DataNodeRequestType.invalidateSchemaCache);
final TSStatus invalidatePartitionStatus =
SyncDataNodeClientPool.getInstance()
- .invalidatePartitionCache(
- dataNodeInfo.getLocation().getInternalEndPoint(),
invalidateCacheReq);
+ .sendSyncRequestToDataNode(
+ dataNodeInfo.getLocation().getInternalEndPoint(),
+ invalidateCacheReq,
+ DataNodeRequestType.invalidatePartitionCache);
if (!verifySucceed(invalidatePartitionStatus, invalidateSchemaStatus)) {
LOG.error(
"Invalidate cache failed, invalidate partition cache status is {},
invalidate schema cache status is {}",
@@ -145,7 +151,10 @@ public class ConfigNodeProcedureEnv {
new
ArrayList<>(configManager.getNodeManager().getRegisteredConfigNodes());
configNodeLocations.add(tConfigNodeLocation);
SyncConfigNodeClientPool.getInstance()
- .addConsensusGroup(tConfigNodeLocation.getInternalEndPoint(),
configNodeLocations);
+ .sendSyncRequestToConfigNode(
+ tConfigNodeLocation.getInternalEndPoint(),
+ configNodeLocations,
+ ConfigNodeRequestType.addConsensusGroup);
}
/**
@@ -174,7 +183,10 @@ public class ConfigNodeProcedureEnv {
*/
public void notifyRegisterSuccess(TConfigNodeLocation configNodeLocation) {
SyncConfigNodeClientPool.getInstance()
- .notifyRegisterSuccess(configNodeLocation.getInternalEndPoint());
+ .sendSyncRequestToConfigNode(
+ configNodeLocation.getInternalEndPoint(),
+ null,
+ ConfigNodeRequestType.notifyRegisterSuccess);
}
public ReentrantLock getAddConfigNodeLock() {
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index 4f239a7ab3..a1e3698103 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
+import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
import org.apache.iotdb.confignode.client.SyncConfigNodeClientPool;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
@@ -187,7 +188,10 @@ public class ConfigNode implements ConfigNodeMBean {
TEndPoint targetConfigNode = conf.getTargetConfigNode();
while (true) {
TConfigNodeRegisterResp resp =
-
SyncConfigNodeClientPool.getInstance().registerConfigNode(targetConfigNode,
req);
+ (TConfigNodeRegisterResp)
+ SyncConfigNodeClientPool.getInstance()
+ .sendSyncRequestToConfigNode(
+ targetConfigNode, req,
ConfigNodeRequestType.registerConfigNode);
if (resp.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
conf.setPartitionRegionId(resp.getPartitionRegionId().getId());
break;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 8667f80c40..705417218a 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
import org.apache.iotdb.confignode.client.SyncConfigNodeClientPool;
import org.apache.iotdb.confignode.conf.ConfigNodeConstant;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
@@ -431,7 +432,13 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
TSStatus status = configManager.removeConfigNode(removeConfigNodePlan);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- status =
SyncConfigNodeClientPool.getInstance().stopConfigNode(configNodeLocation);
+ status =
+ (TSStatus)
+ SyncConfigNodeClientPool.getInstance()
+ .sendSyncRequestToConfigNode(
+ configNodeLocation.getInternalEndPoint(),
+ configNodeLocation,
+ ConfigNodeRequestType.stopConfigNode);
}
// Print log to record the ConfigNode that performs the
RemoveConfigNodeRequest