This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.2 by this push:
     new b14a387df6d [IOTDB-6017] Pipe: separate pipe heartbeat from cluster 
heartbeat (#10285) (#10315)
b14a387df6d is described below

commit b14a387df6d0bda6d9b72aae09eda3c608d3c1ec
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Jun 26 01:27:38 2023 +0800

    [IOTDB-6017] Pipe: separate pipe heartbeat from cluster heartbeat (#10285) 
(#10315)
    
    * separate pipe heartbeat from cluster heartbeat
    
    * fix: IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor
    
    * remove rollback logic in PipeHandleLeaderChangeProcedure to avoid meta 
lost when restarting cluster
    
    * do not submit leader change procedure when leader is not changed
    
    * change lock in abstract pipe procedure and procedure to fair lock
    
    ---------
    
    Co-authored-by: yschengzi <[email protected]>
    Co-authored-by: Steve Yurong Su <[email protected]>
    
    (cherry picked from commit 268c6b88f92e0099cb9c907bd84dc0c5b28d3d1e)
---
 .../confignode/client/DataNodeRequestType.java     |   1 +
 .../client/async/AsyncDataNodeClientPool.java      |   8 ++
 .../client/async/handlers/AsyncClientHandler.java  |  10 ++
 .../handlers/rpc/PipeHeartbeatRPCHandler.java      |  70 +++++++++++++
 .../statemachine/ConfigRegionStateMachine.java     |   3 +
 .../manager/load/service/HeartbeatService.java     |   8 +-
 .../pipe/runtime/PipeHeartbeatScheduler.java       | 111 +++++++++++++++++++++
 .../pipe/runtime/PipeLeaderChangeHandler.java      |  15 +--
 .../pipe/runtime/PipeRuntimeCoordinator.java       |  24 +++--
 .../confignode/persistence/pipe/PipeTaskInfo.java  |   2 +-
 .../procedure/env/ConfigNodeProcedureEnv.java      |   2 +-
 .../runtime/PipeHandleLeaderChangeProcedure.java   |  18 +---
 .../thrift/src/main/thrift/datanode.thrift         |  13 +++
 .../resources/conf/iotdb-common.properties         |   8 +-
 .../iotdb/commons/concurrent/ThreadName.java       |   1 +
 .../apache/iotdb/commons/conf/CommonConfig.java    |  23 +++--
 .../iotdb/commons/conf/CommonDescriptor.java       |  11 +-
 .../iotdb/commons/pipe/config/PipeConfig.java      |  13 ++-
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    |  22 ++++
 .../impl/DataNodeInternalRPCServiceImpl.java       |   9 ++
 20 files changed, 321 insertions(+), 51 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
index f4e839add37..2282a6db166 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/DataNodeRequestType.java
@@ -68,6 +68,7 @@ public enum DataNodeRequestType {
 
   /** Pipe Task */
   PUSH_PIPE_META,
+  PIPE_HEARTBEAT,
 
   /** CQ */
   EXECUTE_CQ,
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
index e410e095289..05efc1bdbd3 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
@@ -36,6 +36,7 @@ import 
org.apache.iotdb.confignode.client.async.handlers.rpc.AsyncTSStatusRPCHan
 import 
org.apache.iotdb.confignode.client.async.handlers.rpc.CheckTimeSeriesExistenceRPCHandler;
 import 
org.apache.iotdb.confignode.client.async.handlers.rpc.CountPathsUsingTemplateRPCHandler;
 import 
org.apache.iotdb.confignode.client.async.handlers.rpc.FetchSchemaBlackListRPCHandler;
+import 
org.apache.iotdb.confignode.client.async.handlers.rpc.PipeHeartbeatRPCHandler;
 import 
org.apache.iotdb.confignode.client.async.handlers.rpc.SchemaUpdateRPCHandler;
 import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TAlterViewReq;
@@ -59,6 +60,7 @@ import 
org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TInvalidateMatchedSchemaCacheReq;
+import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
@@ -230,6 +232,12 @@ public class AsyncDataNodeClientPool {
               (AsyncTSStatusRPCHandler)
                   clientHandler.createAsyncRPCHandler(requestId, 
targetDataNode));
           break;
+        case PIPE_HEARTBEAT:
+          client.pipeHeartbeat(
+              (TPipeHeartbeatReq) clientHandler.getRequest(requestId),
+              (PipeHeartbeatRPCHandler)
+                  clientHandler.createAsyncRPCHandler(requestId, 
targetDataNode));
+          break;
         case MERGE:
         case FULL_MERGE:
           client.merge(
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
index efd4b9f8f55..c2d2989ba06 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
@@ -26,10 +26,12 @@ import 
org.apache.iotdb.confignode.client.async.handlers.rpc.AsyncTSStatusRPCHan
 import 
org.apache.iotdb.confignode.client.async.handlers.rpc.CheckTimeSeriesExistenceRPCHandler;
 import 
org.apache.iotdb.confignode.client.async.handlers.rpc.CountPathsUsingTemplateRPCHandler;
 import 
org.apache.iotdb.confignode.client.async.handlers.rpc.FetchSchemaBlackListRPCHandler;
+import 
org.apache.iotdb.confignode.client.async.handlers.rpc.PipeHeartbeatRPCHandler;
 import 
org.apache.iotdb.confignode.client.async.handlers.rpc.SchemaUpdateRPCHandler;
 import org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceResp;
 import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateResp;
 import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListResp;
+import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -197,6 +199,14 @@ public class AsyncClientHandler<Q, R> {
             dataNodeLocationMap,
             (Map<Integer, TCheckTimeSeriesExistenceResp>) responseMap,
             countDownLatch);
+      case PIPE_HEARTBEAT:
+        return new PipeHeartbeatRPCHandler(
+            requestType,
+            requestId,
+            targetDataNode,
+            dataNodeLocationMap,
+            (Map<Integer, TPipeHeartbeatResp>) responseMap,
+            countDownLatch);
       case SET_TTL:
       case CREATE_DATA_REGION:
       case CREATE_SCHEMA_REGION:
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java
new file mode 100644
index 00000000000..222cbe36a13
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.client.async.handlers.rpc;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+public class PipeHeartbeatRPCHandler extends 
AbstractAsyncRPCHandler<TPipeHeartbeatResp> {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeHeartbeatRPCHandler.class);
+
+  public PipeHeartbeatRPCHandler(
+      DataNodeRequestType requestType,
+      int requestId,
+      TDataNodeLocation targetDataNode,
+      Map<Integer, TDataNodeLocation> dataNodeLocationMap,
+      Map<Integer, TPipeHeartbeatResp> responseMap,
+      CountDownLatch countDownLatch) {
+    super(requestType, requestId, targetDataNode, dataNodeLocationMap, 
responseMap, countDownLatch);
+  }
+
+  @Override
+  public void onComplete(TPipeHeartbeatResp response) {
+    // Put response
+    responseMap.put(requestId, response);
+    dataNodeLocationMap.remove(requestId);
+    LOGGER.info("Successfully {} on DataNode: {}", requestType, 
formattedTargetLocation);
+
+    // Always CountDown
+    countDownLatch.countDown();
+  }
+
+  @Override
+  public void onError(Exception e) {
+    LOGGER.error(
+        "Failed to "
+            + requestType
+            + " on DataNode: "
+            + formattedTargetLocation
+            + ", exception: "
+            + e.getMessage());
+
+    // Always CountDown
+    countDownLatch.countDown();
+  }
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index 4964ba79774..93033b4d9d9 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -221,6 +221,8 @@ public class ConfigRegionStateMachine
 
       threadPool.submit(
           () -> 
configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeMetaSync());
+      threadPool.submit(
+          () -> 
configManager.getPipeManager().getPipeRuntimeCoordinator().startPipeHeartbeat());
     } else {
       LOGGER.info(
           "Current node [nodeId:{}, ip:port: {}] is not longer the leader, the 
new leader is [nodeId:{}]",
@@ -230,6 +232,7 @@ public class ConfigRegionStateMachine
 
       // Stop leader scheduling services
       
configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeMetaSync();
+      
configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeHeartbeat();
       configManager.getLoadManager().stopLoadServices();
       configManager.getProcedureManager().shiftExecutor(false);
       configManager.getRetryFailedTasksThread().stopRetryFailedTasksService();
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
index 3e91b2b4013..ea42eb3f49c 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
@@ -127,9 +127,11 @@ public class HeartbeatService {
     
heartbeatReq.setSchemaQuotaCount(configManager.getClusterSchemaManager().getSchemaQuotaCount());
     // We collect pipe meta in every 100 heartbeat loop
     heartbeatReq.setNeedPipeMetaList(
-        heartbeatCounter.get()
-                % 
PipeConfig.getInstance().getHeartbeatLoopCyclesForCollectingPipeMeta()
-            == 0);
+        !PipeConfig.getInstance().isSeperatedPipeHeartbeatEnabled()
+            && heartbeatCounter.get()
+                    % PipeConfig.getInstance()
+                        .getPipeHeartbeatIntervalSecondsForCollectingPipeMeta()
+                == 0);
     if (!configManager.getClusterQuotaManager().hasSpaceQuotaLimit()) {
       
heartbeatReq.setSchemaRegionIds(configManager.getClusterQuotaManager().getSchemaRegionIds());
       
heartbeatReq.setDataRegionIds(configManager.getClusterQuotaManager().getDataRegionIds());
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatScheduler.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatScheduler.java
new file mode 100644
index 00000000000..86347785381
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatScheduler.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.runtime;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.confignode.client.DataNodeRequestType;
+import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
+import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
+import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class PipeHeartbeatScheduler {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeHeartbeatScheduler.class);
+
+  private static final boolean IS_SEPERATED_PIPE_HEARTBEAT_ENABLED =
+      PipeConfig.getInstance().isSeperatedPipeHeartbeatEnabled();
+  private static final long HEARTBEAT_INTERVAL_SECONDS =
+      
PipeConfig.getInstance().getPipeHeartbeatIntervalSecondsForCollectingPipeMeta();
+
+  private static final ScheduledExecutorService HEARTBEAT_EXECUTOR =
+      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+          ThreadName.PIPE_RUNTIME_HEARTBEAT.getName());
+
+  private final ConfigManager configManager;
+  private final PipeHeartbeatParser pipeHeartbeatParser;
+
+  private Future<?> heartbeatFuture;
+
+  PipeHeartbeatScheduler(ConfigManager configManager) {
+    this.configManager = configManager;
+    this.pipeHeartbeatParser = new PipeHeartbeatParser(configManager);
+  }
+
+  public synchronized void start() {
+    if (IS_SEPERATED_PIPE_HEARTBEAT_ENABLED && heartbeatFuture == null) {
+      heartbeatFuture =
+          ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+              HEARTBEAT_EXECUTOR,
+              this::heartbeat,
+              HEARTBEAT_INTERVAL_SECONDS,
+              HEARTBEAT_INTERVAL_SECONDS,
+              TimeUnit.SECONDS);
+      LOGGER.info("PipeHeartbeat is started successfully.");
+    }
+  }
+
+  private synchronized void heartbeat() {
+    if 
(configManager.getPipeManager().getPipeTaskCoordinator().getPipeTaskInfo().isEmpty())
 {
+      return;
+    }
+
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        configManager.getNodeManager().getRegisteredDataNodeLocations();
+    final TPipeHeartbeatReq request = new 
TPipeHeartbeatReq(System.currentTimeMillis());
+    LOGGER.info(String.format("Collecting pipe heartbeat %s from data nodes", 
request.heartbeatId));
+
+    final AsyncClientHandler<TPipeHeartbeatReq, TPipeHeartbeatResp> 
clientHandler =
+        new AsyncClientHandler<>(DataNodeRequestType.PIPE_HEARTBEAT, request, 
dataNodeLocationMap);
+    
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    clientHandler
+        .getResponseMap()
+        .forEach(
+            (dataNodeId, resp) ->
+                pipeHeartbeatParser.parseHeartbeat(dataNodeId, 
resp.getPipeMetaList()));
+  }
+
+  public synchronized void stop() {
+    if (IS_SEPERATED_PIPE_HEARTBEAT_ENABLED && heartbeatFuture != null) {
+      heartbeatFuture.cancel(false);
+      heartbeatFuture = null;
+      LOGGER.info("PipeHeartbeat is stopped successfully.");
+    }
+  }
+
+  public void parseHeartbeat(int dataNodeId, List<ByteBuffer> 
pipeMetaByteBufferListFromDataNode) {
+    pipeHeartbeatParser.parseHeartbeat(dataNodeId, 
pipeMetaByteBufferListFromDataNode);
+  }
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeLeaderChangeHandler.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeLeaderChangeHandler.java
index 1ea3cb00c2e..4d1f728714d 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeLeaderChangeHandler.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeLeaderChangeHandler.java
@@ -61,13 +61,16 @@ public class PipeLeaderChangeHandler implements 
IClusterStatusSubscriber {
               if 
(regionGroupId.getType().equals(TConsensusGroupType.DataRegion)) {
                 final String databaseName =
                     
configManager.getPartitionManager().getRegionStorageGroup(regionGroupId);
+                // pipe only collect user's data, filter metric database here.
                 if (databaseName != null && 
!databaseName.equals(IoTDBConfig.SYSTEM_DATABASE)) {
-                  // pipe only collect user's data, filter metric database 
here.
-                  dataRegionGroupToOldAndNewLeaderPairMap.put(
-                      regionGroupId,
-                      new Pair<>( // null or -1 means empty origin leader
-                          pair.left == null ? -1 : pair.left,
-                          pair.right == null ? -1 : pair.right));
+                  // null or -1 means empty origin leader
+                  final int oldLeaderDataNodeId = (pair.left == null ? -1 : 
pair.left);
+                  final int newLeaderDataNodeId = (pair.right == null ? -1 : 
pair.right);
+
+                  if (oldLeaderDataNodeId != newLeaderDataNodeId) {
+                    dataRegionGroupToOldAndNewLeaderPairMap.put(
+                        regionGroupId, new Pair<>(oldLeaderDataNodeId, 
newLeaderDataNodeId));
+                  }
                 }
               }
             });
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
index 59ed8e5056c..87a51eebe8f 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
@@ -41,15 +41,15 @@ public class PipeRuntimeCoordinator implements 
IClusterStatusSubscriber {
   private final ExecutorService procedureSubmitter;
 
   private final PipeLeaderChangeHandler pipeLeaderChangeHandler;
-  private final PipeHeartbeatParser pipeHeartbeatParser;
   private final PipeMetaSyncer pipeMetaSyncer;
+  private final PipeHeartbeatScheduler pipeHeartbeatScheduler;
 
   public PipeRuntimeCoordinator(ConfigManager configManager) {
     if (procedureSubmitterHolder.get() == null) {
       synchronized (PipeRuntimeCoordinator.class) {
         if (procedureSubmitterHolder.get() == null) {
           procedureSubmitterHolder.set(
-              IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+              IoTDBThreadPoolFactory.newSingleThreadExecutor(
                   ThreadName.PIPE_RUNTIME_PROCEDURE_SUBMITTER.getName()));
         }
       }
@@ -57,8 +57,8 @@ public class PipeRuntimeCoordinator implements 
IClusterStatusSubscriber {
     procedureSubmitter = procedureSubmitterHolder.get();
 
     pipeLeaderChangeHandler = new PipeLeaderChangeHandler(configManager);
-    pipeHeartbeatParser = new PipeHeartbeatParser(configManager);
     pipeMetaSyncer = new PipeMetaSyncer(configManager);
+    pipeHeartbeatScheduler = new PipeHeartbeatScheduler(configManager);
   }
 
   public ExecutorService getProcedureSubmitter() {
@@ -75,11 +75,6 @@ public class PipeRuntimeCoordinator implements 
IClusterStatusSubscriber {
     pipeLeaderChangeHandler.onRegionGroupLeaderChanged(event);
   }
 
-  public void parseHeartbeat(
-      int dataNodeId, @NotNull List<ByteBuffer> 
pipeMetaByteBufferListFromDataNode) {
-    pipeHeartbeatParser.parseHeartbeat(dataNodeId, 
pipeMetaByteBufferListFromDataNode);
-  }
-
   public void startPipeMetaSync() {
     pipeMetaSyncer.start();
   }
@@ -87,4 +82,17 @@ public class PipeRuntimeCoordinator implements 
IClusterStatusSubscriber {
   public void stopPipeMetaSync() {
     pipeMetaSyncer.stop();
   }
+
+  public void startPipeHeartbeat() {
+    pipeHeartbeatScheduler.start();
+  }
+
+  public void stopPipeHeartbeat() {
+    pipeHeartbeatScheduler.stop();
+  }
+
+  public void parseHeartbeat(
+      int dataNodeId, @NotNull List<ByteBuffer> 
pipeMetaByteBufferListFromDataNode) {
+    pipeHeartbeatScheduler.parseHeartbeat(dataNodeId, 
pipeMetaByteBufferListFromDataNode);
+  }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 46940fc239b..728e3f30792 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -54,7 +54,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTaskInfo.class);
   private static final String SNAPSHOT_FILE_NAME = "pipe_task_info.bin";
 
-  private final ReentrantLock pipeTaskInfoLock = new ReentrantLock();
+  private final ReentrantLock pipeTaskInfoLock = new ReentrantLock(true);
 
   private final PipeMetaKeeper pipeMetaKeeper;
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 76b2cd2149e..4cab4b62642 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -96,7 +96,7 @@ public class ConfigNodeProcedureEnv {
   /** pipe operation lock */
   private final LockQueue pipeLock = new LockQueue();
 
-  private final ReentrantLock schedulerLock = new ReentrantLock();
+  private final ReentrantLock schedulerLock = new ReentrantLock(true);
 
   private final ConfigManager configManager;
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
index c07982381cd..33185518dbf 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
@@ -124,28 +124,14 @@ public class PipeHandleLeaderChangeProcedure extends 
AbstractOperatePipeProcedur
   protected void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv 
env) {
     LOGGER.info("PipeHandleLeaderChangeProcedure: 
rollbackFromHandleOnConfigNodes");
 
-    final Map<TConsensusGroupId, Integer> 
oldDataRegionGroupIdToLeaderDataRegionIdMap =
-        new HashMap<>();
-    dataRegionGroupToOldAndNewLeaderPairMap.forEach(
-        (regionGroupId, oldNewLeaderPair) ->
-            oldDataRegionGroupIdToLeaderDataRegionIdMap.put(
-                regionGroupId, oldNewLeaderPair.getLeft()));
-
-    final PipeHandleLeaderChangePlan pipeHandleLeaderChangePlan =
-        new 
PipeHandleLeaderChangePlan(oldDataRegionGroupIdToLeaderDataRegionIdMap);
-
-    final ConsensusWriteResponse response =
-        
env.getConfigManager().getConsensusManager().write(pipeHandleLeaderChangePlan);
-    if (!response.isSuccessful()) {
-      throw new PipeException(response.getErrorMessage());
-    }
+    // nothing to do
   }
 
   @Override
   protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
     LOGGER.info("PipeHandleLeaderChangeProcedure: 
rollbackFromCreateOnDataNodes");
 
-    pushPipeMetaToDataNodesIgnoreException(env);
+    // nothing to do
   }
 
   @Override
diff --git a/iotdb-protocol/thrift/src/main/thrift/datanode.thrift 
b/iotdb-protocol/thrift/src/main/thrift/datanode.thrift
index 43d26a03835..0deb7b78362 100644
--- a/iotdb-protocol/thrift/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift/src/main/thrift/datanode.thrift
@@ -265,6 +265,14 @@ struct THeartbeatResp {
   10: optional list<binary> pipeMetaList
 }
 
+struct TPipeHeartbeatReq {
+  1: required i64 heartbeatId
+}
+
+struct TPipeHeartbeatResp {
+  1: required list<binary> pipeMetaList
+}
+
 enum TSchemaLimitLevel{
     DEVICE,
     TIMESERIES
@@ -797,6 +805,11 @@ service IDataNodeRPCService {
   */
   common.TSStatus pushPipeMeta(TPushPipeMetaReq req)
 
+  /**
+  * ConfigNode will ask DataNode for pipe meta in every few seconds
+  **/
+  TPipeHeartbeatResp pipeHeartbeat(TPipeHeartbeatReq req)
+
  /**
   * Execute CQ on DataNode
   */
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties 
b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 0bd89a4a617..f314a1c3579 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -977,8 +977,12 @@ cluster_name=defaultCluster
 # The size of the pending queue for the PipeConnector to store the events.
 # pipe_connector_pending_queue_size=1024
 
-# The number of heartbeat loop cycles before collecting pipe meta once
-# pipe_heartbeat_loop_cycles_for_collecting_pipe_meta=100
+# True if the pipe heartbeat is seperated from the cluster's heartbeat, false 
the pipe heartbeat is
+# merged with the cluster's heartbeat.
+# pipe_heartbeat_seperated_mode_enabled=true
+
+# The interval time between the heartbeat that collecting pipe meta (in 
seconds).
+# pipe_heartbeat_interval_seconds_for_collecting_pipe_meta=100
 
 # The initial delay before starting the PipeMetaSyncer service.
 # pipe_meta_syncer_initial_sync_delay_minutes=3
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index e9a136a7b08..40eca81c898 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -108,6 +108,7 @@ public enum ThreadName {
   PIPE_CONNECTOR_EXECUTOR_POOL("Pipe-Connector-Executor-Pool"),
   PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL("Pipe-SubTask-Callback-Executor-Pool"),
   PIPE_RUNTIME_META_SYNCER("Pipe-Runtime-Meta-Syncer"),
+  PIPE_RUNTIME_HEARTBEAT("Pipe-Runtime-Heartbeat"),
   PIPE_RUNTIME_PROCEDURE_SUBMITTER("Pipe-Runtime-Procedure-Submitter"),
   PIPE_WAL_RESOURCE_TTL_CHECKER("Pipe-WAL-Resource-TTL-Checker"),
   WINDOW_EVALUATION_SERVICE("WindowEvaluationTaskPoolManager"),
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java 
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 7177c043878..f84931263ba 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -160,7 +160,8 @@ public class CommonConfig {
   private long pipeConnectorRetryIntervalMs = 1000L;
   private int pipeConnectorPendingQueueSize = 1024;
 
-  private int pipeHeartbeatLoopCyclesForCollectingPipeMeta = 100;
+  private boolean isSeperatedPipeHeartbeatEnabled = true;
+  private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 100;
   private long pipeMetaSyncerInitialSyncDelayMinutes = 3;
   private long pipeMetaSyncerSyncIntervalMinutes = 3;
 
@@ -499,14 +500,22 @@ public class CommonConfig {
     this.pipeConnectorReadFileBufferSize = pipeConnectorReadFileBufferSize;
   }
 
-  public int getPipeHeartbeatLoopCyclesForCollectingPipeMeta() {
-    return pipeHeartbeatLoopCyclesForCollectingPipeMeta;
+  public boolean isSeperatedPipeHeartbeatEnabled() {
+    return isSeperatedPipeHeartbeatEnabled;
   }
 
-  public void setPipeHeartbeatLoopCyclesForCollectingPipeMeta(
-      int pipeHeartbeatLoopCyclesForCollectingPipeMeta) {
-    this.pipeHeartbeatLoopCyclesForCollectingPipeMeta =
-        pipeHeartbeatLoopCyclesForCollectingPipeMeta;
+  public void setSeperatedPipeHeartbeatEnabled(boolean 
isSeperatedPipeHeartbeatEnabled) {
+    this.isSeperatedPipeHeartbeatEnabled = isSeperatedPipeHeartbeatEnabled;
+  }
+
+  public int getPipeHeartbeatIntervalSecondsForCollectingPipeMeta() {
+    return pipeHeartbeatIntervalSecondsForCollectingPipeMeta;
+  }
+
+  public void setPipeHeartbeatIntervalSecondsForCollectingPipeMeta(
+      int pipeHeartbeatIntervalSecondsForCollectingPipeMeta) {
+    this.pipeHeartbeatIntervalSecondsForCollectingPipeMeta =
+        pipeHeartbeatIntervalSecondsForCollectingPipeMeta;
   }
 
   public long getPipeMetaSyncerInitialSyncDelayMinutes() {
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index a8058a549b7..be03167bf1d 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -304,11 +304,16 @@ public class CommonDescriptor {
                 "pipe_connector_pending_queue_size",
                 String.valueOf(config.getPipeConnectorPendingQueueSize()))));
 
-    config.setPipeHeartbeatLoopCyclesForCollectingPipeMeta(
+    config.setSeperatedPipeHeartbeatEnabled(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "pipe_heartbeat_seperated_mode_enabled",
+                String.valueOf(config.isSeperatedPipeHeartbeatEnabled()))));
+    config.setPipeHeartbeatIntervalSecondsForCollectingPipeMeta(
         Integer.parseInt(
             properties.getProperty(
-                "pipe_heartbeat_loop_cycles_for_collecting_pipe_meta",
-                
String.valueOf(config.getPipeHeartbeatLoopCyclesForCollectingPipeMeta()))));
+                "pipe_heartbeat_interval_seconds_for_collecting_pipe_meta",
+                
String.valueOf(config.getPipeHeartbeatIntervalSecondsForCollectingPipeMeta()))));
     config.setPipeMetaSyncerInitialSyncDelayMinutes(
         Long.parseLong(
             properties.getProperty(
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 077048ac11e..b6ffc2238c6 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -87,8 +87,12 @@ public class PipeConfig {
 
   /////////////////////////////// Meta Consistency 
///////////////////////////////
 
-  public int getHeartbeatLoopCyclesForCollectingPipeMeta() {
-    return COMMON_CONFIG.getPipeHeartbeatLoopCyclesForCollectingPipeMeta();
+  public boolean isSeperatedPipeHeartbeatEnabled() {
+    return COMMON_CONFIG.isSeperatedPipeHeartbeatEnabled();
+  }
+
+  public int getPipeHeartbeatIntervalSecondsForCollectingPipeMeta() {
+    return 
COMMON_CONFIG.getPipeHeartbeatIntervalSecondsForCollectingPipeMeta();
   }
 
   public long getPipeMetaSyncerInitialSyncDelayMinutes() {
@@ -129,9 +133,10 @@ public class PipeConfig {
     LOGGER.info("PipeConnectorRetryIntervalMs: {}", 
getPipeConnectorRetryIntervalMs());
     LOGGER.info("PipeConnectorPendingQueueSize: {}", 
getPipeConnectorPendingQueueSize());
 
+    LOGGER.info("SeperatedPipeHeartbeatEnabled: {}", 
isSeperatedPipeHeartbeatEnabled());
     LOGGER.info(
-        "HeartbeatLoopCyclesForCollectingPipeMeta: {}",
-        getHeartbeatLoopCyclesForCollectingPipeMeta());
+        "PipeHeartbeatIntervalSecondsForCollectingPipeMeta: {}",
+        getPipeHeartbeatIntervalSecondsForCollectingPipeMeta());
     LOGGER.info(
         "PipeMetaSyncerInitialSyncDelayMinutes: {}", 
getPipeMetaSyncerInitialSyncDelayMinutes());
     LOGGER.info("PipeMetaSyncerSyncIntervalMinutes: {}", 
getPipeMetaSyncerSyncIntervalMinutes());
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index bbbbd3a7d23..69e7425c00b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -35,6 +35,8 @@ import org.apache.iotdb.db.pipe.task.PipeTaskBuilder;
 import org.apache.iotdb.db.pipe.task.PipeTaskManager;
 import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
 import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
+import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
+import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import org.apache.thrift.TException;
@@ -633,4 +635,24 @@ public class PipeTaskAgent {
     }
     resp.setPipeMetaList(pipeMetaBinaryList);
   }
+
+  public synchronized void collectPipeMetaList(TPipeHeartbeatReq req, 
TPipeHeartbeatResp resp)
+      throws TException {
+    // do nothing if data node is removing or removed, or request does not 
need pipe meta list
+    if (PipeAgent.runtime().isShutdown()) {
+      return;
+    }
+    LOGGER.info("Received pipe heartbeat request {} from config node.", 
req.heartbeatId);
+
+    final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
+    try {
+      for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
+        pipeMetaBinaryList.add(pipeMeta.serialize());
+        LOGGER.info("Reporting pipe meta: {}", pipeMeta);
+      }
+    } catch (IOException e) {
+      throw new TException(e);
+    }
+    resp.setPipeMetaList(pipeMetaBinaryList);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
index b31fd026a11..66d6f235910 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -174,6 +174,8 @@ import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
 import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
 import org.apache.iotdb.mpp.rpc.thrift.TLoadSample;
 import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
+import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
+import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
@@ -943,6 +945,13 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
     }
   }
 
+  @Override
+  public TPipeHeartbeatResp pipeHeartbeat(TPipeHeartbeatReq req) throws 
TException {
+    final TPipeHeartbeatResp resp = new TPipeHeartbeatResp();
+    PipeAgent.task().collectPipeMetaList(req, resp);
+    return resp;
+  }
+
   private TSStatus executeInternalSchemaTask(
       List<TConsensusGroupId> consensusGroupIdList,
       Function<TConsensusGroupId, TSStatus> executeOnOneRegion) {


Reply via email to