This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 40934ddc572 Pipe: Add completion signal to historical events & allow 
all data regions' completion signal to drop the pipe (#12490)
40934ddc572 is described below

commit 40934ddc5726d5b13787450f072951020dcdfa61
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 23 12:01:59 2024 +0800

    Pipe: Add completion signal to historical events & allow all data regions' 
completion signal to drop the pipe (#12490)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../pipe/it/autocreate/IoTDBPipeAutoDropIT.java    |  80 +++++++++++++++
 .../iotdb/pipe/it/manual/IoTDBPipeInclusionIT.java |   4 +-
 .../heartbeat/DataNodeHeartbeatHandler.java        |   3 +-
 .../runtime/PipeRuntimeCoordinator.java            |  18 ++--
 .../runtime/heartbeat/PipeHeartbeat.java           |  61 ++++++++++++
 .../{ => heartbeat}/PipeHeartbeatParser.java       |  51 +++++-----
 .../{ => heartbeat}/PipeHeartbeatScheduler.java    |  20 ++--
 .../confignode/persistence/pipe/PipeTaskInfo.java  |  89 ++++++++++-------
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  | 107 ++++++++++++++++++++-
 .../event/common/terminate/PipeTerminateEvent.java |  94 ++++++++++++++++++
 .../PipeHistoricalDataRegionTsFileExtractor.java   |  56 +++++++----
 .../iotdb/db/pipe/task/PipeDataNodeTask.java       |  20 +++-
 .../config/constant/PipeExtractorConstant.java     |   6 ++
 .../iotdb/commons/pipe/task/meta/PipeMeta.java     |  38 +++++---
 .../commons/pipe/task/meta/PipeTemporaryMeta.java  |  60 ++++++++++++
 .../src/main/thrift/datanode.thrift                |   2 +
 16 files changed, 593 insertions(+), 116 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
new file mode 100644
index 00000000000..b223a245d0a
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.autocreate;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class IoTDBPipeAutoDropIT extends AbstractPipeDualAutoIT {
+  @Test
+  public void testAutoDropInHistoricalTransfer() throws Exception {
+    final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    final String receiverIp = receiverDataNode.getIp();
+    final int receiverPort = receiverDataNode.getPort();
+
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+
+      if (!TestUtils.tryExecuteNonQueryWithRetry(
+          senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
+        return;
+      }
+
+      final Map<String, String> extractorAttributes = new HashMap<>();
+      final Map<String, String> processorAttributes = new HashMap<>();
+      final Map<String, String> connectorAttributes = new HashMap<>();
+
+      
extractorAttributes.put("extractor.history.terminate-pipe-on-all-consumed", 
"true");
+
+      connectorAttributes.put("connector", "iotdb-thrift-connector");
+      connectorAttributes.put("connector.batch.enable", "false");
+      connectorAttributes.put("connector.ip", receiverIp);
+      connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
+
+      final TSStatus status =
+          client.createPipe(
+              new TCreatePipeReq("p1", connectorAttributes)
+                  .setExtractorAttributes(extractorAttributes)
+                  .setProcessorAttributes(processorAttributes));
+
+      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
+      Assert.assertEquals(
+          TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("p1").getCode());
+
+      TestUtils.assertDataEventuallyOnEnv(
+          senderEnv,
+          "show pipes",
+          
"ID,CreationTime,State,PipeSource,PipeProcessor,PipeSink,ExceptionMessage,",
+          Collections.emptySet());
+    }
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeInclusionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeInclusionIT.java
index 7a8ff6c3f15..4515536c253 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeInclusionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeInclusionIT.java
@@ -229,7 +229,7 @@ public class IoTDBPipeInclusionIT extends 
AbstractPipeDualManualIT {
           senderEnv,
           Arrays.asList(
               "create timeseries root.ln.wf01.wt01.status with 
datatype=BOOLEAN,encoding=PLAIN",
-              "insert into root.ln.wf01.wt01(time, status) values(0, 1)",
+              "insert into root.ln.wf01.wt01(time, status) values(0, true)",
               "flush"))) {
         return;
       }
@@ -240,7 +240,7 @@ public class IoTDBPipeInclusionIT extends 
AbstractPipeDualManualIT {
           receiverEnv,
           Arrays.asList(
               "create timeseries root.ln.wf01.wt01.status1 with 
datatype=BOOLEAN,encoding=PLAIN",
-              "insert into root.ln.wf01.wt01(time, status1) values(0, 1)",
+              "insert into root.ln.wf01.wt01(time, status1) values(0, true)",
               "flush"))) {
         return;
       }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
index 49981d72834..5829fd07a56 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
@@ -130,7 +130,8 @@ public class DataNodeHeartbeatHandler implements 
AsyncMethodCallback<TDataNodeHe
       regionDisk.putAll(heartbeatResp.getRegionDisk());
     }
     if (heartbeatResp.getPipeMetaList() != null) {
-      pipeRuntimeCoordinator.parseHeartbeat(nodeId, 
heartbeatResp.getPipeMetaList());
+      pipeRuntimeCoordinator.parseHeartbeat(
+          nodeId, heartbeatResp.getPipeMetaList(), 
heartbeatResp.getPipeCompletedList());
     }
     if (heartbeatResp.isSetConfirmedConfigNodeEndPoints()) {
       loadManager
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
index 8d9644eae5f..97f893f4eae 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
@@ -26,6 +26,8 @@ import 
org.apache.iotdb.confignode.manager.load.subscriber.ConsensusGroupStatist
 import 
org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
 import 
org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent;
 import 
org.apache.iotdb.confignode.manager.load.subscriber.RegionGroupStatisticsChangeEvent;
+import 
org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat.PipeHeartbeat;
+import 
org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat.PipeHeartbeatScheduler;
 
 import javax.validation.constraints.NotNull;
 
@@ -45,7 +47,7 @@ public class PipeRuntimeCoordinator implements 
IClusterStatusSubscriber {
   private final PipeMetaSyncer pipeMetaSyncer;
   private final PipeHeartbeatScheduler pipeHeartbeatScheduler;
 
-  public PipeRuntimeCoordinator(ConfigManager configManager) {
+  public PipeRuntimeCoordinator(final ConfigManager configManager) {
     if (procedureSubmitterHolder.get() == null) {
       synchronized (PipeRuntimeCoordinator.class) {
         if (procedureSubmitterHolder.get() == null) {
@@ -71,18 +73,18 @@ public class PipeRuntimeCoordinator implements 
IClusterStatusSubscriber {
   }
 
   @Override
-  public void onNodeStatisticsChanged(NodeStatisticsChangeEvent event) {
+  public void onNodeStatisticsChanged(final NodeStatisticsChangeEvent event) {
     // Do nothing
   }
 
   @Override
-  public void onRegionGroupStatisticsChanged(RegionGroupStatisticsChangeEvent 
event) {
+  public void onRegionGroupStatisticsChanged(final 
RegionGroupStatisticsChangeEvent event) {
     // Do nothing
   }
 
   @Override
   public synchronized void onConsensusGroupStatisticsChanged(
-      ConsensusGroupStatisticsChangeEvent event) {
+      final ConsensusGroupStatisticsChangeEvent event) {
     pipeLeaderChangeHandler.onConsensusGroupStatisticsChanged(event);
   }
 
@@ -103,7 +105,11 @@ public class PipeRuntimeCoordinator implements 
IClusterStatusSubscriber {
   }
 
   public void parseHeartbeat(
-      int dataNodeId, @NotNull List<ByteBuffer> 
pipeMetaByteBufferListFromDataNode) {
-    pipeHeartbeatScheduler.parseHeartbeat(dataNodeId, 
pipeMetaByteBufferListFromDataNode);
+      final int dataNodeId,
+      @NotNull final List<ByteBuffer> pipeMetaByteBufferListFromDataNode,
+      /* @Nullable */ final List<Boolean> pipeCompletedListFromAgent) {
+    pipeHeartbeatScheduler.parseHeartbeat(
+        dataNodeId,
+        new PipeHeartbeat(pipeMetaByteBufferListFromDataNode, 
pipeCompletedListFromAgent));
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
new file mode 100644
index 00000000000..203ba96ed44
--- /dev/null
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat;
+
+import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
+
+import javax.validation.constraints.NotNull;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class PipeHeartbeat {
+
+  private final Map<PipeStaticMeta, PipeMeta> pipeMetaMap = new HashMap<>();
+  private final Map<PipeStaticMeta, Boolean> isCompletedMap = new HashMap<>();
+
+  public PipeHeartbeat(
+      @NotNull final List<ByteBuffer> pipeMetaByteBufferListFromAgent,
+      /* @Nullable */ final List<Boolean> pipeCompletedListFromAgent) {
+    for (int i = 0; i < pipeMetaByteBufferListFromAgent.size(); ++i) {
+      final PipeMeta pipeMeta = 
PipeMeta.deserialize(pipeMetaByteBufferListFromAgent.get(i));
+      pipeMetaMap.put(pipeMeta.getStaticMeta(), pipeMeta);
+      isCompletedMap.put(
+          pipeMeta.getStaticMeta(),
+          Objects.nonNull(pipeCompletedListFromAgent) && 
pipeCompletedListFromAgent.get(i));
+    }
+  }
+
+  public PipeMeta getPipeMeta(PipeStaticMeta pipeStaticMeta) {
+    return pipeMetaMap.get(pipeStaticMeta);
+  }
+
+  public Boolean isCompleted(PipeStaticMeta pipeStaticMeta) {
+    return isCompletedMap.get(pipeStaticMeta);
+  }
+
+  public boolean isEmpty() {
+    return pipeMetaMap.isEmpty();
+  }
+}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatParser.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
similarity index 88%
rename from 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatParser.java
rename to 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
index 3bda79f0dd5..68ea140dda9 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatParser.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
@@ -17,16 +17,16 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime;
+package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat;
 
 import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
-import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTemporaryMeta;
 import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
@@ -34,12 +34,8 @@ import 
org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.validation.constraints.NotNull;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -55,7 +51,7 @@ public class PipeHeartbeatParser {
   private final AtomicBoolean needWriteConsensusOnConfigNodes;
   private final AtomicBoolean needPushPipeMetaToDataNodes;
 
-  PipeHeartbeatParser(ConfigManager configManager) {
+  PipeHeartbeatParser(final ConfigManager configManager) {
     this.configManager = configManager;
 
     heartbeatCounter = 0;
@@ -65,8 +61,7 @@ public class PipeHeartbeatParser {
     needPushPipeMetaToDataNodes = new AtomicBoolean(false);
   }
 
-  public synchronized void parseHeartbeat(
-      int nodeId, @NotNull List<ByteBuffer> pipeMetaByteBufferListFromAgent) {
+  synchronized void parseHeartbeat(final int nodeId, final PipeHeartbeat 
pipeHeartbeat) {
     final long heartbeatCount = ++heartbeatCounter;
 
     final AtomicBoolean canSubmitHandleMetaChangeProcedure = new 
AtomicBoolean(false);
@@ -87,7 +82,7 @@ public class PipeHeartbeatParser {
       }
     }
 
-    if (pipeMetaByteBufferListFromAgent.isEmpty()
+    if (pipeHeartbeat.isEmpty()
         && !(canSubmitHandleMetaChangeProcedure.get()
             && (needWriteConsensusOnConfigNodes.get() || 
needPushPipeMetaToDataNodes.get()))) {
       return;
@@ -108,9 +103,8 @@ public class PipeHeartbeatParser {
               }
 
               try {
-                if (!pipeMetaByteBufferListFromAgent.isEmpty()) {
-                  parseHeartbeatAndSaveMetaChangeLocally(
-                      pipeTaskInfo, nodeId, pipeMetaByteBufferListFromAgent);
+                if (!pipeHeartbeat.isEmpty()) {
+                  parseHeartbeatAndSaveMetaChangeLocally(pipeTaskInfo, nodeId, 
pipeHeartbeat);
                 }
 
                 if (canSubmitHandleMetaChangeProcedure.get()
@@ -134,16 +128,10 @@ public class PipeHeartbeatParser {
   private void parseHeartbeatAndSaveMetaChangeLocally(
       final AtomicReference<PipeTaskInfo> pipeTaskInfo,
       final int nodeId,
-      @NotNull final List<ByteBuffer> pipeMetaByteBufferListFromAgent) {
-    final Map<PipeStaticMeta, PipeMeta> pipeMetaMapFromAgent = new HashMap<>();
-    for (ByteBuffer byteBuffer : pipeMetaByteBufferListFromAgent) {
-      final PipeMeta pipeMeta = PipeMeta.deserialize(byteBuffer);
-      pipeMetaMapFromAgent.put(pipeMeta.getStaticMeta(), pipeMeta);
-    }
-
+      final PipeHeartbeat pipeHeartbeat) {
     for (final PipeMeta pipeMetaFromCoordinator : 
pipeTaskInfo.get().getPipeMetaList()) {
       final PipeMeta pipeMetaFromAgent =
-          pipeMetaMapFromAgent.get(pipeMetaFromCoordinator.getStaticMeta());
+          pipeHeartbeat.getPipeMeta(pipeMetaFromCoordinator.getStaticMeta());
       if (pipeMetaFromAgent == null) {
         LOGGER.info(
             "PipeRuntimeCoordinator meets error in updating pipeMetaKeeper, "
@@ -152,6 +140,25 @@ public class PipeHeartbeatParser {
         continue;
       }
 
+      // Remove completed pipes
+      final Boolean isPipeCompletedFromAgent =
+          pipeHeartbeat.isCompleted(pipeMetaFromCoordinator.getStaticMeta());
+      if (Boolean.TRUE.equals(isPipeCompletedFromAgent)) {
+        final PipeTemporaryMeta temporaryMeta = 
pipeMetaFromCoordinator.getTemporaryMeta();
+
+        temporaryMeta.markDataNodeCompleted(nodeId);
+
+        final Set<Integer> uncompletedDataNodeIds =
+            
configManager.getNodeManager().getRegisteredDataNodeLocations().keySet();
+        
uncompletedDataNodeIds.removeAll(temporaryMeta.getCompletedDataNodeIds());
+        if (uncompletedDataNodeIds.isEmpty()) {
+          
pipeTaskInfo.get().removePipeMeta(pipeMetaFromCoordinator.getStaticMeta().getPipeName());
+          needWriteConsensusOnConfigNodes.set(true);
+          needPushPipeMetaToDataNodes.set(true);
+          continue;
+        }
+      }
+
       final Map<Integer, PipeTaskMeta> pipeTaskMetaMapFromCoordinator =
           
pipeMetaFromCoordinator.getRuntimeMeta().getConsensusGroupId2TaskMetaMap();
       final Map<Integer, PipeTaskMeta> pipeTaskMetaMapFromAgent =
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatScheduler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
similarity index 90%
rename from 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatScheduler.java
rename to 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
index 65217818e7d..462b6a017a8 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeHeartbeatScheduler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime;
+package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat;
 
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
@@ -36,8 +36,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.ByteBuffer;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
@@ -61,7 +59,7 @@ public class PipeHeartbeatScheduler {
 
   private Future<?> heartbeatFuture;
 
-  PipeHeartbeatScheduler(ConfigManager configManager) {
+  public PipeHeartbeatScheduler(final ConfigManager configManager) {
     this.configManager = configManager;
     this.pipeHeartbeatParser = new PipeHeartbeatParser(configManager);
   }
@@ -90,7 +88,7 @@ public class PipeHeartbeatScheduler {
       return;
     }
 
-    // data node heartbeat
+    // Data node heartbeat
     final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
         configManager.getNodeManager().getRegisteredDataNodeLocations();
     final TPipeHeartbeatReq request = new 
TPipeHeartbeatReq(System.currentTimeMillis());
@@ -109,7 +107,9 @@ public class PipeHeartbeatScheduler {
         .getResponseMap()
         .forEach(
             (dataNodeId, resp) ->
-                pipeHeartbeatParser.parseHeartbeat(dataNodeId, 
resp.getPipeMetaList()));
+                pipeHeartbeatParser.parseHeartbeat(
+                    dataNodeId,
+                    new PipeHeartbeat(resp.getPipeMetaList(), 
resp.getPipeCompletedList())));
 
     // config node heartbeat
     try {
@@ -117,8 +117,8 @@ public class PipeHeartbeatScheduler {
       PipeConfigNodeAgent.task().collectPipeMetaList(request, configNodeResp);
       pipeHeartbeatParser.parseHeartbeat(
           ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
-          configNodeResp.getPipeMetaList());
-    } catch (Exception e) {
+          new PipeHeartbeat(configNodeResp.getPipeMetaList(), null));
+    } catch (final Exception e) {
       LOGGER.warn("Failed to collect pipe meta list from config node task 
agent", e);
     }
   }
@@ -131,7 +131,7 @@ public class PipeHeartbeatScheduler {
     }
   }
 
-  public void parseHeartbeat(int dataNodeId, List<ByteBuffer> 
pipeMetaByteBufferListFromDataNode) {
-    pipeHeartbeatParser.parseHeartbeat(dataNodeId, 
pipeMetaByteBufferListFromDataNode);
+  public void parseHeartbeat(final int dataNodeId, final PipeHeartbeat 
pipeHeartbeat) {
+    pipeHeartbeatParser.parseHeartbeat(dataNodeId, pipeHeartbeat);
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index d41493762d2..201f2f049ed 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -143,7 +143,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
 
   /////////////////////////////// Validator ///////////////////////////////
 
-  public void checkBeforeCreatePipe(TCreatePipeReq createPipeRequest) throws 
PipeException {
+  public void checkBeforeCreatePipe(final TCreatePipeReq createPipeRequest) 
throws PipeException {
     acquireReadLock();
     try {
       checkBeforeCreatePipeInternal(createPipeRequest);
@@ -166,7 +166,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
     throw new PipeException(exceptionMessage);
   }
 
-  public void checkAndUpdateRequestBeforeAlterPipe(TAlterPipeReq 
alterPipeRequest)
+  public void checkAndUpdateRequestBeforeAlterPipe(final TAlterPipeReq 
alterPipeRequest)
       throws PipeException {
     acquireReadLock();
     try {
@@ -176,7 +176,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
     }
   }
 
-  private void checkAndUpdateRequestBeforeAlterPipeInternal(TAlterPipeReq 
alterPipeRequest)
+  private void checkAndUpdateRequestBeforeAlterPipeInternal(final 
TAlterPipeReq alterPipeRequest)
       throws PipeException {
     if (!isPipeExisted(alterPipeRequest.getPipeName())) {
       final String exceptionMessage =
@@ -229,7 +229,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
     }
   }
 
-  public void checkBeforeStartPipe(String pipeName) throws PipeException {
+  public void checkBeforeStartPipe(final String pipeName) throws PipeException 
{
     acquireReadLock();
     try {
       checkBeforeStartPipeInternal(pipeName);
@@ -238,7 +238,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
     }
   }
 
-  private void checkBeforeStartPipeInternal(String pipeName) throws 
PipeException {
+  private void checkBeforeStartPipeInternal(final String pipeName) throws 
PipeException {
     if (!isPipeExisted(pipeName)) {
       final String exceptionMessage =
           String.format("Failed to start pipe %s, the pipe does not exist", 
pipeName);
@@ -255,7 +255,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
     }
   }
 
-  public void checkBeforeStopPipe(String pipeName) throws PipeException {
+  public void checkBeforeStopPipe(final String pipeName) throws PipeException {
     acquireReadLock();
     try {
       checkBeforeStopPipeInternal(pipeName);
@@ -264,7 +264,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
     }
   }
 
-  private void checkBeforeStopPipeInternal(String pipeName) throws 
PipeException {
+  private void checkBeforeStopPipeInternal(final String pipeName) throws 
PipeException {
     if (!isPipeExisted(pipeName)) {
       final String exceptionMessage =
           String.format("Failed to stop pipe %s, the pipe does not exist", 
pipeName);
@@ -281,7 +281,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
     }
   }
 
-  public void checkBeforeDropPipe(String pipeName) {
+  public void checkBeforeDropPipe(final String pipeName) {
     acquireReadLock();
     try {
       checkBeforeDropPipeInternal(pipeName);
@@ -290,7 +290,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
     }
   }
 
-  private void checkBeforeDropPipeInternal(String pipeName) {
+  private void checkBeforeDropPipeInternal(final String pipeName) {
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug(
           "Check before drop pipe {}, pipe exists: {}.", pipeName, 
isPipeExisted(pipeName));
@@ -300,7 +300,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
     // DO NOTHING HERE!
   }
 
-  public boolean isPipeExisted(String pipeName) {
+  public boolean isPipeExisted(final String pipeName) {
     acquireReadLock();
     try {
       return pipeMetaKeeper.containsPipeMeta(pipeName);
@@ -309,7 +309,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
     }
   }
 
-  private PipeStatus getPipeStatus(String pipeName) {
+  private PipeStatus getPipeStatus(final String pipeName) {
     acquireReadLock();
     try {
       return 
pipeMetaKeeper.getPipeMeta(pipeName).getRuntimeMeta().getStatus().get();
@@ -318,7 +318,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
     }
   }
 
-  public boolean isPipeRunning(String pipeName) {
+  public boolean isPipeRunning(final String pipeName) {
     acquireReadLock();
     try {
       return pipeMetaKeeper.containsPipeMeta(pipeName)
@@ -328,7 +328,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
     }
   }
 
-  public boolean isPipeStoppedByUser(String pipeName) {
+  public boolean isPipeStoppedByUser(final String pipeName) {
     acquireReadLock();
     try {
       return pipeMetaKeeper.containsPipeMeta(pipeName)
@@ -341,7 +341,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
 
   /////////////////////////////// Pipe Task Management 
///////////////////////////////
 
-  public TSStatus createPipe(CreatePipePlanV2 plan) {
+  public TSStatus createPipe(final CreatePipePlanV2 plan) {
     acquireWriteLock();
     try {
       pipeMetaKeeper.addPipeMeta(
@@ -353,7 +353,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
     }
   }
 
-  public TSStatus operateMultiplePipes(OperateMultiplePipesPlanV2 plan) {
+  public TSStatus operateMultiplePipes(final OperateMultiplePipesPlanV2 plan) {
     acquireWriteLock();
     try {
       if (plan.getSubPlans() == null || plan.getSubPlans().isEmpty()) {
@@ -364,7 +364,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
       // We use sub-status to record the status of each subPlan
       status.setSubStatus(new ArrayList<>());
 
-      for (ConfigPhysicalPlan subPlan : plan.getSubPlans()) {
+      for (final ConfigPhysicalPlan subPlan : plan.getSubPlans()) {
         try {
           if (subPlan instanceof CreatePipePlanV2) {
             createPipe((CreatePipePlanV2) subPlan);
@@ -379,7 +379,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
                 String.format("Unsupported subPlan type: %s", 
subPlan.getClass().getName()));
           }
           status.getSubStatus().add(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
-        } catch (Exception e) {
+        } catch (final Exception e) {
           // If one of the subPlan fails, we stop operating the rest of the 
pipes
           LOGGER.error("Failed to operate pipe", e);
           status.setCode(TSStatusCode.PIPE_ERROR.getStatusCode());
@@ -399,7 +399,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
     }
   }
 
-  public TSStatus alterPipe(AlterPipePlanV2 plan) {
+  public TSStatus alterPipe(final AlterPipePlanV2 plan) {
     acquireWriteLock();
     try {
       pipeMetaKeeper.removePipeMeta(plan.getPipeStaticMeta().getPipeName());
@@ -412,7 +412,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
     }
   }
 
-  public TSStatus setPipeStatus(SetPipeStatusPlanV2 plan) {
+  public TSStatus setPipeStatus(final SetPipeStatusPlanV2 plan) {
     acquireWriteLock();
     try {
       pipeMetaKeeper
@@ -426,7 +426,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
     }
   }
 
-  public TSStatus dropPipe(DropPipePlanV2 plan) {
+  public TSStatus dropPipe(final DropPipePlanV2 plan) {
     acquireWriteLock();
     try {
       pipeMetaKeeper.removePipeMeta(plan.getPipeName());
@@ -457,7 +457,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
     }
   }
 
-  public PipeMeta getPipeMetaByPipeName(String pipeName) {
+  public PipeMeta getPipeMetaByPipeName(final String pipeName) {
     acquireReadLock();
     try {
       return pipeMetaKeeper.getPipeMetaByPipeName(pipeName);
@@ -478,7 +478,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
   /////////////////////////////// Pipe Runtime Management 
///////////////////////////////
 
   /** Handle the region leader change event and update the pipe task meta 
accordingly. */
-  public TSStatus handleLeaderChange(PipeHandleLeaderChangePlan plan) {
+  public TSStatus handleLeaderChange(final PipeHandleLeaderChangePlan plan) {
     acquireWriteLock();
     try {
       return handleLeaderChangeInternal(plan);
@@ -487,7 +487,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
     }
   }
 
-  private TSStatus handleLeaderChangeInternal(PipeHandleLeaderChangePlan plan) 
{
+  private TSStatus handleLeaderChangeInternal(final PipeHandleLeaderChangePlan 
plan) {
     plan.getConsensusGroupId2NewLeaderIdMap()
         .forEach(
             (consensusGroupId, newLeader) ->
@@ -532,7 +532,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
    * @param plan The plan containing all the {@link PipeMeta}s from leader 
{@link ConfigNode}
    * @return {@link TSStatusCode#SUCCESS_STATUS}
    */
-  public TSStatus handleMetaChanges(PipeHandleMetaChangePlan plan) {
+  public TSStatus handleMetaChanges(final PipeHandleMetaChangePlan plan) {
     acquireWriteLock();
     try {
       return handleMetaChangesInternal(plan);
@@ -541,7 +541,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
     }
   }
 
-  private TSStatus handleMetaChangesInternal(PipeHandleMetaChangePlan plan) {
+  private TSStatus handleMetaChangesInternal(final PipeHandleMetaChangePlan 
plan) {
     LOGGER.info("Handling pipe meta changes ...");
 
     pipeMetaKeeper.clear();
@@ -556,7 +556,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
 
-  public boolean isStoppedByRuntimeException(String pipeName) {
+  public boolean isStoppedByRuntimeException(final String pipeName) {
     acquireReadLock();
     try {
       return isStoppedByRuntimeExceptionInternal(pipeName);
@@ -565,7 +565,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
     }
   }
 
-  private boolean isStoppedByRuntimeExceptionInternal(String pipeName) {
+  private boolean isStoppedByRuntimeExceptionInternal(final String pipeName) {
     return pipeMetaKeeper.containsPipeMeta(pipeName)
         && 
pipeMetaKeeper.getPipeMeta(pipeName).getRuntimeMeta().getIsStoppedByRuntimeException();
   }
@@ -578,7 +578,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
    *
    * @param pipeName The name of the pipe to be clear exception
    */
-  public void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(String 
pipeName) {
+  public void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(final 
String pipeName) {
     acquireWriteLock();
     try {
       
clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(pipeName);
@@ -587,7 +587,8 @@ public class PipeTaskInfo implements SnapshotProcessor {
     }
   }
 
-  private void 
clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(String 
pipeName) {
+  private void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(
+      final String pipeName) {
     if (!pipeMetaKeeper.containsPipeMeta(pipeName)) {
       return;
     }
@@ -616,7 +617,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
             });
   }
 
-  public void setIsStoppedByRuntimeExceptionToFalse(String pipeName) {
+  public void setIsStoppedByRuntimeExceptionToFalse(final String pipeName) {
     acquireWriteLock();
     try {
       setIsStoppedByRuntimeExceptionToFalseInternal(pipeName);
@@ -625,7 +626,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
     }
   }
 
-  private void setIsStoppedByRuntimeExceptionToFalseInternal(String pipeName) {
+  private void setIsStoppedByRuntimeExceptionToFalseInternal(final String 
pipeName) {
     if (!pipeMetaKeeper.containsPipeMeta(pipeName)) {
       return;
     }
@@ -641,9 +642,10 @@ public class PipeTaskInfo implements SnapshotProcessor {
    * messages will then be updated to all the nodes through {@link 
PipeHandleMetaChangeProcedure}.
    *
    * @param respMap The responseMap after pushing pipe meta
-   * @return {@link true} if there are exceptions encountered
+   * @return {@code true} if there are exceptions encountered
    */
-  public boolean recordDataNodePushPipeMetaExceptions(Map<Integer, 
TPushPipeMetaResp> respMap) {
+  public boolean recordDataNodePushPipeMetaExceptions(
+      final Map<Integer, TPushPipeMetaResp> respMap) {
     acquireWriteLock();
     try {
       return recordDataNodePushPipeMetaExceptionsInternal(respMap);
@@ -653,7 +655,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
   }
 
   private boolean recordDataNodePushPipeMetaExceptionsInternal(
-      Map<Integer, TPushPipeMetaResp> respMap) {
+      final Map<Integer, TPushPipeMetaResp> respMap) {
     boolean hasException = false;
 
     for (final Map.Entry<Integer, TPushPipeMetaResp> respEntry : 
respMap.entrySet()) {
@@ -758,10 +760,23 @@ public class PipeTaskInfo implements SnapshotProcessor {
             });
   }
 
+  public void removePipeMeta(final String pipeName) {
+    acquireWriteLock();
+    try {
+      removePipeMetaInternal(pipeName);
+    } finally {
+      releaseWriteLock();
+    }
+  }
+
+  private void removePipeMetaInternal(final String pipeName) {
+    pipeMetaKeeper.removePipeMeta(pipeName);
+  }
+
   /////////////////////////////// Snapshot ///////////////////////////////
 
   @Override
-  public boolean processTakeSnapshot(File snapshotDir) throws IOException {
+  public boolean processTakeSnapshot(final File snapshotDir) throws 
IOException {
     acquireReadLock();
     try {
       final File snapshotFile = new File(snapshotDir, SNAPSHOT_FILE_NAME);
@@ -783,7 +798,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
   }
 
   @Override
-  public void processLoadSnapshot(File snapshotDir) throws IOException {
+  public void processLoadSnapshot(final File snapshotDir) throws IOException {
     acquireWriteLock();
     try {
       final File snapshotFile = new File(snapshotDir, SNAPSHOT_FILE_NAME);
@@ -810,7 +825,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
   }
 
   @Override
-  public boolean equals(Object obj) {
+  public boolean equals(final Object obj) {
     if (this == obj) {
       return true;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 366024d2906..65c7699f398 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -69,6 +69,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -79,6 +80,10 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY;
+
 public class PipeDataNodeTaskAgent extends PipeTaskAgent {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeDataNodeTaskAgent.class);
@@ -275,7 +280,13 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
       return;
     }
 
+    final Set<Integer> dataRegionIds =
+        StorageEngine.getInstance().getAllDataRegionIds().stream()
+            .map(DataRegionId::getId)
+            .collect(Collectors.toSet());
+
     final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
+    final List<Boolean> pipeCompletedList = new ArrayList<>();
     try {
       final Optional<Logger> logger =
           PipeResourceManager.log()
@@ -286,13 +297,44 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
                   pipeMetaKeeper.getPipeMetaCount());
       for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
         pipeMetaBinaryList.add(pipeMeta.serialize());
-        logger.ifPresent(l -> l.info("Reporting pipe meta: {}", 
pipeMeta.coreReportMessage()));
+
+        final Map<Integer, PipeTask> pipeTaskMap =
+            pipeTaskManager.getPipeTasks(pipeMeta.getStaticMeta());
+        final boolean isAllDataRegionCompleted =
+            pipeTaskMap == null
+                || pipeTaskMap.entrySet().stream()
+                    .filter(entry -> dataRegionIds.contains(entry.getKey()))
+                    .allMatch(entry -> ((PipeDataNodeTask) 
entry.getValue()).isCompleted());
+        // If the "source.history.terminate-pipe-on-all-consumed" is false or 
the pipe does
+        // not include data transfer, we should not terminate the pipe.
+        final boolean includeDataAndNeedDrop =
+            
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(
+                        pipeMeta.getStaticMeta().getExtractorParameters())
+                    .getLeft()
+                && pipeMeta
+                    .getStaticMeta()
+                    .getExtractorParameters()
+                    .getBooleanOrDefault(
+                        Arrays.asList(
+                            SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY,
+                            
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY),
+                        
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE);
+
+        pipeCompletedList.add(isAllDataRegionCompleted && 
includeDataAndNeedDrop);
+
+        logger.ifPresent(
+            l ->
+                l.info(
+                    "Reporting pipe meta: {}, isCompleted: {}",
+                    pipeMeta.coreReportMessage(),
+                    includeDataAndNeedDrop));
       }
       LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size());
-    } catch (final IOException e) {
+    } catch (final IOException | IllegalPathException e) {
       throw new TException(e);
     }
     resp.setPipeMetaList(pipeMetaBinaryList);
+    resp.setPipeCompletedList(pipeCompletedList);
   }
 
   @Override
@@ -304,7 +346,13 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     }
     LOGGER.info("Received pipe heartbeat request {} from config node.", 
req.heartbeatId);
 
+    final Set<Integer> dataRegionIds =
+        StorageEngine.getInstance().getAllDataRegionIds().stream()
+            .map(DataRegionId::getId)
+            .collect(Collectors.toSet());
+
     final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
+    final List<Boolean> pipeCompletedList = new ArrayList<>();
     try {
       final Optional<Logger> logger =
           PipeResourceManager.log()
@@ -315,14 +363,44 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
                   pipeMetaKeeper.getPipeMetaCount());
       for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
         pipeMetaBinaryList.add(pipeMeta.serialize());
-        logger.ifPresent(l -> l.info("Reporting pipe meta: {}", 
pipeMeta.coreReportMessage()));
+
+        final Map<Integer, PipeTask> pipeTaskMap =
+            pipeTaskManager.getPipeTasks(pipeMeta.getStaticMeta());
+        final boolean isAllDataRegionCompleted =
+            pipeTaskMap == null
+                || pipeTaskMap.entrySet().stream()
+                    .filter(entry -> dataRegionIds.contains(entry.getKey()))
+                    .allMatch(entry -> ((PipeDataNodeTask) 
entry.getValue()).isCompleted());
+        // If the "source.history.terminate-pipe-on-all-consumed" is false or 
the pipe does
+        // not include data transfer, we should not terminate the pipe.
+        final boolean includeDataAndNeedDrop =
+            
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(
+                        pipeMeta.getStaticMeta().getExtractorParameters())
+                    .getLeft()
+                && pipeMeta
+                    .getStaticMeta()
+                    .getExtractorParameters()
+                    .getBooleanOrDefault(
+                        Arrays.asList(
+                            SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY,
+                            
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY),
+                        
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE);
+
+        pipeCompletedList.add(isAllDataRegionCompleted && 
includeDataAndNeedDrop);
+
+        logger.ifPresent(
+            l ->
+                l.info(
+                    "Reporting pipe meta: {}, isCompleted: {}",
+                    pipeMeta.coreReportMessage(),
+                    includeDataAndNeedDrop));
       }
       LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size());
-    } catch (final IOException e) {
+    } catch (final IOException | IllegalPathException e) {
       throw new TException(e);
     }
     resp.setPipeMetaList(pipeMetaBinaryList);
-
+    resp.setPipeCompletedList(pipeCompletedList);
     PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(true);
   }
 
@@ -477,6 +555,25 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     pipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
   }
 
+  ///////////////////////// Terminate Logic /////////////////////////
+
+  public void markCompleted(final String pipeName, final int regionId) {
+    acquireWriteLock();
+    try {
+      if (pipeMetaKeeper.containsPipeMeta(pipeName)) {
+        final PipeDataNodeTask pipeDataNodeTask =
+            ((PipeDataNodeTask)
+                pipeTaskManager.getPipeTask(
+                    pipeMetaKeeper.getPipeMeta(pipeName).getStaticMeta(), 
regionId));
+        if (Objects.nonNull(pipeDataNodeTask)) {
+          pipeDataNodeTask.markCompleted();
+        }
+      }
+    } finally {
+      releaseWriteLock();
+    }
+  }
+
   ///////////////////////// Utils /////////////////////////
 
   public Set<Integer> getPipeTaskRegionIdSet(final String pipeName, final long 
creationTime) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
new file mode 100644
index 00000000000..c1252fe668f
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.terminate;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.pattern.PipePattern;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.pipe.task.PipeDataNodeTask;
+
+/**
+ * The {@link PipeTerminateEvent} is an {@link EnrichedEvent} that controls 
the termination of pipe,
+ * that is, when the historical {@link PipeTsFileInsertionEvent}s are all 
processed, this will be
+ * reported next and mark the {@link PipeDataNodeTask} as completed. WARNING: 
This event shall never
+ * be discarded.
+ */
+public class PipeTerminateEvent extends EnrichedEvent {
+  private final int dataRegionId;
+
+  public PipeTerminateEvent(
+      final String pipeName, final PipeTaskMeta pipeTaskMeta, final int 
dataRegionId) {
+    super(pipeName, pipeTaskMeta, null, Long.MIN_VALUE, Long.MAX_VALUE);
+    this.dataRegionId = dataRegionId;
+  }
+
+  @Override
+  public boolean internallyIncreaseResourceReferenceCount(final String 
holderMessage) {
+    return true;
+  }
+
+  @Override
+  public boolean internallyDecreaseResourceReferenceCount(final String 
holderMessage) {
+    return true;
+  }
+
+  @Override
+  public ProgressIndex getProgressIndex() {
+    return MinimumProgressIndex.INSTANCE;
+  }
+
+  @Override
+  public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
+      final String pipeName,
+      final PipeTaskMeta pipeTaskMeta,
+      final PipePattern pattern,
+      final long startTime,
+      final long endTime) {
+    // Should record PipeTaskMeta, for the terminateEvent shall report 
progress to
+    // notify the pipeTask it's completed.
+    return new PipeTerminateEvent(pipeName, pipeTaskMeta, dataRegionId);
+  }
+
+  @Override
+  public boolean isGeneratedByPipe() {
+    return false;
+  }
+
+  @Override
+  public boolean mayEventTimeOverlappedWithTimeRange() {
+    return true;
+  }
+
+  @Override
+  public void reportProgress() {
+    PipeAgent.task().markCompleted(pipeName, dataRegionId);
+  }
+
+  @Override
+  public String toString() {
+    return String.format("PipeTerminateEvent{dataRegionId=%s}", dataRegionId)
+        + " - "
+        + super.toString();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 3d1528fe659..880d3f9f7f2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
 import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
@@ -64,6 +65,8 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY;
@@ -72,6 +75,7 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_START_TIME_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_ENABLE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY;
 
@@ -93,22 +97,22 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
   private boolean isDbNameCoveredByPattern = false;
 
   private boolean isHistoricalExtractorEnabled = false;
-
   private long historicalDataExtractionStartTime = Long.MIN_VALUE; // Event 
time
   private long historicalDataExtractionEndTime = Long.MAX_VALUE; // Event time
-
   private long historicalDataExtractionTimeLowerBound; // Arrival time
 
   private boolean sloppyTimeRange; // true to disable time range filter after 
extraction
 
   private boolean shouldExtractInsertion;
-
   private boolean shouldTransferModFile; // Whether to transfer mods
 
+  private boolean shouldTerminatePipeOnAllHistoricalEventsConsumed;
+  private boolean isTerminateSignalSent = false;
+
   private Queue<TsFileResource> pendingQueue;
 
   @Override
-  public void validate(PipeParameterValidator validator) {
+  public void validate(final PipeParameterValidator validator) {
     final PipeParameters parameters = validator.getParameters();
 
     if (parameters.hasAnyAttributes(
@@ -139,7 +143,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
                   EXTRACTOR_END_TIME_KEY));
         }
         return;
-      } catch (Exception e) {
+      } catch (final Exception e) {
         // compatible with the current validation framework
         throw new PipeParameterNotValidException(e.getMessage());
       }
@@ -191,14 +195,22 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
                   || // Should extract deletion
                   
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(parameters)
                       .getRight());
-    } catch (Exception e) {
+
+      shouldTerminatePipeOnAllHistoricalEventsConsumed =
+          parameters.getBooleanOrDefault(
+              Arrays.asList(
+                  SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY,
+                  EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY),
+              EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE);
+    } catch (final Exception e) {
       // Compatible with the current validation framework
       throw new PipeParameterNotValidException(e.getMessage());
     }
   }
 
   @Override
-  public void customize(PipeParameters parameters, 
PipeExtractorRuntimeConfiguration configuration)
+  public void customize(
+      final PipeParameters parameters, final PipeExtractorRuntimeConfiguration 
configuration)
       throws IllegalPathException {
     shouldExtractInsertion =
         
DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(parameters).getLeft();
@@ -394,7 +406,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
               // Will unpin it after the PipeTsFileInsertionEvent is created 
and pinned.
               try {
                 PipeResourceManager.tsfile().pinTsFileResource(resource, 
shouldTransferModFile);
-              } catch (IOException e) {
+              } catch (final IOException e) {
                 LOGGER.warn("Pipe: failed to pin TsFileResource {}", 
resource.getTsFilePath());
               }
             });
@@ -426,7 +438,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
     }
   }
 
-  private boolean mayTsFileContainUnprocessedData(TsFileResource resource) {
+  private boolean mayTsFileContainUnprocessedData(final TsFileResource 
resource) {
     if (startIndex instanceof TimeWindowStateProgressIndex) {
       // The resource is closed thus the TsFileResource#getFileEndTime() is 
safe to use
       return ((TimeWindowStateProgressIndex) startIndex).getMinTime() <= 
resource.getFileEndTime();
@@ -446,21 +458,21 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
     return !startIndex.isAfter(resource.getMaxProgressIndexAfterClose());
   }
 
-  private boolean isTsFileResourceOverlappedWithTimeRange(TsFileResource 
resource) {
+  private boolean isTsFileResourceOverlappedWithTimeRange(final TsFileResource 
resource) {
     return !(resource.getFileEndTime() < historicalDataExtractionStartTime
         || historicalDataExtractionEndTime < resource.getFileStartTime());
   }
 
-  private boolean isTsFileResourceCoveredByTimeRange(TsFileResource resource) {
+  private boolean isTsFileResourceCoveredByTimeRange(final TsFileResource 
resource) {
     return historicalDataExtractionStartTime <= resource.getFileStartTime()
         && historicalDataExtractionEndTime >= resource.getFileEndTime();
   }
 
-  private boolean 
isTsFileGeneratedAfterExtractionTimeLowerBound(TsFileResource resource) {
+  private boolean isTsFileGeneratedAfterExtractionTimeLowerBound(final 
TsFileResource resource) {
     try {
       return historicalDataExtractionTimeLowerBound
           <= 
TsFileNameGenerator.getTsFileName(resource.getTsFile().getName()).getTime();
-    } catch (IOException e) {
+    } catch (final IOException e) {
       LOGGER.warn(
           "Pipe {}@{}: failed to get the generation time of TsFile {}, extract 
it anyway"
               + " (historical data extraction time lower bound: {})",
@@ -483,7 +495,12 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
 
     final TsFileResource resource = pendingQueue.poll();
     if (resource == null) {
-      return null;
+      isTerminateSignalSent = true;
+      final PipeTerminateEvent terminateEvent =
+          new PipeTerminateEvent(pipeName, pipeTaskMeta, dataRegionId);
+      terminateEvent.increaseReferenceCount(
+          PipeHistoricalDataRegionTsFileExtractor.class.getName());
+      return terminateEvent;
     }
 
     final PipeTsFileInsertionEvent event =
@@ -508,7 +525,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
     
event.increaseReferenceCount(PipeHistoricalDataRegionTsFileExtractor.class.getName());
     try {
       PipeResourceManager.tsfile().unpinTsFileResource(resource);
-    } catch (IOException e) {
+    } catch (final IOException e) {
       LOGGER.warn(
           "Pipe {}@{}: failed to unpin TsFileResource after creating event, 
original path: {}",
           pipeName,
@@ -520,7 +537,12 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
   }
 
   public synchronized boolean hasConsumedAll() {
-    return Objects.isNull(pendingQueue) || pendingQueue.isEmpty();
+    // If the pendingQueue is null when the function is called, it
+    // implies that the extractor only extracts deletion thus the
+    // Historical event has nothing to consume
+    return Objects.isNull(pendingQueue)
+        || pendingQueue.isEmpty()
+            && (!shouldTerminatePipeOnAllHistoricalEventsConsumed || 
isTerminateSignalSent);
   }
 
   @Override
@@ -535,7 +557,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
           resource -> {
             try {
               PipeResourceManager.tsfile().unpinTsFileResource(resource);
-            } catch (IOException e) {
+            } catch (final IOException e) {
               LOGGER.warn(
                   "Pipe {}@{}: failed to unpin TsFileResource after dropping 
pipe, original path: {}",
                   pipeName,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeDataNodeTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeDataNodeTask.java
index c5b58c6e9b8..502bdec2c7c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeDataNodeTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeDataNodeTask.java
@@ -36,12 +36,14 @@ public class PipeDataNodeTask implements PipeTask {
   private final PipeTaskStage processorStage;
   private final PipeTaskStage connectorStage;
 
+  private volatile boolean isCompleted = false;
+
   public PipeDataNodeTask(
-      String pipeName,
-      int regionId,
-      PipeTaskStage extractorStage,
-      PipeTaskStage processorStage,
-      PipeTaskStage connectorStage) {
+      final String pipeName,
+      final int regionId,
+      final PipeTaskStage extractorStage,
+      final PipeTaskStage processorStage,
+      final PipeTaskStage connectorStage) {
     this.pipeName = pipeName;
     this.regionId = regionId;
 
@@ -106,6 +108,14 @@ public class PipeDataNodeTask implements PipeTask {
     return pipeName;
   }
 
+  public boolean isCompleted() {
+    return isCompleted;
+  }
+
+  public void markCompleted() {
+    this.isCompleted = true;
+  }
+
   @Override
   public String toString() {
     return pipeName + "@" + regionId;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
index 05ed098976f..6a60d77cd0e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java
@@ -61,6 +61,12 @@ public class PipeExtractorConstant {
   public static final String EXTRACTOR_MODS_ENABLE_KEY = 
"extractor.mods.enable";
   public static final String SOURCE_MODS_ENABLE_KEY = "source.mods.enable";
   public static final boolean EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE = false;
+  public static final String 
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY =
+      "extractor.history.terminate-pipe-on-all-consumed";
+  public static final String SOURCE_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY 
=
+      "source.history.terminate-pipe-on-all-consumed";
+  public static final boolean 
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE =
+      false;
 
   public static final String EXTRACTOR_REALTIME_ENABLE_KEY = 
"extractor.realtime.enable";
   public static final String SOURCE_REALTIME_ENABLE_KEY = 
"source.realtime.enable";
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
index dbbc8fdd961..e30fcf11269 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
@@ -33,9 +33,13 @@ public class PipeMeta {
   private final PipeStaticMeta staticMeta;
   private final PipeRuntimeMeta runtimeMeta;
 
-  public PipeMeta(PipeStaticMeta staticMeta, PipeRuntimeMeta runtimeMeta) {
+  // This is temporary information of pipe and will not be serialized.
+  private final PipeTemporaryMeta temporaryMeta;
+
+  public PipeMeta(final PipeStaticMeta staticMeta, final PipeRuntimeMeta 
runtimeMeta) {
     this.staticMeta = staticMeta;
     this.runtimeMeta = runtimeMeta;
+    this.temporaryMeta = new PipeTemporaryMeta();
   }
 
   public PipeStaticMeta getStaticMeta() {
@@ -46,25 +50,29 @@ public class PipeMeta {
     return runtimeMeta;
   }
 
+  public PipeTemporaryMeta getTemporaryMeta() {
+    return temporaryMeta;
+  }
+
   public ByteBuffer serialize() throws IOException {
-    PublicBAOS byteArrayOutputStream = new PublicBAOS();
-    DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream);
+    final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+    final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream);
     serialize(outputStream);
     return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
   }
 
-  public void serialize(OutputStream outputStream) throws IOException {
+  public void serialize(final OutputStream outputStream) throws IOException {
     staticMeta.serialize(outputStream);
     runtimeMeta.serialize(outputStream);
   }
 
-  public static PipeMeta deserialize(FileInputStream fileInputStream) throws 
IOException {
+  public static PipeMeta deserialize(final FileInputStream fileInputStream) 
throws IOException {
     final PipeStaticMeta staticMeta = 
PipeStaticMeta.deserialize(fileInputStream);
     final PipeRuntimeMeta runtimeMeta = 
PipeRuntimeMeta.deserialize(fileInputStream);
     return new PipeMeta(staticMeta, runtimeMeta);
   }
 
-  public static PipeMeta deserialize(ByteBuffer byteBuffer) {
+  public static PipeMeta deserialize(final ByteBuffer byteBuffer) {
     final PipeStaticMeta staticMeta = PipeStaticMeta.deserialize(byteBuffer);
     final PipeRuntimeMeta runtimeMeta = 
PipeRuntimeMeta.deserialize(byteBuffer);
     return new PipeMeta(staticMeta, runtimeMeta);
@@ -104,25 +112,33 @@ public class PipeMeta {
   }
 
   @Override
-  public boolean equals(Object o) {
+  public boolean equals(final Object o) {
     if (this == o) {
       return true;
     }
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-    PipeMeta pipeMeta = (PipeMeta) o;
+    final PipeMeta pipeMeta = (PipeMeta) o;
     return Objects.equals(staticMeta, pipeMeta.staticMeta)
-        && Objects.equals(runtimeMeta, pipeMeta.runtimeMeta);
+        && Objects.equals(runtimeMeta, pipeMeta.runtimeMeta)
+        && Objects.equals(temporaryMeta, pipeMeta.temporaryMeta);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(staticMeta, runtimeMeta);
+    return Objects.hash(staticMeta, runtimeMeta, temporaryMeta);
   }
 
   @Override
   public String toString() {
-    return "PipeMeta{" + "staticMeta=" + staticMeta + ", runtimeMeta=" + 
runtimeMeta + '}';
+    return "PipeMeta{"
+        + "staticMeta="
+        + staticMeta
+        + ", runtimeMeta="
+        + runtimeMeta
+        + ", temporaryMeta="
+        + temporaryMeta
+        + '}';
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java
new file mode 100644
index 00000000000..6da2be8e81f
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.task.meta;
+
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class PipeTemporaryMeta {
+
+  private final ConcurrentMap<Integer, Integer> completedDataNodeIds = new 
ConcurrentHashMap<>();
+
+  public void markDataNodeCompleted(final int dataNodeId) {
+    completedDataNodeIds.put(dataNodeId, dataNodeId);
+  }
+
+  public Set<Integer> getCompletedDataNodeIds() {
+    return completedDataNodeIds.keySet();
+  }
+
+  @Override
+  public boolean equals(final Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    final PipeTemporaryMeta that = (PipeTemporaryMeta) o;
+    return Objects.equals(this.completedDataNodeIds, 
that.completedDataNodeIds);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(completedDataNodeIds);
+  }
+
+  @Override
+  public String toString() {
+    return "PipeTemporaryMeta{" + "completedDataNodeIds=" + 
completedDataNodeIds + '}';
+  }
+}
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index f5d6839ce6c..06d0fd8d0e7 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -294,6 +294,7 @@ struct TDataNodeHeartbeatResp {
   11: optional string activateStatus
   12: optional set<common.TEndPoint> confirmedConfigNodeEndPoints
   13: optional map<common.TConsensusGroupId, i64> consensusLogicalTimeMap
+  14: optional list<bool> pipeCompletedList
 }
 
 struct TPipeHeartbeatReq {
@@ -302,6 +303,7 @@ struct TPipeHeartbeatReq {
 
 struct TPipeHeartbeatResp {
   1: required list<binary> pipeMetaList
+  2: optional list<bool> pipeCompletedList
 }
 
 enum TSchemaLimitLevel{

Reply via email to