This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ad56c78138 Pipe: Enabled waiting for pipes to finish & progress index 
persist to config node in shutdown hook (#15896)
1ad56c78138 is described below

commit 1ad56c78138d0c4b73a608eb4ada56ce61e5aba0
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jul 10 11:47:25 2025 +0800

    Pipe: Enabled waiting for pipes to finish & progress index persist to 
config node in shutdown hook (#15896)
    
    * persist in shutdown hook
    
    * Update PipeTaskAgent.java
    
    * Update PipeConfigNodeTaskAgent.java
    
    * Update DataNodeShutdownHook.java
    
    * Fix
    
    * Fix2
    
    * Fix3
    
    * Update PipeHeartbeatScheduler.java
---
 .../rpc/DataNodeAsyncRequestRPCHandler.java        |  2 +-
 .../handlers/rpc/PipeHeartbeatRPCHandler.java      |  2 +-
 .../iotdb/confignode/manager/ConfigManager.java    | 18 ++++++++
 .../apache/iotdb/confignode/manager/IManager.java  |  3 ++
 .../pipe/agent/task/PipeConfigNodeTaskAgent.java   |  2 +-
 .../runtime/heartbeat/PipeHeartbeatScheduler.java  |  2 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  6 +++
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  | 48 +++++++++++++++++-----
 .../subtask/processor/PipeProcessorSubtask.java    |  4 +-
 .../event/common/heartbeat/PipeHeartbeatEvent.java |  6 +--
 .../statement/PipeStatementInsertionEvent.java     |  6 +--
 .../tablet/PipeInsertNodeTabletInsertionEvent.java |  6 +--
 .../common/tablet/PipeRawTabletInsertionEvent.java |  6 +--
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  6 +--
 .../dataregion/IoTDBDataRegionExtractor.java       |  4 +-
 .../PipeRealtimeDataRegionHybridExtractor.java     |  5 +--
 .../schemaregion/IoTDBSchemaRegionExtractor.java   |  4 +-
 .../iotdb/db/pipe/metric/PipeDataNodeMetrics.java  |  6 +--
 .../PipeDataNodeRemainingEventAndTimeOperator.java | 18 +++++++-
 ...ics.java => PipeDataNodeSinglePipeMetrics.java} | 15 ++++---
 .../iotdb/db/protocol/client/ConfigNodeClient.java |  8 ++++
 .../impl/DataNodeInternalRPCServiceImpl.java       |  2 +-
 .../InformationSchemaContentSupplierFactory.java   |  4 +-
 .../execution/config/sys/pipe/ShowPipeTask.java    |  4 +-
 .../iotdb/db/service/DataNodeShutdownHook.java     | 36 +++++++++++++++-
 .../apache/iotdb/commons/conf/CommonConfig.java    | 14 +++++++
 .../commons/pipe/agent/task/PipeTaskAgent.java     |  2 +-
 .../iotdb/commons/pipe/config/PipeConfig.java      |  5 +++
 .../iotdb/commons/pipe/config/PipeDescriptor.java  |  5 +++
 .../thrift-commons/src/main/thrift/common.thrift   |  7 ++++
 .../src/main/thrift/confignode.thrift              |  3 ++
 .../src/main/thrift/datanode.thrift                |  9 +---
 32 files changed, 202 insertions(+), 66 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
index 85ce1253c13..46b9f44d8cd 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.confignode.client.async.handlers.rpc;
 
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
 import org.apache.iotdb.commons.client.request.AsyncRequestContext;
@@ -33,7 +34,6 @@ import 
org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceResp;
 import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateResp;
 import org.apache.iotdb.mpp.rpc.thrift.TDeviceViewResp;
 import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListResp;
-import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaResp;
 import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
 import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaResp;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java
index 569424afbff..e5fa157961d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java
@@ -20,8 +20,8 @@
 package org.apache.iotdb.confignode.client.async.handlers.rpc;
 
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
-import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index a95b945a73c..fdc0ccaeb31 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
+import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSchemaNode;
@@ -2917,6 +2918,23 @@ public class ConfigManager implements IManager {
         : new TFetchTableResp(status);
   }
 
+  @Override
+  public TSStatus pushHeartbeat(final int dataNodeId, final TPipeHeartbeatResp 
resp) {
+    final TSStatus status = confirmLeader();
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      return status;
+    }
+    pipeManager
+        .getPipeRuntimeCoordinator()
+        .parseHeartbeat(
+            dataNodeId,
+            resp.getPipeMetaList(),
+            resp.getPipeCompletedList(),
+            resp.getPipeRemainingEventCountList(),
+            resp.getPipeRemainingTimeList());
+    return StatusUtils.OK;
+  }
+
   @Override
   public DataSet registerAINode(TAINodeRegisterReq req) {
     TSStatus status = confirmLeader();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index db5cbe11e00..d47d9375c5e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.confignode.manager;
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
+import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq;
@@ -890,4 +891,6 @@ public interface IManager {
   TDescTable4InformationSchemaResp describeTable4InformationSchema();
 
   TFetchTableResp fetchTables(final Map<String, Set<String>> fetchTableMap);
+
+  TSStatus pushHeartbeat(final int dataNodeId, final TPipeHeartbeatResp resp);
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
index 0e90174a7f2..b115ba25b4a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.confignode.manager.pipe.agent.task;
 
+import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MetaProgressIndex;
 import org.apache.iotdb.commons.exception.IllegalPathException;
@@ -35,7 +36,6 @@ import 
org.apache.iotdb.confignode.manager.pipe.metric.overview.PipeConfigNodeRe
 import 
org.apache.iotdb.confignode.manager.pipe.metric.source.PipeConfigRegionExtractorMetrics;
 import 
org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceManager;
 import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
-import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
index 3533b40158d..120ddb65f86 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat;
 
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
@@ -31,7 +32,6 @@ import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
 import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
-import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index af8e7e593b4..e8be7f574c8 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
+import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq;
@@ -1415,4 +1416,9 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
   public TSStatus createTableView(final TCreateTableViewReq req) {
     return configManager.createTableView(req);
   }
+
+  @Override
+  public TSStatus pushHeartbeat(final int dataNodeId, final TPipeHeartbeatResp 
resp) {
+    return configManager.pushHeartbeat(dataNodeId, resp);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 9d00d5a1206..8eddc2987ea 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.pipe.agent.task;
 
+import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
@@ -49,10 +51,13 @@ import 
org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
 import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener;
 import 
org.apache.iotdb.db.pipe.extractor.schemaregion.SchemaRegionListeningFilter;
-import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
 import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeOperateSchemaQueueNode;
 import org.apache.iotdb.db.schemaengine.SchemaEngine;
@@ -63,10 +68,10 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.metrics.utils.SystemMetric;
 import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp;
 import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
-import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.thrift.TException;
@@ -322,13 +327,12 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
 
   @Override
   protected void thawRate(final String pipeName, final long creationTime) {
-    PipeDataNodeRemainingEventAndTimeMetrics.getInstance().thawRate(pipeName + 
"_" + creationTime);
+    PipeDataNodeSinglePipeMetrics.getInstance().thawRate(pipeName + "_" + 
creationTime);
   }
 
   @Override
   protected void freezeRate(final String pipeName, final long creationTime) {
-    PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
-        .freezeRate(pipeName + "_" + creationTime);
+    PipeDataNodeSinglePipeMetrics.getInstance().freezeRate(pipeName + "_" + 
creationTime);
   }
 
   @Override
@@ -339,7 +343,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
 
     final String taskId = pipeName + "_" + creationTime;
     PipeTsFileToTabletsMetrics.getInstance().deregister(taskId);
-    PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(taskId);
+    PipeDataNodeSinglePipeMetrics.getInstance().deregister(taskId);
 
     return true;
   }
@@ -367,7 +371,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
       final long creationTime = pipeMeta.getStaticMeta().getCreationTime();
       final String taskId = pipeName + "_" + creationTime;
       PipeTsFileToTabletsMetrics.getInstance().deregister(taskId);
-      
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(taskId);
+      PipeDataNodeSinglePipeMetrics.getInstance().deregister(taskId);
       // When the pipe contains no pipe tasks, there is no corresponding 
prefetching queue for the
       // subscribed pipe, so the subscription needs to be manually marked as 
completed.
       if (!hasPipeTasks && PipeStaticMeta.isSubscriptionPipe(pipeName)) {
@@ -461,7 +465,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
 
         final boolean isCompleted = isAllDataRegionCompleted && 
includeDataAndNeedDrop;
         final Pair<Long, Double> remainingEventAndTime =
-            PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+            PipeDataNodeSinglePipeMetrics.getInstance()
                 .getRemainingEventAndTime(staticMeta.getPipeName(), 
staticMeta.getCreationTime());
         pipeCompletedList.add(isCompleted);
         pipeRemainingEventCountList.add(remainingEventAndTime.getLeft());
@@ -491,7 +495,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
   protected void collectPipeMetaListInternal(
       final TPipeHeartbeatReq req, final TPipeHeartbeatResp resp) throws 
TException {
     // Do nothing if data node is removing or removed, or request does not 
need pipe meta list
-    if (PipeDataNodeAgent.runtime().isShutdown()) {
+    if (PipeDataNodeAgent.runtime().isShutdown() && req.heartbeatId != 
Long.MIN_VALUE) {
       return;
     }
     LOGGER.info("Received pipe heartbeat request {} from config node.", 
req.heartbeatId);
@@ -533,7 +537,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
 
         final boolean isCompleted = isAllDataRegionCompleted && 
includeDataAndNeedDrop;
         final Pair<Long, Double> remainingEventAndTime =
-            PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+            PipeDataNodeSinglePipeMetrics.getInstance()
                 .getRemainingEventAndTime(staticMeta.getPipeName(), 
staticMeta.getCreationTime());
         pipeCompletedList.add(isCompleted);
         pipeRemainingEventCountList.add(remainingEventAndTime.getLeft());
@@ -842,7 +846,29 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
 
   ///////////////////////// Shutdown Logic /////////////////////////
 
-  public void persistAllProgressIndexLocally() {
+  public void persistAllProgressIndex() {
+    persistAllProgressIndexLocally();
+    persistAllProgressIndex2ConfigNode();
+  }
+
+  private void persistAllProgressIndex2ConfigNode() {
+    try (final ConfigNodeClient configNodeClient =
+        
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
 {
+      // Send request to some API server
+      final TPipeHeartbeatResp resp = new TPipeHeartbeatResp();
+      collectPipeMetaList(new TPipeHeartbeatReq(Long.MIN_VALUE), resp);
+      final TSStatus result =
+          configNodeClient.pushHeartbeat(
+              IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), resp);
+      if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != result.getCode()) {
+        LOGGER.warn("Failed to persist progress index to configNode, status: 
{}", result);
+      }
+    } catch (final Exception e) {
+      LOGGER.warn(e.getMessage());
+    }
+  }
+
+  private void persistAllProgressIndexLocally() {
     if (!PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {
       LOGGER.info(
           "Pipe progress index persist disabled. Skipping persist all progress 
index locally.");
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
index 1f7262c0c16..a574712cbbd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java
@@ -33,7 +33,7 @@ import 
org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector;
 import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
-import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.pipe.metric.processor.PipeProcessorMetrics;
 import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor;
 import org.apache.iotdb.db.storageengine.StorageEngine;
@@ -167,7 +167,7 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
             pipeProcessor.process((TsFileInsertionEvent) event, 
outputEventCollector);
           }
           PipeProcessorMetrics.getInstance().markTsFileEvent(taskID);
-          PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+          PipeDataNodeSinglePipeMetrics.getInstance()
               .markTsFileCollectInvocationCount(
                   pipeNameWithCreationTime, 
outputEventCollector.getCollectInvocationCount());
         } else if (event instanceof PipeHeartbeatEvent) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index 1339611e7bf..8c3c29c315b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -26,7 +26,7 @@ import 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.pipe.metric.overview.PipeHeartbeatEventMetrics;
 import org.apache.iotdb.db.utils.DateTimeUtils;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -93,7 +93,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
   @Override
   public boolean internallyIncreaseResourceReferenceCount(final String 
holderMessage) {
     if (Objects.nonNull(pipeName)) {
-      PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+      PipeDataNodeSinglePipeMetrics.getInstance()
           .increaseHeartbeatEventCount(pipeName, creationTime);
     }
     return true;
@@ -104,7 +104,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
     // PipeName == null indicates that the event is the raw event at disruptor,
     // not the event copied and passed to the extractor
     if (Objects.nonNull(pipeName)) {
-      PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+      PipeDataNodeSinglePipeMetrics.getInstance()
           .decreaseHeartbeatEventCount(pipeName, creationTime);
       if (shouldPrintMessage && LOGGER.isDebugEnabled()) {
         LOGGER.debug(this.toString());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/statement/PipeStatementInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/statement/PipeStatementInsertionEvent.java
index 964f186a308..2befefc2863 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/statement/PipeStatementInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/statement/PipeStatementInsertionEvent.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager;
 import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent;
 import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
-import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
@@ -89,7 +89,7 @@ public class PipeStatementInsertionEvent extends 
PipeInsertionEvent
     PipeDataNodeResourceManager.memory()
         .forceResize(allocatedMemoryBlock, statement.ramBytesUsed() + 
INSTANCE_SIZE);
     if (Objects.nonNull(pipeName)) {
-      PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+      PipeDataNodeSinglePipeMetrics.getInstance()
           .increaseRawTabletEventCount(pipeName, creationTime);
     }
     return true;
@@ -98,7 +98,7 @@ public class PipeStatementInsertionEvent extends 
PipeInsertionEvent
   @Override
   public boolean internallyDecreaseResourceReferenceCount(final String 
holderMessage) {
     if (Objects.nonNull(pipeName)) {
-      PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+      PipeDataNodeSinglePipeMetrics.getInstance()
           .decreaseRawTabletEventCount(pipeName, creationTime);
     }
     allocatedMemoryBlock.close();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index b1518d3f4f1..c28dba1aec6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -33,7 +33,7 @@ import 
org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventParser;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventTablePatternParser;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventTreePatternParser;
-import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
 import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock;
@@ -204,7 +204,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
     try {
       PipeDataNodeResourceManager.wal().pin(walEntryHandler);
       if (Objects.nonNull(pipeName)) {
-        PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+        PipeDataNodeSinglePipeMetrics.getInstance()
             .increaseInsertNodeEventCount(pipeName, creationTime);
         PipeDataNodeAgent.task().addFloatingMemoryUsageInByte(pipeName, 
ramBytesUsed());
       }
@@ -240,7 +240,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
     } finally {
       if (Objects.nonNull(pipeName)) {
         PipeDataNodeAgent.task().decreaseFloatingMemoryUsageInByte(pipeName, 
ramBytesUsed());
-        PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+        PipeDataNodeSinglePipeMetrics.getInstance()
             .decreaseInsertNodeEventCount(pipeName, creationTime, 
System.nanoTime() - extractTime);
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 5f29402bd36..349bfc2f6b0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -33,7 +33,7 @@ import 
org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventP
 import 
org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventTablePatternParser;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventTreePatternParser;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
-import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
 import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock;
@@ -235,7 +235,7 @@ public class PipeRawTabletInsertionEvent extends 
PipeInsertionEvent
             allocatedMemoryBlock,
             PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) + 
INSTANCE_SIZE);
     if (Objects.nonNull(pipeName)) {
-      PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+      PipeDataNodeSinglePipeMetrics.getInstance()
           .increaseRawTabletEventCount(pipeName, creationTime);
     }
     return true;
@@ -244,7 +244,7 @@ public class PipeRawTabletInsertionEvent extends 
PipeInsertionEvent
   @Override
   public boolean internallyDecreaseResourceReferenceCount(final String 
holderMessage) {
     if (Objects.nonNull(pipeName)) {
-      PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+      PipeDataNodeSinglePipeMetrics.getInstance()
           .decreaseRawTabletEventCount(pipeName, creationTime);
     }
     allocatedMemoryBlock.close();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index aa6e90054f8..f216ff51077 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -36,7 +36,7 @@ import 
org.apache.iotdb.db.pipe.event.common.tsfile.aggregator.TsFileInsertionPo
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParser;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParserProvider;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
-import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
 import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
@@ -306,7 +306,7 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
       return false;
     } finally {
       if (Objects.nonNull(pipeName)) {
-        PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+        PipeDataNodeSinglePipeMetrics.getInstance()
             .increaseTsFileEventCount(pipeName, creationTime);
       }
     }
@@ -330,7 +330,7 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
       return false;
     } finally {
       if (Objects.nonNull(pipeName)) {
-        PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+        PipeDataNodeSinglePipeMetrics.getInstance()
             .decreaseTsFileEventCount(pipeName, creationTime, 
System.nanoTime() - extractTime);
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index 3ee2d655ec9..7154d901748 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -37,7 +37,7 @@ import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRe
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionHybridExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionLogExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionTsFileExtractor;
-import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
 import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
 import org.apache.iotdb.db.storageengine.StorageEngine;
@@ -586,7 +586,7 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
     // register metric after generating taskID
     PipeDataRegionExtractorMetrics.getInstance().register(this);
     PipeTsFileToTabletsMetrics.getInstance().register(this);
-    PipeDataNodeRemainingEventAndTimeMetrics.getInstance().register(this);
+    PipeDataNodeSinglePipeMetrics.getInstance().register(this);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 92e9ad64f5d..ae632237650 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -31,7 +31,7 @@ import 
org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch;
-import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.storageengine.StorageEngine;
@@ -238,8 +238,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
 
   private boolean mayRemainingInsertNodeEventExceedLimit(final 
PipeRealtimeEvent event) {
     final boolean mayRemainingInsertEventExceedLimit =
-        PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
-            .mayRemainingInsertEventExceedLimit(pipeID);
+        
PipeDataNodeSinglePipeMetrics.getInstance().mayRemainingInsertEventExceedLimit(pipeID);
     if (mayRemainingInsertEventExceedLimit && 
event.mayExtractorUseTablets(this)) {
       logByLogManager(
           l ->
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/IoTDBSchemaRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/IoTDBSchemaRegionExtractor.java
index 7f5b8df6811..0df7fb21d73 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/IoTDBSchemaRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/IoTDBSchemaRegionExtractor.java
@@ -36,7 +36,7 @@ import 
org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionSnapshotEvent;
 import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
-import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.pipe.metric.schema.PipeSchemaRegionExtractorMetrics;
 import org.apache.iotdb.db.queryengine.plan.Coordinator;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
@@ -98,7 +98,7 @@ public class IoTDBSchemaRegionExtractor extends 
IoTDBNonDataRegionExtractor {
     listenedTypeSet = 
SchemaRegionListeningFilter.parseListeningPlanTypeSet(parameters);
 
     PipeSchemaRegionExtractorMetrics.getInstance().register(this);
-    PipeDataNodeRemainingEventAndTimeMetrics.getInstance().register(this);
+    PipeDataNodeSinglePipeMetrics.getInstance().register(this);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
index 3f03ce580fd..dd3873feaf4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.pipe.metric;
 
 import org.apache.iotdb.commons.pipe.metric.PipeEventCommitMetrics;
-import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.pipe.metric.overview.PipeHeartbeatEventMetrics;
 import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics;
 import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
@@ -53,7 +53,7 @@ public class PipeDataNodeMetrics implements IMetricSet {
     PipeSchemaRegionListenerMetrics.getInstance().bindTo(metricService);
     PipeSchemaRegionExtractorMetrics.getInstance().bindTo(metricService);
     PipeSchemaRegionConnectorMetrics.getInstance().bindTo(metricService);
-    
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().bindTo(metricService);
+    PipeDataNodeSinglePipeMetrics.getInstance().bindTo(metricService);
     PipeDataNodeReceiverMetrics.getInstance().bindTo(metricService);
     PipeTsFileToTabletsMetrics.getInstance().bindTo(metricService);
   }
@@ -71,7 +71,7 @@ public class PipeDataNodeMetrics implements IMetricSet {
     PipeSchemaRegionListenerMetrics.getInstance().unbindFrom(metricService);
     PipeSchemaRegionExtractorMetrics.getInstance().unbindFrom(metricService);
     PipeSchemaRegionConnectorMetrics.getInstance().unbindFrom(metricService);
-    
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().unbindFrom(metricService);
+    PipeDataNodeSinglePipeMetrics.getInstance().unbindFrom(metricService);
     PipeDataNodeReceiverMetrics.getInstance().unbindFrom(metricService);
     PipeTsFileToTabletsMetrics.getInstance().unbindFrom(metricService);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
index 86368acf353..73d58285345 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -40,7 +40,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
-class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOperator {
+public class PipeDataNodeRemainingEventAndTimeOperator extends 
PipeRemainingOperator {
 
   // Calculate from schema region extractors directly for it requires less 
computation
   private final Set<IoTDBSchemaRegionExtractor> schemaRegionExtractors =
@@ -107,6 +107,22 @@ class PipeDataNodeRemainingEventAndTimeOperator extends 
PipeRemainingOperator {
     return insertNodeEventCountEMA.insertNodeEMAValue;
   }
 
+  public long getRemainingNonHeartbeatEvents() {
+    final long remainingEvents =
+        tsfileEventCount.get()
+            + rawTabletEventCount.get()
+            + insertNodeEventCount.get()
+            + schemaRegionExtractors.stream()
+                .map(IoTDBSchemaRegionExtractor::getUnTransferredEventCount)
+                .reduce(Long::sum)
+                .orElse(0L);
+
+    // There are cases where the indicator is negative. For example, after the 
Pipe is restarted,
+    // the Processor SubTask is still collecting Events, resulting in a 
negative count. This
+    // situation cannot be avoided because the Pipe may be restarted 
internally.
+    return remainingEvents >= 0 ? remainingEvents : 0;
+  }
+
   long getRemainingEvents() {
     final long remainingEvents =
         tsfileEventCount.get()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
similarity index 96%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeMetrics.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
index 354a980edfd..677d758a162 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java
@@ -42,15 +42,14 @@ import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
-public class PipeDataNodeRemainingEventAndTimeMetrics implements IMetricSet {
+public class PipeDataNodeSinglePipeMetrics implements IMetricSet {
 
-  private static final Logger LOGGER =
-      LoggerFactory.getLogger(PipeDataNodeRemainingEventAndTimeMetrics.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeDataNodeSinglePipeMetrics.class);
 
   @SuppressWarnings("java:S3077")
   private volatile AbstractMetricService metricService;
 
-  private final Map<String, PipeDataNodeRemainingEventAndTimeOperator>
+  public final Map<String, PipeDataNodeRemainingEventAndTimeOperator>
       remainingEventAndTimeOperatorMap = new ConcurrentHashMap<>();
 
   private static Histogram PIPE_DATANODE_INSERTNODE_TRANSFER_TIME_HISTOGRAM =
@@ -381,19 +380,19 @@ public class PipeDataNodeRemainingEventAndTimeMetrics 
implements IMetricSet {
 
   private static class PipeDataNodeRemainingEventAndTimeMetricsHolder {
 
-    private static final PipeDataNodeRemainingEventAndTimeMetrics INSTANCE =
-        new PipeDataNodeRemainingEventAndTimeMetrics();
+    private static final PipeDataNodeSinglePipeMetrics INSTANCE =
+        new PipeDataNodeSinglePipeMetrics();
 
     private PipeDataNodeRemainingEventAndTimeMetricsHolder() {
       // Empty constructor
     }
   }
 
-  public static PipeDataNodeRemainingEventAndTimeMetrics getInstance() {
+  public static PipeDataNodeSinglePipeMetrics getInstance() {
     return PipeDataNodeRemainingEventAndTimeMetricsHolder.INSTANCE;
   }
 
-  private PipeDataNodeRemainingEventAndTimeMetrics() {
+  private PipeDataNodeSinglePipeMetrics() {
     
PipeEventCommitManager.getInstance().setCommitRateMarker(this::markRegionCommit);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index 9d0dcd22f91..8685b380b47 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
+import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq;
@@ -1418,6 +1419,13 @@ public class ConfigNodeClient implements 
IConfigNodeRPCService.Iface, ThriftClie
         () -> client.createTableView(req), status -> 
!updateConfigNodeLeader(status));
   }
 
+  @Override
+  public TSStatus pushHeartbeat(final int dataNodeId, final TPipeHeartbeatResp 
resp)
+      throws TException {
+    return executeRemoteCallWithRetry(
+        () -> client.pushHeartbeat(dataNodeId, resp), status -> 
!updateConfigNodeLeader(status));
+  }
+
   public static class Factory extends ThriftClientFactory<ConfigRegionId, 
ConfigNodeClient> {
 
     public Factory(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 82608ecf23a..226b26093ed 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.common.rpc.thrift.TLoadSample;
 import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
+import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSender;
 import org.apache.iotdb.common.rpc.thrift.TServiceType;
@@ -244,7 +245,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
 import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
 import org.apache.iotdb.mpp.rpc.thrift.TNotifyRegionMigrationReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
-import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaResp;
 import 
org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaRespExceptionMessage;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
index 85854bdb2dd..b011a49b077 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
@@ -59,7 +59,7 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowTopicInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TShowTopicReq;
 import org.apache.iotdb.confignode.rpc.thrift.TTableInfo;
-import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
@@ -566,7 +566,7 @@ public class InformationSchemaContentSupplierFactory {
 
       if (remainingEventCount == -1 && remainingTime == -1) {
         final Pair<Long, Double> remainingEventAndTime =
-            PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+            PipeDataNodeSinglePipeMetrics.getInstance()
                 .getRemainingEventAndTime(tPipeInfo.getId(), 
tPipeInfo.getCreationTime());
         remainingEventCount = remainingEventAndTime.getLeft();
         remainingTime = remainingEventAndTime.getRight();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
index 0dfd784e006..76b8680c378 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
@@ -22,7 +22,7 @@ package 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe;
 import org.apache.iotdb.commons.schema.column.ColumnHeader;
 import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
-import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
 import org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory;
 import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
@@ -105,7 +105,7 @@ public class ShowPipeTask implements IConfigTask {
 
       if (remainingEventCount == -1 && remainingTime == -1) {
         final Pair<Long, Double> remainingEventAndTime =
-            PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+            PipeDataNodeSinglePipeMetrics.getInstance()
                 .getRemainingEventAndTime(tPipeInfo.getId(), 
tPipeInfo.getCreationTime());
         remainingEventCount = remainingEventAndTime.getLeft();
         remainingTime = remainingEventAndTime.getRight();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
index efa628bbcbf..11faf429732 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
@@ -25,12 +25,15 @@ import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.pipe.agent.runtime.PipePeriodicalJobExecutor;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager;
+import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeOperator;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
 import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
@@ -44,6 +47,8 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
+
 public class DataNodeShutdownHook extends Thread {
 
   private static final Logger logger = 
LoggerFactory.getLogger(DataNodeShutdownHook.class);
@@ -118,8 +123,37 @@ public class DataNodeShutdownHook extends Thread {
       triggerSnapshotForAllDataRegion();
     }
 
+    long startTime = System.currentTimeMillis();
+    if (PipeDataNodeAgent.task().getPipeCount() != 0) {
+      for (Map.Entry<String, PipeDataNodeRemainingEventAndTimeOperator> entry :
+          
PipeDataNodeSinglePipeMetrics.getInstance().remainingEventAndTimeOperatorMap.entrySet())
 {
+        boolean timeout = false;
+        while (true) {
+          if (entry.getValue().getRemainingNonHeartbeatEvents() > 0) {
+            logger.info(
+                "Successfully waited for pipe {} to finish.", 
entry.getValue().getPipeName());
+            break;
+          }
+          if (System.currentTimeMillis() - startTime
+              > PipeConfig.getInstance().getPipeMaxWaitFinishTime()) {
+            timeout = true;
+            break;
+          }
+          try {
+            Thread.sleep(100);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            logger.info("Interrupted when waiting for pipe to finish");
+          }
+        }
+        if (timeout) {
+          logger.info("Timed out when waiting for pipes to finish, will 
break");
+          break;
+        }
+      }
+    }
     // Persist progress index before shutdown to accurate recovery after 
restart
-    PipeDataNodeAgent.task().persistAllProgressIndexLocally();
+    PipeDataNodeAgent.task().persistAllProgressIndex();
     // Shutdown all consensus pipe's receiver
     PipeDataNodeAgent.receiver().pipeConsensus().closeReceiverExecutor();
     // Shutdown pipe progressIndex background service
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 5d79b138067..b3a55435790 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -228,6 +228,8 @@ public class CommonConfig {
   private long pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds = 20;
   private long pipeSubtaskExecutorForcedRestartIntervalMs = Long.MAX_VALUE;
 
+  private long pipeMaxWaitFinishTime = 10 * 1000;
+
   private int pipeExtractorAssignerDisruptorRingBufferSize = 65536;
   private long pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes = 50; 
// 50B
   private int pipeExtractorMatcherCacheSize = 1024;
@@ -1371,6 +1373,18 @@ public class CommonConfig {
         pipeSubtaskExecutorForcedRestartIntervalMs);
   }
 
+  public long getPipeMaxWaitFinishTime() {
+    return pipeMaxWaitFinishTime;
+  }
+
+  public void setPipeMaxWaitFinishTime(long pipeMaxWaitFinishTime) {
+    if (this.pipeMaxWaitFinishTime == pipeMaxWaitFinishTime) {
+      return;
+    }
+    this.pipeMaxWaitFinishTime = pipeMaxWaitFinishTime;
+    logger.info("pipeMaxWaitFinishTime is set to {}.", pipeMaxWaitFinishTime);
+  }
+
   public int getPipeRealTimeQueuePollTsFileThreshold() {
     return pipeRealTimeQueuePollTsFileThreshold;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index a4b9dfebe93..c61f4dc95fd 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.commons.pipe.agent.task;
 
+import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
@@ -35,7 +36,6 @@ import 
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
 import org.apache.iotdb.commons.pipe.connector.limiter.PipeEndPointRateLimiter;
 import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
-import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeException;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index dc2b4350960..439dfec55f4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -147,6 +147,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeSubtaskExecutorForcedRestartIntervalMs();
   }
 
+  public long getPipeMaxWaitFinishTime() {
+    return COMMON_CONFIG.getPipeMaxWaitFinishTime();
+  }
+
   /////////////////////////////// Extractor ///////////////////////////////
 
   public int getPipeExtractorAssignerDisruptorRingBufferSize() {
@@ -523,6 +527,7 @@ public class PipeConfig {
     LOGGER.info(
         "PipeSubtaskExecutorForcedRestartIntervalMs: {}",
         getPipeSubtaskExecutorForcedRestartIntervalMs());
+    LOGGER.info("PipeMaxWaitFinishTime: {}", getPipeMaxWaitFinishTime());
 
     LOGGER.info(
         "PipeExtractorAssignerDisruptorRingBufferSize: {}",
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index b754a8581aa..db2ff49bbb0 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -596,6 +596,11 @@ public class PipeDescriptor {
                 "pipe_threshold_allocation_strategy_high_usage_threshold",
                 String.valueOf(
                     
config.getPipeThresholdAllocationStrategyFixedMemoryHighUsageThreshold()))));
+
+    config.setPipeMaxWaitFinishTime(
+        Long.parseLong(
+            properties.getProperty(
+                "pipe_max_wait_finish_time", 
String.valueOf(config.getPipeMaxWaitFinishTime()))));
   }
 
   public static void loadPipeExternalConfig(
diff --git a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift 
b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
index c6f881f06cb..3287f35b9bb 100644
--- a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
+++ b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
@@ -196,6 +196,13 @@ struct TSetThrottleQuotaReq {
   2: required TThrottleQuota throttleQuota
 }
 
+struct TPipeHeartbeatResp {
+  1: required list<binary> pipeMetaList
+  2: optional list<bool> pipeCompletedList
+  3: optional list<i64> pipeRemainingEventCountList
+  4: optional list<double> pipeRemainingTimeList
+}
+
 struct TLicense {
     1: required i64 licenseIssueTimestamp
     2: required i64 expireTimestamp
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift 
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index 3ce020c731b..bf91b410459 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -2011,6 +2011,9 @@ service IConfigNodeRPCService {
   /** Get throttle quota information */
   TThrottleQuotaResp getThrottleQuota()
 
+  /** Push heartbeat in shutdown */
+  common.TSStatus pushHeartbeat(i32 dataNodeId, common.TPipeHeartbeatResp resp)
+
   // ======================================================
   // Table Or View
   // ======================================================
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 1b8ad67de5c..64406b005d9 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -316,13 +316,6 @@ struct TPipeHeartbeatReq {
   1: required i64 heartbeatId
 }
 
-struct TPipeHeartbeatResp {
-  1: required list<binary> pipeMetaList
-  2: optional list<bool> pipeCompletedList
-  3: optional list<i64> pipeRemainingEventCountList
-  4: optional list<double> pipeRemainingTimeList
-}
-
 enum TSchemaLimitLevel{
     DEVICE,
     TIMESERIES
@@ -1118,7 +1111,7 @@ service IDataNodeRPCService {
   /**
   * ConfigNode will ask DataNode for pipe meta in every few seconds
   **/
-  TPipeHeartbeatResp pipeHeartbeat(TPipeHeartbeatReq req)
+  common.TPipeHeartbeatResp pipeHeartbeat(TPipeHeartbeatReq req)
 
  /**
   * Execute CQ on DataNode

Reply via email to