This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.2 by this push:
     new a07ba5c7229 [To rel/1.2][IOTDB-5931] Pipe: async execute 
pipeHandleLeaderChange and pipeHandleMetaChange to avoid causing heartbeats to 
timeout (#10204)
a07ba5c7229 is described below

commit a07ba5c7229d819fe0f9f17e9e79dd43c3c4df40
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Jun 19 09:03:27 2023 +0800

    [To rel/1.2][IOTDB-5931] Pipe: async execute pipeHandleLeaderChange and 
pipeHandleMetaChange to avoid causing heartbeats to timeout (#10204)
---
 .../iotdb/confignode/manager/ProcedureManager.java |  48 ++--
 .../pipe/runtime/PipeHeartbeatParser.java}         | 246 ++++++---------------
 ...ordinator.java => PipeLeaderChangeHandler.java} |  58 +----
 .../manager/pipe/runtime/PipeMetaSyncer.java       |   4 +-
 .../pipe/runtime/PipeRuntimeCoordinator.java       |  91 ++------
 .../impl/pipe/AbstractOperatePipeProcedureV2.java  |   2 +-
 .../runtime/PipeHandleMetaChangeProcedure.java     | 174 +--------------
 .../runtime/PipeHandleMetaChangeProcedureTest.java |   7 +-
 .../iotdb/commons/concurrent/ThreadName.java       |  18 +-
 .../pipe/resource/wal/PipeWALResourceManager.java  |   2 +-
 10 files changed, 131 insertions(+), 519 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index fc84b840245..a0cf05716a1 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -94,7 +94,6 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.Pair;
 
-import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -834,51 +833,36 @@ public class ProcedureManager {
     }
   }
 
-  public TSStatus pipeHandleLeaderChange(
+  public void pipeHandleLeaderChange(
       Map<TConsensusGroupId, Pair<Integer, Integer>> 
dataRegionGroupToOldAndNewLeaderPairMap) {
     try {
-      long procedureId =
+      final long procedureId =
           executor.submitProcedure(
               new 
PipeHandleLeaderChangeProcedure(dataRegionGroupToOldAndNewLeaderPairMap));
-      List<TSStatus> statusList = new ArrayList<>();
-      boolean isSucceed =
-          waitingProcedureFinished(Collections.singletonList(procedureId), 
statusList);
-      if (isSucceed) {
-        return RpcUtils.SUCCESS_STATUS;
-      } else {
-        return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
-            .setMessage(statusList.get(0).getMessage());
-      }
+      LOGGER.info("PipeHandleLeaderChangeProcedure was submitted, procedureId: 
{}.", procedureId);
     } catch (Exception e) {
-      return new 
TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
+      LOGGER.warn("PipeHandleLeaderChangeProcedure was failed to submit.", e);
     }
   }
 
-  public TSStatus pipeMetaSync() {
+  public void pipeHandleMetaChange(
+      boolean needWriteConsensusOnConfigNodes, boolean 
needPushPipeMetaToDataNodes) {
     try {
-      long procedureId = executor.submitProcedure(new PipeMetaSyncProcedure());
-      List<TSStatus> statusList = new ArrayList<>();
-      boolean isSucceed =
-          waitingProcedureFinished(Collections.singletonList(procedureId), 
statusList);
-      if (isSucceed) {
-        return RpcUtils.SUCCESS_STATUS;
-      } else {
-        return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
-            .setMessage(statusList.get(0).getMessage());
-      }
+      final long procedureId =
+          executor.submitProcedure(
+              new PipeHandleMetaChangeProcedure(
+                  needWriteConsensusOnConfigNodes, 
needPushPipeMetaToDataNodes));
+      LOGGER.info("PipeHandleMetaChangeProcedure was submitted, procedureId: 
{}.", procedureId);
     } catch (Exception e) {
-      return new 
TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
+      LOGGER.warn("PipeHandleMetaChangeProcedure was failed to submit.", e);
     }
   }
 
-  public TSStatus pipeHandleMetaChange(
-      int dataNodeId, @NotNull List<ByteBuffer> 
pipeMetaByteBufferListFromDataNode) {
+  public TSStatus pipeMetaSync() {
     try {
-      long procedureId =
-          executor.submitProcedure(
-              new PipeHandleMetaChangeProcedure(dataNodeId, 
pipeMetaByteBufferListFromDataNode));
-      List<TSStatus> statusList = new ArrayList<>();
-      boolean isSucceed =
+      final long procedureId = executor.submitProcedure(new 
PipeMetaSyncProcedure());
+      final List<TSStatus> statusList = new ArrayList<>();
+      final boolean isSucceed =
           waitingProcedureFinished(Collections.singletonList(procedureId), 
statusList);
       if (isSucceed) {
         return RpcUtils.SUCCESS_STATUS;
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatParser.java
similarity index 51%
copy from 
confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
copy to 
confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatParser.java
index 435b3e0b41f..537a5e7d1cc 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatParser.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.procedure.impl.pipe.runtime;
+package org.apache.iotdb.confignode.manager.pipe.runtime;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
@@ -27,70 +27,91 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
-import 
org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan;
 import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
-import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
-import 
org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2;
-import org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation;
-import org.apache.iotdb.confignode.procedure.store.ProcedureType;
-import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
-import org.apache.iotdb.pipe.api.exception.PipeException;
-import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.confignode.manager.ConfigManager;
 
 import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.DataOutputStream;
-import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
 
-public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV2 {
+public class PipeHeartbeatParser {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeHandleMetaChangeProcedure.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeHeartbeatParser.class);
 
-  private int dataNodeId;
-  private final List<ByteBuffer> pipeMetaByteBufferListFromDataNode;
+  private final ConfigManager configManager;
 
-  private boolean needWriteConsensusOnConfigNodes = false;
-  private boolean needPushPipeMetaToDataNodes = false;
+  private long heartbeatCounter;
+  private int registeredDataNodeNumber;
 
-  public PipeHandleMetaChangeProcedure() {
-    super();
-    pipeMetaByteBufferListFromDataNode = new ArrayList<>();
+  private final AtomicBoolean needWriteConsensusOnConfigNodes;
+  private final AtomicBoolean needPushPipeMetaToDataNodes;
+
+  PipeHeartbeatParser(ConfigManager configManager) {
+    this.configManager = configManager;
+
+    heartbeatCounter = 0;
+    registeredDataNodeNumber = 1;
+
+    needWriteConsensusOnConfigNodes = new AtomicBoolean(false);
+    needPushPipeMetaToDataNodes = new AtomicBoolean(false);
   }
 
-  public PipeHandleMetaChangeProcedure(
+  public synchronized void parseHeartbeat(
       int dataNodeId, @NotNull List<ByteBuffer> 
pipeMetaByteBufferListFromDataNode) {
-    super();
-    this.dataNodeId = dataNodeId;
-    this.pipeMetaByteBufferListFromDataNode = 
pipeMetaByteBufferListFromDataNode;
-    needWriteConsensusOnConfigNodes = false;
-    needPushPipeMetaToDataNodes = false;
-  }
+    final long heartbeatCount = ++heartbeatCounter;
+
+    final AtomicBoolean canSubmitHandleMetaChangeProcedure = new 
AtomicBoolean(false);
+    // registeredDataNodeNumber can not be 0 when the method is called
+    if (heartbeatCount % registeredDataNodeNumber == 0) {
+      canSubmitHandleMetaChangeProcedure.set(true);
+
+      // registeredDataNodeNumber may be changed, update it here when we can 
submit procedure
+      registeredDataNodeNumber = 
configManager.getNodeManager().getRegisteredDataNodeCount();
+      if (registeredDataNodeNumber <= 0) {
+        LOGGER.warn(
+            "registeredDataNodeNumber is {} when parseHeartbeat from data node 
(id={}).",
+            registeredDataNodeNumber,
+            dataNodeId);
+        // registeredDataNodeNumber can not be set to 0 in this class, 
otherwise may cause
+        // DivideByZeroException
+        registeredDataNodeNumber = 1;
+      }
+    }
 
-  @Override
-  protected PipeTaskOperation getOperation() {
-    return PipeTaskOperation.HANDLE_PIPE_META_CHANGE;
-  }
+    if (pipeMetaByteBufferListFromDataNode.isEmpty()
+        && !(canSubmitHandleMetaChangeProcedure.get()
+            && (needWriteConsensusOnConfigNodes.get() || 
needPushPipeMetaToDataNodes.get()))) {
+      return;
+    }
 
-  @Override
-  protected void executeFromValidateTask(ConfigNodeProcedureEnv env) {
-    LOGGER.info("PipeHandleMetaChangeProcedure: executeFromValidateTask");
+    PipeRuntimeCoordinator.PROCEDURE_SUBMITTER.submit(
+        () -> {
+          if (!pipeMetaByteBufferListFromDataNode.isEmpty()) {
+            parseHeartbeatAndSaveMetaChangeLocally(dataNodeId, 
pipeMetaByteBufferListFromDataNode);
+          }
 
-    // do nothing
-  }
+          if (canSubmitHandleMetaChangeProcedure.get()
+              && (needWriteConsensusOnConfigNodes.get() || 
needPushPipeMetaToDataNodes.get())) {
+            configManager
+                .getProcedureManager()
+                .pipeHandleMetaChange(
+                    needWriteConsensusOnConfigNodes.get(), 
needPushPipeMetaToDataNodes.get());
 
-  @Override
-  protected void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
-    LOGGER.info("PipeHandleMetaChangeProcedure: 
executeFromCalculateInfoForTask");
+            // reset flags after procedure is submitted
+            needWriteConsensusOnConfigNodes.set(false);
+            needPushPipeMetaToDataNodes.set(false);
+          }
+        });
+  }
 
+  private void parseHeartbeatAndSaveMetaChangeLocally(
+      int dataNodeId, @NotNull List<ByteBuffer> 
pipeMetaByteBufferListFromDataNode) {
     final Map<PipeStaticMeta, PipeMeta> pipeMetaMapFromDataNode = new 
HashMap<>();
     for (ByteBuffer byteBuffer : pipeMetaByteBufferListFromDataNode) {
       final PipeMeta pipeMeta = PipeMeta.deserialize(byteBuffer);
@@ -98,7 +119,7 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
     }
 
     for (final PipeMeta pipeMetaOnConfigNode :
-        env.getConfigManager()
+        configManager
             .getPipeManager()
             .getPipeTaskCoordinator()
             .getPipeTaskInfo()
@@ -152,7 +173,7 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
                   .getValue()
                   
.updateProgressIndex(runtimeMetaFromDataNode.getProgressIndex()));
 
-          needWriteConsensusOnConfigNodes = true;
+          needWriteConsensusOnConfigNodes.set(true);
         }
 
         // update runtime exception
@@ -172,8 +193,8 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
                 .equals(PipeStatus.STOPPED)) {
               
pipeMetaOnConfigNode.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
 
-              needWriteConsensusOnConfigNodes = true;
-              needPushPipeMetaToDataNodes = true;
+              needWriteConsensusOnConfigNodes.set(true);
+              needPushPipeMetaToDataNodes.set(true);
 
               LOGGER.warn(
                   String.format(
@@ -183,7 +204,7 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
 
             if (exception instanceof PipeRuntimeConnectorCriticalException) {
               ((PipeTableResp)
-                      env.getConfigManager()
+                      configManager
                           .getPipeManager()
                           .getPipeTaskCoordinator()
                           .getPipeTaskInfo()
@@ -195,8 +216,8 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
                           status -> {
                             status.set(PipeStatus.STOPPED);
 
-                            needWriteConsensusOnConfigNodes = true;
-                            needPushPipeMetaToDataNodes = true;
+                            needWriteConsensusOnConfigNodes.set(true);
+                            needPushPipeMetaToDataNodes.set(true);
 
                             LOGGER.warn(
                                 String.format(
@@ -209,131 +230,4 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
       }
     }
   }
-
-  @Override
-  protected void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv 
env) {
-    LOGGER.info("PipeHandleMetaChangeProcedure: 
executeFromWriteConfigNodeConsensus");
-
-    if (!needWriteConsensusOnConfigNodes) {
-      return;
-    }
-
-    final List<PipeMeta> pipeMetaList = new ArrayList<>();
-    for (final PipeMeta pipeMeta :
-        env.getConfigManager()
-            .getPipeManager()
-            .getPipeTaskCoordinator()
-            .getPipeTaskInfo()
-            .getPipeMetaList()) {
-      pipeMetaList.add(pipeMeta);
-    }
-
-    final ConsensusWriteResponse response =
-        env.getConfigManager()
-            .getConsensusManager()
-            .write(new PipeHandleMetaChangePlan(pipeMetaList));
-    if (!response.isSuccessful()) {
-      throw new PipeException(response.getErrorMessage());
-    }
-  }
-
-  @Override
-  protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
-    LOGGER.info("PipeHandleMetaChangeProcedure: executeFromHandleOnDataNodes");
-
-    if (!needPushPipeMetaToDataNodes) {
-      return;
-    }
-
-    pushPipeMetaToDataNodesIgnoreException(env);
-  }
-
-  @Override
-  protected void rollbackFromValidateTask(ConfigNodeProcedureEnv env) {
-    LOGGER.info("PipeHandleMetaChangeProcedure: rollbackFromValidateTask");
-
-    // do nothing
-  }
-
-  @Override
-  protected void rollbackFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
-    LOGGER.info("PipeHandleMetaChangeProcedure: 
rollbackFromCalculateInfoForTask");
-
-    // do nothing
-  }
-
-  @Override
-  protected void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv 
env) {
-    LOGGER.info("PipeHandleMetaChangeProcedure: 
rollbackFromWriteConfigNodeConsensus");
-
-    // do nothing
-  }
-
-  @Override
-  protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
-    LOGGER.info("PipeHandleMetaChangeProcedure: 
rollbackFromOperateOnDataNodes");
-
-    // do nothing
-  }
-
-  @Override
-  public void serialize(DataOutputStream stream) throws IOException {
-    
stream.writeShort(ProcedureType.PIPE_HANDLE_META_CHANGE_PROCEDURE.getTypeCode());
-    super.serialize(stream);
-
-    ReadWriteIOUtils.write(dataNodeId, stream);
-
-    ReadWriteIOUtils.write(pipeMetaByteBufferListFromDataNode.size(), stream);
-    for (ByteBuffer pipeMetaByteBuffer : pipeMetaByteBufferListFromDataNode) {
-      ReadWriteIOUtils.write(pipeMetaByteBuffer.limit(), stream);
-      ReadWriteIOUtils.write(new Binary(pipeMetaByteBuffer.array()), stream);
-    }
-
-    ReadWriteIOUtils.write(needWriteConsensusOnConfigNodes, stream);
-    ReadWriteIOUtils.write(needPushPipeMetaToDataNodes, stream);
-  }
-
-  @Override
-  public void deserialize(ByteBuffer byteBuffer) {
-    super.deserialize(byteBuffer);
-
-    dataNodeId = ReadWriteIOUtils.readInt(byteBuffer);
-
-    final int size = ReadWriteIOUtils.readInt(byteBuffer);
-    for (int i = 0; i < size; ++i) {
-      final int limit = ReadWriteIOUtils.readInt(byteBuffer);
-      final ByteBuffer pipeMetaByteBuffer =
-          ByteBuffer.wrap(ReadWriteIOUtils.readBinary(byteBuffer).getValues());
-      pipeMetaByteBuffer.limit(limit);
-      pipeMetaByteBufferListFromDataNode.add(pipeMetaByteBuffer);
-    }
-
-    needWriteConsensusOnConfigNodes = ReadWriteIOUtils.readBool(byteBuffer);
-    needPushPipeMetaToDataNodes = ReadWriteIOUtils.readBool(byteBuffer);
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (!(o instanceof PipeHandleMetaChangeProcedure)) {
-      return false;
-    }
-    PipeHandleMetaChangeProcedure that = (PipeHandleMetaChangeProcedure) o;
-    return dataNodeId == that.dataNodeId
-        && needWriteConsensusOnConfigNodes == 
that.needWriteConsensusOnConfigNodes
-        && needPushPipeMetaToDataNodes == that.needPushPipeMetaToDataNodes
-        && Objects.equals(
-            pipeMetaByteBufferListFromDataNode, 
that.pipeMetaByteBufferListFromDataNode);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(
-        dataNodeId,
-        pipeMetaByteBufferListFromDataNode,
-        needWriteConsensusOnConfigNodes,
-        needPushPipeMetaToDataNodes);
-  }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeLeaderChangeHandler.java
similarity index 63%
copy from 
confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
copy to 
confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeLeaderChangeHandler.java
index 00182aac69d..3f93fb022b9 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeLeaderChangeHandler.java
@@ -21,35 +21,22 @@ package org.apache.iotdb.confignode.manager.pipe.runtime;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import 
org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
 import org.apache.iotdb.confignode.manager.load.subscriber.RouteChangeEvent;
 import 
org.apache.iotdb.confignode.manager.load.subscriber.StatisticsChangeEvent;
 import org.apache.iotdb.metrics.utils.IoTDBMetricsUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.Pair;
 
-import org.jetbrains.annotations.NotNull;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
-public class PipeRuntimeCoordinator implements IClusterStatusSubscriber {
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeRuntimeCoordinator.class);
+public class PipeLeaderChangeHandler implements IClusterStatusSubscriber {
 
   private final ConfigManager configManager;
 
-  private final PipeMetaSyncer pipeMetaSyncer;
-
-  public PipeRuntimeCoordinator(ConfigManager configManager) {
+  PipeLeaderChangeHandler(ConfigManager configManager) {
     this.configManager = configManager;
-    this.pipeMetaSyncer = new PipeMetaSyncer(configManager);
   }
 
   @Override
@@ -90,40 +77,11 @@ public class PipeRuntimeCoordinator implements 
IClusterStatusSubscriber {
       return;
     }
 
-    final TSStatus result =
-        configManager
-            .getProcedureManager()
-            .pipeHandleLeaderChange(dataRegionGroupToOldAndNewLeaderPairMap);
-    if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      LOGGER.warn(
-          "PipeRuntimeCoordinator meets error in handling data region leader 
change, status: ({})",
-          result);
-    }
-  }
-
-  public void startPipeMetaSync() {
-    pipeMetaSyncer.start();
-  }
-
-  public void stopPipeMetaSync() {
-    pipeMetaSyncer.stop();
-  }
-
-  /**
-   * parse heartbeat from data node.
-   *
-   * @param dataNodeId data node id
-   * @param pipeMetaByteBufferListFromDataNode pipe meta byte buffer list 
collected from data node
-   */
-  public void parseHeartbeat(
-      int dataNodeId, @NotNull List<ByteBuffer> 
pipeMetaByteBufferListFromDataNode) {
-    final TSStatus result =
-        configManager
-            .getProcedureManager()
-            .pipeHandleMetaChange(dataNodeId, 
pipeMetaByteBufferListFromDataNode);
-    if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      LOGGER.warn(
-          "PipeTaskCoordinator meets error in handling pipe meta change, 
status: ({})", result);
-    }
+    // submit procedure in an async way to avoid blocking the caller
+    PipeRuntimeCoordinator.PROCEDURE_SUBMITTER.submit(
+        () ->
+            configManager
+                .getProcedureManager()
+                
.pipeHandleLeaderChange(dataRegionGroupToOldAndNewLeaderPairMap));
   }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
index 2251d0d7892..715dabe2a46 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
@@ -40,7 +40,7 @@ public class PipeMetaSyncer {
 
   private static final ScheduledExecutorService SYNC_EXECUTOR =
       IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
-          ThreadName.PIPE_META_SYNC_SERVICE.getName());
+          ThreadName.PIPE_RUNTIME_META_SYNCER.getName());
   private static final long INITIAL_SYNC_DELAY_MINUTES =
       PipeConfig.getInstance().getPipeMetaSyncerInitialSyncDelayMinutes();
   private static final long SYNC_INTERVAL_MINUTES =
@@ -50,7 +50,7 @@ public class PipeMetaSyncer {
 
   private Future<?> metaSyncFuture;
 
-  public PipeMetaSyncer(ConfigManager configManager) {
+  PipeMetaSyncer(ConfigManager configManager) {
     this.configManager = configManager;
   }
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
index 00182aac69d..23398038b55 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
@@ -19,86 +19,49 @@
 
 package org.apache.iotdb.confignode.manager.pipe.runtime;
 
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import 
org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
 import org.apache.iotdb.confignode.manager.load.subscriber.RouteChangeEvent;
 import 
org.apache.iotdb.confignode.manager.load.subscriber.StatisticsChangeEvent;
-import org.apache.iotdb.metrics.utils.IoTDBMetricsUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.jetbrains.annotations.NotNull;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
+import java.util.concurrent.ExecutorService;
 
 public class PipeRuntimeCoordinator implements IClusterStatusSubscriber {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeRuntimeCoordinator.class);
-
-  private final ConfigManager configManager;
+  // shared thread pool in the runtime package
+  static final ExecutorService PROCEDURE_SUBMITTER =
+      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+          ThreadName.PIPE_RUNTIME_PROCEDURE_SUBMITTER.getName());
 
+  private final PipeLeaderChangeHandler pipeLeaderChangeHandler;
+  private final PipeHeartbeatParser pipeHeartbeatParser;
   private final PipeMetaSyncer pipeMetaSyncer;
 
   public PipeRuntimeCoordinator(ConfigManager configManager) {
-    this.configManager = configManager;
-    this.pipeMetaSyncer = new PipeMetaSyncer(configManager);
+    pipeLeaderChangeHandler = new PipeLeaderChangeHandler(configManager);
+    pipeHeartbeatParser = new PipeHeartbeatParser(configManager);
+    pipeMetaSyncer = new PipeMetaSyncer(configManager);
   }
 
   @Override
   public void onClusterStatisticsChanged(StatisticsChangeEvent event) {
-    // do nothing, because pipe task is not related to statistics
+    pipeLeaderChangeHandler.onClusterStatisticsChanged(event);
   }
 
   @Override
   public void onRegionGroupLeaderChanged(RouteChangeEvent event) {
-    // if no pipe task, return
-    if 
(configManager.getPipeManager().getPipeTaskCoordinator().getPipeTaskInfo().isEmpty())
 {
-      return;
-    }
-
-    // we only care about data region leader change
-    final Map<TConsensusGroupId, Pair<Integer, Integer>> 
dataRegionGroupToOldAndNewLeaderPairMap =
-        new HashMap<>();
-    event
-        .getLeaderMap()
-        .forEach(
-            (regionGroupId, pair) -> {
-              if 
(regionGroupId.getType().equals(TConsensusGroupType.DataRegion)) {
-                final String databaseName =
-                    
configManager.getPartitionManager().getRegionStorageGroup(regionGroupId);
-                if (databaseName != null && 
!databaseName.equals(IoTDBMetricsUtils.DATABASE)) {
-                  // pipe only collect user's data, filter metric database 
here.
-                  dataRegionGroupToOldAndNewLeaderPairMap.put(
-                      regionGroupId,
-                      new Pair<>( // null or -1 means empty origin leader
-                          pair.left == null ? -1 : pair.left,
-                          pair.right == null ? -1 : pair.right));
-                }
-              }
-            });
-
-    // if no data region leader change, return
-    if (dataRegionGroupToOldAndNewLeaderPairMap.isEmpty()) {
-      return;
-    }
+    pipeLeaderChangeHandler.onRegionGroupLeaderChanged(event);
+  }
 
-    final TSStatus result =
-        configManager
-            .getProcedureManager()
-            .pipeHandleLeaderChange(dataRegionGroupToOldAndNewLeaderPairMap);
-    if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      LOGGER.warn(
-          "PipeRuntimeCoordinator meets error in handling data region leader 
change, status: ({})",
-          result);
-    }
+  public void parseHeartbeat(
+      int dataNodeId, @NotNull List<ByteBuffer> 
pipeMetaByteBufferListFromDataNode) {
+    pipeHeartbeatParser.parseHeartbeat(dataNodeId, 
pipeMetaByteBufferListFromDataNode);
   }
 
   public void startPipeMetaSync() {
@@ -108,22 +71,4 @@ public class PipeRuntimeCoordinator implements 
IClusterStatusSubscriber {
   public void stopPipeMetaSync() {
     pipeMetaSyncer.stop();
   }
-
-  /**
-   * parse heartbeat from data node.
-   *
-   * @param dataNodeId data node id
-   * @param pipeMetaByteBufferListFromDataNode pipe meta byte buffer list 
collected from data node
-   */
-  public void parseHeartbeat(
-      int dataNodeId, @NotNull List<ByteBuffer> 
pipeMetaByteBufferListFromDataNode) {
-    final TSStatus result =
-        configManager
-            .getProcedureManager()
-            .pipeHandleMetaChange(dataNodeId, 
pipeMetaByteBufferListFromDataNode);
-    if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      LOGGER.warn(
-          "PipeTaskCoordinator meets error in handling pipe meta change, 
status: ({})", result);
-    }
-  }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index e48901b7824..27a9fd1de25 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -51,7 +51,7 @@ public abstract class AbstractOperatePipeProcedureV2
   private static final Logger LOGGER =
       LoggerFactory.getLogger(AbstractOperatePipeProcedureV2.class);
 
-  private static final int RETRY_THRESHOLD = 3;
+  private static final int RETRY_THRESHOLD = 1;
 
   // only used in rollback to reduce the number of network calls
   protected boolean isRollbackFromOperateOnDataNodesSuccessful = false;
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index 435b3e0b41f..d7e9971dbb9 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -19,26 +19,16 @@
 
 package org.apache.iotdb.confignode.procedure.impl.pipe.runtime;
 
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
-import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
-import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
-import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
-import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
-import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan;
-import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import 
org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2;
 import org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.pipe.api.exception.PipeException;
-import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
-import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,33 +36,25 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 
 public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV2 {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeHandleMetaChangeProcedure.class);
 
-  private int dataNodeId;
-  private final List<ByteBuffer> pipeMetaByteBufferListFromDataNode;
-
   private boolean needWriteConsensusOnConfigNodes = false;
   private boolean needPushPipeMetaToDataNodes = false;
 
   public PipeHandleMetaChangeProcedure() {
     super();
-    pipeMetaByteBufferListFromDataNode = new ArrayList<>();
   }
 
   public PipeHandleMetaChangeProcedure(
-      int dataNodeId, @NotNull List<ByteBuffer> 
pipeMetaByteBufferListFromDataNode) {
+      boolean needWriteConsensusOnConfigNodes, boolean 
needPushPipeMetaToDataNodes) {
     super();
-    this.dataNodeId = dataNodeId;
-    this.pipeMetaByteBufferListFromDataNode = 
pipeMetaByteBufferListFromDataNode;
-    needWriteConsensusOnConfigNodes = false;
-    needPushPipeMetaToDataNodes = false;
+    this.needWriteConsensusOnConfigNodes = needWriteConsensusOnConfigNodes;
+    this.needPushPipeMetaToDataNodes = needPushPipeMetaToDataNodes;
   }
 
   @Override
@@ -91,123 +73,7 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
   protected void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
     LOGGER.info("PipeHandleMetaChangeProcedure: 
executeFromCalculateInfoForTask");
 
-    final Map<PipeStaticMeta, PipeMeta> pipeMetaMapFromDataNode = new 
HashMap<>();
-    for (ByteBuffer byteBuffer : pipeMetaByteBufferListFromDataNode) {
-      final PipeMeta pipeMeta = PipeMeta.deserialize(byteBuffer);
-      pipeMetaMapFromDataNode.put(pipeMeta.getStaticMeta(), pipeMeta);
-    }
-
-    for (final PipeMeta pipeMetaOnConfigNode :
-        env.getConfigManager()
-            .getPipeManager()
-            .getPipeTaskCoordinator()
-            .getPipeTaskInfo()
-            .getPipeMetaList()) {
-      final PipeMeta pipeMetaFromDataNode =
-          pipeMetaMapFromDataNode.get(pipeMetaOnConfigNode.getStaticMeta());
-      if (pipeMetaFromDataNode == null) {
-        LOGGER.info(
-            "PipeRuntimeCoordinator meets error in updating pipeMetaKeeper, "
-                + "pipeMetaFromDataNode is null, pipeMetaOnConfigNode: {}",
-            pipeMetaOnConfigNode);
-        continue;
-      }
-
-      final Map<TConsensusGroupId, PipeTaskMeta> pipeTaskMetaMapOnConfigNode =
-          
pipeMetaOnConfigNode.getRuntimeMeta().getConsensusGroupIdToTaskMetaMap();
-      final Map<TConsensusGroupId, PipeTaskMeta> pipeTaskMetaMapFromDataNode =
-          
pipeMetaFromDataNode.getRuntimeMeta().getConsensusGroupIdToTaskMetaMap();
-      for (final Map.Entry<TConsensusGroupId, PipeTaskMeta> 
runtimeMetaOnConfigNode :
-          pipeTaskMetaMapOnConfigNode.entrySet()) {
-        if (runtimeMetaOnConfigNode.getValue().getLeaderDataNodeId() != 
dataNodeId) {
-          continue;
-        }
-
-        final PipeTaskMeta runtimeMetaFromDataNode =
-            pipeTaskMetaMapFromDataNode.get(runtimeMetaOnConfigNode.getKey());
-        if (runtimeMetaFromDataNode == null) {
-          LOGGER.warn(
-              "PipeRuntimeCoordinator meets error in updating pipeMetaKeeper, "
-                  + "runtimeMetaFromDataNode is null, runtimeMetaOnConfigNode: 
{}",
-              runtimeMetaOnConfigNode);
-          continue;
-        }
-
-        // update progress index
-        if (!runtimeMetaOnConfigNode
-            .getValue()
-            .getProgressIndex()
-            .isAfter(runtimeMetaFromDataNode.getProgressIndex())) {
-          LOGGER.info(
-              "Updating progress index for (pipe name: {}, consensus group id: 
{}) ... Progress index on config node: {}, progress index from data node: {}",
-              pipeMetaOnConfigNode.getStaticMeta().getPipeName(),
-              runtimeMetaOnConfigNode.getKey(),
-              runtimeMetaOnConfigNode.getValue().getProgressIndex(),
-              runtimeMetaFromDataNode.getProgressIndex());
-          LOGGER.info(
-              "Progress index for (pipe name: {}, consensus group id: {}) is 
updated to {}",
-              pipeMetaOnConfigNode.getStaticMeta().getPipeName(),
-              runtimeMetaOnConfigNode.getKey(),
-              runtimeMetaOnConfigNode
-                  .getValue()
-                  
.updateProgressIndex(runtimeMetaFromDataNode.getProgressIndex()));
-
-          needWriteConsensusOnConfigNodes = true;
-        }
-
-        // update runtime exception
-        final PipeTaskMeta pipeTaskMetaOnConfigNode = 
runtimeMetaOnConfigNode.getValue();
-        pipeTaskMetaOnConfigNode.clearExceptionMessages();
-        for (final PipeRuntimeException exception :
-            runtimeMetaFromDataNode.getExceptionMessages()) {
-
-          pipeTaskMetaOnConfigNode.trackExceptionMessage(exception);
-
-          if (exception instanceof PipeRuntimeCriticalException) {
-            final String pipeName = 
pipeMetaOnConfigNode.getStaticMeta().getPipeName();
-            if (!pipeMetaOnConfigNode
-                .getRuntimeMeta()
-                .getStatus()
-                .get()
-                .equals(PipeStatus.STOPPED)) {
-              
pipeMetaOnConfigNode.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
-
-              needWriteConsensusOnConfigNodes = true;
-              needPushPipeMetaToDataNodes = true;
-
-              LOGGER.warn(
-                  String.format(
-                      "Detect PipeRuntimeCriticalException %s from DataNode, 
stop pipe %s.",
-                      exception, pipeName));
-            }
-
-            if (exception instanceof PipeRuntimeConnectorCriticalException) {
-              ((PipeTableResp)
-                      env.getConfigManager()
-                          .getPipeManager()
-                          .getPipeTaskCoordinator()
-                          .getPipeTaskInfo()
-                          .showPipes())
-                  .filter(true, pipeName).getAllPipeMeta().stream()
-                      .map(pipeMeta -> pipeMeta.getRuntimeMeta().getStatus())
-                      .filter(status -> 
!status.get().equals(PipeStatus.STOPPED))
-                      .forEach(
-                          status -> {
-                            status.set(PipeStatus.STOPPED);
-
-                            needWriteConsensusOnConfigNodes = true;
-                            needPushPipeMetaToDataNodes = true;
-
-                            LOGGER.warn(
-                                String.format(
-                                    "Detect 
PipeRuntimeConnectorCriticalException %s from DataNode, stop pipe %s.",
-                                    exception, pipeName));
-                          });
-            }
-          }
-        }
-      }
-    }
+    // do nothing
   }
 
   @Override
@@ -281,14 +147,6 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
     
stream.writeShort(ProcedureType.PIPE_HANDLE_META_CHANGE_PROCEDURE.getTypeCode());
     super.serialize(stream);
 
-    ReadWriteIOUtils.write(dataNodeId, stream);
-
-    ReadWriteIOUtils.write(pipeMetaByteBufferListFromDataNode.size(), stream);
-    for (ByteBuffer pipeMetaByteBuffer : pipeMetaByteBufferListFromDataNode) {
-      ReadWriteIOUtils.write(pipeMetaByteBuffer.limit(), stream);
-      ReadWriteIOUtils.write(new Binary(pipeMetaByteBuffer.array()), stream);
-    }
-
     ReadWriteIOUtils.write(needWriteConsensusOnConfigNodes, stream);
     ReadWriteIOUtils.write(needPushPipeMetaToDataNodes, stream);
   }
@@ -297,17 +155,6 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
   public void deserialize(ByteBuffer byteBuffer) {
     super.deserialize(byteBuffer);
 
-    dataNodeId = ReadWriteIOUtils.readInt(byteBuffer);
-
-    final int size = ReadWriteIOUtils.readInt(byteBuffer);
-    for (int i = 0; i < size; ++i) {
-      final int limit = ReadWriteIOUtils.readInt(byteBuffer);
-      final ByteBuffer pipeMetaByteBuffer =
-          ByteBuffer.wrap(ReadWriteIOUtils.readBinary(byteBuffer).getValues());
-      pipeMetaByteBuffer.limit(limit);
-      pipeMetaByteBufferListFromDataNode.add(pipeMetaByteBuffer);
-    }
-
     needWriteConsensusOnConfigNodes = ReadWriteIOUtils.readBool(byteBuffer);
     needPushPipeMetaToDataNodes = ReadWriteIOUtils.readBool(byteBuffer);
   }
@@ -321,19 +168,12 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
       return false;
     }
     PipeHandleMetaChangeProcedure that = (PipeHandleMetaChangeProcedure) o;
-    return dataNodeId == that.dataNodeId
-        && needWriteConsensusOnConfigNodes == 
that.needWriteConsensusOnConfigNodes
-        && needPushPipeMetaToDataNodes == that.needPushPipeMetaToDataNodes
-        && Objects.equals(
-            pipeMetaByteBufferListFromDataNode, 
that.pipeMetaByteBufferListFromDataNode);
+    return needWriteConsensusOnConfigNodes == 
that.needWriteConsensusOnConfigNodes
+        && needPushPipeMetaToDataNodes == that.needPushPipeMetaToDataNodes;
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(
-        dataNodeId,
-        pipeMetaByteBufferListFromDataNode,
-        needWriteConsensusOnConfigNodes,
-        needPushPipeMetaToDataNodes);
+    return Objects.hash(needWriteConsensusOnConfigNodes, 
needPushPipeMetaToDataNodes);
   }
 }
diff --git 
a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java
 
b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java
index 38783ea2e53..47a0dca1abd 100644
--- 
a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java
+++ 
b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java
@@ -22,7 +22,6 @@ package 
org.apache.iotdb.confignode.procedure.impl.pipe.runtime;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
-import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
@@ -34,7 +33,6 @@ import org.junit.Test;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Collections;
 import java.util.HashMap;
 
 import static org.junit.Assert.assertEquals;
@@ -78,10 +76,7 @@ public class PipeHandleMetaChangeProcedureTest {
               }
             });
 
-    PipeHandleMetaChangeProcedure proc =
-        new PipeHandleMetaChangeProcedure(
-            123,
-            Collections.singletonList(new PipeMeta(pipeStaticMeta, 
pipeRuntimeMeta).serialize()));
+    PipeHandleMetaChangeProcedure proc = new 
PipeHandleMetaChangeProcedure(true, false);
 
     try {
       proc.serialize(outputStream);
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 094a722f75f..7170d4a4e4c 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -99,17 +99,15 @@ public enum ThreadName {
   GRPC_DEFAULT_EXECUTOR("grpc-default-executor"),
   GPRC_DEFAULT_WORKER_ELG("grpc-default-worker-ELG"),
   GROUP_MANAGEMENT("groupManagement"),
-
   // -------------------------- Compute --------------------------
   PIPE_ASSIGNER_EXECUTOR_POOL("Pipe-Assigner-Executor-Pool"),
   PIPE_PROCESSOR_EXECUTOR_POOL("Pipe-Processor-Executor-Pool"),
   PIPE_CONNECTOR_EXECUTOR_POOL("Pipe-Connector-Executor-Pool"),
   PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL("Pipe-SubTask-Callback-Executor-Pool"),
-  EXT_PIPE_PLUGIN_WORKER("ExtPipePlugin-Worker"),
+  PIPE_RUNTIME_META_SYNCER("Pipe-Runtime-Meta-Syncer"),
+  PIPE_RUNTIME_PROCEDURE_SUBMITTER("Pipe-Runtime-Procedure-Submitter"),
+  PIPE_WAL_RESOURCE_TTL_CHECKER("Pipe-WAL-Resource-TTL-Checker"),
   WINDOW_EVALUATION_SERVICE("WindowEvaluationTaskPoolManager"),
-  // -------------------------- Sync --------------------------
-  SYNC_SENDER_PIPE("Sync-Pipe"),
-  SYNC_SENDER_HEARTBEAT("Sync-Heartbeat"),
   // -------------------------- JVM --------------------------
   // NOTICE: The thread name of jvm cannot be edited here!
   // We list the thread name here just for distinguishing what module the 
thread belongs to.
@@ -144,8 +142,6 @@ public enum ThreadName {
   MLNODE_RPC_SERVICE("MLNodeRpc-Service"),
   IOTDB_SHUTDOWN_HOOK("IoTDB-Shutdown-Hook"),
   STORAGE_ENGINE_RECOVER_TRIGGER("StorageEngine-RecoverTrigger"),
-  PIPE_META_SYNC_SERVICE("Pipe-Meta-Sync-Service"),
-  
PIPE_WAL_RESOURCE_TTL_CHECKER_SERVICE("Pipe-WAL-Resource-TTL-Checker-Service"),
   // the unknown thread name is used for metrics
   UNKOWN("UNKNOWN");
 
@@ -231,10 +227,11 @@ public enum ThreadName {
               PIPE_PROCESSOR_EXECUTOR_POOL,
               PIPE_CONNECTOR_EXECUTOR_POOL,
               PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL,
-              EXT_PIPE_PLUGIN_WORKER,
+              PIPE_RUNTIME_META_SYNCER,
+              PIPE_RUNTIME_PROCEDURE_SUBMITTER,
+              PIPE_WAL_RESOURCE_TTL_CHECKER,
               WINDOW_EVALUATION_SERVICE));
-  private static Set<ThreadName> syncThreadNames =
-      new HashSet<>(Arrays.asList(SYNC_SENDER_PIPE, SYNC_SENDER_HEARTBEAT));
+
   private static Set<ThreadName> jvmThreadNames =
       new HashSet<>(
           Arrays.asList(
@@ -292,7 +289,6 @@ public enum ThreadName {
           iotConsensusThrreadNames,
           ratisThreadNames,
           computeThreadNames,
-          syncThreadNames,
           jvmThreadNames,
           metricsThreadNames,
           otherThreadNames
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
index cadb4746984..de67bb8543f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
@@ -22,7 +22,7 @@ public class PipeWALResourceManager implements AutoCloseable {
 
   private static final ScheduledExecutorService PIPE_WAL_RESOURCE_TTL_CHECKER =
       IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
-          ThreadName.PIPE_WAL_RESOURCE_TTL_CHECKER_SERVICE.getName());
+          ThreadName.PIPE_WAL_RESOURCE_TTL_CHECKER.getName());
   private final ScheduledFuture<?> ttlCheckerFuture;
 
   public PipeWALResourceManager() {


Reply via email to