This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 38b36006b2a [IOTDB-6117] Pipe: Optimize RPC requests from CN to DN. CN 
send exactly one pipeMeta to each DN upon create/start/stop/drop pipe (#10875)
38b36006b2a is described below

commit 38b36006b2ae283451d5e2f74324a6fcc5b737d6
Author: 马子坤 <[email protected]>
AuthorDate: Fri Aug 18 13:58:19 2023 +0800

    [IOTDB-6117] Pipe: Optimize RPC requests from CN to DN. CN send exactly one 
pipeMeta to each DN upon create/start/stop/drop pipe (#10875)
    
    Currently, CN sends pipeMeta of all existing pipes to each dn upon 
create/start/stop/drop pipe, which may be time-comsuming.
    
    In this commit, CN will send exactly one pipeMeta to each DN upon 
create/start/stop/drop pipe.
    
    ---------
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../confignode/client/DataNodeRequestType.java     |   3 +-
 .../client/async/AsyncDataNodeClientPool.java      |   9 +-
 .../client/async/handlers/AsyncClientHandler.java  |   3 +-
 .../confignode/persistence/pipe/PipeTaskInfo.java  |   9 ++
 .../procedure/env/ConfigNodeProcedureEnv.java      |  31 ++++-
 .../impl/pipe/AbstractOperatePipeProcedureV2.java  |  31 ++++-
 .../impl/pipe/task/CreatePipeProcedureV2.java      |   9 +-
 .../impl/pipe/task/DropPipeProcedureV2.java        |   5 +-
 .../impl/pipe/task/StartPipeProcedureV2.java       |   3 +-
 .../impl/pipe/task/StopPipeProcedureV2.java        |   3 +-
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    | 131 +++++++++++++++------
 .../impl/DataNodeInternalRPCServiceImpl.java       |  27 +++++
 .../commons/pipe/task/meta/PipeMetaKeeper.java     |   4 +
 .../src/main/thrift/datanode.thrift                |  10 ++
 14 files changed, 226 insertions(+), 52 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
index 8fc3ffcbe37..6f5743cf826 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -67,7 +67,8 @@ public enum DataNodeRequestType {
   DROP_PIPE_PLUGIN,
 
   // Pipe Task
-  PUSH_PIPE_META,
+  PIPE_PUSH_ALL_META,
+  PIPE_PUSH_SINGLE_META,
   PIPE_HEARTBEAT,
 
   // CQ
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
index 8ccba5457ac..7e8e52e6ff7 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
@@ -63,6 +63,7 @@ import 
org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInvalidateMatchedSchemaCacheReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaReq;
+import org.apache.iotdb.mpp.rpc.thrift.TPushSinglePipeMetaReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListReq;
@@ -227,12 +228,18 @@ public class AsyncDataNodeClientPool {
               (AsyncTSStatusRPCHandler)
                   clientHandler.createAsyncRPCHandler(requestId, 
targetDataNode));
           break;
-        case PUSH_PIPE_META:
+        case PIPE_PUSH_ALL_META:
           client.pushPipeMeta(
               (TPushPipeMetaReq) clientHandler.getRequest(requestId),
               (PipePushMetaRPCHandler)
                   clientHandler.createAsyncRPCHandler(requestId, 
targetDataNode));
           break;
+        case PIPE_PUSH_SINGLE_META:
+          client.pushSinglePipeMeta(
+              (TPushSinglePipeMetaReq) clientHandler.getRequest(requestId),
+              (PipePushMetaRPCHandler)
+                  clientHandler.createAsyncRPCHandler(requestId, 
targetDataNode));
+          break;
         case PIPE_HEARTBEAT:
           client.pipeHeartbeat(
               (TPipeHeartbeatReq) clientHandler.getRequest(requestId),
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
index 21690788051..7796252199c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
@@ -210,7 +210,8 @@ public class AsyncClientHandler<Q, R> {
             dataNodeLocationMap,
             (Map<Integer, TPipeHeartbeatResp>) responseMap,
             countDownLatch);
-      case PUSH_PIPE_META:
+      case PIPE_PUSH_ALL_META:
+      case PIPE_PUSH_SINGLE_META:
         return new PipePushMetaRPCHandler(
             requestType,
             requestId,
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 28ade7c5f5d..7ffeeed3d77 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -273,6 +273,15 @@ public class PipeTaskInfo implements SnapshotProcessor {
     }
   }
 
+  public PipeMeta getPipeMetaByPipeName(String pipeName) {
+    acquireReadLock();
+    try {
+      return pipeMetaKeeper.getPipeMetaByPipeName(pipeName);
+    } finally {
+      releaseReadLock();
+    }
+  }
+
   public boolean isEmpty() {
     acquireReadLock();
     try {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index ee5fcccd681..26a94750e62 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -69,6 +69,7 @@ import 
org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
+import org.apache.iotdb.mpp.rpc.thrift.TPushSinglePipeMetaReq;
 import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.Binary;
@@ -660,14 +661,40 @@ public class ConfigNodeProcedureEnv {
     return clientHandler.getResponseList();
   }
 
-  public Map<Integer, TPushPipeMetaResp> pushPipeMetaToDataNodes(
+  public Map<Integer, TPushPipeMetaResp> pushAllPipeMetaToDataNodes(
       List<ByteBuffer> pipeMetaBinaryList) {
     final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
         configManager.getNodeManager().getRegisteredDataNodeLocations();
     final TPushPipeMetaReq request = new 
TPushPipeMetaReq().setPipeMetas(pipeMetaBinaryList);
 
     final AsyncClientHandler<TPushPipeMetaReq, TPushPipeMetaResp> 
clientHandler =
-        new AsyncClientHandler<>(DataNodeRequestType.PUSH_PIPE_META, request, 
dataNodeLocationMap);
+        new AsyncClientHandler<>(
+            DataNodeRequestType.PIPE_PUSH_ALL_META, request, 
dataNodeLocationMap);
+    
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    return clientHandler.getResponseMap();
+  }
+
+  public Map<Integer, TPushPipeMetaResp> 
pushSinglePipeMetaToDataNodes(ByteBuffer pipeMetaBinary) {
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        configManager.getNodeManager().getRegisteredDataNodeLocations();
+    final TPushSinglePipeMetaReq request = new 
TPushSinglePipeMetaReq().setPipeMeta(pipeMetaBinary);
+
+    final AsyncClientHandler<TPushSinglePipeMetaReq, TPushPipeMetaResp> 
clientHandler =
+        new AsyncClientHandler<>(
+            DataNodeRequestType.PIPE_PUSH_SINGLE_META, request, 
dataNodeLocationMap);
+    
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    return clientHandler.getResponseMap();
+  }
+
+  public Map<Integer, TPushPipeMetaResp> dropSinglePipeOnDataNodes(String 
pipeNameToDrop) {
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        configManager.getNodeManager().getRegisteredDataNodeLocations();
+    final TPushSinglePipeMetaReq request =
+        new TPushSinglePipeMetaReq().setPipeNameToDrop(pipeNameToDrop);
+
+    final AsyncClientHandler<TPushSinglePipeMetaReq, TPushPipeMetaResp> 
clientHandler =
+        new AsyncClientHandler<>(
+            DataNodeRequestType.PIPE_PUSH_SINGLE_META, request, 
dataNodeLocationMap);
     
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
     return clientHandler.getResponseMap();
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index c0ef6d4bfc4..892859fd57e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -255,8 +255,7 @@ public abstract class AbstractOperatePipeProcedureV2
   }
 
   /**
-   * Pushing all the pipeMeta's to all the dataNodes, forcing an update to the 
the pipe's runtime
-   * state.
+   * Pushing all the pipeMeta's to all the dataNodes, forcing an update to the 
pipe's runtime state.
    *
    * @param env ConfigNodeProcedureEnv
    * @return The responseMap after pushing pipe meta
@@ -269,7 +268,7 @@ public abstract class AbstractOperatePipeProcedureV2
       pipeMetaBinaryList.add(pipeMeta.serialize());
     }
 
-    return env.pushPipeMetaToDataNodes(pipeMetaBinaryList);
+    return env.pushAllPipeMetaToDataNodes(pipeMetaBinaryList);
   }
 
   /**
@@ -334,6 +333,32 @@ public abstract class AbstractOperatePipeProcedureV2
     }
   }
 
+  /**
+   * Pushing one pipeMeta to all the dataNodes, forcing an update to the 
pipe's runtime state.
+   *
+   * @param pipeName pipe name of the pipe to push
+   * @param env ConfigNodeProcedureEnv
+   * @return The responseMap after pushing pipe meta
+   * @throws IOException Exception when Serializing to byte buffer
+   */
+  protected Map<Integer, TPushPipeMetaResp> pushSinglePipeMetaToDataNodes(
+      String pipeName, ConfigNodeProcedureEnv env) throws IOException {
+    return env.pushSinglePipeMetaToDataNodes(
+        pipeTaskInfo.get().getPipeMetaByPipeName(pipeName).serialize());
+  }
+
+  /**
+   * Drop a pipe on all the dataNodes.
+   *
+   * @param pipeName pipe name of the pipe to drop
+   * @param env ConfigNodeProcedureEnv
+   * @return The responseMap after pushing pipe meta
+   */
+  protected Map<Integer, TPushPipeMetaResp> dropSinglePipeOnDataNodes(
+      String pipeName, ConfigNodeProcedureEnv env) {
+    return env.dropSinglePipeOnDataNodes(pipeName);
+  }
+
   @Override
   public void serialize(DataOutputStream stream) throws IOException {
     super.serialize(stream);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index 5aefe33c222..599637b8282 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -139,13 +139,11 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   @Override
   protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
       throws PipeException, IOException {
-    LOGGER.info(
-        "CreatePipeProcedureV2: executeFromOperateOnDataNodes({})",
-        createPipeRequest.getPipeName());
+    final String pipeName = createPipeRequest.getPipeName();
+    LOGGER.info("CreatePipeProcedureV2: executeFromOperateOnDataNodes({})", 
pipeName);
 
     String exceptionMessage =
-        parsePushPipeMetaExceptionForPipe(
-            createPipeRequest.getPipeName(), pushPipeMetaToDataNodes(env));
+        parsePushPipeMetaExceptionForPipe(pipeName, 
pushSinglePipeMetaToDataNodes(pipeName, env));
     if (!exceptionMessage.isEmpty()) {
       throw new PipeException(
           String.format(
@@ -190,6 +188,7 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
         "CreatePipeProcedureV2: rollbackFromOperateOnDataNodes({})",
         createPipeRequest.getPipeName());
 
+    // Push all pipe metas to datanode, may be time-consuming
     String exceptionMessage =
         parsePushPipeMetaExceptionForPipe(
             createPipeRequest.getPipeName(), pushPipeMetaToDataNodes(env));
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
index 510d87d2763..ff8cd3e7f3f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
@@ -81,12 +81,11 @@ public class DropPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   }
 
   @Override
-  protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
-      throws PipeException, IOException {
+  protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) 
throws PipeException {
     LOGGER.info("DropPipeProcedureV2: executeFromOperateOnDataNodes({})", 
pipeName);
 
     String exceptionMessage =
-        parsePushPipeMetaExceptionForPipe(pipeName, 
pushPipeMetaToDataNodes(env));
+        parsePushPipeMetaExceptionForPipe(pipeName, 
dropSinglePipeOnDataNodes(pipeName, env));
     if (!exceptionMessage.isEmpty()) {
       throw new PipeException(
           String.format("Failed to drop pipe %s, details: %s", pipeName, 
exceptionMessage));
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
index eae446df24b..4272def5bc1 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
@@ -89,7 +89,7 @@ public class StartPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
     LOGGER.info("StartPipeProcedureV2: executeFromOperateOnDataNodes({})", 
pipeName);
 
     String exceptionMessage =
-        parsePushPipeMetaExceptionForPipe(pipeName, 
pushPipeMetaToDataNodes(env));
+        parsePushPipeMetaExceptionForPipe(pipeName, 
pushSinglePipeMetaToDataNodes(pipeName, env));
     if (!exceptionMessage.isEmpty()) {
       throw new PipeException(
           String.format("Failed to start pipe %s, details: %s", pipeName, 
exceptionMessage));
@@ -130,6 +130,7 @@ public class StartPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
       throws PipeException, IOException {
     LOGGER.info("StartPipeProcedureV2: rollbackFromOperateOnDataNodes({})", 
pipeName);
 
+    // Push all pipe metas to datanode, may be time-consuming
     String exceptionMessage =
         parsePushPipeMetaExceptionForPipe(pipeName, 
pushPipeMetaToDataNodes(env));
     if (!exceptionMessage.isEmpty()) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
index a3a85c8364e..cf866b1d27d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
@@ -89,7 +89,7 @@ public class StopPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
     LOGGER.info("StopPipeProcedureV2: executeFromOperateOnDataNodes({})", 
pipeName);
 
     String exceptionMessage =
-        parsePushPipeMetaExceptionForPipe(pipeName, 
pushPipeMetaToDataNodes(env));
+        parsePushPipeMetaExceptionForPipe(pipeName, 
pushSinglePipeMetaToDataNodes(pipeName, env));
     if (!exceptionMessage.isEmpty()) {
       throw new PipeException(
           String.format("Failed to stop pipe %s, details: %s", pipeName, 
exceptionMessage));
@@ -126,6 +126,7 @@ public class StopPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
       throws PipeException, IOException {
     LOGGER.info("StopPipeProcedureV2: rollbackFromOperateOnDataNodes({})", 
pipeName);
 
+    // Push all pipe metas to datanode, may be time-consuming
     String exceptionMessage =
         parsePushPipeMetaExceptionForPipe(pipeName, 
pushPipeMetaToDataNodes(env));
     if (!exceptionMessage.isEmpty()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index da21b757444..e9ecb26d402 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -117,6 +117,65 @@ public class PipeTaskAgent {
 
   ////////////////////////// Pipe Task Management Entry 
//////////////////////////
 
+  public synchronized TPushPipeMetaRespExceptionMessage 
handleSinglePipeMetaChanges(
+      PipeMeta pipeMetaFromConfigNode) {
+    acquireWriteLock();
+    try {
+      return handleSinglePipeMetaChangesInternal(pipeMetaFromConfigNode);
+    } finally {
+      releaseWriteLock();
+    }
+  }
+
+  private TPushPipeMetaRespExceptionMessage 
handleSinglePipeMetaChangesInternal(
+      PipeMeta pipeMetaFromConfigNode) {
+    // Do nothing if data node is removing or removed
+    if (PipeAgent.runtime().isShutdown()) {
+      return null;
+    }
+
+    try {
+      executeSinglePipeMetaChanges(pipeMetaFromConfigNode);
+      return null;
+    } catch (Exception e) {
+      final String pipeName = 
pipeMetaFromConfigNode.getStaticMeta().getPipeName();
+      final String errorMessage =
+          String.format(
+              "Failed to handle single pipe meta changes for %s, because %s",
+              pipeName, e.getMessage());
+      LOGGER.warn("Failed to handle single pipe meta changes for {}", 
pipeName, e);
+      return new TPushPipeMetaRespExceptionMessage(
+          pipeName, errorMessage, System.currentTimeMillis());
+    }
+  }
+
+  public synchronized TPushPipeMetaRespExceptionMessage handleDropPipe(String 
pipeName) {
+    acquireWriteLock();
+    try {
+      return handleDropPipeInternal(pipeName);
+    } finally {
+      releaseWriteLock();
+    }
+  }
+
+  private TPushPipeMetaRespExceptionMessage handleDropPipeInternal(String 
pipeName) {
+    // Do nothing if data node is removing or removed
+    if (PipeAgent.runtime().isShutdown()) {
+      return null;
+    }
+
+    try {
+      dropPipe(pipeName);
+      return null;
+    } catch (Exception e) {
+      final String errorMessage =
+          String.format("Failed to drop pipe %s, because %s", pipeName, 
e.getMessage());
+      LOGGER.warn("Failed to drop pipe {}", pipeName, e);
+      return new TPushPipeMetaRespExceptionMessage(
+          pipeName, errorMessage, System.currentTimeMillis());
+    }
+  }
+
   public synchronized List<TPushPipeMetaRespExceptionMessage> 
handlePipeMetaChanges(
       List<PipeMeta> pipeMetaListFromConfigNode) {
     acquireWriteLock();
@@ -139,41 +198,10 @@ public class PipeTaskAgent {
     // Iterate through pipe meta list from config node, check if pipe meta 
exists on data node
     // or has changed
     for (final PipeMeta metaFromConfigNode : pipeMetaListFromConfigNode) {
-      final String pipeName = metaFromConfigNode.getStaticMeta().getPipeName();
-
       try {
-        final PipeMeta metaOnDataNode = pipeMetaKeeper.getPipeMeta(pipeName);
-
-        // If pipe meta does not exist on data node, create a new pipe
-        if (metaOnDataNode == null) {
-          if (createPipe(metaFromConfigNode)) {
-            // If the status recorded in config node is RUNNING, start the pipe
-            startPipe(pipeName, 
metaFromConfigNode.getStaticMeta().getCreationTime());
-          }
-          // If the status recorded in config node is STOPPED or DROPPED, do 
nothing
-          continue;
-        }
-
-        // If pipe meta exists on data node, check if it has changed
-        final PipeStaticMeta staticMetaOnDataNode = 
metaOnDataNode.getStaticMeta();
-        final PipeStaticMeta staticMetaFromConfigNode = 
metaFromConfigNode.getStaticMeta();
-
-        // First check if pipe static meta has changed, if so, drop the pipe 
and create a new one
-        if (!staticMetaOnDataNode.equals(staticMetaFromConfigNode)) {
-          dropPipe(pipeName);
-          if (createPipe(metaFromConfigNode)) {
-            startPipe(pipeName, 
metaFromConfigNode.getStaticMeta().getCreationTime());
-          }
-          // If the status is STOPPED or DROPPED, do nothing
-          continue;
-        }
-
-        // Then check if pipe runtime meta has changed, if so, update the pipe
-        final PipeRuntimeMeta runtimeMetaOnDataNode = 
metaOnDataNode.getRuntimeMeta();
-        final PipeRuntimeMeta runtimeMetaFromConfigNode = 
metaFromConfigNode.getRuntimeMeta();
-        handlePipeRuntimeMetaChanges(
-            staticMetaFromConfigNode, runtimeMetaFromConfigNode, 
runtimeMetaOnDataNode);
+        executeSinglePipeMetaChanges(metaFromConfigNode);
       } catch (Exception e) {
+        final String pipeName = 
metaFromConfigNode.getStaticMeta().getPipeName();
         final String errorMessage =
             String.format(
                 "Failed to handle pipe meta changes for %s, because %s", 
pipeName, e.getMessage());
@@ -205,7 +233,42 @@ public class PipeTaskAgent {
     return exceptionMessages;
   }
 
-  private void handlePipeRuntimeMetaChanges(
+  private void executeSinglePipeMetaChanges(final PipeMeta metaFromConfigNode) 
{
+    final String pipeName = metaFromConfigNode.getStaticMeta().getPipeName();
+    final PipeMeta metaOnDataNode = pipeMetaKeeper.getPipeMeta(pipeName);
+
+    // If pipe meta does not exist on data node, create a new pipe
+    if (metaOnDataNode == null) {
+      if (createPipe(metaFromConfigNode)) {
+        // If the status recorded in config node is RUNNING, start the pipe
+        startPipe(pipeName, 
metaFromConfigNode.getStaticMeta().getCreationTime());
+      }
+      // If the status recorded in config node is STOPPED or DROPPED, do 
nothing
+      return;
+    }
+
+    // If pipe meta exists on data node, check if it has changed
+    final PipeStaticMeta staticMetaOnDataNode = metaOnDataNode.getStaticMeta();
+    final PipeStaticMeta staticMetaFromConfigNode = 
metaFromConfigNode.getStaticMeta();
+
+    // First check if pipe static meta has changed, if so, drop the pipe and 
create a new one
+    if (!staticMetaOnDataNode.equals(staticMetaFromConfigNode)) {
+      dropPipe(pipeName);
+      if (createPipe(metaFromConfigNode)) {
+        startPipe(pipeName, 
metaFromConfigNode.getStaticMeta().getCreationTime());
+      }
+      // If the status is STOPPED or DROPPED, do nothing
+      return;
+    }
+
+    // Then check if pipe runtime meta has changed, if so, update the pipe
+    final PipeRuntimeMeta runtimeMetaOnDataNode = 
metaOnDataNode.getRuntimeMeta();
+    final PipeRuntimeMeta runtimeMetaFromConfigNode = 
metaFromConfigNode.getRuntimeMeta();
+    executeSinglePipeRuntimeMetaChanges(
+        staticMetaFromConfigNode, runtimeMetaFromConfigNode, 
runtimeMetaOnDataNode);
+  }
+
+  private void executeSinglePipeRuntimeMetaChanges(
       @NotNull PipeStaticMeta pipeStaticMeta,
       @NotNull PipeRuntimeMeta runtimeMetaFromConfigNode,
       @NotNull PipeRuntimeMeta runtimeMetaOnDataNode) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index ddddd2f783d..86647205bc9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -181,6 +181,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
 import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
+import org.apache.iotdb.mpp.rpc.thrift.TPushSinglePipeMetaReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListReq;
@@ -218,6 +219,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -956,6 +958,31 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
     }
   }
 
+  @Override
+  public TPushPipeMetaResp pushSinglePipeMeta(TPushSinglePipeMetaReq req) {
+    try {
+      TPushPipeMetaRespExceptionMessage exceptionMessage;
+      if (req.isSetPipeNameToDrop()) {
+        exceptionMessage = 
PipeAgent.task().handleDropPipe(req.getPipeNameToDrop());
+      } else if (req.isSetPipeMeta()) {
+        final PipeMeta pipeMeta = 
PipeMeta.deserialize(ByteBuffer.wrap(req.getPipeMeta()));
+        exceptionMessage = 
PipeAgent.task().handleSinglePipeMetaChanges(pipeMeta);
+      } else {
+        throw new Exception("Invalid TPushSinglePipeMetaReq");
+      }
+      return exceptionMessage == null
+          ? new TPushPipeMetaResp()
+              .setStatus(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()))
+          : new TPushPipeMetaResp()
+              .setStatus(new 
TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()))
+              
.setExceptionMessages(Collections.singletonList(exceptionMessage));
+    } catch (Exception e) {
+      LOGGER.error("Error occurred when pushing single pipe meta", e);
+      return new TPushPipeMetaResp()
+          .setStatus(new 
TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()));
+    }
+  }
+
   @Override
   public TPipeHeartbeatResp pipeHeartbeat(TPipeHeartbeatReq req) throws 
TException {
     final TPipeHeartbeatResp resp = new TPipeHeartbeatResp();
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
index 85c58ecc873..347ebe78199 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaKeeper.java
@@ -85,6 +85,10 @@ public class PipeMetaKeeper {
     return pipeNameToPipeMetaMap.values();
   }
 
+  public PipeMeta getPipeMetaByPipeName(String pipeName) {
+    return pipeNameToPipeMetaMap.get(pipeName);
+  }
+
   public void clear() {
     this.pipeNameToPipeMetaMap.clear();
   }
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 99f2ed1419d..30e1e0d676f 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -406,6 +406,11 @@ struct TPushPipeMetaRespExceptionMessage {
   3: required i64 timeStamp
 }
 
+struct TPushSinglePipeMetaReq {
+  1: optional binary pipeMeta // Should not set both to null.
+  2: optional string pipeNameToDrop // If it is not null, pipe with indicated 
name on datanode will be dropped.
+}
+
 struct TConstructViewSchemaBlackListReq{
     1: required list<common.TConsensusGroupId> schemaRegionIdList
     2: required binary pathPatternTree
@@ -816,6 +821,11 @@ service IDataNodeRPCService {
   */
   TPushPipeMetaResp pushPipeMeta(TPushPipeMetaReq req)
 
+ /**
+  * Send one pipeMeta to DataNodes, for create/start/stop/drop one pipe
+  */
+  TPushPipeMetaResp pushSinglePipeMeta(TPushSinglePipeMetaReq req)
+
   /**
   * ConfigNode will ask DataNode for pipe meta in every few seconds
   **/

Reply via email to