This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new bb728f04cc8 Pipe: Added RemainingEventCount/EstimatedRemainingSeconds 
in configNode metrics/show pipes response (#12578)
bb728f04cc8 is described below

commit bb728f04cc890f5c756506dc74a2b93bf137bfae
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 29 11:06:45 2024 +0800

    Pipe: Added RemainingEventCount/EstimatedRemainingSeconds in configNode 
metrics/show pipes response (#12578)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../heartbeat/DataNodeHeartbeatHandler.java        |   6 +-
 .../response/pipe/task/PipeTableResp.java          |  95 ++++++-----
 .../pipe/agent/task/PipeConfigNodeTaskAgent.java   |  48 ++++--
 .../runtime/PipeRuntimeCoordinator.java            |  10 +-
 .../runtime/heartbeat/PipeHeartbeat.java           |  32 +++-
 .../runtime/heartbeat/PipeHeartbeatParser.java     |  15 +-
 .../runtime/heartbeat/PipeHeartbeatScheduler.java  |  12 +-
 .../extractor/ConfigRegionListeningFilter.java     |   2 +-
 .../manager/pipe/metric/PipeConfigNodeMetrics.java |   2 +
 .../metric/PipeConfigNodeRemainingTimeMetrics.java |   9 ++
 .../PipeConfigNodeRemainingTimeOperator.java       |   8 +-
 .../metric/PipeConfigRegionExtractorMetrics.java   |  14 ++
 .../pipe/metric/PipeTemporaryMetaMetrics.java      | 175 +++++++++++++++++++++
 .../confignode/persistence/pipe/PipeInfo.java      |  75 ++++++---
 .../confignode/persistence/pipe/PipeTaskInfo.java  |   5 +-
 .../consensus/response/pipe/PipeTableRespTest.java |   4 +-
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |  48 ++++--
 .../schemaregion/SchemaRegionListeningFilter.java  |   2 +-
 .../PipeDataNodeRemainingEventAndTimeMetrics.java  |  11 ++
 .../common/header/ColumnHeaderConstant.java        |   6 +-
 .../execution/config/sys/pipe/ShowPipeTask.java    |  45 +++++-
 .../iotdb/commons/pipe/task/meta/PipeMeta.java     |   9 +-
 .../commons/pipe/task/meta/PipeTemporaryMeta.java  |  33 +++-
 .../iotdb/commons/service/metric/enums/Metric.java |   2 +
 .../src/main/thrift/confignode.thrift              |   2 +
 .../src/main/thrift/datanode.thrift                |   4 +
 26 files changed, 555 insertions(+), 119 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
index 5829fd07a56..9d42c319a44 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
@@ -131,7 +131,11 @@ public class DataNodeHeartbeatHandler implements 
AsyncMethodCallback<TDataNodeHe
     }
     if (heartbeatResp.getPipeMetaList() != null) {
       pipeRuntimeCoordinator.parseHeartbeat(
-          nodeId, heartbeatResp.getPipeMetaList(), 
heartbeatResp.getPipeCompletedList());
+          nodeId,
+          heartbeatResp.getPipeMetaList(),
+          heartbeatResp.getPipeCompletedList(),
+          heartbeatResp.getPipeRemainingEventCountList(),
+          heartbeatResp.getPipeRemainingTimeList());
     }
     if (heartbeatResp.isSetConfirmedConfigNodeEndPoints()) {
       loadManager
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
index 9385f43f9a8..221da8308eb 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
@@ -20,14 +20,18 @@
 package org.apache.iotdb.confignode.consensus.response.pipe.task;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTemporaryMeta;
+import 
org.apache.iotdb.confignode.manager.pipe.extractor.ConfigRegionListeningFilter;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
+import org.apache.iotdb.confignode.service.ConfigNode;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.db.utils.DateTimeUtils;
 
@@ -36,13 +40,14 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
+import java.util.stream.Collectors;
 
 public class PipeTableResp implements DataSet {
 
   private final TSStatus status;
   private final List<PipeMeta> allPipeMeta;
 
-  public PipeTableResp(TSStatus status, List<PipeMeta> allPipeMeta) {
+  public PipeTableResp(final TSStatus status, final List<PipeMeta> 
allPipeMeta) {
     this.status = status;
     this.allPipeMeta = allPipeMeta;
   }
@@ -56,46 +61,41 @@ public class PipeTableResp implements DataSet {
       if (pipeName == null) {
         return this;
       } else {
-        final List<PipeMeta> filteredPipeMeta = new ArrayList<>();
-        for (final PipeMeta pipeMeta : allPipeMeta) {
-          if (pipeMeta.getStaticMeta().getPipeName().equals(pipeName)) {
-            filteredPipeMeta.add(pipeMeta);
-            break;
-          }
-        }
-        return new PipeTableResp(status, filteredPipeMeta);
+        return new PipeTableResp(
+            status,
+            allPipeMeta.stream()
+                .filter(pipeMeta -> 
pipeMeta.getStaticMeta().getPipeName().equals(pipeName))
+                .collect(Collectors.toList()));
       }
     } else {
       if (pipeName == null) {
         return this;
       } else {
-        String sortedConnectorParametersString = null;
-        for (final PipeMeta pipeMeta : allPipeMeta) {
-          if (pipeMeta.getStaticMeta().getPipeName().equals(pipeName)) {
-            sortedConnectorParametersString =
-                pipeMeta.getStaticMeta().getConnectorParameters().toString();
-            break;
-          }
-        }
+        final String sortedConnectorParametersString =
+            allPipeMeta.stream()
+                .filter(pipeMeta -> 
pipeMeta.getStaticMeta().getPipeName().equals(pipeName))
+                .findFirst()
+                .map(pipeMeta -> 
pipeMeta.getStaticMeta().getConnectorParameters().toString())
+                .orElse(null);
 
-        final List<PipeMeta> filteredPipeMeta = new ArrayList<>();
-        for (final PipeMeta pipeMeta : allPipeMeta) {
-          if (pipeMeta
-              .getStaticMeta()
-              .getConnectorParameters()
-              .toString()
-              .equals(sortedConnectorParametersString)) {
-            filteredPipeMeta.add(pipeMeta);
-          }
-        }
-        return new PipeTableResp(status, filteredPipeMeta);
+        return new PipeTableResp(
+            status,
+            allPipeMeta.stream()
+                .filter(
+                    pipeMeta ->
+                        pipeMeta
+                            .getStaticMeta()
+                            .getConnectorParameters()
+                            .toString()
+                            .equals(sortedConnectorParametersString))
+                .collect(Collectors.toList()));
       }
     }
   }
 
   public TGetAllPipeInfoResp convertToTGetAllPipeInfoResp() throws IOException 
{
     final List<ByteBuffer> pipeInformationByteBuffers = new ArrayList<>();
-    for (PipeMeta pipeMeta : allPipeMeta) {
+    for (final PipeMeta pipeMeta : allPipeMeta) {
       pipeInformationByteBuffers.add(pipeMeta.serialize());
     }
     return new TGetAllPipeInfoResp(status, pipeInformationByteBuffers);
@@ -104,19 +104,21 @@ public class PipeTableResp implements DataSet {
   public TShowPipeResp convertToTShowPipeResp() {
     final List<TShowPipeInfo> showPipeInfoList = new ArrayList<>();
 
-    for (PipeMeta pipeMeta : allPipeMeta) {
+    for (final PipeMeta pipeMeta : allPipeMeta) {
       final PipeStaticMeta staticMeta = pipeMeta.getStaticMeta();
       final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta();
       final StringBuilder exceptionMessageBuilder = new StringBuilder();
-      for (PipeRuntimeException e : 
runtimeMeta.getNodeId2PipeRuntimeExceptionMap().values()) {
+      for (final PipeRuntimeException e :
+          runtimeMeta.getNodeId2PipeRuntimeExceptionMap().values()) {
         exceptionMessageBuilder
             .append(DateTimeUtils.convertLongToDate(e.getTimeStamp(), "ms"))
             .append(", ")
             .append(e.getMessage())
             .append("\n");
       }
-      for (PipeTaskMeta pipeTaskMeta : 
runtimeMeta.getConsensusGroupId2TaskMetaMap().values()) {
-        for (PipeRuntimeException e : pipeTaskMeta.getExceptionMessages()) {
+      for (final PipeTaskMeta pipeTaskMeta :
+          runtimeMeta.getConsensusGroupId2TaskMetaMap().values()) {
+        for (final PipeRuntimeException e : 
pipeTaskMeta.getExceptionMessages()) {
           exceptionMessageBuilder
               .append(DateTimeUtils.convertLongToDate(e.getTimeStamp(), "ms"))
               .append(", ")
@@ -125,7 +127,7 @@ public class PipeTableResp implements DataSet {
         }
       }
 
-      showPipeInfoList.add(
+      final TShowPipeInfo showPipeInfo =
           new TShowPipeInfo(
               staticMeta.getPipeName(),
               staticMeta.getCreationTime(),
@@ -133,11 +135,34 @@ public class PipeTableResp implements DataSet {
               staticMeta.getExtractorParameters().toString(),
               staticMeta.getProcessorParameters().toString(),
               staticMeta.getConnectorParameters().toString(),
-              exceptionMessageBuilder.toString()));
+              exceptionMessageBuilder.toString());
+      final PipeTemporaryMeta temporaryMeta = pipeMeta.getTemporaryMeta();
+      final boolean canCalculateOnLocal = canCalculateOnLocal(pipeMeta);
+
+      showPipeInfo.setRemainingEventCount(
+          canCalculateOnLocal ? -1 : temporaryMeta.getGlobalRemainingEvents());
+      showPipeInfo.setEstimatedRemainingTime(
+          canCalculateOnLocal ? -1 : temporaryMeta.getGlobalRemainingTime());
+      showPipeInfoList.add(showPipeInfo);
     }
 
     // sorted by pipe name
     showPipeInfoList.sort(Comparator.comparing(pipeInfo -> pipeInfo.id));
     return new 
TShowPipeResp().setStatus(status).setPipeInfoList(showPipeInfoList);
   }
+
+  private boolean canCalculateOnLocal(final PipeMeta pipeMeta) {
+    try {
+      return ConfigNode.getInstance()
+                  .getConfigManager()
+                  .getNodeManager()
+                  .getRegisteredDataNodeCount()
+              == 1
+          && ConfigRegionListeningFilter.parseListeningPlanTypeSet(
+                  pipeMeta.getStaticMeta().getExtractorParameters())
+              .isEmpty();
+    } catch (final IllegalPathException e) {
+      return false;
+    }
+  }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
index 5f9614eceb2..fb8507be9d4 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
@@ -30,6 +30,8 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
 import 
org.apache.iotdb.confignode.manager.pipe.extractor.ConfigRegionListeningFilter;
+import 
org.apache.iotdb.confignode.manager.pipe.metric.PipeConfigNodeRemainingTimeMetrics;
+import 
org.apache.iotdb.confignode.manager.pipe.metric.PipeConfigRegionExtractorMetrics;
 import org.apache.iotdb.confignode.manager.pipe.task.PipeConfigNodeTask;
 import org.apache.iotdb.confignode.manager.pipe.task.PipeConfigNodeTaskBuilder;
 import org.apache.iotdb.confignode.manager.pipe.task.PipeConfigNodeTaskStage;
@@ -63,14 +65,16 @@ public class PipeConfigNodeTaskAgent extends PipeTaskAgent {
   }
 
   @Override
-  protected Map<Integer, PipeTask> buildPipeTasks(PipeMeta 
pipeMetaFromConfigNode)
+  protected Map<Integer, PipeTask> buildPipeTasks(final PipeMeta 
pipeMetaFromConfigNode)
       throws IllegalPathException {
     return new PipeConfigNodeTaskBuilder(pipeMetaFromConfigNode).build();
   }
 
   @Override
   protected void createPipeTask(
-      int consensusGroupId, PipeStaticMeta pipeStaticMeta, PipeTaskMeta 
pipeTaskMeta)
+      final int consensusGroupId,
+      final PipeStaticMeta pipeStaticMeta,
+      final PipeTaskMeta pipeTaskMeta)
       throws IllegalPathException {
     // Advance the extractor parameters parsing logic to avoid creating 
un-relevant pipeTasks
     if (consensusGroupId == Integer.MIN_VALUE
@@ -106,12 +110,12 @@ public class PipeConfigNodeTaskAgent extends 
PipeTaskAgent {
 
   @Override
   protected TPushPipeMetaRespExceptionMessage 
handleSinglePipeMetaChangesInternal(
-      PipeMeta pipeMetaFromCoordinator) {
+      final PipeMeta pipeMetaFromCoordinator) {
     try {
       return PipeConfigNodeAgent.runtime().isLeaderReady()
           ? 
super.handleSinglePipeMetaChangesInternal(pipeMetaFromCoordinator.deepCopy())
           : null;
-    } catch (Exception e) {
+    } catch (final Exception e) {
       return new TPushPipeMetaRespExceptionMessage(
           pipeMetaFromCoordinator.getStaticMeta().getPipeName(),
           e.getMessage(),
@@ -120,7 +124,7 @@ public class PipeConfigNodeTaskAgent extends PipeTaskAgent {
   }
 
   @Override
-  protected TPushPipeMetaRespExceptionMessage handleDropPipeInternal(String 
pipeName) {
+  protected TPushPipeMetaRespExceptionMessage handleDropPipeInternal(final 
String pipeName) {
     return PipeConfigNodeAgent.runtime().isLeaderReady()
         ? super.handleDropPipeInternal(pipeName)
         : null;
@@ -128,7 +132,7 @@ public class PipeConfigNodeTaskAgent extends PipeTaskAgent {
 
   @Override
   protected List<TPushPipeMetaRespExceptionMessage> 
handlePipeMetaChangesInternal(
-      List<PipeMeta> pipeMetaListFromCoordinator) {
+      final List<PipeMeta> pipeMetaListFromCoordinator) {
     if (isShutdown() || !PipeConfigNodeAgent.runtime().isLeaderReady()) {
       return Collections.emptyList();
     }
@@ -148,13 +152,13 @@ public class PipeConfigNodeTaskAgent extends 
PipeTaskAgent {
                   .collect(Collectors.toList()));
       clearConfigRegionListeningQueueIfNecessary(pipeMetaListFromCoordinator);
       return exceptionMessages;
-    } catch (Exception e) {
+    } catch (final Exception e) {
       throw new PipeException("failed to handle pipe meta changes", e);
     }
   }
 
   private void clearConfigRegionListeningQueueIfNecessary(
-      List<PipeMeta> pipeMetaListFromCoordinator) {
+      final List<PipeMeta> pipeMetaListFromCoordinator) {
     final AtomicLong listeningQueueNewFirstIndex = new 
AtomicLong(Long.MAX_VALUE);
 
     // Check each pipe
@@ -187,8 +191,8 @@ public class PipeConfigNodeTaskAgent extends PipeTaskAgent {
   }
 
   @Override
-  protected void collectPipeMetaListInternal(TPipeHeartbeatReq req, 
TPipeHeartbeatResp resp)
-      throws TException {
+  protected void collectPipeMetaListInternal(
+      final TPipeHeartbeatReq req, final TPipeHeartbeatResp resp) throws 
TException {
     // Do nothing if data node is removing or removed, or request does not 
need pipe meta list
     if (isShutdown() || !PipeConfigNodeAgent.runtime().isLeaderReady()) {
       return;
@@ -197,6 +201,8 @@ public class PipeConfigNodeTaskAgent extends PipeTaskAgent {
     LOGGER.info("Received pipe heartbeat request {} from config coordinator.", 
req.heartbeatId);
 
     final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
+    final List<Long> pipeRemainingEventCountList = new ArrayList<>();
+    final List<Double> pipeRemainingTimeList = new ArrayList<>();
     try {
       final boolean shouldPrintLog =
           System.currentTimeMillis() - lastLogPrintedTime.get() > 1000 * 60 * 
10; // 10 minutes
@@ -206,14 +212,32 @@ public class PipeConfigNodeTaskAgent extends 
PipeTaskAgent {
 
       for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
         pipeMetaBinaryList.add(pipeMeta.serialize());
+
+        final PipeStaticMeta staticMeta = pipeMeta.getStaticMeta();
+        final long remainingEventCount =
+            PipeConfigRegionExtractorMetrics.getInstance()
+                .getRemainingEventCount(staticMeta.getPipeName(), 
staticMeta.getCreationTime());
+        final double remainingTime =
+            PipeConfigNodeRemainingTimeMetrics.getInstance()
+                .getRemainingTime(staticMeta.getPipeName(), 
staticMeta.getCreationTime());
+
+        pipeRemainingEventCountList.add(remainingEventCount);
+        pipeRemainingTimeList.add(remainingTime);
+
         if (shouldPrintLog) {
-          LOGGER.info("Reporting pipe meta: {}", pipeMeta.coreReportMessage());
+          LOGGER.info(
+              "Reporting pipe meta: {}, remainingEventCount: {}, 
remainingTime: {}",
+              pipeMeta.coreReportMessage(),
+              remainingEventCount,
+              remainingTime);
         }
       }
       LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size());
-    } catch (IOException e) {
+    } catch (final IOException e) {
       throw new TException(e);
     }
     resp.setPipeMetaList(pipeMetaBinaryList);
+    resp.setPipeRemainingEventCountList(pipeRemainingEventCountList);
+    resp.setPipeRemainingTimeList(pipeRemainingTimeList);
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
index 97f893f4eae..d960c3fe88a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
@@ -107,9 +107,15 @@ public class PipeRuntimeCoordinator implements 
IClusterStatusSubscriber {
   public void parseHeartbeat(
       final int dataNodeId,
       @NotNull final List<ByteBuffer> pipeMetaByteBufferListFromDataNode,
-      /* @Nullable */ final List<Boolean> pipeCompletedListFromAgent) {
+      /* @Nullable */ final List<Boolean> pipeCompletedListFromAgent,
+      /* @Nullable */ final List<Long> pipeRemainingEventCountListFromAgent,
+      /* @Nullable */ final List<Double> pipeRemainingTimeListFromAgent) {
     pipeHeartbeatScheduler.parseHeartbeat(
         dataNodeId,
-        new PipeHeartbeat(pipeMetaByteBufferListFromDataNode, 
pipeCompletedListFromAgent));
+        new PipeHeartbeat(
+            pipeMetaByteBufferListFromDataNode,
+            pipeCompletedListFromAgent,
+            pipeRemainingEventCountListFromAgent,
+            pipeRemainingTimeListFromAgent));
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
index 203ba96ed44..4489bf5b957 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
@@ -31,30 +31,54 @@ import java.util.Map;
 import java.util.Objects;
 
 public class PipeHeartbeat {
-
   private final Map<PipeStaticMeta, PipeMeta> pipeMetaMap = new HashMap<>();
   private final Map<PipeStaticMeta, Boolean> isCompletedMap = new HashMap<>();
+  private final Map<PipeStaticMeta, Long> remainingEventCountMap = new 
HashMap<>();
+  private final Map<PipeStaticMeta, Double> remainingTimeMap = new HashMap<>();
 
   public PipeHeartbeat(
       @NotNull final List<ByteBuffer> pipeMetaByteBufferListFromAgent,
-      /* @Nullable */ final List<Boolean> pipeCompletedListFromAgent) {
+      /* @Nullable */ final List<Boolean> pipeCompletedListFromAgent,
+      /* @Nullable */ final List<Long> pipeRemainingEventCountListFromAgent,
+      /* @Nullable */ final List<Double> pipeRemainingTimeListFromAgent) {
     for (int i = 0; i < pipeMetaByteBufferListFromAgent.size(); ++i) {
       final PipeMeta pipeMeta = 
PipeMeta.deserialize(pipeMetaByteBufferListFromAgent.get(i));
       pipeMetaMap.put(pipeMeta.getStaticMeta(), pipeMeta);
       isCompletedMap.put(
           pipeMeta.getStaticMeta(),
           Objects.nonNull(pipeCompletedListFromAgent) && 
pipeCompletedListFromAgent.get(i));
+      // If remaining event count & remaining time can not be got, it implies 
that the heartbeat is
+      // from an ancient version of DataNode. Here we guarantee that "0" will 
not affect both of
+      // the final results and namely these dataNodes are omitted in 
calculation.
+      remainingEventCountMap.put(
+          pipeMeta.getStaticMeta(),
+          Objects.nonNull(pipeCompletedListFromAgent)
+              ? pipeRemainingEventCountListFromAgent.get(i)
+              : 0L);
+      remainingTimeMap.put(
+          pipeMeta.getStaticMeta(),
+          Objects.nonNull(pipeRemainingTimeListFromAgent)
+              ? pipeRemainingTimeListFromAgent.get(i)
+              : 0d);
     }
   }
 
-  public PipeMeta getPipeMeta(PipeStaticMeta pipeStaticMeta) {
+  public PipeMeta getPipeMeta(final PipeStaticMeta pipeStaticMeta) {
     return pipeMetaMap.get(pipeStaticMeta);
   }
 
-  public Boolean isCompleted(PipeStaticMeta pipeStaticMeta) {
+  public Boolean isCompleted(final PipeStaticMeta pipeStaticMeta) {
     return isCompletedMap.get(pipeStaticMeta);
   }
 
+  public Long getRemainingEventCount(final PipeStaticMeta pipeStaticMeta) {
+    return remainingEventCountMap.get(pipeStaticMeta);
+  }
+
+  public Double getRemainingTime(final PipeStaticMeta pipeStaticMeta) {
+    return remainingTimeMap.get(pipeStaticMeta);
+  }
+
   public boolean isEmpty() {
     return pipeMetaMap.isEmpty();
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
index 68ea140dda9..790799cae72 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTemporaryMeta;
@@ -130,8 +131,8 @@ public class PipeHeartbeatParser {
       final int nodeId,
       final PipeHeartbeat pipeHeartbeat) {
     for (final PipeMeta pipeMetaFromCoordinator : 
pipeTaskInfo.get().getPipeMetaList()) {
-      final PipeMeta pipeMetaFromAgent =
-          pipeHeartbeat.getPipeMeta(pipeMetaFromCoordinator.getStaticMeta());
+      final PipeStaticMeta staticMeta = 
pipeMetaFromCoordinator.getStaticMeta();
+      final PipeMeta pipeMetaFromAgent = pipeHeartbeat.getPipeMeta(staticMeta);
       if (pipeMetaFromAgent == null) {
         LOGGER.info(
             "PipeRuntimeCoordinator meets error in updating pipeMetaKeeper, "
@@ -140,11 +141,11 @@ public class PipeHeartbeatParser {
         continue;
       }
 
+      final PipeTemporaryMeta temporaryMeta = 
pipeMetaFromCoordinator.getTemporaryMeta();
+
       // Remove completed pipes
-      final Boolean isPipeCompletedFromAgent =
-          pipeHeartbeat.isCompleted(pipeMetaFromCoordinator.getStaticMeta());
+      final Boolean isPipeCompletedFromAgent = 
pipeHeartbeat.isCompleted(staticMeta);
       if (Boolean.TRUE.equals(isPipeCompletedFromAgent)) {
-        final PipeTemporaryMeta temporaryMeta = 
pipeMetaFromCoordinator.getTemporaryMeta();
 
         temporaryMeta.markDataNodeCompleted(nodeId);
 
@@ -159,6 +160,10 @@ public class PipeHeartbeatParser {
         }
       }
 
+      // Record statistics
+      temporaryMeta.setRemainingEvent(nodeId, 
pipeHeartbeat.getRemainingEventCount(staticMeta));
+      temporaryMeta.setRemainingTime(nodeId, 
pipeHeartbeat.getRemainingTime(staticMeta));
+
       final Map<Integer, PipeTaskMeta> pipeTaskMetaMapFromCoordinator =
           
pipeMetaFromCoordinator.getRuntimeMeta().getConsensusGroupId2TaskMetaMap();
       final Map<Integer, PipeTaskMeta> pipeTaskMetaMapFromAgent =
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
index 462b6a017a8..ad691ea8565 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java
@@ -109,7 +109,11 @@ public class PipeHeartbeatScheduler {
             (dataNodeId, resp) ->
                 pipeHeartbeatParser.parseHeartbeat(
                     dataNodeId,
-                    new PipeHeartbeat(resp.getPipeMetaList(), 
resp.getPipeCompletedList())));
+                    new PipeHeartbeat(
+                        resp.getPipeMetaList(),
+                        resp.getPipeCompletedList(),
+                        resp.getPipeRemainingEventCountList(),
+                        resp.getPipeRemainingTimeList())));
 
     // config node heartbeat
     try {
@@ -117,7 +121,11 @@ public class PipeHeartbeatScheduler {
       PipeConfigNodeAgent.task().collectPipeMetaList(request, configNodeResp);
       pipeHeartbeatParser.parseHeartbeat(
           ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
-          new PipeHeartbeat(configNodeResp.getPipeMetaList(), null));
+          new PipeHeartbeat(
+              configNodeResp.getPipeMetaList(),
+              null,
+              configNodeResp.getPipeRemainingEventCountList(),
+              configNodeResp.getPipeRemainingTimeList()));
     } catch (final Exception e) {
       LOGGER.warn("Failed to collect pipe meta list from config node task 
agent", e);
     }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java
index f782f3e0617..0e06bdba1c2 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/ConfigRegionListeningFilter.java
@@ -159,7 +159,7 @@ public class ConfigRegionListeningFilter {
   }
 
   public static Set<ConfigPhysicalPlanType> 
parseListeningPlanTypeSet(PipeParameters parameters)
-      throws IllegalPathException, IllegalArgumentException {
+      throws IllegalPathException {
     Set<ConfigPhysicalPlanType> planTypes = new HashSet<>();
     Set<PartialPath> inclusionOptions =
         parseOptions(
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
index d819ccc075e..b02679f147e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
@@ -41,6 +41,7 @@ public class PipeConfigNodeMetrics implements IMetricSet {
     PipeConfigRegionExtractorMetrics.getInstance().bindTo(metricService);
     PipeConfigRegionConnectorMetrics.getInstance().bindTo(metricService);
     PipeConfigNodeRemainingTimeMetrics.getInstance().bindTo(metricService);
+    PipeTemporaryMetaMetrics.getInstance().bindTo(metricService);
   }
 
   @Override
@@ -51,5 +52,6 @@ public class PipeConfigNodeMetrics implements IMetricSet {
     PipeConfigRegionExtractorMetrics.getInstance().unbindFrom(metricService);
     PipeConfigRegionConnectorMetrics.getInstance().unbindFrom(metricService);
     PipeConfigNodeRemainingTimeMetrics.getInstance().unbindFrom(metricService);
+    PipeTemporaryMetaMetrics.getInstance().unbindFrom(metricService);
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java
index 0ef7b48d1c7..378251eb45c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeMetrics.java
@@ -147,6 +147,15 @@ public class PipeConfigNodeRemainingTimeMetrics implements 
IMetricSet {
     operator.markConfigRegionCommit();
   }
 
+  //////////////////////////// Show pipes ////////////////////////////
+
+  public double getRemainingTime(final String pipeName, final long 
creationTime) {
+    return remainingTimeOperatorMap
+        .computeIfAbsent(
+            pipeName + "_" + creationTime, k -> new 
PipeConfigNodeRemainingTimeOperator())
+        .getRemainingTime();
+  }
+
   //////////////////////////// singleton ////////////////////////////
 
   private static class PipeConfigNodeRemainingTimeMetricsHolder {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeOperator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeOperator.java
index cbc23f2e9be..201aa775de5 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeOperator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeRemainingTimeOperator.java
@@ -56,13 +56,7 @@ class PipeConfigNodeRemainingTimeOperator {
   //////////////////////////// Remaining time calculation 
////////////////////////////
 
   /**
-   * This will calculate the estimated remaining time of pipe.
-   *
-   * <p>Notes:
-   *
-   * <p>1. The events in pipe assigner are omitted.
-   *
-   * <p>2. Other pipes' events sharing the same connectorSubtasks may be 
over-calculated.
+   * This will calculate the estimated remaining time of the given pipe's 
config region subTask.
    *
    * @return The estimated remaining time
    */
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigRegionExtractorMetrics.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigRegionExtractorMetrics.java
index 879c31999d1..403fb8a2b6c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigRegionExtractorMetrics.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigRegionExtractorMetrics.java
@@ -117,6 +117,20 @@ public class PipeConfigRegionExtractorMetrics implements 
IMetricSet {
     extractorMap.remove(taskID);
   }
 
+  //////////////////////////// Show pipes ////////////////////////////
+
+  public long getRemainingEventCount(final String pipeName, final long 
creationTime) {
+    final String taskID = pipeName + "_" + creationTime;
+    final IoTDBConfigRegionExtractor extractor = extractorMap.get(taskID);
+    if (Objects.isNull(extractor)) {
+      LOGGER.warn(
+          "Failed to get remaining event count, IoTDBConfigRegionExtractor({}) 
does not exist",
+          taskID);
+      return 0;
+    }
+    return extractor.getUnTransferredEventCount();
+  }
+
   //////////////////////////// singleton ////////////////////////////
 
   private static class PipeConfigRegionExtractorMetricsHolder {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeTemporaryMetaMetrics.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeTemporaryMetaMetrics.java
new file mode 100644
index 00000000000..676b4cd154e
--- /dev/null
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeTemporaryMetaMetrics.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.pipe.metric;
+
+import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTemporaryMeta;
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The {@link PipeTemporaryMetaMetrics} is to calculate the pipe-statistics 
from the {@link
+ * PipeTemporaryMeta}. The class is lock-free and can only read from the 
thread-safe variables from
+ * the {@link PipeTemporaryMeta}.
+ */
+public class PipeTemporaryMetaMetrics implements IMetricSet {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTemporaryMetaMetrics.class);
+
+  private volatile AbstractMetricService metricService;
+
+  private final Map<String, PipeTemporaryMeta> pipeTemporaryMetaMap = new 
ConcurrentHashMap<>();
+
+  //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
+
+  @Override
+  public void bindTo(final AbstractMetricService metricService) {
+    this.metricService = metricService;
+    
ImmutableSet.copyOf(pipeTemporaryMetaMap.keySet()).forEach(this::createMetrics);
+  }
+
+  private void createMetrics(final String pipeID) {
+    createAutoGauge(pipeID);
+  }
+
+  private void createAutoGauge(final String pipeID) {
+    final PipeTemporaryMeta pipeTemporaryMeta = 
pipeTemporaryMetaMap.get(pipeID);
+    final String[] pipeNameAndCreationTime = pipeID.split("_");
+    metricService.createAutoGauge(
+        Metric.PIPE_GLOBAL_REMAINING_EVENT_COUNT.toString(),
+        MetricLevel.IMPORTANT,
+        pipeTemporaryMeta,
+        PipeTemporaryMeta::getGlobalRemainingEvents,
+        Tag.NAME.toString(),
+        pipeNameAndCreationTime[0],
+        Tag.CREATION_TIME.toString(),
+        pipeNameAndCreationTime[1]);
+    metricService.createAutoGauge(
+        Metric.PIPE_GLOBAL_REMAINING_TIME.toString(),
+        MetricLevel.IMPORTANT,
+        pipeTemporaryMeta,
+        PipeTemporaryMeta::getGlobalRemainingTime,
+        Tag.NAME.toString(),
+        pipeNameAndCreationTime[0],
+        Tag.CREATION_TIME.toString(),
+        pipeNameAndCreationTime[1]);
+  }
+
+  @Override
+  public void unbindFrom(final AbstractMetricService metricService) {
+    
ImmutableSet.copyOf(pipeTemporaryMetaMap.keySet()).forEach(this::deregister);
+    if (!pipeTemporaryMetaMap.isEmpty()) {
+      LOGGER.warn(
+          "Failed to unbind from pipe temporary meta metrics, 
PipeTemporaryMeta map not empty");
+    }
+  }
+
+  private void removeMetrics(final String pipeID) {
+    removeAutoGauge(pipeID);
+  }
+
+  private void removeAutoGauge(final String pipeID) {
+    final String[] pipeNameAndCreationTime = pipeID.split("_");
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.PIPE_GLOBAL_REMAINING_EVENT_COUNT.toString(),
+        Tag.NAME.toString(),
+        pipeNameAndCreationTime[0],
+        Tag.CREATION_TIME.toString(),
+        pipeNameAndCreationTime[1]);
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.PIPE_GLOBAL_REMAINING_TIME.toString(),
+        Tag.NAME.toString(),
+        pipeNameAndCreationTime[0],
+        Tag.CREATION_TIME.toString(),
+        pipeNameAndCreationTime[1]);
+    pipeTemporaryMetaMap.remove(pipeID);
+  }
+
+  //////////////////////////// register & deregister (pipe integration) 
////////////////////////////
+
+  public void register(final PipeMeta pipeMeta) {
+    final String taskID =
+        pipeMeta.getStaticMeta().getPipeName() + "_" + 
pipeMeta.getStaticMeta().getCreationTime();
+    pipeTemporaryMetaMap.putIfAbsent(taskID, pipeMeta.getTemporaryMeta());
+    if (Objects.nonNull(metricService)) {
+      createMetrics(taskID);
+    }
+  }
+
+  public void deregister(final String pipeID) {
+    if (!pipeTemporaryMetaMap.containsKey(pipeID)) {
+      LOGGER.warn(
+          "Failed to deregister pipe temporary meta metrics, 
PipeTemporaryMeta({}) does not exist",
+          pipeID);
+      return;
+    }
+    if (Objects.nonNull(metricService)) {
+      removeMetrics(pipeID);
+    }
+  }
+
+  public void handleTemporaryMetaChanges(final Iterable<PipeMeta> 
pipeMetaList) {
+    final Set<String> pipeTaskIDs = new HashSet<>();
+    pipeMetaList.forEach(
+        pipeMeta -> {
+          final String pipeTaskID =
+              pipeMeta.getStaticMeta().getPipeName()
+                  + "_"
+                  + pipeMeta.getStaticMeta().getCreationTime();
+          if (!pipeTemporaryMetaMap.containsKey(pipeTaskID)) {
+            register(pipeMeta);
+          }
+          pipeTaskIDs.add(pipeTaskID);
+        });
+    ImmutableSet.copyOf(pipeTemporaryMetaMap.keySet()).stream()
+        .filter(pipeTaskID -> !pipeTaskIDs.contains(pipeTaskID))
+        .forEach(this::deregister);
+  }
+
+  //////////////////////////// singleton ////////////////////////////
+
+  private static class PipeTemporaryMetaMetricsHolder {
+
+    private static final PipeTemporaryMetaMetrics INSTANCE = new 
PipeTemporaryMetaMetrics();
+
+    private PipeTemporaryMetaMetricsHolder() {
+      // Empty constructor
+    }
+  }
+
+  public static PipeTemporaryMetaMetrics getInstance() {
+    return PipeTemporaryMetaMetricsHolder.INSTANCE;
+  }
+}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
index ba3981b67f1..ea1664c27a0 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
@@ -30,6 +30,10 @@ import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePla
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.OperateMultiplePipesPlanV2;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
 import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
+import 
org.apache.iotdb.confignode.manager.pipe.agent.runtime.PipeConfigRegionListener;
+import 
org.apache.iotdb.confignode.manager.pipe.agent.task.PipeConfigNodeTaskAgent;
+import 
org.apache.iotdb.confignode.manager.pipe.execution.PipeConfigNodeSubtask;
+import 
org.apache.iotdb.confignode.manager.pipe.metric.PipeTemporaryMetaMetrics;
 import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -66,7 +70,7 @@ public class PipeInfo implements SnapshotProcessor {
 
   /////////////////////////////////  Non-query  
/////////////////////////////////
 
-  public TSStatus createPipe(CreatePipePlanV2 plan) {
+  public TSStatus createPipe(final CreatePipePlanV2 plan) {
     try {
       final Optional<PipeMeta> pipeMetaBeforeCreation =
           Optional.ofNullable(
@@ -89,33 +93,37 @@ public class PipeInfo implements SnapshotProcessor {
                 throw new PipeException("Failed to increase listener 
reference", e);
               }
             });
+        PipeTemporaryMetaMetrics.getInstance()
+            .handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
         return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
       } else {
         return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
             .setMessage(message.getMessage());
       }
-    } catch (Exception e) {
+    } catch (final Exception e) {
       LOGGER.error("Failed to create pipe", e);
       return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
           .setMessage("Failed to create pipe, because " + e.getMessage());
     }
   }
 
-  public TSStatus setPipeStatus(SetPipeStatusPlanV2 plan) {
+  public TSStatus setPipeStatus(final SetPipeStatusPlanV2 plan) {
     try {
       pipeTaskInfo.setPipeStatus(plan);
 
       PipeConfigNodeAgent.task()
           
.handleSinglePipeMetaChanges(pipeTaskInfo.getPipeMetaByPipeName(plan.getPipeName()));
+      PipeTemporaryMetaMetrics.getInstance()
+          .handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
       return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-    } catch (Exception e) {
+    } catch (final Exception e) {
       LOGGER.error("Failed to set pipe status", e);
       return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
           .setMessage("Failed to set pipe status, because " + e.getMessage());
     }
   }
 
-  public TSStatus dropPipe(DropPipePlanV2 plan) {
+  public TSStatus dropPipe(final DropPipePlanV2 plan) {
     try {
       final Optional<PipeMeta> pipeMetaBeforeDrop =
           
Optional.ofNullable(pipeTaskInfo.getPipeMetaByPipeName(plan.getPipeName()));
@@ -130,48 +138,63 @@ public class PipeInfo implements SnapshotProcessor {
               try {
                 PipeConfigNodeAgent.runtime()
                     
.decreaseListenerReference(meta.getStaticMeta().getExtractorParameters());
-              } catch (Exception e) {
+              } catch (final Exception e) {
                 throw new PipeException("Failed to decrease listener 
reference", e);
               }
             });
+        PipeTemporaryMetaMetrics.getInstance()
+            .handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
         return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
       } else {
         return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
             .setMessage(message.getMessage());
       }
-    } catch (Exception e) {
+    } catch (final Exception e) {
       LOGGER.error("Failed to drop pipe", e);
       return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
           .setMessage("Failed to drop pipe, because " + e.getMessage());
     }
   }
 
-  public TSStatus alterPipe(AlterPipePlanV2 plan) {
+  public TSStatus alterPipe(final AlterPipePlanV2 plan) {
     try {
       pipeTaskInfo.alterPipe(plan);
 
       PipeConfigNodeAgent.task()
           .handleSinglePipeMetaChanges(
               
pipeTaskInfo.getPipeMetaByPipeName(plan.getPipeStaticMeta().getPipeName()));
+      PipeTemporaryMetaMetrics.getInstance()
+          .handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
       return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-    } catch (Exception e) {
+    } catch (final Exception e) {
       LOGGER.error("Failed to alter pipe", e);
       return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
           .setMessage("Failed to alter pipe, because " + e.getMessage());
     }
   }
 
-  public TSStatus operateMultiplePipes(OperateMultiplePipesPlanV2 plans) {
+  /**
+   * Note: This interface is only used for subscription and thus irrelevant to 
the {@link
+   * PipeConfigNodeSubtask}. Hence, we can skip the operation of {@link 
PipeConfigNodeTaskAgent} and
+   * {@link PipeConfigRegionListener} here.
+   *
+   * @param plans An {@link OperateMultiplePipesPlanV2} consisting of many 
subPlans
+   * @return result {@link TSStatus}
+   */
+  public TSStatus operateMultiplePipes(final OperateMultiplePipesPlanV2 plans) 
{
     try {
-      return pipeTaskInfo.operateMultiplePipes(plans);
-    } catch (Exception e) {
+      final TSStatus status = pipeTaskInfo.operateMultiplePipes(plans);
+      PipeTemporaryMetaMetrics.getInstance()
+          .handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
+      return status;
+    } catch (final Exception e) {
       LOGGER.error("Failed to create multiple pipes", e);
       return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
           .setMessage("Failed to create multiple pipes, because " + 
e.getMessage());
     }
   }
 
-  public TSStatus handleLeaderChange(PipeHandleLeaderChangePlan plan) {
+  public TSStatus handleLeaderChange(final PipeHandleLeaderChangePlan plan) {
     try {
       pipeTaskInfo.handleLeaderChange(plan);
 
@@ -180,15 +203,17 @@ public class PipeInfo implements SnapshotProcessor {
         pipeMetaListFromCoordinator.add(pipeMeta);
       }
       
PipeConfigNodeAgent.task().handlePipeMetaChanges(pipeMetaListFromCoordinator);
+      PipeTemporaryMetaMetrics.getInstance()
+          .handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
       return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-    } catch (Exception e) {
+    } catch (final Exception e) {
       LOGGER.error("Failed to handle leader change", e);
       return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
           .setMessage("Failed to handle leader change, because " + 
e.getMessage());
     }
   }
 
-  public TSStatus handleMetaChanges(PipeHandleMetaChangePlan plan) {
+  public TSStatus handleMetaChanges(final PipeHandleMetaChangePlan plan) {
     try {
       pipeTaskInfo.handleMetaChanges(plan);
 
@@ -198,8 +223,10 @@ public class PipeInfo implements SnapshotProcessor {
             
pipeTaskInfo.getPipeMetaByPipeName(pipeMeta.getStaticMeta().getPipeName()));
       }
       
PipeConfigNodeAgent.task().handlePipeMetaChanges(pipeMetaListFromCoordinator);
+      PipeTemporaryMetaMetrics.getInstance()
+          .handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
       return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-    } catch (Exception e) {
+    } catch (final Exception e) {
       LOGGER.error("Failed to handle meta changes", e);
       return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
           .setMessage("Failed to handle meta changes, because " + 
e.getMessage());
@@ -209,13 +236,13 @@ public class PipeInfo implements SnapshotProcessor {
   /////////////////////////////////  SnapshotProcessor  
/////////////////////////////////
 
   @Override
-  public boolean processTakeSnapshot(File snapshotDir) throws IOException {
+  public boolean processTakeSnapshot(final File snapshotDir) throws 
IOException {
     return pipeTaskInfo.processTakeSnapshot(snapshotDir)
         && pipePluginInfo.processTakeSnapshot(snapshotDir);
   }
 
   @Override
-  public void processLoadSnapshot(File snapshotDir) throws IOException {
+  public void processLoadSnapshot(final File snapshotDir) throws IOException {
     Exception loadPipeTaskInfoException = null;
     Exception loadPipePluginInfoException = null;
 
@@ -226,14 +253,14 @@ public class PipeInfo implements SnapshotProcessor {
         PipeConfigNodeAgent.runtime()
             
.increaseListenerReference(pipeMeta.getStaticMeta().getExtractorParameters());
       }
-    } catch (Exception ex) {
+    } catch (final Exception ex) {
       LOGGER.error("Failed to load pipe task info from snapshot", ex);
       loadPipeTaskInfoException = ex;
     }
 
     try {
       pipePluginInfo.processLoadSnapshot(snapshotDir);
-    } catch (Exception ex) {
+    } catch (final Exception ex) {
       LOGGER.error("Failed to load pipe plugin info from snapshot", ex);
       loadPipePluginInfoException = ex;
     }
@@ -251,16 +278,16 @@ public class PipeInfo implements SnapshotProcessor {
   /////////////////////////////////  equals & hashCode  
/////////////////////////////////
 
   @Override
-  public boolean equals(Object o) {
+  public boolean equals(final Object o) {
     if (this == o) {
       return true;
     }
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-    PipeInfo pipeInfo = (PipeInfo) o;
-    return Objects.equals(pipePluginInfo, pipeInfo.pipePluginInfo)
-        && Objects.equals(pipeTaskInfo, pipeInfo.pipeTaskInfo);
+    final PipeInfo that = (PipeInfo) o;
+    return Objects.equals(pipePluginInfo, that.pipePluginInfo)
+        && Objects.equals(pipeTaskInfo, that.pipeTaskInfo);
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 201f2f049ed..037b3b4fb95 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTemporaryMeta;
 import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleLeaderChangePlan;
@@ -402,10 +403,12 @@ public class PipeTaskInfo implements SnapshotProcessor {
   public TSStatus alterPipe(final AlterPipePlanV2 plan) {
     acquireWriteLock();
     try {
+      final PipeTemporaryMeta temporaryMeta =
+          
pipeMetaKeeper.getPipeMeta(plan.getPipeStaticMeta().getPipeName()).getTemporaryMeta();
       pipeMetaKeeper.removePipeMeta(plan.getPipeStaticMeta().getPipeName());
       pipeMetaKeeper.addPipeMeta(
           plan.getPipeStaticMeta().getPipeName(),
-          new PipeMeta(plan.getPipeStaticMeta(), plan.getPipeRuntimeMeta()));
+          new PipeMeta(plan.getPipeStaticMeta(), plan.getPipeRuntimeMeta(), 
temporaryMeta));
       return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     } finally {
       releaseWriteLock();
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java
index d974ceeab0a..144b988df1e 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java
@@ -79,7 +79,7 @@ public class PipeTableRespTest {
     pipeTasks1.put(1, pipeTaskMeta1);
     PipeStaticMeta pipeStaticMeta1 =
         new PipeStaticMeta(
-            "testPipe", 121, extractorAttributes1, processorAttributes1, 
connectorAttributes1);
+            "testPipe1", 122, extractorAttributes1, processorAttributes1, 
connectorAttributes1);
     PipeRuntimeMeta pipeRuntimeMeta1 = new PipeRuntimeMeta(pipeTasks1);
     pipeMetaList.add(new PipeMeta(pipeStaticMeta1, pipeRuntimeMeta1));
 
@@ -99,7 +99,7 @@ public class PipeTableRespTest {
     pipeTasks2.put(1, pipeTaskMeta2);
     PipeStaticMeta pipeStaticMeta2 =
         new PipeStaticMeta(
-            "testPipe", 121, extractorAttributes2, processorAttributes2, 
connectorAttributes2);
+            "testPipe2", 123, extractorAttributes2, processorAttributes2, 
connectorAttributes2);
     PipeRuntimeMeta pipeRuntimeMeta2 = new PipeRuntimeMeta(pipeTasks2);
     pipeMetaList.add(new PipeMeta(pipeStaticMeta2, pipeRuntimeMeta2));
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 65c7699f398..9f3b7fe66ee 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -63,6 +63,7 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import org.apache.thrift.TException;
+import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -287,6 +288,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
 
     final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
     final List<Boolean> pipeCompletedList = new ArrayList<>();
+    final List<Long> pipeRemainingEventCountList = new ArrayList<>();
+    final List<Double> pipeRemainingTimeList = new ArrayList<>();
     try {
       final Optional<Logger> logger =
           PipeResourceManager.log()
@@ -298,8 +301,9 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
       for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
         pipeMetaBinaryList.add(pipeMeta.serialize());
 
-        final Map<Integer, PipeTask> pipeTaskMap =
-            pipeTaskManager.getPipeTasks(pipeMeta.getStaticMeta());
+        final PipeStaticMeta staticMeta = pipeMeta.getStaticMeta();
+
+        final Map<Integer, PipeTask> pipeTaskMap = 
pipeTaskManager.getPipeTasks(staticMeta);
         final boolean isAllDataRegionCompleted =
             pipeTaskMap == null
                 || pipeTaskMap.entrySet().stream()
@@ -320,14 +324,22 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
                             
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY),
                         
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE);
 
-        pipeCompletedList.add(isAllDataRegionCompleted && 
includeDataAndNeedDrop);
+        final boolean isCompleted = isAllDataRegionCompleted && 
includeDataAndNeedDrop;
+        final Pair<Long, Double> remainingEventAndTime =
+            PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+                .getRemainingEventAndTime(staticMeta.getPipeName(), 
staticMeta.getCreationTime());
+        pipeCompletedList.add(isCompleted);
+        pipeRemainingEventCountList.add(remainingEventAndTime.getLeft());
+        pipeRemainingTimeList.add(remainingEventAndTime.getRight());
 
         logger.ifPresent(
             l ->
                 l.info(
-                    "Reporting pipe meta: {}, isCompleted: {}",
+                    "Reporting pipe meta: {}, isCompleted: {}, 
remainingEventCount: {}, estimatedRemainingTime: {}",
                     pipeMeta.coreReportMessage(),
-                    includeDataAndNeedDrop));
+                    isCompleted,
+                    remainingEventAndTime.getLeft(),
+                    remainingEventAndTime.getRight()));
       }
       LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size());
     } catch (final IOException | IllegalPathException e) {
@@ -335,6 +347,9 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     }
     resp.setPipeMetaList(pipeMetaBinaryList);
     resp.setPipeCompletedList(pipeCompletedList);
+    resp.setPipeRemainingEventCountList(pipeRemainingEventCountList);
+    resp.setPipeRemainingTimeList(pipeRemainingTimeList);
+    PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(true);
   }
 
   @Override
@@ -353,6 +368,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
 
     final List<ByteBuffer> pipeMetaBinaryList = new ArrayList<>();
     final List<Boolean> pipeCompletedList = new ArrayList<>();
+    final List<Long> pipeRemainingEventCountList = new ArrayList<>();
+    final List<Double> pipeRemainingTimeList = new ArrayList<>();
     try {
       final Optional<Logger> logger =
           PipeResourceManager.log()
@@ -364,8 +381,9 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
       for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
         pipeMetaBinaryList.add(pipeMeta.serialize());
 
-        final Map<Integer, PipeTask> pipeTaskMap =
-            pipeTaskManager.getPipeTasks(pipeMeta.getStaticMeta());
+        final PipeStaticMeta staticMeta = pipeMeta.getStaticMeta();
+
+        final Map<Integer, PipeTask> pipeTaskMap = 
pipeTaskManager.getPipeTasks(staticMeta);
         final boolean isAllDataRegionCompleted =
             pipeTaskMap == null
                 || pipeTaskMap.entrySet().stream()
@@ -386,14 +404,22 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
                             
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_KEY),
                         
EXTRACTOR_HISTORY_TERMINATE_PIPE_ON_ALL_CONSUMED_DEFAULT_VALUE);
 
-        pipeCompletedList.add(isAllDataRegionCompleted && 
includeDataAndNeedDrop);
+        final boolean isCompleted = isAllDataRegionCompleted && 
includeDataAndNeedDrop;
+        final Pair<Long, Double> remainingEventAndTime =
+            PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+                .getRemainingEventAndTime(staticMeta.getPipeName(), 
staticMeta.getCreationTime());
+        pipeCompletedList.add(isCompleted);
+        pipeRemainingEventCountList.add(remainingEventAndTime.getLeft());
+        pipeRemainingTimeList.add(remainingEventAndTime.getRight());
 
         logger.ifPresent(
             l ->
                 l.info(
-                    "Reporting pipe meta: {}, isCompleted: {}",
+                    "Reporting pipe meta: {}, isCompleted: {}, 
remainingEventCount: {}, estimatedRemainingTime: {}",
                     pipeMeta.coreReportMessage(),
-                    includeDataAndNeedDrop));
+                    isCompleted,
+                    remainingEventAndTime.getLeft(),
+                    remainingEventAndTime.getRight()));
       }
       LOGGER.info("Reported {} pipe metas.", pipeMetaBinaryList.size());
     } catch (final IOException | IllegalPathException e) {
@@ -401,6 +427,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     }
     resp.setPipeMetaList(pipeMetaBinaryList);
     resp.setPipeCompletedList(pipeCompletedList);
+    resp.setPipeRemainingEventCountList(pipeRemainingEventCountList);
+    resp.setPipeRemainingTimeList(pipeRemainingTimeList);
     PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(true);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/SchemaRegionListeningFilter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/SchemaRegionListeningFilter.java
index 63c42cc98c2..4b6233b9aaf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/SchemaRegionListeningFilter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/SchemaRegionListeningFilter.java
@@ -97,7 +97,7 @@ public class SchemaRegionListeningFilter {
   }
 
   public static Set<PlanNodeType> parseListeningPlanTypeSet(PipeParameters 
parameters)
-      throws IllegalPathException, IllegalArgumentException {
+      throws IllegalPathException {
     Set<PlanNodeType> planTypes = new HashSet<>();
     Set<PartialPath> inclusionOptions =
         parseOptions(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
index 5173d64e04f..c1ecb8460b4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.metrics.utils.MetricType;
 
 import com.google.common.collect.ImmutableSet;
+import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -197,6 +198,16 @@ public class PipeDataNodeRemainingEventAndTimeMetrics 
implements IMetricSet {
     }
   }
 
+  //////////////////////////// Show pipes ////////////////////////////
+
+  public Pair<Long, Double> getRemainingEventAndTime(
+      final String pipeName, final long creationTime) {
+    final PipeDataNodeRemainingEventAndTimeOperator operator =
+        remainingEventAndTimeOperatorMap.computeIfAbsent(
+            pipeName + "_" + creationTime, k -> new 
PipeDataNodeRemainingEventAndTimeOperator());
+    return new Pair<>(operator.getRemainingEvents(), 
operator.getRemainingTime());
+  }
+
   //////////////////////////// singleton ////////////////////////////
 
   private static class PipeDataNodeRemainingEventAndTimeMetricsHolder {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeaderConstant.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeaderConstant.java
index cb76c13aa0b..78e8a32630d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeaderConstant.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeaderConstant.java
@@ -168,6 +168,8 @@ public class ColumnHeaderConstant {
   public static final String PIPE_PROCESSOR = "PipeProcessor";
   public static final String PIPE_CONNECTOR = "PipeSink";
   public static final String EXCEPTION_MESSAGE = "ExceptionMessage";
+  public static final String REMAINING_EVENT_COUNT = "RemainingEventCount";
+  public static final String ESTIMATED_REMAINING_SECONDS = 
"EstimatedRemainingSeconds";
 
   // column names for select into
   public static final String SOURCE_DEVICE = "SourceDevice";
@@ -414,7 +416,9 @@ public class ColumnHeaderConstant {
           new ColumnHeader(PIPE_EXTRACTOR, TSDataType.TEXT),
           new ColumnHeader(PIPE_PROCESSOR, TSDataType.TEXT),
           new ColumnHeader(PIPE_CONNECTOR, TSDataType.TEXT),
-          new ColumnHeader(EXCEPTION_MESSAGE, TSDataType.TEXT));
+          new ColumnHeader(EXCEPTION_MESSAGE, TSDataType.TEXT),
+          new ColumnHeader(REMAINING_EVENT_COUNT, TSDataType.TEXT),
+          new ColumnHeader(ESTIMATED_REMAINING_SECONDS, TSDataType.TEXT));
 
   public static final List<ColumnHeader> showTopicColumnHeaders =
       ImmutableList.of(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
index 55e2e284e05..a4d0a9505b4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/ShowPipeTask.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe;
 
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
+import 
org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
 import org.apache.iotdb.db.queryengine.common.header.ColumnHeader;
 import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
@@ -37,6 +38,7 @@ import org.apache.tsfile.common.conf.TSFileConfig;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.Pair;
 
 import java.util.List;
 import java.util.stream.Collectors;
@@ -45,24 +47,24 @@ public class ShowPipeTask implements IConfigTask {
 
   private final ShowPipesStatement showPipesStatement;
 
-  public ShowPipeTask(ShowPipesStatement showPipesStatement) {
+  public ShowPipeTask(final ShowPipesStatement showPipesStatement) {
     this.showPipesStatement = showPipesStatement;
   }
 
   @Override
-  public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor 
configTaskExecutor)
+  public ListenableFuture<ConfigTaskResult> execute(final IConfigTaskExecutor 
configTaskExecutor)
       throws InterruptedException {
     return configTaskExecutor.showPipes(showPipesStatement);
   }
 
   public static void buildTSBlock(
-      List<TShowPipeInfo> pipeInfoList, SettableFuture<ConfigTaskResult> 
future) {
-    List<TSDataType> outputDataTypes =
+      final List<TShowPipeInfo> pipeInfoList, final 
SettableFuture<ConfigTaskResult> future) {
+    final List<TSDataType> outputDataTypes =
         ColumnHeaderConstant.showPipeColumnHeaders.stream()
             .map(ColumnHeader::getColumnType)
             .collect(Collectors.toList());
-    TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
-    for (TShowPipeInfo tPipeInfo : pipeInfoList) {
+    final TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
+    for (final TShowPipeInfo tPipeInfo : pipeInfoList) {
       builder.getTimeColumnBuilder().writeLong(0L);
       builder
           .getColumnBuilder(0)
@@ -88,9 +90,38 @@ public class ShowPipeTask implements IConfigTask {
       builder
           .getColumnBuilder(6)
           .writeBinary(new Binary(tPipeInfo.getExceptionMessage(), 
TSFileConfig.STRING_CHARSET));
+
+      // Optional, default 0/0.0
+      long remainingEventCount = tPipeInfo.getRemainingEventCount();
+      double remainingTime = tPipeInfo.getEstimatedRemainingTime();
+
+      if (remainingEventCount == -1 && remainingTime == -1) {
+        final Pair<Long, Double> remainingEventAndTime =
+            PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
+                .getRemainingEventAndTime(tPipeInfo.getId(), 
tPipeInfo.getCreationTime());
+        remainingEventCount = remainingEventAndTime.getLeft();
+        remainingTime = remainingEventAndTime.getRight();
+      }
+
+      builder
+          .getColumnBuilder(7)
+          .writeBinary(
+              new Binary(
+                  tPipeInfo.isSetRemainingEventCount()
+                      ? String.valueOf(remainingEventCount)
+                      : "Unknown",
+                  TSFileConfig.STRING_CHARSET));
+      builder
+          .getColumnBuilder(8)
+          .writeBinary(
+              new Binary(
+                  tPipeInfo.isSetEstimatedRemainingTime()
+                      ? String.format("%.2f", remainingTime)
+                      : "Unknown",
+                  TSFileConfig.STRING_CHARSET));
       builder.declarePosition();
     }
-    DatasetHeader datasetHeader = DatasetHeaderFactory.getShowPipeHeader();
+    final DatasetHeader datasetHeader = 
DatasetHeaderFactory.getShowPipeHeader();
     future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, 
builder.build(), datasetHeader));
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
index e30fcf11269..2cdb6659a9a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeMeta.java
@@ -37,9 +37,16 @@ public class PipeMeta {
   private final PipeTemporaryMeta temporaryMeta;
 
   public PipeMeta(final PipeStaticMeta staticMeta, final PipeRuntimeMeta 
runtimeMeta) {
+    this(staticMeta, runtimeMeta, new PipeTemporaryMeta());
+  }
+
+  public PipeMeta(
+      final PipeStaticMeta staticMeta,
+      final PipeRuntimeMeta runtimeMeta,
+      final PipeTemporaryMeta temporaryMeta) {
     this.staticMeta = staticMeta;
     this.runtimeMeta = runtimeMeta;
-    this.temporaryMeta = new PipeTemporaryMeta();
+    this.temporaryMeta = temporaryMeta;
   }
 
   public PipeStaticMeta getStaticMeta() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java
index 6da2be8e81f..14ef390e12b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTemporaryMeta.java
@@ -27,15 +27,33 @@ import java.util.concurrent.ConcurrentMap;
 public class PipeTemporaryMeta {
 
   private final ConcurrentMap<Integer, Integer> completedDataNodeIds = new 
ConcurrentHashMap<>();
+  private final ConcurrentMap<Integer, Long> nodeId2RemainingEventMap = new 
ConcurrentHashMap<>();
+  private final ConcurrentMap<Integer, Double> nodeId2RemainingTimeMap = new 
ConcurrentHashMap<>();
 
   public void markDataNodeCompleted(final int dataNodeId) {
     completedDataNodeIds.put(dataNodeId, dataNodeId);
   }
 
+  public void setRemainingEvent(final int dataNodeId, final long 
remainingEventCount) {
+    nodeId2RemainingEventMap.put(dataNodeId, remainingEventCount);
+  }
+
+  public void setRemainingTime(final int dataNodeId, final double 
remainingTime) {
+    nodeId2RemainingTimeMap.put(dataNodeId, remainingTime);
+  }
+
   public Set<Integer> getCompletedDataNodeIds() {
     return completedDataNodeIds.keySet();
   }
 
+  public long getGlobalRemainingEvents() {
+    return 
nodeId2RemainingEventMap.values().stream().reduce(Long::sum).orElse(0L);
+  }
+
+  public double getGlobalRemainingTime() {
+    return 
nodeId2RemainingTimeMap.values().stream().reduce(Math::max).orElse(0d);
+  }
+
   @Override
   public boolean equals(final Object o) {
     if (this == o) {
@@ -45,16 +63,25 @@ public class PipeTemporaryMeta {
       return false;
     }
     final PipeTemporaryMeta that = (PipeTemporaryMeta) o;
-    return Objects.equals(this.completedDataNodeIds, 
that.completedDataNodeIds);
+    return Objects.equals(this.completedDataNodeIds, that.completedDataNodeIds)
+        && Objects.equals(this.nodeId2RemainingEventMap, 
that.nodeId2RemainingEventMap)
+        && Objects.equals(this.nodeId2RemainingTimeMap, 
that.nodeId2RemainingTimeMap);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(completedDataNodeIds);
+    return Objects.hash(completedDataNodeIds, nodeId2RemainingEventMap, 
nodeId2RemainingTimeMap);
   }
 
   @Override
   public String toString() {
-    return "PipeTemporaryMeta{" + "completedDataNodeIds=" + 
completedDataNodeIds + '}';
+    return "PipeTemporaryMeta{"
+        + "completedDataNodeIds="
+        + completedDataNodeIds
+        + ", nodeId2RemainingEventMap="
+        + nodeId2RemainingEventMap
+        + ", nodeId2RemainingTimeMap"
+        + nodeId2RemainingTimeMap
+        + '}';
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index 4a8c0fe691d..49f1e057625 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -163,6 +163,8 @@ public enum Metric {
   UNTRANSFERRED_CONFIG_COUNT("untransferred_config_count"),
   PIPE_CONNECTOR_CONFIG_TRANSFER("pipe_connector_config_transfer"),
   PIPE_CONFIGNODE_REMAINING_TIME("pipe_confignode_remaining_time"),
+  PIPE_GLOBAL_REMAINING_EVENT_COUNT("pipe_global_remaining_event_count"),
+  PIPE_GLOBAL_REMAINING_TIME("pipe_global_remaining_time"),
   // subscription related
   SUBSCRIPTION_UNCOMMITTED_EVENT_COUNT("subscription_uncommitted_event_count"),
   SUBSCRIPTION_CURRENT_COMMIT_ID("subscription_current_commit_id"),
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift 
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index 28978144ae8..94fcb820e09 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -686,6 +686,8 @@ struct TShowPipeInfo {
   5: required string pipeProcessor
   6: required string pipeConnector
   7: required string exceptionMessage
+  8: optional i64 remainingEventCount
+  9: optional double EstimatedRemainingTime
 }
 
 struct TGetAllPipeInfoResp {
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 06d0fd8d0e7..68666d16261 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -295,6 +295,8 @@ struct TDataNodeHeartbeatResp {
   12: optional set<common.TEndPoint> confirmedConfigNodeEndPoints
   13: optional map<common.TConsensusGroupId, i64> consensusLogicalTimeMap
   14: optional list<bool> pipeCompletedList
+  15: optional list<i64> pipeRemainingEventCountList
+  16: optional list<double> pipeRemainingTimeList
 }
 
 struct TPipeHeartbeatReq {
@@ -304,6 +306,8 @@ struct TPipeHeartbeatReq {
 struct TPipeHeartbeatResp {
   1: required list<binary> pipeMetaList
   2: optional list<bool> pipeCompletedList
+  3: optional list<i64> pipeRemainingEventCountList
+  4: optional list<double> pipeRemainingTimeList
 }
 
 enum TSchemaLimitLevel{

Reply via email to