This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3cb544e39b2 Pipe: Reduce the memory usage & Enable insertNode memory 
control for stream mode degrade and stuck restart (#14102)
3cb544e39b2 is described below

commit 3cb544e39b250b56bdc8d4c4e3ff0b50c2922e5f
Author: Caideyipi <[email protected]>
AuthorDate: Thu Nov 21 15:45:59 2024 +0800

    Pipe: Reduce the memory usage & Enable insertNode memory control for stream 
mode degrade and stuck restart (#14102)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../pipe/runtime/PipeHandleMetaChangePlan.java     |   8 +-
 .../response/pipe/task/PipeTableResp.java          |   5 +-
 .../pipe/agent/task/PipeConfigNodeTaskAgent.java   |   4 +-
 .../runtime/heartbeat/PipeHeartbeat.java           |   3 +-
 .../runtime/heartbeat/PipeHeartbeatParser.java     |   5 +-
 .../manager/pipe/metric/PipeConfigNodeMetrics.java |   4 +-
 ... => PipeTemporaryMetaInCoordinatorMetrics.java} |  29 +++---
 .../confignode/persistence/pipe/PipeInfo.java      |  16 +--
 .../confignode/persistence/pipe/PipeTaskInfo.java  |   4 +-
 .../db/pipe/agent/runtime/PipeAgentLauncher.java   |   2 +-
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |  30 ++++--
 .../evolvable/batch/PipeTabletEventPlainBatch.java |  10 +-
 .../async/IoTDBDataRegionAsyncConnector.java       |   2 +-
 .../tablet/PipeInsertNodeTabletInsertionEvent.java |  30 +++++-
 .../common/tablet/PipeRawTabletInsertionEvent.java |   6 +-
 .../PipeRealtimeDataRegionHybridExtractor.java     |  12 ++-
 .../impl/DataNodeInternalRPCServiceImpl.java       |   7 +-
 .../config/executor/ClusterConfigTaskExecutor.java |   6 +-
 .../dataregion/wal/utils/WALEntryHandler.java      |  12 +--
 .../TsFileResourceProgressIndexTest.java           |   5 +
 .../commons/consensus/index/ProgressIndex.java     |  12 ++-
 .../consensus/index/impl/HybridProgressIndex.java  |  36 ++++---
 .../consensus/index/impl/IoTProgressIndex.java     |  12 +++
 .../consensus/index/impl/MetaProgressIndex.java    |   9 +-
 .../consensus/index/impl/MinimumProgressIndex.java |   5 +
 .../consensus/index/impl/RecoverProgressIndex.java |  13 +++
 .../consensus/index/impl/SimpleProgressIndex.java  |   9 +-
 .../consensus/index/impl/StateProgressIndex.java   |  17 +++-
 .../index/impl/TimeWindowStateProgressIndex.java   |  24 +++++
 .../commons/pipe/agent/task/PipeTaskAgent.java     |  49 ++++++++++
 .../commons/pipe/agent/task/meta/PipeMeta.java     |  17 +++-
 .../pipe/agent/task/meta/PipeTemporaryMeta.java    |  73 +-------------
 .../agent/task/meta/PipeTemporaryMetaInAgent.java  | 107 +++++++++++++++++++++
 ...ta.java => PipeTemporaryMetaInCoordinator.java} |   5 +-
 .../pipe/agent/task/progress/CommitterKey.java     |   2 +-
 .../task/progress/PipeEventCommitManager.java      |  12 ++-
 .../iotdb/commons/pipe/event/EnrichedEvent.java    |   6 --
 .../iotdb/commons/pipe/task/PipeMetaDeSerTest.java |  19 ++--
 38 files changed, 447 insertions(+), 180 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/runtime/PipeHandleMetaChangePlan.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/runtime/PipeHandleMetaChangePlan.java
index 07d70e92db4..74a3cf37c94 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/runtime/PipeHandleMetaChangePlan.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/runtime/PipeHandleMetaChangePlan.java
@@ -48,7 +48,7 @@ public class PipeHandleMetaChangePlan extends 
ConfigPhysicalPlan {
   }
 
   @Override
-  protected void serializeImpl(DataOutputStream stream) throws IOException {
+  protected void serializeImpl(final DataOutputStream stream) throws 
IOException {
     stream.writeShort(getType().getPlanType());
 
     stream.writeInt(pipeMetaList.size());
@@ -58,16 +58,16 @@ public class PipeHandleMetaChangePlan extends 
ConfigPhysicalPlan {
   }
 
   @Override
-  protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+  protected void deserializeImpl(final ByteBuffer buffer) throws IOException {
     int size = buffer.getInt();
     for (int i = 0; i < size; i++) {
-      PipeMeta pipeMeta = PipeMeta.deserialize(buffer);
+      PipeMeta pipeMeta = PipeMeta.deserialize4Coordinator(buffer);
       pipeMetaList.add(pipeMeta);
     }
   }
 
   @Override
-  public boolean equals(Object obj) {
+  public boolean equals(final Object obj) {
     if (this == obj) {
       return true;
     }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
index f2cbe60cb97..020690f2465 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
-import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMeta;
+import 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInCoordinator;
 import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
 import 
org.apache.iotdb.confignode.manager.pipe.extractor.ConfigRegionListeningFilter;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
@@ -175,7 +175,8 @@ public class PipeTableResp implements DataSet {
               staticMeta.getProcessorParameters().toString(),
               staticMeta.getConnectorParameters().toString(),
               exceptionMessageBuilder.toString());
-      final PipeTemporaryMeta temporaryMeta = pipeMeta.getTemporaryMeta();
+      final PipeTemporaryMetaInCoordinator temporaryMeta =
+          (PipeTemporaryMetaInCoordinator) pipeMeta.getTemporaryMeta();
       final boolean canCalculateOnLocal = canCalculateOnLocal(pipeMeta);
 
       showPipeInfo.setRemainingEventCount(
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
index d5204b1c99b..2451e055c72 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java
@@ -121,7 +121,7 @@ public class PipeConfigNodeTaskAgent extends PipeTaskAgent {
       final PipeMeta pipeMetaFromCoordinator) {
     try {
       return PipeConfigNodeAgent.runtime().isLeaderReady()
-          ? 
super.handleSinglePipeMetaChangesInternal(pipeMetaFromCoordinator.deepCopy())
+          ? 
super.handleSinglePipeMetaChangesInternal(pipeMetaFromCoordinator.deepCopy4TaskAgent())
           : null;
     } catch (final Exception e) {
       return new TPushPipeMetaRespExceptionMessage(
@@ -152,7 +152,7 @@ public class PipeConfigNodeTaskAgent extends PipeTaskAgent {
                   .map(
                       pipeMeta -> {
                         try {
-                          return pipeMeta.deepCopy();
+                          return pipeMeta.deepCopy4TaskAgent();
                         } catch (Exception e) {
                           throw new PipeException("failed to deep copy 
pipeMeta", e);
                         }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
index 9c909149896..02ed8cca2fc 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java
@@ -42,7 +42,8 @@ public class PipeHeartbeat {
       /* @Nullable */ final List<Long> pipeRemainingEventCountListFromAgent,
       /* @Nullable */ final List<Double> pipeRemainingTimeListFromAgent) {
     for (int i = 0; i < pipeMetaByteBufferListFromAgent.size(); ++i) {
-      final PipeMeta pipeMeta = 
PipeMeta.deserialize(pipeMetaByteBufferListFromAgent.get(i));
+      final PipeMeta pipeMeta =
+          
PipeMeta.deserialize4TaskAgent(pipeMetaByteBufferListFromAgent.get(i));
       pipeMetaMap.put(pipeMeta.getStaticMeta(), pipeMeta);
       isCompletedMap.put(
           pipeMeta.getStaticMeta(),
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
index 16e2e6a5b73..e2303fecdea 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
@@ -28,7 +28,7 @@ import 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
-import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMeta;
+import 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInCoordinator;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
 import org.apache.iotdb.confignode.manager.ConfigManager;
@@ -144,7 +144,8 @@ public class PipeHeartbeatParser {
         continue;
       }
 
-      final PipeTemporaryMeta temporaryMeta = 
pipeMetaFromCoordinator.getTemporaryMeta();
+      final PipeTemporaryMetaInCoordinator temporaryMeta =
+          (PipeTemporaryMetaInCoordinator) 
pipeMetaFromCoordinator.getTemporaryMeta();
 
       // Remove completed pipes
       final Boolean isPipeCompletedFromAgent = 
pipeHeartbeat.isCompleted(staticMeta);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
index 264ed997b84..cc9652cb6f4 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeConfigNodeMetrics.java
@@ -41,7 +41,7 @@ public class PipeConfigNodeMetrics implements IMetricSet {
     PipeConfigRegionExtractorMetrics.getInstance().bindTo(metricService);
     PipeConfigRegionConnectorMetrics.getInstance().bindTo(metricService);
     PipeConfigNodeRemainingTimeMetrics.getInstance().bindTo(metricService);
-    PipeTemporaryMetaMetrics.getInstance().bindTo(metricService);
+    PipeTemporaryMetaInCoordinatorMetrics.getInstance().bindTo(metricService);
     PipeConfigNodeReceiverMetrics.getInstance().bindTo(metricService);
     PipeConfigNodeResourceMetrics.getInstance().bindTo(metricService);
   }
@@ -54,7 +54,7 @@ public class PipeConfigNodeMetrics implements IMetricSet {
     PipeConfigRegionExtractorMetrics.getInstance().unbindFrom(metricService);
     PipeConfigRegionConnectorMetrics.getInstance().unbindFrom(metricService);
     PipeConfigNodeRemainingTimeMetrics.getInstance().unbindFrom(metricService);
-    PipeTemporaryMetaMetrics.getInstance().unbindFrom(metricService);
+    
PipeTemporaryMetaInCoordinatorMetrics.getInstance().unbindFrom(metricService);
     PipeConfigNodeReceiverMetrics.getInstance().unbindFrom(metricService);
     PipeConfigNodeResourceMetrics.getInstance().unbindFrom(metricService);
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeTemporaryMetaMetrics.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeTemporaryMetaInCoordinatorMetrics.java
similarity index 82%
rename from 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeTemporaryMetaMetrics.java
rename to 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeTemporaryMetaInCoordinatorMetrics.java
index 2296a53d91e..cc46a1a015e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeTemporaryMetaMetrics.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/metric/PipeTemporaryMetaInCoordinatorMetrics.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.manager.pipe.metric;
 
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMeta;
+import 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInCoordinator;
 import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
 import org.apache.iotdb.metrics.AbstractMetricService;
@@ -39,17 +40,19 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * The {@link PipeTemporaryMetaMetrics} is to calculate the pipe-statistics 
from the {@link
- * PipeTemporaryMeta}. The class is lock-free and can only read from the 
thread-safe variables from
- * the {@link PipeTemporaryMeta}.
+ * The {@link PipeTemporaryMetaInCoordinatorMetrics} is to calculate the 
pipe-statistics from the
+ * {@link PipeTemporaryMeta}. The class is lock-free and can only read from 
the thread-safe
+ * variables from the {@link PipeTemporaryMeta}.
  */
-public class PipeTemporaryMetaMetrics implements IMetricSet {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTemporaryMetaMetrics.class);
+public class PipeTemporaryMetaInCoordinatorMetrics implements IMetricSet {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(PipeTemporaryMetaInCoordinatorMetrics.class);
 
   @SuppressWarnings("java:S3077")
   private volatile AbstractMetricService metricService;
 
-  private final Map<String, PipeTemporaryMeta> pipeTemporaryMetaMap = new 
ConcurrentHashMap<>();
+  private final Map<String, PipeTemporaryMetaInCoordinator> 
pipeTemporaryMetaMap =
+      new ConcurrentHashMap<>();
 
   //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
 
@@ -64,13 +67,13 @@ public class PipeTemporaryMetaMetrics implements IMetricSet 
{
   }
 
   private void createAutoGauge(final String pipeID) {
-    final PipeTemporaryMeta pipeTemporaryMeta = 
pipeTemporaryMetaMap.get(pipeID);
+    final PipeTemporaryMetaInCoordinator pipeTemporaryMeta = 
pipeTemporaryMetaMap.get(pipeID);
     final String[] pipeNameAndCreationTime = pipeID.split("_");
     metricService.createAutoGauge(
         Metric.PIPE_GLOBAL_REMAINING_EVENT_COUNT.toString(),
         MetricLevel.IMPORTANT,
         pipeTemporaryMeta,
-        PipeTemporaryMeta::getGlobalRemainingEvents,
+        PipeTemporaryMetaInCoordinator::getGlobalRemainingEvents,
         Tag.NAME.toString(),
         pipeNameAndCreationTime[0],
         Tag.CREATION_TIME.toString(),
@@ -79,7 +82,7 @@ public class PipeTemporaryMetaMetrics implements IMetricSet {
         Metric.PIPE_GLOBAL_REMAINING_TIME.toString(),
         MetricLevel.IMPORTANT,
         pipeTemporaryMeta,
-        PipeTemporaryMeta::getGlobalRemainingTime,
+        PipeTemporaryMetaInCoordinator::getGlobalRemainingTime,
         Tag.NAME.toString(),
         pipeNameAndCreationTime[0],
         Tag.CREATION_TIME.toString(),
@@ -123,7 +126,8 @@ public class PipeTemporaryMetaMetrics implements IMetricSet 
{
   public void register(final PipeMeta pipeMeta) {
     final String taskID =
         pipeMeta.getStaticMeta().getPipeName() + "_" + 
pipeMeta.getStaticMeta().getCreationTime();
-    pipeTemporaryMetaMap.putIfAbsent(taskID, pipeMeta.getTemporaryMeta());
+    pipeTemporaryMetaMap.putIfAbsent(
+        taskID, (PipeTemporaryMetaInCoordinator) pipeMeta.getTemporaryMeta());
     if (Objects.nonNull(metricService)) {
       createMetrics(taskID);
     }
@@ -163,14 +167,15 @@ public class PipeTemporaryMetaMetrics implements 
IMetricSet {
 
   private static class PipeTemporaryMetaMetricsHolder {
 
-    private static final PipeTemporaryMetaMetrics INSTANCE = new 
PipeTemporaryMetaMetrics();
+    private static final PipeTemporaryMetaInCoordinatorMetrics INSTANCE =
+        new PipeTemporaryMetaInCoordinatorMetrics();
 
     private PipeTemporaryMetaMetricsHolder() {
       // Empty constructor
     }
   }
 
-  public static PipeTemporaryMetaMetrics getInstance() {
+  public static PipeTemporaryMetaInCoordinatorMetrics getInstance() {
     return PipeTemporaryMetaMetricsHolder.INSTANCE;
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
index 48a9d705dc4..f1f15b02528 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java
@@ -33,7 +33,7 @@ import 
org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
 import 
org.apache.iotdb.confignode.manager.pipe.agent.runtime.PipeConfigRegionListener;
 import 
org.apache.iotdb.confignode.manager.pipe.agent.task.PipeConfigNodeSubtask;
 import 
org.apache.iotdb.confignode.manager.pipe.agent.task.PipeConfigNodeTaskAgent;
-import 
org.apache.iotdb.confignode.manager.pipe.metric.PipeTemporaryMetaMetrics;
+import 
org.apache.iotdb.confignode.manager.pipe.metric.PipeTemporaryMetaInCoordinatorMetrics;
 import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -94,7 +94,7 @@ public class PipeInfo implements SnapshotProcessor {
                 throw new PipeException("Failed to increase listener 
reference", e);
               }
             });
-        PipeTemporaryMetaMetrics.getInstance()
+        PipeTemporaryMetaInCoordinatorMetrics.getInstance()
             .handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
         return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
       } else {
@@ -114,7 +114,7 @@ public class PipeInfo implements SnapshotProcessor {
 
       PipeConfigNodeAgent.task()
           
.handleSinglePipeMetaChanges(pipeTaskInfo.getPipeMetaByPipeName(plan.getPipeName()));
-      PipeTemporaryMetaMetrics.getInstance()
+      PipeTemporaryMetaInCoordinatorMetrics.getInstance()
           .handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
       return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     } catch (final Exception e) {
@@ -143,7 +143,7 @@ public class PipeInfo implements SnapshotProcessor {
                 throw new PipeException("Failed to decrease listener 
reference", e);
               }
             });
-        PipeTemporaryMetaMetrics.getInstance()
+        PipeTemporaryMetaInCoordinatorMetrics.getInstance()
             .handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
         return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
       } else {
@@ -181,7 +181,7 @@ public class PipeInfo implements SnapshotProcessor {
                 throw new PipeException("Failed to decrease listener 
reference", e);
               }
             });
-        PipeTemporaryMetaMetrics.getInstance()
+        PipeTemporaryMetaInCoordinatorMetrics.getInstance()
             .handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
         return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
       } else {
@@ -206,7 +206,7 @@ public class PipeInfo implements SnapshotProcessor {
   public TSStatus operateMultiplePipes(final OperateMultiplePipesPlanV2 plans) 
{
     try {
       final TSStatus status = pipeTaskInfo.operateMultiplePipes(plans);
-      PipeTemporaryMetaMetrics.getInstance()
+      PipeTemporaryMetaInCoordinatorMetrics.getInstance()
           .handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
       return status;
     } catch (final Exception e) {
@@ -225,7 +225,7 @@ public class PipeInfo implements SnapshotProcessor {
         pipeMetaListFromCoordinator.add(pipeMeta);
       }
       
PipeConfigNodeAgent.task().handlePipeMetaChanges(pipeMetaListFromCoordinator);
-      PipeTemporaryMetaMetrics.getInstance()
+      PipeTemporaryMetaInCoordinatorMetrics.getInstance()
           .handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
       return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     } catch (final Exception e) {
@@ -245,7 +245,7 @@ public class PipeInfo implements SnapshotProcessor {
             
pipeTaskInfo.getPipeMetaByPipeName(pipeMeta.getStaticMeta().getPipeName()));
       }
       
PipeConfigNodeAgent.task().handlePipeMetaChanges(pipeMetaListFromCoordinator);
-      PipeTemporaryMetaMetrics.getInstance()
+      PipeTemporaryMetaInCoordinatorMetrics.getInstance()
           .handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
       return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     } catch (final Exception e) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 936220f77b3..70d23a47cc7 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -31,6 +31,7 @@ import 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMeta;
+import 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInCoordinator;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeType;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
@@ -602,7 +603,8 @@ public class PipeTaskInfo implements SnapshotProcessor {
                                   .get(consensusGroupId.getId())
                                   .setLeaderNodeId(newLeader);
                               // New region leader may contain un-transferred 
events
-                              
pipeMeta.getTemporaryMeta().markDataNodeUncompleted(newLeader);
+                              ((PipeTemporaryMetaInCoordinator) 
pipeMeta.getTemporaryMeta())
+                                  .markDataNodeUncompleted(newLeader);
                             } else {
                               
consensusGroupIdToTaskMetaMap.remove(consensusGroupId.getId());
                             }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
index 0b8fb380cd9..4cfb7185def 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
@@ -168,7 +168,7 @@ class PipeAgentLauncher {
               getAllPipeInfoResp.getAllPipeInfo().stream()
                   .map(
                       byteBuffer -> {
-                        final PipeMeta pipeMeta = 
PipeMeta.deserialize(byteBuffer);
+                        final PipeMeta pipeMeta = 
PipeMeta.deserialize4TaskAgent(byteBuffer);
                         LOGGER.info(
                             "Pulled pipe meta from config node: {}, recovering 
...", pipeMeta);
                         return pipeMeta;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 5e5143e5f06..a25029f6a58 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -528,15 +528,25 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
       }
 
       // Only restart the stream mode pipes for releasing memTables.
-      if (extractors.get(0).isStreamMode()
-          && 
extractors.stream().anyMatch(IoTDBDataRegionExtractor::hasConsumedAllHistoricalTsFiles)
-          && (mayMemTablePinnedCountReachDangerousThreshold()
-              || mayWalSizeReachThrottleThreshold())) {
-        // Extractors of this pipe may be stuck and is pinning too many 
MemTables.
-        LOGGER.warn(
-            "Pipe {} needs to restart because too many memTables are pinned.",
-            pipeMeta.getStaticMeta());
-        stuckPipes.add(pipeMeta);
+      if (extractors.get(0).isStreamMode()) {
+        if 
(extractors.stream().anyMatch(IoTDBDataRegionExtractor::hasConsumedAllHistoricalTsFiles)
+            && (mayMemTablePinnedCountReachDangerousThreshold()
+                || mayWalSizeReachThrottleThreshold())) {
+          // Extractors of this pipe may be stuck and is pinning too many 
MemTables.
+          LOGGER.warn(
+              "Pipe {} needs to restart because too many memTables are 
pinned.",
+              pipeMeta.getStaticMeta());
+          stuckPipes.add(pipeMeta);
+        } else if (getFloatingMemoryUsageInByte(pipeName)
+            >= 
(PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes()
+                    - 
PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes())
+                / pipeMetaKeeper.getPipeMetaCount()) {
+          // Extractors of this pipe may have too many insert nodes
+          LOGGER.warn(
+              "Pipe {} needs to restart because too many insertNodes are 
extracted.",
+              pipeMeta.getStaticMeta());
+          stuckPipes.add(pipeMeta);
+        }
       }
     }
 
@@ -582,7 +592,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
     acquireWriteLock();
     try {
       final long startTime = System.currentTimeMillis();
-      final PipeMeta originalPipeMeta = pipeMeta.deepCopy();
+      final PipeMeta originalPipeMeta = pipeMeta.deepCopy4TaskAgent();
       handleDropPipe(pipeMeta.getStaticMeta().getPipeName());
       handleSinglePipeMetaChanges(originalPipeMeta);
       LOGGER.warn(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
index fae2338a215..ce865b1fb21 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventPlainBatch.java
@@ -54,7 +54,7 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
 
   private static final String TREE_MODEL_DATABASE_PLACEHOLDER = null;
   private final List<String> binaryDataBases = new ArrayList<>();
-  private final List<String> inertNodeDataBases = new ArrayList<>();
+  private final List<String> insertNodeDataBases = new ArrayList<>();
   private final List<String> tabletDataBases = new ArrayList<>();
 
   // limit in buffer size
@@ -111,7 +111,7 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
     tabletBuffers.clear();
 
     binaryDataBases.clear();
-    inertNodeDataBases.clear();
+    insertNodeDataBases.clear();
     tabletDataBases.clear();
 
     pipe2BytesAccumulated.clear();
@@ -123,7 +123,7 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
         insertNodeBuffers,
         tabletBuffers,
         binaryDataBases,
-        inertNodeDataBases,
+        insertNodeDataBases,
         tabletDataBases);
   }
 
@@ -168,10 +168,10 @@ public class PipeTabletEventPlainBatch extends 
PipeTabletEventBatch {
         if (pipeInsertNodeTabletInsertionEvent.isTableModelEvent()) {
           databaseEstimateSize =
               
pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName().length();
-          
inertNodeDataBases.add(pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName());
+          
insertNodeDataBases.add(pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName());
         } else {
           databaseEstimateSize = 4;
-          inertNodeDataBases.add(TREE_MODEL_DATABASE_PLACEHOLDER);
+          insertNodeDataBases.add(TREE_MODEL_DATABASE_PLACEHOLDER);
         }
       }
     } else {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index e473017a489..ca5982d9ea5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -396,7 +396,7 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
   private void logOnClientException(
       final AsyncPipeDataTransferServiceClient client, final Exception e) {
     if (client == null) {
-      LOGGER.warn(THRIFT_ERROR_FORMATTER_WITHOUT_ENDPOINT, e);
+      LOGGER.warn(THRIFT_ERROR_FORMATTER_WITHOUT_ENDPOINT);
     } else {
       LOGGER.warn(
           String.format(THRIFT_ERROR_FORMATTER_WITH_ENDPOINT, client.getIp(), 
client.getPort()), e);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
index 616ce606fc0..3cd8cc72c25 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java
@@ -26,6 +26,7 @@ import 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
 import 
org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource;
+import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent;
 import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventParser;
@@ -42,11 +43,14 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalIn
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
+import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryPosition;
 import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
+import org.apache.tsfile.utils.Accountable;
+import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.write.UnSupportedDataTypeException;
 import org.apache.tsfile.write.record.Tablet;
 import org.slf4j.Logger;
@@ -63,10 +67,16 @@ import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
 public class PipeInsertNodeTabletInsertionEvent extends PipeInsertionEvent
-    implements TabletInsertionEvent, ReferenceTrackableEvent {
+    implements TabletInsertionEvent, ReferenceTrackableEvent, Accountable {
 
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeInsertNodeTabletInsertionEvent.class);
+  private static final long INSTANCE_SIZE =
+      
RamUsageEstimator.shallowSizeOfInstance(PipeInsertNodeTabletInsertionEvent.class)
+          + RamUsageEstimator.shallowSizeOfInstance(WALEntryHandler.class)
+          + RamUsageEstimator.shallowSizeOfInstance(WALEntryPosition.class)
+          + RamUsageEstimator.shallowSizeOfInstance(AtomicInteger.class)
+          + RamUsageEstimator.shallowSizeOfInstance(AtomicBoolean.class);
 
   private final WALEntryHandler walEntryHandler;
   private final boolean isAligned;
@@ -164,12 +174,13 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
       if (Objects.nonNull(pipeName)) {
         PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
             .increaseTabletEventCount(pipeName, creationTime);
+        PipeDataNodeAgent.task().addFloatingMemoryUsageInByte(pipeName, 
ramBytesUsed());
       }
       return true;
     } catch (final Exception e) {
       LOGGER.warn(
           String.format(
-              "Increase reference count for memtable %d error. Holder Message: 
%s",
+              "Increase reference count for memTable %d error. Holder Message: 
%s",
               walEntryHandler.getMemTableId(), holderMessage),
           e);
       return false;
@@ -180,7 +191,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
   public boolean internallyDecreaseResourceReferenceCount(final String 
holderMessage) {
     try {
       PipeDataNodeResourceManager.wal().unpin(walEntryHandler);
-      // Release the containers' memory.
+      // Release the parsers' memory.
       if (eventParsers != null) {
         eventParsers.clear();
         eventParsers = null;
@@ -195,6 +206,7 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
       return false;
     } finally {
       if (Objects.nonNull(pipeName)) {
+        PipeDataNodeAgent.task().decreaseFloatingMemoryUsageInByte(pipeName, 
ramBytesUsed());
         PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
             .decreaseTabletEventCount(pipeName, creationTime);
       }
@@ -481,6 +493,18 @@ public class PipeInsertNodeTabletInsertionEvent extends 
PipeInsertionEvent
         this.isReleased, this.referenceCount, this.walEntryHandler);
   }
 
+  // Notes:
+  // 1. We only consider insertion event's memory for degrade and restart, 
because degrade/restart
+  // may not be of use for releasing other events' memory.
+  // 2. We do not consider eventParsers because they may not exist and if it 
is invoked, the event
+  // will soon be released.
+  @Override
+  public long ramBytesUsed() {
+    return INSTANCE_SIZE
+        + (Objects.nonNull(devicePath) ? PartialPath.estimateSize(devicePath) 
: 0)
+        + (Objects.nonNull(progressIndex) ? progressIndex.ramBytesUsed() : 0);
+  }
+
   private static class PipeInsertNodeTabletInsertionEventResource extends 
PipeEventResource {
 
     private final WALEntryHandler walEntryHandler;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 5157ece8810..2782e04c9aa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.pipe.api.access.Row;
 import org.apache.iotdb.pipe.api.collector.RowCollector;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 
+import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.write.record.Tablet;
 
 import java.util.Objects;
@@ -51,6 +52,9 @@ import java.util.function.BiConsumer;
 public class PipeRawTabletInsertionEvent extends PipeInsertionEvent
     implements TabletInsertionEvent, ReferenceTrackableEvent {
 
+  // For better calculation
+  private static final long INSTANCE_SIZE =
+      
RamUsageEstimator.shallowSizeOfInstance(PipeRawTabletInsertionEvent.class);
   private Tablet tablet;
   private String deviceId; // Only used when the tablet is released.
   private final boolean isAligned;
@@ -168,7 +172,7 @@ public class PipeRawTabletInsertionEvent extends 
PipeInsertionEvent
     allocatedMemoryBlock =
         PipeDataNodeResourceManager.memory()
             .forceAllocateForTabletWithRetry(
-                PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet));
+                PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) + 
INSTANCE_SIZE);
     if (Objects.nonNull(pipeName)) {
       PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
           .increaseTabletEventCount(pipeName, creationTime);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index bf2a5b6a966..95437f162b0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -214,7 +214,8 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
         || mayMemTablePinnedCountReachDangerousThreshold()
         || isHistoricalTsFileEventCountExceededLimit()
         || isRealtimeTsFileEventCountExceededLimit()
-        || mayTsFileLinkedCountReachDangerousThreshold();
+        || mayTsFileLinkedCountReachDangerousThreshold()
+        || mayInsertNodeMemoryReachDangerousThreshold();
   }
 
   private boolean mayWalSizeReachThrottleThreshold() {
@@ -245,6 +246,15 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
         >= PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount();
   }
 
+  private boolean mayInsertNodeMemoryReachDangerousThreshold() {
+    return 3
+            * PipeDataNodeAgent.task().getFloatingMemoryUsageInByte(pipeName)
+            * PipeDataNodeAgent.task().getPipeCount()
+        >= (PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes()
+                - 
PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes())
+            * 2;
+  }
+
   @Override
   public Event supply() {
     PipeRealtimeEvent realtimeEvent = (PipeRealtimeEvent) 
pendingQueue.directPoll();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 3d40633bdf1..d028fb1f182 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -1118,7 +1118,7 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
           PipeDataNodeAgent.task()
               .handlePipeMetaChanges(
                   req.getPipeMetas().stream()
-                      .map(PipeMeta::deserialize)
+                      .map(PipeMeta::deserialize4TaskAgent)
                       .collect(Collectors.toList()));
 
       return exceptionMessages.isEmpty()
@@ -1141,7 +1141,8 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
       if (req.isSetPipeNameToDrop()) {
         exceptionMessage = 
PipeDataNodeAgent.task().handleDropPipe(req.getPipeNameToDrop());
       } else if (req.isSetPipeMeta()) {
-        final PipeMeta pipeMeta = 
PipeMeta.deserialize(ByteBuffer.wrap(req.getPipeMeta()));
+        final PipeMeta pipeMeta =
+            PipeMeta.deserialize4TaskAgent(ByteBuffer.wrap(req.getPipeMeta()));
         exceptionMessage = 
PipeDataNodeAgent.task().handleSinglePipeMetaChanges(pipeMeta);
       } else {
         throw new Exception("Invalid TPushSinglePipeMetaReq");
@@ -1178,7 +1179,7 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
         }
       } else if (req.isSetPipeMetas()) {
         for (ByteBuffer byteBuffer : req.getPipeMetas()) {
-          final PipeMeta pipeMeta = PipeMeta.deserialize(byteBuffer);
+          final PipeMeta pipeMeta = PipeMeta.deserialize4TaskAgent(byteBuffer);
           TPushPipeMetaRespExceptionMessage message =
               PipeDataNodeAgent.task().handleSinglePipeMetaChanges(pipeMeta);
           exceptionMessages.add(message);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 0b2fad3a64d..8bde753ee72 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -1945,7 +1945,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
 
       pipeMetaFromCoordinator =
           getAllPipeInfoResp.getAllPipeInfo().stream()
-              .map(PipeMeta::deserialize)
+              .map(PipeMeta::deserialize4Coordinator)
               .filter(
                   pipeMeta ->
                       pipeMeta
@@ -1964,7 +1964,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
             new IoTDBException(exceptionMessage, 
TSStatusCode.PIPE_ERROR.getStatusCode()));
         return future;
       }
-    } catch (Exception e) {
+    } catch (final Exception e) {
       final String exceptionMessage =
           String.format(
               "Failed to alter pipe %s, because %s",
@@ -2024,7 +2024,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
                   
pipeMetaFromCoordinator.getStaticMeta().getConnectorParameters().getAttribute());
         }
       }
-    } catch (Exception e) {
+    } catch (final Exception e) {
       LOGGER.info("Failed to validate alter pipe statement, because {}", 
e.getMessage(), e);
       future.setException(
           new IoTDBException(e.getMessage(), 
TSStatusCode.PIPE_ERROR.getStatusCode()));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
index ea9570033ae..edc67edccfa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
@@ -58,7 +58,7 @@ public class WALEntryHandler {
   private volatile boolean isHardlink = false;
   private final AtomicReference<File> hardlinkFile = new AtomicReference<>();
 
-  public WALEntryHandler(WALEntryValue value) {
+  public WALEntryHandler(final WALEntryValue value) {
     this.value = value;
   }
 
@@ -105,7 +105,7 @@ public class WALEntryHandler {
    */
   public InsertNode getInsertNode() throws WALPipeException {
     // return local cache
-    WALEntryValue res = value;
+    final WALEntryValue res = value;
     if (res != null) {
       if (res instanceof InsertNode) {
         return (InsertNode) res;
@@ -120,7 +120,7 @@ public class WALEntryHandler {
         synchronized (this) {
           this.wait();
         }
-      } catch (InterruptedException e) {
+      } catch (final InterruptedException e) {
         logger.warn("Interrupted when waiting for result.", e);
         Thread.currentThread().interrupt();
       }
@@ -179,7 +179,7 @@ public class WALEntryHandler {
     }
   }
 
-  public void setWalNode(WALNode walNode, long memTableId) {
+  public void setWalNode(final WALNode walNode, final long memTableId) {
     this.walNode = walNode;
     this.memTableId = memTableId;
     walEntryPosition.setWalNode(walNode, memTableId);
@@ -189,7 +189,7 @@ public class WALEntryHandler {
     return memTableId;
   }
 
-  public void setEntryPosition(long walFileVersionId, long position) {
+  public void setEntryPosition(final long walFileVersionId, final long 
position) {
     this.walEntryPosition.setEntryPosition(walFileVersionId, position, value);
     this.value = null;
     synchronized (this) {
@@ -205,7 +205,7 @@ public class WALEntryHandler {
     return walEntryPosition.getSize();
   }
 
-  public void setSize(int size) {
+  public void setSize(final int size) {
     this.walEntryPosition.setSize(size);
   }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
index 4d1f2b8bb3e..07ae50a5f88 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
@@ -227,6 +227,11 @@ public class TsFileResourceProgressIndexTest {
     public TotalOrderSumTuple getTotalOrderSumTuple() {
       return new TotalOrderSumTuple((long) val);
     }
+
+    @Override
+    public long ramBytesUsed() {
+      return 0;
+    }
   }
 
   @Test
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
index 43f934c5b9b..b54d6db4dab 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ProgressIndex.java
@@ -24,6 +24,8 @@ import 
org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
 
 import com.google.common.collect.ImmutableList;
+import org.apache.tsfile.utils.Accountable;
+import org.apache.tsfile.utils.RamUsageEstimator;
 
 import javax.annotation.Nonnull;
 
@@ -32,6 +34,7 @@ import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.LongStream;
@@ -53,7 +56,14 @@ import java.util.stream.LongStream;
  * immutability contract. This prevents unintended modifications to the 
underlying mutable state
  * from affecting other parts of the program.
  */
-public abstract class ProgressIndex {
+public abstract class ProgressIndex implements Accountable {
+
+  protected static final long LOCK_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(ReentrantReadWriteLock.class)
+          + 
RamUsageEstimator.shallowSizeOfInstance(ReentrantReadWriteLock.ReadLock.class)
+          + 
RamUsageEstimator.shallowSizeOfInstance(ReentrantReadWriteLock.WriteLock.class)
+          + ((long) RamUsageEstimator.NUM_BYTES_OBJECT_HEADER << 1)
+          + 64;
 
   /** Serialize this progress index to the given byte buffer. */
   public abstract void serialize(ByteBuffer byteBuffer);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
index ac0e50b317c..b27c2666cc9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/HybridProgressIndex.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
 
 import com.google.common.collect.ImmutableMap;
+import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 
 import javax.annotation.Nonnull;
@@ -40,6 +41,11 @@ import java.util.stream.Collectors;
 
 public class HybridProgressIndex extends ProgressIndex {
 
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(HybridProgressIndex.class) + 
ProgressIndex.LOCK_SIZE;
+  private static final long ENTRY_SIZE =
+      RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY
+          + RamUsageEstimator.alignObjectSize(Short.BYTES);
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
   private final Map<Short, ProgressIndex> type2Index;
@@ -48,11 +54,11 @@ public class HybridProgressIndex extends ProgressIndex {
     this(Collections.emptyMap());
   }
 
-  public HybridProgressIndex(ProgressIndex progressIndex) {
+  public HybridProgressIndex(final ProgressIndex progressIndex) {
     this(Collections.singletonMap(progressIndex.getType().getType(), 
progressIndex));
   }
 
-  private HybridProgressIndex(Map<Short, ProgressIndex> type2Index) {
+  private HybridProgressIndex(final Map<Short, ProgressIndex> type2Index) {
     this.type2Index = new HashMap<>(type2Index);
   }
 
@@ -61,7 +67,7 @@ public class HybridProgressIndex extends ProgressIndex {
   }
 
   @Override
-  public void serialize(ByteBuffer byteBuffer) {
+  public void serialize(final ByteBuffer byteBuffer) {
     lock.readLock().lock();
     try {
       ProgressIndexType.HYBRID_PROGRESS_INDEX.serialize(byteBuffer);
@@ -77,7 +83,7 @@ public class HybridProgressIndex extends ProgressIndex {
   }
 
   @Override
-  public void serialize(OutputStream stream) throws IOException {
+  public void serialize(final OutputStream stream) throws IOException {
     lock.readLock().lock();
     try {
       ProgressIndexType.HYBRID_PROGRESS_INDEX.serialize(stream);
@@ -93,7 +99,7 @@ public class HybridProgressIndex extends ProgressIndex {
   }
 
   @Override
-  public boolean isAfter(@Nonnull ProgressIndex progressIndex) {
+  public boolean isAfter(@Nonnull final ProgressIndex progressIndex) {
     lock.readLock().lock();
     try {
       if (progressIndex instanceof MinimumProgressIndex) {
@@ -121,14 +127,14 @@ public class HybridProgressIndex extends ProgressIndex {
     }
   }
 
-  public boolean isGivenProgressIndexAfterSelf(ProgressIndex progressIndex) {
+  public boolean isGivenProgressIndexAfterSelf(final ProgressIndex 
progressIndex) {
     return type2Index.size() == 1
         && type2Index.containsKey(progressIndex.getType().getType())
         && 
progressIndex.isAfter(type2Index.get(progressIndex.getType().getType()));
   }
 
   @Override
-  public boolean equals(ProgressIndex progressIndex) {
+  public boolean equals(final ProgressIndex progressIndex) {
     lock.readLock().lock();
     try {
       if (!(progressIndex instanceof HybridProgressIndex)) {
@@ -152,7 +158,7 @@ public class HybridProgressIndex extends ProgressIndex {
   }
 
   @Override
-  public boolean equals(Object obj) {
+  public boolean equals(final Object obj) {
     if (obj == null) {
       return false;
     }
@@ -171,7 +177,8 @@ public class HybridProgressIndex extends ProgressIndex {
   }
 
   @Override
-  public ProgressIndex 
updateToMinimumEqualOrIsAfterProgressIndex(ProgressIndex progressIndex) {
+  public ProgressIndex updateToMinimumEqualOrIsAfterProgressIndex(
+      final ProgressIndex progressIndex) {
     lock.writeLock().lock();
     try {
       if (progressIndex == null || progressIndex instanceof 
MinimumProgressIndex) {
@@ -229,7 +236,7 @@ public class HybridProgressIndex extends ProgressIndex {
     }
   }
 
-  public static HybridProgressIndex deserializeFrom(ByteBuffer byteBuffer) {
+  public static HybridProgressIndex deserializeFrom(final ByteBuffer 
byteBuffer) {
     final HybridProgressIndex hybridProgressIndex = new HybridProgressIndex();
     final int size = ReadWriteIOUtils.readInt(byteBuffer);
     for (int i = 0; i < size; i++) {
@@ -240,7 +247,7 @@ public class HybridProgressIndex extends ProgressIndex {
     return hybridProgressIndex;
   }
 
-  public static HybridProgressIndex deserializeFrom(InputStream stream) throws 
IOException {
+  public static HybridProgressIndex deserializeFrom(final InputStream stream) 
throws IOException {
     final HybridProgressIndex hybridProgressIndex = new HybridProgressIndex();
     final int size = ReadWriteIOUtils.readInt(stream);
     for (int i = 0; i < size; i++) {
@@ -255,4 +262,11 @@ public class HybridProgressIndex extends ProgressIndex {
   public String toString() {
     return "HybridProgressIndex{" + "type2Index=" + type2Index + '}';
   }
+
+  @Override
+  public long ramBytesUsed() {
+    return INSTANCE_SIZE
+        + type2Index.size() * ENTRY_SIZE
+        + 
type2Index.values().stream().map(ProgressIndex::ramBytesUsed).reduce(0L, 
Long::sum);
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
index 5c7ffb2cc81..8b02d85da5a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTProgressIndex.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.consensus.index.impl;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
 
+import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 
 import javax.annotation.Nonnull;
@@ -37,6 +38,12 @@ import java.util.Objects;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class IoTProgressIndex extends ProgressIndex {
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(IoTProgressIndex.class) + 
ProgressIndex.LOCK_SIZE;
+
+  // We assume that the integers are all cached, while the longs are all not
+  private static final long ENTRY_SIZE =
+      RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY + Long.BYTES;
 
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
@@ -232,4 +239,9 @@ public class IoTProgressIndex extends ProgressIndex {
   public String toString() {
     return "IoTProgressIndex{" + "peerId2SearchIndex=" + peerId2SearchIndex + 
'}';
   }
+
+  @Override
+  public long ramBytesUsed() {
+    return INSTANCE_SIZE + peerId2SearchIndex.size() * ENTRY_SIZE;
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MetaProgressIndex.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MetaProgressIndex.java
index c181345e4ad..75322152d45 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MetaProgressIndex.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MetaProgressIndex.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.consensus.index.impl;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
 
+import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 
 import javax.annotation.Nonnull;
@@ -34,7 +35,8 @@ import java.util.Objects;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class MetaProgressIndex extends ProgressIndex {
-
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(MetaProgressIndex.class) + 
ProgressIndex.LOCK_SIZE;
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
   private final long index;
@@ -173,4 +175,9 @@ public class MetaProgressIndex extends ProgressIndex {
   public String toString() {
     return "MetaProgressIndex{" + "index=" + index + '}';
   }
+
+  @Override
+  public long ramBytesUsed() {
+    return INSTANCE_SIZE;
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java
index e476409a30b..e22f82c9fbb 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumProgressIndex.java
@@ -99,4 +99,9 @@ public class MinimumProgressIndex extends ProgressIndex {
   public String toString() {
     return "MinimumProgressIndex{}";
   }
+
+  @Override
+  public long ramBytesUsed() {
+    return 0;
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
index c2511222eab..5756594abeb 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
 
 import com.google.common.collect.ImmutableMap;
+import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 
 import javax.annotation.Nonnull;
@@ -39,6 +40,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
 public class RecoverProgressIndex extends ProgressIndex {
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(RecoverProgressIndex.class)
+          + +ProgressIndex.LOCK_SIZE;
 
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
@@ -239,4 +243,13 @@ public class RecoverProgressIndex extends ProgressIndex {
   public String toString() {
     return "RecoverProgressIndex{" + "dataNodeId2LocalIndex=" + 
dataNodeId2LocalIndex + '}';
   }
+
+  @Override
+  public long ramBytesUsed() {
+    return INSTANCE_SIZE
+        + dataNodeId2LocalIndex.size() * 
RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY
+        + dataNodeId2LocalIndex.values().stream()
+            .map(SimpleProgressIndex::ramBytesUsed)
+            .reduce(0L, Long::sum);
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
index d852f0cc001..162dc9f128d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/SimpleProgressIndex.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.consensus.index.impl;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
 
+import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 
 import javax.annotation.Nonnull;
@@ -34,7 +35,8 @@ import java.util.Objects;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class SimpleProgressIndex extends ProgressIndex {
-
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(SimpleProgressIndex.class) + 
ProgressIndex.LOCK_SIZE;
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
   private final int rebootTimes;
@@ -211,4 +213,9 @@ public class SimpleProgressIndex extends ProgressIndex {
         + memTableFlushOrderId
         + '}';
   }
+
+  @Override
+  public long ramBytesUsed() {
+    return INSTANCE_SIZE;
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/StateProgressIndex.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/StateProgressIndex.java
index 9eca0795e51..b6c51665e19 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/StateProgressIndex.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/StateProgressIndex.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.commons.consensus.index.ProgressIndexType;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 
 import javax.annotation.Nonnull;
@@ -44,7 +45,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
  * integrity and independence of the progress index instances.
  */
 public class StateProgressIndex extends ProgressIndex {
-
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(StateProgressIndex.class) + 
ProgressIndex.LOCK_SIZE;
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
   private final long version;
@@ -52,7 +54,7 @@ public class StateProgressIndex extends ProgressIndex {
   private final ProgressIndex innerProgressIndex;
 
   public StateProgressIndex(
-      long version, Map<String, Binary> state, ProgressIndex 
innerProgressIndex) {
+      final long version, final Map<String, Binary> state, final ProgressIndex 
innerProgressIndex) {
     this.version = version;
     this.state = new HashMap<>(state);
     this.innerProgressIndex = innerProgressIndex;
@@ -242,4 +244,15 @@ public class StateProgressIndex extends ProgressIndex {
         + innerProgressIndex
         + '}';
   }
+
+  @Override
+  public long ramBytesUsed() {
+    return INSTANCE_SIZE
+        + innerProgressIndex.ramBytesUsed()
+        + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY * state.size()
+        + state.entrySet().stream()
+            .map(
+                entry -> RamUsageEstimator.sizeOf(entry.getKey()) + 
entry.getValue().ramBytesUsed())
+            .reduce(0L, Long::sum);
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java
index 37d6b6fdf38..bb1b2c1ce5e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/TimeWindowStateProgressIndex.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.commons.consensus.index.ProgressIndexType;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 
 import javax.annotation.Nonnull;
@@ -47,6 +48,13 @@ import java.util.stream.Collectors;
  */
 public class TimeWindowStateProgressIndex extends ProgressIndex {
 
+  private static final long INSTANCE_SIZE =
+      
RamUsageEstimator.shallowSizeOfInstance(TimeWindowStateProgressIndex.class)
+          + ProgressIndex.LOCK_SIZE;
+  private static final long ENTRY_SIZE =
+      RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY
+          + RamUsageEstimator.shallowSizeOfInstance(Pair.class);
+
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
   // Only the byteBuffer is nullable, the timeSeries, pair and timestamp must 
not be null
@@ -292,4 +300,20 @@ public class TimeWindowStateProgressIndex extends 
ProgressIndex {
         + timeSeries2TimestampWindowBufferPairMap
         + "'}";
   }
+
+  @Override
+  public long ramBytesUsed() {
+    return INSTANCE_SIZE
+        + timeSeries2TimestampWindowBufferPairMap.size() * ENTRY_SIZE
+        + timeSeries2TimestampWindowBufferPairMap.entrySet().stream()
+            .map(
+                entry ->
+                    RamUsageEstimator.sizeOf(entry.getKey())
+                        + RamUsageEstimator.sizeOf(entry.getValue().getLeft())
+                        + (Objects.nonNull(entry.getValue().getRight())
+                            ? 
(RamUsageEstimator.shallowSizeOfInstance(ByteBuffer.class)
+                                + 
RamUsageEstimator.sizeOf(entry.getValue().getRight().array()))
+                            : 0))
+            .reduce(0L, Long::sum);
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index 00afcee7280..a42b7d5ea7e 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -29,6 +29,9 @@ import 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInAgent;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
+import 
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
 import org.apache.iotdb.commons.pipe.connector.limiter.PipeEndPointRateLimiter;
 import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
@@ -81,6 +84,7 @@ public abstract class PipeTaskAgent {
 
     // Help PipeEndPointRateLimiter to check if the pipe is still alive
     PipeEndPointRateLimiter.setTaskAgent(this);
+    PipeEventCommitManager.getInstance().setTaskAgent(this);
   }
 
   ////////////////////////// PipeMeta Lock Control //////////////////////////
@@ -1030,8 +1034,53 @@ public abstract class PipeTaskAgent {
   protected abstract void collectPipeMetaListInternal(
       final TPipeHeartbeatReq req, final TPipeHeartbeatResp resp) throws 
TException;
 
+  ///////////////////////// Maintain meta info /////////////////////////
+
   public long getPipeCreationTime(final String pipeName) {
     final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
     return pipeMeta == null ? 0 : pipeMeta.getStaticMeta().getCreationTime();
   }
+
+  public String getPipeNameWithCreationTime(final String pipeName, final long 
creationTime) {
+    final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+    return pipeMeta == null
+        ? pipeName + "_" + creationTime
+        : ((PipeTemporaryMetaInAgent) 
pipeMeta.getTemporaryMeta()).getPipeNameWithCreationTime();
+  }
+
+  public CommitterKey getCommitterKey(
+      final String pipeName, final long creationTime, final int regionId, 
final int restartTime) {
+    final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+    return pipeMeta == null
+        ? new CommitterKey(pipeName, creationTime, regionId, restartTime)
+        : ((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta())
+            .getCommitterKey(pipeName, creationTime, regionId, restartTime);
+  }
+
+  public long getFloatingMemoryUsageInByte(final String pipeName) {
+    final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+    return pipeMeta == null
+        ? 0
+        : ((PipeTemporaryMetaInAgent) 
pipeMeta.getTemporaryMeta()).getFloatingMemoryUsageInByte();
+  }
+
+  public void addFloatingMemoryUsageInByte(final String pipeName, final long 
sizeInByte) {
+    final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+    if (Objects.nonNull(pipeMeta)) {
+      ((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta())
+          .addFloatingMemoryUsageInByte(sizeInByte);
+    }
+  }
+
+  public void decreaseFloatingMemoryUsageInByte(final String pipeName, final 
long sizeInByte) {
+    final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+    if (Objects.nonNull(pipeMeta)) {
+      ((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta())
+          .decreaseFloatingMemoryUsageInByte(sizeInByte);
+    }
+  }
+
+  public int getPipeCount() {
+    return pipeMetaKeeper.getPipeMetaCount();
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java
index e536a3a7af2..997278010e9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeMeta.java
@@ -37,7 +37,7 @@ public class PipeMeta {
   private final PipeTemporaryMeta temporaryMeta;
 
   public PipeMeta(final PipeStaticMeta staticMeta, final PipeRuntimeMeta 
runtimeMeta) {
-    this(staticMeta, runtimeMeta, new PipeTemporaryMeta());
+    this(staticMeta, runtimeMeta, new PipeTemporaryMetaInCoordinator());
   }
 
   public PipeMeta(
@@ -79,14 +79,23 @@ public class PipeMeta {
     return new PipeMeta(staticMeta, runtimeMeta);
   }
 
-  public static PipeMeta deserialize(final ByteBuffer byteBuffer) {
+  public static PipeMeta deserialize4TaskAgent(final ByteBuffer byteBuffer) {
+    final PipeStaticMeta staticMeta = PipeStaticMeta.deserialize(byteBuffer);
+    final PipeRuntimeMeta runtimeMeta = 
PipeRuntimeMeta.deserialize(byteBuffer);
+    return new PipeMeta(
+        staticMeta,
+        runtimeMeta,
+        new PipeTemporaryMetaInAgent(staticMeta.getPipeName(), 
staticMeta.getCreationTime()));
+  }
+
+  public static PipeMeta deserialize4Coordinator(final ByteBuffer byteBuffer) {
     final PipeStaticMeta staticMeta = PipeStaticMeta.deserialize(byteBuffer);
     final PipeRuntimeMeta runtimeMeta = 
PipeRuntimeMeta.deserialize(byteBuffer);
     return new PipeMeta(staticMeta, runtimeMeta);
   }
 
-  public PipeMeta deepCopy() throws IOException {
-    return PipeMeta.deserialize(serialize());
+  public PipeMeta deepCopy4TaskAgent() throws IOException {
+    return PipeMeta.deserialize4TaskAgent(serialize());
   }
 
   public String coreReportMessage() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMeta.java
index aeffe7fb3d9..5363f47190b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMeta.java
@@ -19,75 +19,4 @@
 
 package org.apache.iotdb.commons.pipe.agent.task.meta;
 
-import java.util.Collections;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-public class PipeTemporaryMeta {
-
-  private final Set<Integer> completedDataNodeIds =
-      Collections.newSetFromMap(new ConcurrentHashMap<>());
-  private final ConcurrentMap<Integer, Long> nodeId2RemainingEventMap = new 
ConcurrentHashMap<>();
-  private final ConcurrentMap<Integer, Double> nodeId2RemainingTimeMap = new 
ConcurrentHashMap<>();
-
-  public void markDataNodeCompleted(final int dataNodeId) {
-    completedDataNodeIds.add(dataNodeId);
-  }
-
-  public void markDataNodeUncompleted(final int dataNodeId) {
-    completedDataNodeIds.remove(dataNodeId);
-  }
-
-  public void setRemainingEvent(final int dataNodeId, final long 
remainingEventCount) {
-    nodeId2RemainingEventMap.put(dataNodeId, remainingEventCount);
-  }
-
-  public void setRemainingTime(final int dataNodeId, final double 
remainingTime) {
-    nodeId2RemainingTimeMap.put(dataNodeId, remainingTime);
-  }
-
-  public Set<Integer> getCompletedDataNodeIds() {
-    return completedDataNodeIds;
-  }
-
-  public long getGlobalRemainingEvents() {
-    return 
nodeId2RemainingEventMap.values().stream().reduce(Long::sum).orElse(0L);
-  }
-
-  public double getGlobalRemainingTime() {
-    return 
nodeId2RemainingTimeMap.values().stream().reduce(Math::max).orElse(0d);
-  }
-
-  @Override
-  public boolean equals(final Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    final PipeTemporaryMeta that = (PipeTemporaryMeta) o;
-    return Objects.equals(this.completedDataNodeIds, that.completedDataNodeIds)
-        && Objects.equals(this.nodeId2RemainingEventMap, 
that.nodeId2RemainingEventMap)
-        && Objects.equals(this.nodeId2RemainingTimeMap, 
that.nodeId2RemainingTimeMap);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(completedDataNodeIds, nodeId2RemainingEventMap, 
nodeId2RemainingTimeMap);
-  }
-
-  @Override
-  public String toString() {
-    return "PipeTemporaryMeta{"
-        + "completedDataNodeIds="
-        + completedDataNodeIds
-        + ", nodeId2RemainingEventMap="
-        + nodeId2RemainingEventMap
-        + ", nodeId2RemainingTimeMap"
-        + nodeId2RemainingTimeMap
-        + '}';
-  }
-}
+public interface PipeTemporaryMeta {}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMetaInAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMetaInAgent.java
new file mode 100644
index 00000000000..23914fa8d84
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMetaInAgent.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.agent.task.meta;
+
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class PipeTemporaryMetaInAgent implements PipeTemporaryMeta {
+
+  // Statistics
+  private final AtomicLong floatingMemoryUsageInByte = new AtomicLong(0L);
+
+  // Object pool
+  private final String pipeNameWithCreationTime;
+  private final Map<Integer, CommitterKey> regionId2CommitterKeyMap = new 
ConcurrentHashMap<>();
+
+  PipeTemporaryMetaInAgent(final String pipeName, final long creationTime) {
+    this.pipeNameWithCreationTime = pipeName + "_" + creationTime;
+  }
+
+  /////////////////////////////// DataNode ///////////////////////////////
+
+  public void addFloatingMemoryUsageInByte(final long usage) {
+    floatingMemoryUsageInByte.addAndGet(usage);
+  }
+
+  public void decreaseFloatingMemoryUsageInByte(final long usage) {
+    floatingMemoryUsageInByte.addAndGet(-usage);
+  }
+
+  public long getFloatingMemoryUsageInByte() {
+    return floatingMemoryUsageInByte.get();
+  }
+
+  public String getPipeNameWithCreationTime() {
+    return pipeNameWithCreationTime;
+  }
+
+  public CommitterKey getCommitterKey(
+      final String pipeName, final long creationTime, final int regionId, 
final int restartTime) {
+    final CommitterKey key = regionId2CommitterKeyMap.get(regionId);
+    if (Objects.nonNull(key) && key.getRestartTimes() == restartTime) {
+      return key;
+    }
+    final CommitterKey newKey = new CommitterKey(pipeName, creationTime, 
regionId, restartTime);
+    if (Objects.nonNull(key) && restartTime < key.getRestartTimes()) {
+      return newKey;
+    }
+    // restartTime > key.getRestartTimes()
+    regionId2CommitterKeyMap.put(regionId, newKey);
+    return newKey;
+  }
+
+  /////////////////////////////// Object ///////////////////////////////
+
+  // We assume that the "pipeNameWithCreationTime" does not contain extra 
information
+  // thus we do not consider it here
+  @Override
+  public boolean equals(final Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    final PipeTemporaryMetaInAgent that = (PipeTemporaryMetaInAgent) o;
+    return Objects.equals(
+            this.floatingMemoryUsageInByte.get(), 
that.floatingMemoryUsageInByte.get())
+        && Objects.equals(this.regionId2CommitterKeyMap, 
that.regionId2CommitterKeyMap);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(floatingMemoryUsageInByte, regionId2CommitterKeyMap);
+  }
+
+  @Override
+  public String toString() {
+    return "PipeTemporaryMeta{"
+        + "floatingMemoryUsage="
+        + floatingMemoryUsageInByte
+        + ", regionId2CommitterKeyMap="
+        + regionId2CommitterKeyMap
+        + '}';
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMetaInCoordinator.java
similarity index 94%
copy from 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMeta.java
copy to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMetaInCoordinator.java
index aeffe7fb3d9..ee127bbae44 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTemporaryMetaInCoordinator.java
@@ -25,8 +25,9 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
-public class PipeTemporaryMeta {
+public class PipeTemporaryMetaInCoordinator implements PipeTemporaryMeta {
 
+  // ConfigNode statistics
   private final Set<Integer> completedDataNodeIds =
       Collections.newSetFromMap(new ConcurrentHashMap<>());
   private final ConcurrentMap<Integer, Long> nodeId2RemainingEventMap = new 
ConcurrentHashMap<>();
@@ -68,7 +69,7 @@ public class PipeTemporaryMeta {
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-    final PipeTemporaryMeta that = (PipeTemporaryMeta) o;
+    final PipeTemporaryMetaInCoordinator that = 
(PipeTemporaryMetaInCoordinator) o;
     return Objects.equals(this.completedDataNodeIds, that.completedDataNodeIds)
         && Objects.equals(this.nodeId2RemainingEventMap, 
that.nodeId2RemainingEventMap)
         && Objects.equals(this.nodeId2RemainingTimeMap, 
that.nodeId2RemainingTimeMap);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/CommitterKey.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/CommitterKey.java
index d135ba27a3e..8335d504551 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/CommitterKey.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/CommitterKey.java
@@ -32,7 +32,7 @@ public class CommitterKey {
     this(pipeName, creationTime, regionId, -1);
   }
 
-  CommitterKey(
+  public CommitterKey(
       final String pipeName, final long creationTime, final int regionId, 
final int restartTimes) {
     this.pipeName = pipeName;
     this.creationTime = creationTime;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
index c429f3155c0..af84d89f3da 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.commons.pipe.agent.task.progress;
 
+import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.metric.PipeEventCommitMetrics;
 
@@ -34,6 +35,7 @@ public class PipeEventCommitManager {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeEventCommitManager.class);
 
+  private volatile PipeTaskAgent taskAgent;
   private final Map<CommitterKey, PipeEventCommitter> eventCommitterMap = new 
ConcurrentHashMap<>();
 
   // the restartTimes in the committer key is always -1
@@ -104,7 +106,9 @@ public class PipeEventCommitManager {
     }
     if (Objects.nonNull(commitRateMarker)) {
       try {
-        commitRateMarker.accept(event.getPipeNameWithCreationTime(), 
event.isDataRegionEvent());
+        commitRateMarker.accept(
+            taskAgent.getPipeNameWithCreationTime(event.getPipeName(), 
event.getCreationTime()),
+            event.isDataRegionEvent());
       } catch (final Exception e) {
         if (LOGGER.isDebugEnabled()) {
           LOGGER.debug(
@@ -144,7 +148,7 @@ public class PipeEventCommitManager {
 
   private CommitterKey generateCommitterKey(
       final String pipeName, final long creationTime, final int regionId) {
-    return new CommitterKey(
+    return taskAgent.getCommitterKey(
         pipeName,
         creationTime,
         regionId,
@@ -162,6 +166,10 @@ public class PipeEventCommitManager {
         committerKey.getPipeName(), committerKey.getCreationTime(), 
committerKey.getRegionId());
   }
 
+  public void setTaskAgent(final PipeTaskAgent taskAgent) {
+    this.taskAgent = taskAgent;
+  }
+
   public void setCommitRateMarker(final BiConsumer<String, Boolean> 
commitRateMarker) {
     this.commitRateMarker = commitRateMarker;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index 2fdef0730a8..30c1d1dca6b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -55,7 +55,6 @@ public abstract class EnrichedEvent implements Event {
 
   protected final String pipeName;
   protected final long creationTime;
-  private final String pipeNameWithCreationTime; // cache for better 
performance
 
   protected final PipeTaskMeta pipeTaskMeta;
 
@@ -89,7 +88,6 @@ public abstract class EnrichedEvent implements Event {
 
     this.pipeName = pipeName;
     this.creationTime = creationTime;
-    this.pipeNameWithCreationTime = pipeName + "_" + creationTime;
     this.pipeTaskMeta = pipeTaskMeta;
     this.treePattern = treePattern;
     this.tablePattern = tablePattern;
@@ -313,10 +311,6 @@ public abstract class EnrichedEvent implements Event {
     return creationTime;
   }
 
-  public String getPipeNameWithCreationTime() {
-    return pipeNameWithCreationTime;
-  }
-
   public final int getRegionId() {
     // TODO: persist regionId in EnrichedEvent
     return committerKey == null ? -1 : committerKey.getRegionId();
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java
index ad77c8c3901..8e04baf7cc0 100644
--- 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java
@@ -51,7 +51,7 @@ public class PipeMetaDeSerTest {
 
   @Test
   public void test() throws IOException {
-    PipeStaticMeta pipeStaticMeta =
+    final PipeStaticMeta pipeStaticMeta =
         new PipeStaticMeta(
             "pipeName",
             123L,
@@ -67,8 +67,8 @@ public class PipeMetaDeSerTest {
               }
             },
             new HashMap<String, String>() {});
-    ByteBuffer staticByteBuffer = pipeStaticMeta.serialize();
-    PipeStaticMeta pipeStaticMeta1 = 
PipeStaticMeta.deserialize(staticByteBuffer);
+    final ByteBuffer staticByteBuffer = pipeStaticMeta.serialize();
+    final PipeStaticMeta pipeStaticMeta1 = 
PipeStaticMeta.deserialize(staticByteBuffer);
     Assert.assertEquals(pipeStaticMeta, pipeStaticMeta1);
 
     HybridProgressIndex hybridProgressIndex =
@@ -82,8 +82,9 @@ public class PipeMetaDeSerTest {
             hybridProgressIndex.updateToMinimumEqualOrIsAfterProgressIndex(
                 new IoTProgressIndex(3, 6L));
 
-    Map<String, Pair<Long, ByteBuffer>> 
timeSeries2TimestampWindowBufferPairMap = new HashMap<>();
-    ByteBuffer buffer;
+    final Map<String, Pair<Long, ByteBuffer>> 
timeSeries2TimestampWindowBufferPairMap =
+        new HashMap<>();
+    final ByteBuffer buffer;
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
         final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
       ReadWriteIOUtils.write("123", outputStream);
@@ -92,7 +93,7 @@ public class PipeMetaDeSerTest {
     timeSeries2TimestampWindowBufferPairMap.put("root.test.a1", new 
Pair<>(123L, buffer));
 
     final HybridProgressIndex finalHybridProgressIndex = hybridProgressIndex;
-    PipeRuntimeMeta pipeRuntimeMeta =
+    final PipeRuntimeMeta pipeRuntimeMeta =
         new PipeRuntimeMeta(
             new ConcurrentHashMap<Integer, PipeTaskMeta>() {
               {
@@ -145,9 +146,9 @@ public class PipeMetaDeSerTest {
     pipeRuntimeMeta1 = PipeRuntimeMeta.deserialize(runtimeByteBuffer);
     Assert.assertEquals(pipeRuntimeMeta, pipeRuntimeMeta1);
 
-    PipeMeta pipeMeta = new PipeMeta(pipeStaticMeta, pipeRuntimeMeta);
-    ByteBuffer byteBuffer = pipeMeta.serialize();
-    PipeMeta pipeMeta1 = PipeMeta.deserialize(byteBuffer);
+    final PipeMeta pipeMeta = new PipeMeta(pipeStaticMeta, pipeRuntimeMeta);
+    final ByteBuffer byteBuffer = pipeMeta.serialize();
+    final PipeMeta pipeMeta1 = PipeMeta.deserialize4Coordinator(byteBuffer);
     Assert.assertEquals(pipeMeta, pipeMeta1);
   }
 }


Reply via email to