This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7e08e68a0d4 IoTConsensusV2 X [Region Migration] Each component uses
an independent ClientManager && Filter out the consensus pipe when judging
resource release #14052
7e08e68a0d4 is described below
commit 7e08e68a0d4931b4d57f7acd7ee23d820106aaf4
Author: Peng Junzhi <[email protected]>
AuthorDate: Mon Nov 11 23:52:32 2024 +0800
IoTConsensusV2 X [Region Migration] Each component uses an independent
ClientManager && Filter out the consensus pipe when judging resource release
#14052
---
.../apache/iotdb/consensus/pipe/PipeConsensus.java | 13 ++++++++++---
.../consensus/pipe/PipeConsensusServerImpl.java | 1 +
.../pipeconsensus/PipeConsensusAsyncConnector.java | 2 +-
.../pipeconsensus/PipeConsensusSyncConnector.java | 2 +-
.../container/PipeConsensusClientMgrContainer.java | 22 ++++++++--------------
.../commons/pipe/agent/task/PipeTaskManager.java | 7 ++++---
6 files changed, 25 insertions(+), 22 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
index 4c31182b3d9..8a10a3df70a 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
@@ -111,8 +111,8 @@ public class PipeConsensus implements IConsensus {
config.getPipeConsensusConfig().getReplicateMode());
this.consensusPipeGuardian =
config.getPipeConsensusConfig().getPipe().getConsensusPipeGuardian();
- this.asyncClientManager =
PipeConsensusClientMgrContainer.getInstance().getAsyncClientManager();
- this.syncClientManager =
PipeConsensusClientMgrContainer.getInstance().getSyncClientManager();
+ this.asyncClientManager =
PipeConsensusClientMgrContainer.getInstance().newAsyncClientManager();
+ this.syncClientManager =
PipeConsensusClientMgrContainer.getInstance().newSyncClientManager();
}
@Override
@@ -373,16 +373,23 @@ public class PipeConsensus implements IConsensus {
try {
// let target peer reject new write
+ LOGGER.info("[{}] inactivate peer {}", CLASS_NAME, peer);
impl.setRemotePeerActive(peer, false);
KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.AFTER_INACTIVE_PEER);
+
// wait its consensus pipes to complete
+ LOGGER.info("[{}] wait target peer{} complete transfer...", CLASS_NAME,
peer);
impl.waitTargetPeerToPeersTransmissionCompleted(peer);
+
// remove consensus pipes between target peer and other peers
+ LOGGER.info("[{}] notify other peers to drop consensus pipes...",
CLASS_NAME);
impl.notifyPeersToDropConsensusPipe(peer);
+
// wait target peer to release all resource
+ LOGGER.info("[{}] wait {} to release all resource...", CLASS_NAME, peer);
impl.waitReleaseAllRegionRelatedResource(peer);
} catch (ConsensusGroupModifyPeerException e) {
- throw new ConsensusException(e.getMessage());
+ throw new ConsensusException(e);
}
KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.FINISH);
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
index 68b50c1c305..aec583e84cc 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
@@ -530,6 +530,7 @@ public class PipeConsensusServerImpl {
final List<String> consensusPipeNames =
peerManager.getPeers().stream()
+ .filter(peer -> !peer.equals(targetPeer))
.map(peer -> new ConsensusPipeName(targetPeer,
peer).toString())
.collect(Collectors.toList());
isTransmissionCompleted =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
index 2ba68560126..4f56d1a847b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
@@ -142,7 +142,7 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector implements Conse
nodeUrls, consensusGroupId, thisDataNodeId,
pipeConsensusConnectorMetrics);
retryConnector.customize(parameters, configuration);
asyncTransferClientManager =
- PipeConsensusClientMgrContainer.getInstance().getAsyncClientManager();
+ PipeConsensusClientMgrContainer.getInstance().newAsyncClientManager();
if (isTabletBatchModeEnabled) {
tabletBatchBuilder =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
index 96602638010..01b406674c9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
@@ -93,7 +93,7 @@ public class PipeConsensusSyncConnector extends
IoTDBConnector {
this.consensusGroupId = consensusGroupId;
this.thisDataNodeId = thisDataNodeId;
this.syncRetryClientManager =
- PipeConsensusClientMgrContainer.getInstance().getSyncClientManager();
+ PipeConsensusClientMgrContainer.getInstance().newSyncClientManager();
this.pipeConsensusConnectorMetrics = pipeConsensusConnectorMetrics;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/PipeConsensusClientMgrContainer.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/PipeConsensusClientMgrContainer.java
index 6fc04163433..383b995104b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/PipeConsensusClientMgrContainer.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/PipeConsensusClientMgrContainer.java
@@ -39,32 +39,26 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
*/
public class PipeConsensusClientMgrContainer {
private static final CommonConfig CONF =
CommonDescriptor.getInstance().getConfig();
- private final IClientManager<TEndPoint, AsyncPipeConsensusServiceClient>
asyncClientManager;
- private final IClientManager<TEndPoint, SyncPipeConsensusServiceClient>
syncClientManager;
+ private final PipeConsensusClientProperty config;
private PipeConsensusClientMgrContainer() {
// load rpc client config
- PipeConsensusClientProperty config =
+ this.config =
PipeConsensusClientProperty.newBuilder()
.setIsRpcThriftCompressionEnabled(CONF.isRpcThriftCompressionEnabled())
.setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode())
.setSelectorNumOfClientManager(CONF.getSelectorNumOfClientManager())
.build();
-
- this.asyncClientManager =
- new IClientManager.Factory<TEndPoint,
AsyncPipeConsensusServiceClient>()
- .createClientManager(new
AsyncPipeConsensusServiceClientPoolFactory(config));
- this.syncClientManager =
- new IClientManager.Factory<TEndPoint, SyncPipeConsensusServiceClient>()
- .createClientManager(new
SyncPipeConsensusServiceClientPoolFactory(config));
}
- public IClientManager<TEndPoint, AsyncPipeConsensusServiceClient>
getAsyncClientManager() {
- return asyncClientManager;
+ public IClientManager<TEndPoint, AsyncPipeConsensusServiceClient>
newAsyncClientManager() {
+ return new IClientManager.Factory<TEndPoint,
AsyncPipeConsensusServiceClient>()
+ .createClientManager(new
AsyncPipeConsensusServiceClientPoolFactory(config));
}
- public IClientManager<TEndPoint, SyncPipeConsensusServiceClient>
getSyncClientManager() {
- return syncClientManager;
+ public IClientManager<TEndPoint, SyncPipeConsensusServiceClient>
newSyncClientManager() {
+ return new IClientManager.Factory<TEndPoint,
SyncPipeConsensusServiceClient>()
+ .createClientManager(new
SyncPipeConsensusServiceClientPoolFactory(config));
}
private static class PipeConsensusClientMgrContainerHolder {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskManager.java
index 45c993f05da..70e959e66f0 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskManager.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.commons.pipe.agent.task;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeType;
import java.util.HashMap;
import java.util.Map;
@@ -115,9 +116,9 @@ public class PipeTaskManager {
* otherwise
*/
public synchronized boolean hasPipeTaskInConsensusGroup(final int
consensusGroupId) {
- return pipeMap.values().stream()
- .anyMatch(
- consensusGroupId2PipeTask ->
consensusGroupId2PipeTask.containsKey(consensusGroupId));
+ return pipeMap.entrySet().stream()
+ .filter(entry -> entry.getKey().getPipeType() != PipeType.CONSENSUS)
+ .anyMatch(entry -> entry.getValue().containsKey(consensusGroupId));
}
/**