This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 80ff23ff5d7 Pipe: Fixed the bug that config receivers may not be
one-to-one match with the sender clients (#12280)
80ff23ff5d7 is described below
commit 80ff23ff5d72566cab26e6a1839587e5b57447fe
Author: Caideyipi <[email protected]>
AuthorDate: Wed Apr 10 20:15:42 2024 +0800
Pipe: Fixed the bug that config receivers may not be one-to-one match with
the sender clients (#12280)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../pipe/it/autocreate/IoTDBPipeClusterIT.java | 8 ++--
.../statemachine/ConfigRegionStateMachine.java | 3 ++
.../iotdb/confignode/manager/ConfigManager.java | 11 ++++++
.../apache/iotdb/confignode/manager/IManager.java | 7 ++++
.../receiver/IoTDBConfigNodeReceiverAgent.java | 21 ++++++++++
.../receiver/protocol/IoTDBConfigNodeReceiver.java | 5 ++-
.../thrift/ConfigNodeRPCServiceProcessor.java | 8 +++-
.../protocol/thrift/IoTDBDataNodeReceiver.java | 40 ++++++++++++++++++-
.../thrift/IoTDBDataNodeReceiverAgent.java | 18 +++++++++
.../iotdb/db/protocol/client/ConfigNodeClient.java | 7 ++++
.../config/executor/ClusterConfigTaskExecutor.java | 21 +++++++++-
.../config/executor/IConfigTaskExecutor.java | 4 +-
.../commons/pipe/receiver/IoTDBReceiverAgent.java | 46 ++++++++++++++--------
.../src/main/thrift/confignode.thrift | 8 +++-
14 files changed, 177 insertions(+), 30 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
index 9113e3669ba..37e860906ad 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
@@ -725,9 +725,9 @@ public class IoTDBPipeClusterIT extends
AbstractPipeDualAutoIT {
final Thread t =
new Thread(
() -> {
- try (SyncConfigNodeIServiceClient client =
+ try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- TSStatus status =
+ final TSStatus status =
client.createPipe(
new TCreatePipeReq("p1", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
@@ -759,9 +759,9 @@ public class IoTDBPipeClusterIT extends
AbstractPipeDualAutoIT {
final Thread t =
new Thread(
() -> {
- try (SyncConfigNodeIServiceClient client =
+ try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- TSStatus status = client.dropPipe("p1");
+ final TSStatus status = client.dropPipe("p1");
if (status.getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
successCount.incrementAndGet();
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index 039624e49ce..b0d48cb6544 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -265,6 +265,9 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
// Shutdown leader related service for config pipe
PipeConfigNodeAgent.runtime().notifyLeaderUnavailable();
+ // Clean receiver file dir
+ PipeConfigNodeAgent.receiver().cleanPipeReceiverDir();
+
LOGGER.info(
"Current node [nodeId:{}, ip:port: {}] is not longer the leader, "
+ "all services on old leader are unavailable now.",
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 3c7ae9e7a7c..e25d26d7e8a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -2009,6 +2009,7 @@ public class ConfigManager implements IManager {
TPipeTransferResp result =
PipeConfigNodeAgent.receiver()
.receive(
+ req.getClientId(),
req.isAirGap
? new AirGapPseudoTPipeTransferRequest()
.setVersion(req.version)
@@ -2018,6 +2019,16 @@ public class ConfigManager implements IManager {
return new TPipeConfigTransferResp(result.status).setBody(result.body);
}
+ @Override
+ public TSStatus handleClientExit(String clientId) {
+ TSStatus status = confirmLeader();
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return status;
+ }
+ PipeConfigNodeAgent.receiver().handleClientExit(clientId);
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+
@Override
public TGetRegionIdResp getRegionId(TGetRegionIdReq req) {
TSStatus status = confirmLeader();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 05288fc46a0..1529f436c4b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -639,6 +639,13 @@ public interface IManager {
*/
TPipeConfigTransferResp handleTransferConfigPlan(TPipeConfigTransferReq req);
+ /**
+ * Execute the config req received from pipe.
+ *
+ * @return The result of handling.
+ */
+ TSStatus handleClientExit(String clientId);
+
/** Create Topic. */
TSStatus createTopic(TCreateTopicReq topic);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/receiver/IoTDBConfigNodeReceiverAgent.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/receiver/IoTDBConfigNodeReceiverAgent.java
index 2b5413172a7..3b7b5557dca 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/receiver/IoTDBConfigNodeReceiverAgent.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/receiver/IoTDBConfigNodeReceiverAgent.java
@@ -20,20 +20,41 @@
package org.apache.iotdb.confignode.manager.pipe.agent.receiver;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiver;
import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiverAgent;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import
org.apache.iotdb.confignode.manager.pipe.receiver.protocol.IoTDBConfigNodeReceiver;
import java.io.File;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
public class IoTDBConfigNodeReceiverAgent extends IoTDBReceiverAgent {
+ private final ConcurrentMap<String, IoTDBReceiver> clientKey2ReceiverMap =
+ new ConcurrentHashMap<>();
+
@Override
protected void initConstructors() {
RECEIVER_CONSTRUCTORS.put(
IoTDBConnectorRequestVersion.VERSION_1.getVersion(),
IoTDBConfigNodeReceiver::new);
}
+ @Override
+ protected IoTDBReceiver getReceiverWithSpecifiedClient(String key) {
+ return clientKey2ReceiverMap.get(key);
+ }
+
+ @Override
+ protected void setReceiverWithSpecifiedClient(String key, IoTDBReceiver
receiver) {
+ clientKey2ReceiverMap.put(key, receiver);
+ }
+
+ @Override
+ protected void removeReceiverWithSpecifiedClient(String key) {
+ clientKey2ReceiverMap.remove(key);
+ }
+
public void cleanPipeReceiverDir() {
cleanPipeReceiverDir(
new
File(ConfigNodeDescriptor.getInstance().getConf().getPipeReceiverFileDir()));
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index 6f2fb7966db..88fa0e2411d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -149,8 +149,9 @@ public class IoTDBConfigNodeReceiver extends
IoTDBFileReceiver {
}
}
- // This indicates that the client from DataNode to ConfigNode is newly
created and
- // thus the sender needs to re-handshake to notify its configurations.
+ // This indicates that the client from DataNode to ConfigNode is newly
created,
+ // mainly because the receiver has changed its leader, and thus the sender
needs to re-handshake
+ // to notify its configurations.
// Note that the sender needs not to reconstruct its client because the
client
// is directly linked to the preceding DataNode and has not broken.
private boolean needHandshake(PipeRequestType type) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index d8fe871ebef..caa0afcb75b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -981,11 +981,15 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
}
@Override
- public TPipeConfigTransferResp
handleTransferConfigPlan(TPipeConfigTransferReq req)
- throws TException {
+ public TPipeConfigTransferResp
handleTransferConfigPlan(TPipeConfigTransferReq req) {
return configManager.handleTransferConfigPlan(req);
}
+ @Override
+ public TSStatus handlePipeConfigClientExit(String clientId) {
+ return configManager.handleClientExit(clientId);
+ }
+
@Override
public TSStatus createTopic(TCreateTopicReq req) {
return configManager.createTopic(req);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 94f288a0f6d..7c1ccf01ec2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferPlanNodeReq;
@@ -89,6 +90,8 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -104,6 +107,14 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
private final PipeStatementExceptionVisitor exceptionVisitor =
new PipeStatementExceptionVisitor();
+ // Used for data transfer: confignode (cluster A) -> datanode (cluster B) ->
confignode (cluster
+ // B).
+ // If connection from confignode (cluster A) to datanode (cluster B) is
lost, the receiver in
+ // confignode (cluster B) needs to handle the thread exit using
configReceiverId generated by
+ // datanode (cluster B).
+ private static final AtomicLong CONFIG_RECEIVER_ID_GENERATOR = new
AtomicLong(0);
+ protected final AtomicReference<String> configReceiverId = new
AtomicReference<>();
+
static {
try {
folderManager =
@@ -305,7 +316,21 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
}
private TPipeTransferResp handleTransferConfigPlan(TPipeTransferReq req) {
- return
ClusterConfigTaskExecutor.getInstance().handleTransferConfigPlan(req);
+ return ClusterConfigTaskExecutor.getInstance()
+ .handleTransferConfigPlan(getConfigReceiverId(), req);
+ }
+
+ /** Used to identify the sender client */
+ private String getConfigReceiverId() {
+ if (Objects.isNull(configReceiverId.get())) {
+ configReceiverId.set(
+ IoTDBDescriptor.getInstance().getConfig().getDataNodeId()
+ + "_"
+ + PipeAgent.runtime().getRebootTimes()
+ + "_"
+ + CONFIG_RECEIVER_ID_GENERATOR.incrementAndGet());
+ }
+ return configReceiverId.get();
}
private TSStatus executeStatementAndClassifyExceptions(Statement statement) {
@@ -351,4 +376,17 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
return result.status;
}
+
+ @Override
+ public synchronized void handleExit() {
+ if (Objects.nonNull(configReceiverId.get())) {
+ try {
+
ClusterConfigTaskExecutor.getInstance().handlePipeConfigClientExit(configReceiverId.get());
+ } catch (Exception e) {
+ LOGGER.warn("Failed to handle config client (id = {}) exit",
configReceiverId.get(), e);
+ }
+ }
+
+ super.handleExit();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java
index d2dfb1309e6..932123a4260 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java
@@ -20,13 +20,31 @@
package org.apache.iotdb.db.pipe.receiver.protocol.thrift;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiver;
import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiverAgent;
public class IoTDBDataNodeReceiverAgent extends IoTDBReceiverAgent {
+ private final ThreadLocal<IoTDBReceiver> receiverThreadLocal = new
ThreadLocal<>();
+
@Override
protected void initConstructors() {
RECEIVER_CONSTRUCTORS.put(
IoTDBConnectorRequestVersion.VERSION_1.getVersion(),
IoTDBDataNodeReceiver::new);
}
+
+ @Override
+ protected IoTDBReceiver getReceiverWithSpecifiedClient(final String ignore) {
+ return receiverThreadLocal.get();
+ }
+
+ @Override
+ protected void setReceiverWithSpecifiedClient(final String ignore, final
IoTDBReceiver receiver) {
+ receiverThreadLocal.set(receiver);
+ }
+
+ @Override
+ protected void removeReceiverWithSpecifiedClient(final String ignore) {
+ receiverThreadLocal.remove();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index ee740c64c59..f62fda39de3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -976,6 +976,13 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
() -> client.handleTransferConfigPlan(req), resp ->
!updateConfigNodeLeader(resp.status));
}
+ @Override
+ public TSStatus handlePipeConfigClientExit(String clientId) throws
TException {
+ return executeRemoteCallWithRetry(
+ () -> client.handlePipeConfigClientExit(clientId),
+ status -> !updateConfigNodeLeader(status));
+ }
+
@Override
public TGetRegionIdResp getRegionId(TGetRegionIdReq req) throws TException {
return executeRemoteCallWithRetry(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index e844a547093..5bd53a23403 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -2647,10 +2647,14 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
}
@Override
- public TPipeTransferResp handleTransferConfigPlan(TPipeTransferReq req) {
+ public TPipeTransferResp handleTransferConfigPlan(String clientId,
TPipeTransferReq req) {
final TPipeConfigTransferReq configTransferReq =
new TPipeConfigTransferReq(
- req.version, req.type, req.body, req instanceof
AirGapPseudoTPipeTransferRequest);
+ req.version,
+ req.type,
+ req.body,
+ req instanceof AirGapPseudoTPipeTransferRequest,
+ clientId);
try (final ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
@@ -2668,4 +2672,17 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
.setMessage(e.toString()));
}
}
+
+ @Override
+ public void handlePipeConfigClientExit(String clientId) {
+ try (final ConfigNodeClient configNodeClient =
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ final TSStatus status =
configNodeClient.handlePipeConfigClientExit(clientId);
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != status.getCode()) {
+ LOGGER.warn("Failed to handlePipeConfigClientExit, status is {}.",
status);
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Failed to handlePipeConfigClientExit.", e);
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
index e39602a941d..df24fec47da 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -236,5 +236,7 @@ public interface IConfigTaskExecutor {
TThrottleQuotaResp getThrottleQuota();
- TPipeTransferResp handleTransferConfigPlan(TPipeTransferReq req);
+ TPipeTransferResp handleTransferConfigPlan(String clientId, TPipeTransferReq
req);
+
+ void handlePipeConfigClientExit(String clientId);
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBReceiverAgent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBReceiverAgent.java
index 120e76277a7..97e582f2407 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBReceiverAgent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBReceiverAgent.java
@@ -38,8 +38,6 @@ public abstract class IoTDBReceiverAgent {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBReceiverAgent.class);
- protected ThreadLocal<IoTDBReceiver> receiverThreadLocal = new
ThreadLocal<>();
-
protected static final Map<Byte, Supplier<IoTDBReceiver>>
RECEIVER_CONSTRUCTORS = new HashMap<>();
protected abstract void initConstructors();
@@ -49,9 +47,13 @@ public abstract class IoTDBReceiverAgent {
}
public final TPipeTransferResp receive(final TPipeTransferReq req) {
+ return receive(null, req);
+ }
+
+ public final TPipeTransferResp receive(final String key, final
TPipeTransferReq req) {
final byte reqVersion = req.getVersion();
if (RECEIVER_CONSTRUCTORS.containsKey(reqVersion)) {
- return getReceiver(reqVersion).receive(req);
+ return getReceiver(key, reqVersion).receive(req);
} else {
return new TPipeTransferResp(
RpcUtils.getStatus(
@@ -60,41 +62,53 @@ public abstract class IoTDBReceiverAgent {
}
}
- protected final IoTDBReceiver getReceiver(final byte reqVersion) {
- if (receiverThreadLocal.get() == null) {
- return setAndGetReceiver(reqVersion);
+ protected final IoTDBReceiver getReceiver(final String key, final byte
reqVersion) {
+ if (getReceiverWithSpecifiedClient(key) == null) {
+ return setAndGetReceiver(key, reqVersion);
}
- final byte receiverThreadLocalVersion =
receiverThreadLocal.get().getVersion().getVersion();
+ final byte receiverThreadLocalVersion =
+ getReceiverWithSpecifiedClient(key).getVersion().getVersion();
if (receiverThreadLocalVersion != reqVersion) {
LOGGER.warn(
"The receiver version {} is different from the sender version {},"
+ " the receiver will be reset to the sender version.",
receiverThreadLocalVersion,
reqVersion);
- receiverThreadLocal.get().handleExit();
- receiverThreadLocal.remove();
- return setAndGetReceiver(reqVersion);
+ getReceiverWithSpecifiedClient(key).handleExit();
+ removeReceiverWithSpecifiedClient(key);
+ return setAndGetReceiver(key, reqVersion);
}
- return receiverThreadLocal.get();
+ return getReceiverWithSpecifiedClient(key);
}
- private IoTDBReceiver setAndGetReceiver(final byte reqVersion) {
+ private IoTDBReceiver setAndGetReceiver(final String key, final byte
reqVersion) {
if (RECEIVER_CONSTRUCTORS.containsKey(reqVersion)) {
- receiverThreadLocal.set(RECEIVER_CONSTRUCTORS.get(reqVersion).get());
+ setReceiverWithSpecifiedClient(key,
RECEIVER_CONSTRUCTORS.get(reqVersion).get());
} else {
throw new UnsupportedOperationException(
String.format("Unsupported pipe version %d", reqVersion));
}
- return receiverThreadLocal.get();
+ return getReceiverWithSpecifiedClient(key);
}
+ protected abstract IoTDBReceiver getReceiverWithSpecifiedClient(final String
key);
+
+ protected abstract void setReceiverWithSpecifiedClient(
+ final String key, final IoTDBReceiver receiver);
+
+ protected abstract void removeReceiverWithSpecifiedClient(final String key);
+
public final void handleClientExit() {
- final IoTDBReceiver receiver = receiverThreadLocal.get();
+ handleClientExit(null);
+ }
+
+ public final void handleClientExit(String key) {
+ final IoTDBReceiver receiver = getReceiverWithSpecifiedClient(key);
if (receiver != null) {
receiver.handleExit();
- receiverThreadLocal.remove();
+ removeReceiverWithSpecifiedClient(key);
}
}
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index ec47654f1ad..28978144ae8 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -730,6 +730,7 @@ struct TPipeConfigTransferReq {
2: required i16 type
3: required binary body
4: required bool isAirGap
+ 5: required string clientId
}
struct TPipeConfigTransferResp {
@@ -1450,12 +1451,15 @@ service IConfigNodeRPCService {
/** Show Pipe by name, if name is empty, show all Pipe */
TShowPipeResp showPipe(TShowPipeReq req)
- /** Get all pipe information. It is used for DataNode registration and
restart*/
+ /** Get all pipe information. It is used for DataNode registration and
restart */
TGetAllPipeInfoResp getAllPipeInfo()
- /** Execute schema language from external pipes */
+ /** Execute schema language from external pipes */
TPipeConfigTransferResp handleTransferConfigPlan(TPipeConfigTransferReq req)
+ /** Handle client exit for ConfigNode receiver */
+ common.TSStatus handlePipeConfigClientExit(string clientId)
+
// ======================================================
// Subscription Topic
// ======================================================