This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 80ff23ff5d7 Pipe: Fixed the bug that config receivers may not be 
one-to-one match with the sender clients (#12280)
80ff23ff5d7 is described below

commit 80ff23ff5d72566cab26e6a1839587e5b57447fe
Author: Caideyipi <[email protected]>
AuthorDate: Wed Apr 10 20:15:42 2024 +0800

    Pipe: Fixed the bug that config receivers may not be one-to-one match with 
the sender clients (#12280)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../pipe/it/autocreate/IoTDBPipeClusterIT.java     |  8 ++--
 .../statemachine/ConfigRegionStateMachine.java     |  3 ++
 .../iotdb/confignode/manager/ConfigManager.java    | 11 ++++++
 .../apache/iotdb/confignode/manager/IManager.java  |  7 ++++
 .../receiver/IoTDBConfigNodeReceiverAgent.java     | 21 ++++++++++
 .../receiver/protocol/IoTDBConfigNodeReceiver.java |  5 ++-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  8 +++-
 .../protocol/thrift/IoTDBDataNodeReceiver.java     | 40 ++++++++++++++++++-
 .../thrift/IoTDBDataNodeReceiverAgent.java         | 18 +++++++++
 .../iotdb/db/protocol/client/ConfigNodeClient.java |  7 ++++
 .../config/executor/ClusterConfigTaskExecutor.java | 21 +++++++++-
 .../config/executor/IConfigTaskExecutor.java       |  4 +-
 .../commons/pipe/receiver/IoTDBReceiverAgent.java  | 46 ++++++++++++++--------
 .../src/main/thrift/confignode.thrift              |  8 +++-
 14 files changed, 177 insertions(+), 30 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
index 9113e3669ba..37e860906ad 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
@@ -725,9 +725,9 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeDualAutoIT {
       final Thread t =
           new Thread(
               () -> {
-                try (SyncConfigNodeIServiceClient client =
+                try (final SyncConfigNodeIServiceClient client =
                     (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
-                  TSStatus status =
+                  final TSStatus status =
                       client.createPipe(
                           new TCreatePipeReq("p1", connectorAttributes)
                               .setExtractorAttributes(extractorAttributes)
@@ -759,9 +759,9 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeDualAutoIT {
       final Thread t =
           new Thread(
               () -> {
-                try (SyncConfigNodeIServiceClient client =
+                try (final SyncConfigNodeIServiceClient client =
                     (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
-                  TSStatus status = client.dropPipe("p1");
+                  final TSStatus status = client.dropPipe("p1");
                   if (status.getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                     successCount.incrementAndGet();
                   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index 039624e49ce..b0d48cb6544 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -265,6 +265,9 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
       // Shutdown leader related service for config pipe
       PipeConfigNodeAgent.runtime().notifyLeaderUnavailable();
 
+      // Clean receiver file dir
+      PipeConfigNodeAgent.receiver().cleanPipeReceiverDir();
+
       LOGGER.info(
           "Current node [nodeId:{}, ip:port: {}] is not longer the leader, "
               + "all services on old leader are unavailable now.",
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 3c7ae9e7a7c..e25d26d7e8a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -2009,6 +2009,7 @@ public class ConfigManager implements IManager {
     TPipeTransferResp result =
         PipeConfigNodeAgent.receiver()
             .receive(
+                req.getClientId(),
                 req.isAirGap
                     ? new AirGapPseudoTPipeTransferRequest()
                         .setVersion(req.version)
@@ -2018,6 +2019,16 @@ public class ConfigManager implements IManager {
     return new TPipeConfigTransferResp(result.status).setBody(result.body);
   }
 
+  @Override
+  public TSStatus handleClientExit(String clientId) {
+    TSStatus status = confirmLeader();
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      return status;
+    }
+    PipeConfigNodeAgent.receiver().handleClientExit(clientId);
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+  }
+
   @Override
   public TGetRegionIdResp getRegionId(TGetRegionIdReq req) {
     TSStatus status = confirmLeader();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 05288fc46a0..1529f436c4b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -639,6 +639,13 @@ public interface IManager {
    */
   TPipeConfigTransferResp handleTransferConfigPlan(TPipeConfigTransferReq req);
 
+  /**
+   * Execute the config req received from pipe.
+   *
+   * @return The result of handling.
+   */
+  TSStatus handleClientExit(String clientId);
+
   /** Create Topic. */
   TSStatus createTopic(TCreateTopicReq topic);
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/receiver/IoTDBConfigNodeReceiverAgent.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/receiver/IoTDBConfigNodeReceiverAgent.java
index 2b5413172a7..3b7b5557dca 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/receiver/IoTDBConfigNodeReceiverAgent.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/receiver/IoTDBConfigNodeReceiverAgent.java
@@ -20,20 +20,41 @@
 package org.apache.iotdb.confignode.manager.pipe.agent.receiver;
 
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiver;
 import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiverAgent;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import 
org.apache.iotdb.confignode.manager.pipe.receiver.protocol.IoTDBConfigNodeReceiver;
 
 import java.io.File;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 public class IoTDBConfigNodeReceiverAgent extends IoTDBReceiverAgent {
 
+  private final ConcurrentMap<String, IoTDBReceiver> clientKey2ReceiverMap =
+      new ConcurrentHashMap<>();
+
   @Override
   protected void initConstructors() {
     RECEIVER_CONSTRUCTORS.put(
         IoTDBConnectorRequestVersion.VERSION_1.getVersion(), 
IoTDBConfigNodeReceiver::new);
   }
 
+  @Override
+  protected IoTDBReceiver getReceiverWithSpecifiedClient(String key) {
+    return clientKey2ReceiverMap.get(key);
+  }
+
+  @Override
+  protected void setReceiverWithSpecifiedClient(String key, IoTDBReceiver 
receiver) {
+    clientKey2ReceiverMap.put(key, receiver);
+  }
+
+  @Override
+  protected void removeReceiverWithSpecifiedClient(String key) {
+    clientKey2ReceiverMap.remove(key);
+  }
+
   public void cleanPipeReceiverDir() {
     cleanPipeReceiverDir(
         new 
File(ConfigNodeDescriptor.getInstance().getConf().getPipeReceiverFileDir()));
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
index 6f2fb7966db..88fa0e2411d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java
@@ -149,8 +149,9 @@ public class IoTDBConfigNodeReceiver extends 
IoTDBFileReceiver {
     }
   }
 
-  // This indicates that the client from DataNode to ConfigNode is newly 
created and
-  // thus the sender needs to re-handshake to notify its configurations.
+  // This indicates that the client from DataNode to ConfigNode is newly 
created,
+  // mainly because the receiver has changed its leader, and thus the sender 
needs to re-handshake
+  // to notify its configurations.
   // Note that the sender needs not to reconstruct its client because the 
client
   // is directly linked to the preceding DataNode and has not broken.
   private boolean needHandshake(PipeRequestType type) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index d8fe871ebef..caa0afcb75b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -981,11 +981,15 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
   }
 
   @Override
-  public TPipeConfigTransferResp 
handleTransferConfigPlan(TPipeConfigTransferReq req)
-      throws TException {
+  public TPipeConfigTransferResp 
handleTransferConfigPlan(TPipeConfigTransferReq req) {
     return configManager.handleTransferConfigPlan(req);
   }
 
+  @Override
+  public TSStatus handlePipeConfigClientExit(String clientId) {
+    return configManager.handleClientExit(clientId);
+  }
+
   @Override
   public TSStatus createTopic(TCreateTopicReq req) {
     return configManager.createTopic(req);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 94f288a0f6d..7c1ccf01ec2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferPlanNodeReq;
@@ -89,6 +90,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -104,6 +107,14 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
   private final PipeStatementExceptionVisitor exceptionVisitor =
       new PipeStatementExceptionVisitor();
 
+  // Used for data transfer: confignode (cluster A) -> datanode (cluster B) -> 
confignode (cluster
+  // B).
+  // If connection from confignode (cluster A) to datanode (cluster B) is 
lost, the receiver in
+  // confignode (cluster B) needs to handle the thread exit using 
configReceiverId generated by
+  // datanode (cluster B).
+  private static final AtomicLong CONFIG_RECEIVER_ID_GENERATOR = new 
AtomicLong(0);
+  protected final AtomicReference<String> configReceiverId = new 
AtomicReference<>();
+
   static {
     try {
       folderManager =
@@ -305,7 +316,21 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
   }
 
   private TPipeTransferResp handleTransferConfigPlan(TPipeTransferReq req) {
-    return 
ClusterConfigTaskExecutor.getInstance().handleTransferConfigPlan(req);
+    return ClusterConfigTaskExecutor.getInstance()
+        .handleTransferConfigPlan(getConfigReceiverId(), req);
+  }
+
+  /** Used to identify the sender client */
+  private String getConfigReceiverId() {
+    if (Objects.isNull(configReceiverId.get())) {
+      configReceiverId.set(
+          IoTDBDescriptor.getInstance().getConfig().getDataNodeId()
+              + "_"
+              + PipeAgent.runtime().getRebootTimes()
+              + "_"
+              + CONFIG_RECEIVER_ID_GENERATOR.incrementAndGet());
+    }
+    return configReceiverId.get();
   }
 
   private TSStatus executeStatementAndClassifyExceptions(Statement statement) {
@@ -351,4 +376,17 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
                 
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
     return result.status;
   }
+
+  @Override
+  public synchronized void handleExit() {
+    if (Objects.nonNull(configReceiverId.get())) {
+      try {
+        
ClusterConfigTaskExecutor.getInstance().handlePipeConfigClientExit(configReceiverId.get());
+      } catch (Exception e) {
+        LOGGER.warn("Failed to handle config client (id = {}) exit", 
configReceiverId.get(), e);
+      }
+    }
+
+    super.handleExit();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java
index d2dfb1309e6..932123a4260 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverAgent.java
@@ -20,13 +20,31 @@
 package org.apache.iotdb.db.pipe.receiver.protocol.thrift;
 
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
+import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiver;
 import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiverAgent;
 
 public class IoTDBDataNodeReceiverAgent extends IoTDBReceiverAgent {
 
+  private final ThreadLocal<IoTDBReceiver> receiverThreadLocal = new 
ThreadLocal<>();
+
   @Override
   protected void initConstructors() {
     RECEIVER_CONSTRUCTORS.put(
         IoTDBConnectorRequestVersion.VERSION_1.getVersion(), 
IoTDBDataNodeReceiver::new);
   }
+
+  @Override
+  protected IoTDBReceiver getReceiverWithSpecifiedClient(final String ignore) {
+    return receiverThreadLocal.get();
+  }
+
+  @Override
+  protected void setReceiverWithSpecifiedClient(final String ignore, final 
IoTDBReceiver receiver) {
+    receiverThreadLocal.set(receiver);
+  }
+
+  @Override
+  protected void removeReceiverWithSpecifiedClient(final String ignore) {
+    receiverThreadLocal.remove();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index ee740c64c59..f62fda39de3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -976,6 +976,13 @@ public class ConfigNodeClient implements 
IConfigNodeRPCService.Iface, ThriftClie
         () -> client.handleTransferConfigPlan(req), resp -> 
!updateConfigNodeLeader(resp.status));
   }
 
+  @Override
+  public TSStatus handlePipeConfigClientExit(String clientId) throws 
TException {
+    return executeRemoteCallWithRetry(
+        () -> client.handlePipeConfigClientExit(clientId),
+        status -> !updateConfigNodeLeader(status));
+  }
+
   @Override
   public TGetRegionIdResp getRegionId(TGetRegionIdReq req) throws TException {
     return executeRemoteCallWithRetry(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index e844a547093..5bd53a23403 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -2647,10 +2647,14 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
   }
 
   @Override
-  public TPipeTransferResp handleTransferConfigPlan(TPipeTransferReq req) {
+  public TPipeTransferResp handleTransferConfigPlan(String clientId, 
TPipeTransferReq req) {
     final TPipeConfigTransferReq configTransferReq =
         new TPipeConfigTransferReq(
-            req.version, req.type, req.body, req instanceof 
AirGapPseudoTPipeTransferRequest);
+            req.version,
+            req.type,
+            req.body,
+            req instanceof AirGapPseudoTPipeTransferRequest,
+            clientId);
 
     try (final ConfigNodeClient configNodeClient =
         
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
@@ -2668,4 +2672,17 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
               .setMessage(e.toString()));
     }
   }
+
+  @Override
+  public void handlePipeConfigClientExit(String clientId) {
+    try (final ConfigNodeClient configNodeClient =
+        
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+      final TSStatus status = 
configNodeClient.handlePipeConfigClientExit(clientId);
+      if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != status.getCode()) {
+        LOGGER.warn("Failed to handlePipeConfigClientExit, status is {}.", 
status);
+      }
+    } catch (Exception e) {
+      LOGGER.warn("Failed to handlePipeConfigClientExit.", e);
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
index e39602a941d..df24fec47da 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -236,5 +236,7 @@ public interface IConfigTaskExecutor {
 
   TThrottleQuotaResp getThrottleQuota();
 
-  TPipeTransferResp handleTransferConfigPlan(TPipeTransferReq req);
+  TPipeTransferResp handleTransferConfigPlan(String clientId, TPipeTransferReq 
req);
+
+  void handlePipeConfigClientExit(String clientId);
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBReceiverAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBReceiverAgent.java
index 120e76277a7..97e582f2407 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBReceiverAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBReceiverAgent.java
@@ -38,8 +38,6 @@ public abstract class IoTDBReceiverAgent {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBReceiverAgent.class);
 
-  protected ThreadLocal<IoTDBReceiver> receiverThreadLocal = new 
ThreadLocal<>();
-
   protected static final Map<Byte, Supplier<IoTDBReceiver>> 
RECEIVER_CONSTRUCTORS = new HashMap<>();
 
   protected abstract void initConstructors();
@@ -49,9 +47,13 @@ public abstract class IoTDBReceiverAgent {
   }
 
   public final TPipeTransferResp receive(final TPipeTransferReq req) {
+    return receive(null, req);
+  }
+
+  public final TPipeTransferResp receive(final String key, final 
TPipeTransferReq req) {
     final byte reqVersion = req.getVersion();
     if (RECEIVER_CONSTRUCTORS.containsKey(reqVersion)) {
-      return getReceiver(reqVersion).receive(req);
+      return getReceiver(key, reqVersion).receive(req);
     } else {
       return new TPipeTransferResp(
           RpcUtils.getStatus(
@@ -60,41 +62,53 @@ public abstract class IoTDBReceiverAgent {
     }
   }
 
-  protected final IoTDBReceiver getReceiver(final byte reqVersion) {
-    if (receiverThreadLocal.get() == null) {
-      return setAndGetReceiver(reqVersion);
+  protected final IoTDBReceiver getReceiver(final String key, final byte 
reqVersion) {
+    if (getReceiverWithSpecifiedClient(key) == null) {
+      return setAndGetReceiver(key, reqVersion);
     }
 
-    final byte receiverThreadLocalVersion = 
receiverThreadLocal.get().getVersion().getVersion();
+    final byte receiverThreadLocalVersion =
+        getReceiverWithSpecifiedClient(key).getVersion().getVersion();
     if (receiverThreadLocalVersion != reqVersion) {
       LOGGER.warn(
           "The receiver version {} is different from the sender version {},"
               + " the receiver will be reset to the sender version.",
           receiverThreadLocalVersion,
           reqVersion);
-      receiverThreadLocal.get().handleExit();
-      receiverThreadLocal.remove();
-      return setAndGetReceiver(reqVersion);
+      getReceiverWithSpecifiedClient(key).handleExit();
+      removeReceiverWithSpecifiedClient(key);
+      return setAndGetReceiver(key, reqVersion);
     }
 
-    return receiverThreadLocal.get();
+    return getReceiverWithSpecifiedClient(key);
   }
 
-  private IoTDBReceiver setAndGetReceiver(final byte reqVersion) {
+  private IoTDBReceiver setAndGetReceiver(final String key, final byte 
reqVersion) {
     if (RECEIVER_CONSTRUCTORS.containsKey(reqVersion)) {
-      receiverThreadLocal.set(RECEIVER_CONSTRUCTORS.get(reqVersion).get());
+      setReceiverWithSpecifiedClient(key, 
RECEIVER_CONSTRUCTORS.get(reqVersion).get());
     } else {
       throw new UnsupportedOperationException(
           String.format("Unsupported pipe version %d", reqVersion));
     }
-    return receiverThreadLocal.get();
+    return getReceiverWithSpecifiedClient(key);
   }
 
+  protected abstract IoTDBReceiver getReceiverWithSpecifiedClient(final String 
key);
+
+  protected abstract void setReceiverWithSpecifiedClient(
+      final String key, final IoTDBReceiver receiver);
+
+  protected abstract void removeReceiverWithSpecifiedClient(final String key);
+
   public final void handleClientExit() {
-    final IoTDBReceiver receiver = receiverThreadLocal.get();
+    handleClientExit(null);
+  }
+
+  public final void handleClientExit(String key) {
+    final IoTDBReceiver receiver = getReceiverWithSpecifiedClient(key);
     if (receiver != null) {
       receiver.handleExit();
-      receiverThreadLocal.remove();
+      removeReceiverWithSpecifiedClient(key);
     }
   }
 
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift 
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index ec47654f1ad..28978144ae8 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -730,6 +730,7 @@ struct TPipeConfigTransferReq {
   2: required i16 type
   3: required binary body
   4: required bool isAirGap
+  5: required string clientId
 }
 
 struct TPipeConfigTransferResp {
@@ -1450,12 +1451,15 @@ service IConfigNodeRPCService {
   /** Show Pipe by name, if name is empty, show all Pipe */
   TShowPipeResp showPipe(TShowPipeReq req)
 
-  /** Get all pipe information. It is used for DataNode registration and 
restart*/
+  /** Get all pipe information. It is used for DataNode registration and 
restart */
   TGetAllPipeInfoResp getAllPipeInfo()
 
- /** Execute schema language from external pipes */
+  /** Execute schema language from external pipes */
   TPipeConfigTransferResp handleTransferConfigPlan(TPipeConfigTransferReq req)
 
+  /** Handle client exit for ConfigNode receiver */
+  common.TSStatus handlePipeConfigClientExit(string clientId)
+
   // ======================================================
   // Subscription Topic
   // ======================================================

Reply via email to