This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3168bf68870 PipeConsensus: add metrics and fix some bugs for 
pipeConsensus (#12723)
3168bf68870 is described below

commit 3168bf688701f8aa753032b551749f8da17a6088
Author: Peng Junzhi <[email protected]>
AuthorDate: Fri Jun 14 00:43:16 2024 -0500

    PipeConsensus: add metrics and fix some bugs for pipeConsensus (#12723)
---
 .../apache/iotdb/consensus/ConsensusFactory.java   |   3 +
 .../consensus/pipe/PipeConsensusServerImpl.java    |  67 ++-
 .../pipe/consensuspipe/ConsensusPipeConnector.java |  25 ++
 .../pipe/consensuspipe/ConsensusPipeManager.java   |   2 +
 .../pipe/metric/PipeConsensusServerMetrics.java    | 190 +++++++++
 .../pipe/metric/PipeConsensusSyncLagManager.java   | 131 ++++++
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |   5 +
 .../pipeconsensus/PipeConsensusAsyncConnector.java | 151 +++++--
 .../pipeconsensus/PipeConsensusSyncConnector.java  |  25 +-
 .../PipeConsensusTabletBatchEventHandler.java      |   7 +-
 .../PipeConsensusTabletInsertNodeEventHandler.java |   6 +-
 .../PipeConsensusTabletInsertionEventHandler.java  |  14 +-
 .../PipeConsensusTabletRawEventHandler.java        |  45 --
 .../PipeConsensusTsFileInsertionEventHandler.java  |  22 +-
 .../consensus/PipeConsensusConnectorMetrics.java   | 290 +++++++++++++
 .../consensus/PipeConsensusReceiverMetrics.java    | 384 +++++++++++++++++
 .../pipeconsensus/PipeConsensusReceiver.java       | 455 ++++++++++++++-------
 .../subtask/processor/PipeProcessorSubtask.java    |   7 +-
 .../container/PipeConsensusClientMgrContainer.java |   2 +-
 .../config/constant/PipeConnectorConstant.java     |   1 +
 .../iotdb/commons/pipe/event/EnrichedEvent.java    |   4 +-
 .../pipe/progress/PipeEventCommitManager.java      |   8 +
 .../commons/pipe/progress/PipeEventCommitter.java  |   4 +
 .../iotdb/commons/service/metric/enums/Metric.java |   5 +
 24 files changed, 1582 insertions(+), 271 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
index ef9032516d7..8cab79e315e 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.consensus;
 import 
org.apache.iotdb.commons.client.container.PipeConsensusClientMgrContainer;
 import org.apache.iotdb.consensus.config.ConsensusConfig;
 import org.apache.iotdb.consensus.config.PipeConsensusConfig.ReplicateMode;
+import org.apache.iotdb.consensus.pipe.metric.PipeConsensusSyncLagManager;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,6 +67,8 @@ public class ConsensusFactory {
         className = REAL_PIPE_CONSENSUS;
         // initialize pipeConsensus' thrift component
         PipeConsensusClientMgrContainer.build();
+        // initialize pipeConsensus's metric component
+        PipeConsensusSyncLagManager.build();
       }
       Class<?> executor = Class.forName(className);
       Constructor<?> executorConstructor =
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
index f4a64fc691b..95d5a23cdc2 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java
@@ -28,15 +28,19 @@ import 
org.apache.iotdb.commons.consensus.index.ComparableConsensusRequest;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
+import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
 import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.config.PipeConsensusConfig;
+import org.apache.iotdb.consensus.config.PipeConsensusConfig.ReplicateMode;
 import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
 import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeManager;
 import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
 import org.apache.iotdb.consensus.pipe.consensuspipe.ProgressIndexManager;
+import org.apache.iotdb.consensus.pipe.metric.PipeConsensusServerMetrics;
 import org.apache.iotdb.consensus.pipe.thrift.TCheckConsensusPipeCompletedReq;
 import org.apache.iotdb.consensus.pipe.thrift.TCheckConsensusPipeCompletedResp;
 import 
org.apache.iotdb.consensus.pipe.thrift.TNotifyPeerToCreateConsensusPipeReq;
@@ -66,7 +70,8 @@ import java.util.stream.Collectors;
 public class PipeConsensusServerImpl {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeConsensusServerImpl.class);
   private static final long 
CHECK_TRANSMISSION_COMPLETION_INTERVAL_IN_MILLISECONDS = 2_000L;
-
+  private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS 
=
+      PerformanceOverviewMetrics.getInstance();
   private final Peer thisNode;
   private final IStateMachine stateMachine;
   private final Lock stateMachineLock = new ReentrantLock();
@@ -77,6 +82,8 @@ public class PipeConsensusServerImpl {
   private final ConsensusPipeManager consensusPipeManager;
   private final ProgressIndexManager progressIndexManager;
   private final IClientManager<TEndPoint, SyncPipeConsensusServiceClient> 
syncClientManager;
+  private final PipeConsensusServerMetrics pipeConsensusServerMetrics;
+  private final ReplicateMode replicateMode;
 
   private ProgressIndex cachedProgressIndex = MinimumProgressIndex.INSTANCE;
 
@@ -98,6 +105,8 @@ public class PipeConsensusServerImpl {
     this.consensusPipeManager = consensusPipeManager;
     this.progressIndexManager = config.getPipe().getProgressIndexManager();
     this.syncClientManager = syncClientManager;
+    this.pipeConsensusServerMetrics = new PipeConsensusServerMetrics(this);
+    this.replicateMode = config.getReplicateMode();
 
     if (configuration.isEmpty()) {
       peerManager.recover();
@@ -129,6 +138,7 @@ public class PipeConsensusServerImpl {
 
   public synchronized void start(boolean startConsensusPipes) throws 
IOException {
     stateMachine.start();
+    MetricService.getInstance().addMetricSet(this.pipeConsensusServerMetrics);
 
     if (startConsensusPipes) {
       // start all consensus pipes
@@ -153,7 +163,7 @@ public class PipeConsensusServerImpl {
       // do not roll back, because it will stop anyway
       LOGGER.warn("{} cannot stop all consensus pipes", thisNode);
     }
-
+    
MetricService.getInstance().removeMetricSet(this.pipeConsensusServerMetrics);
     stateMachine.stop();
     isStarted.set(false);
   }
@@ -167,6 +177,7 @@ public class PipeConsensusServerImpl {
       LOGGER.warn("{} cannot drop all consensus pipes", thisNode);
     }
 
+    
MetricService.getInstance().removeMetricSet(this.pipeConsensusServerMetrics);
     peerManager.clear();
     stateMachine.stop();
     isStarted.set(false);
@@ -183,7 +194,11 @@ public class PipeConsensusServerImpl {
                 }
                 return true;
               } catch (Exception e) {
-                LOGGER.warn("{} cannot create consensus pipe between {} and 
{}", thisNode, peer, e);
+                LOGGER.warn(
+                    "{}: cannot create consensus pipe between {} and {}",
+                    e.getMessage(),
+                    thisNode,
+                    peer);
                 return false;
               }
             })
@@ -202,11 +217,11 @@ public class PipeConsensusServerImpl {
                 return true;
               } catch (Exception e) {
                 LOGGER.warn(
-                    "{} cannot update consensus pipe between {} and {} to 
status {}",
+                    "{}: cannot update consensus pipe between {} and {} to 
status {}",
+                    e.getMessage(),
                     thisNode,
                     peer,
-                    status,
-                    e);
+                    status);
                 return false;
               }
             })
@@ -274,12 +289,25 @@ public class PipeConsensusServerImpl {
 
   public TSStatus write(IConsensusRequest request) {
     try {
+      long consensusWriteStartTime = System.nanoTime();
       stateMachineLock.lock();
+      long getStateMachineLockTime = System.nanoTime();
+      // statistic the time of acquiring stateMachine lock
+      pipeConsensusServerMetrics.recordGetStateMachineLockTime(
+          getStateMachineLockTime - consensusWriteStartTime);
+      long writeToStateMachineStartTime = System.nanoTime();
       if (request instanceof ComparableConsensusRequest) {
         ((ComparableConsensusRequest) request)
             
.setProgressIndex(progressIndexManager.assignProgressIndex(thisNode.getGroupId()));
       }
-      return stateMachine.write(request);
+      TSStatus result = stateMachine.write(request);
+      long writeToStateMachineEndTime = System.nanoTime();
+      PERFORMANCE_OVERVIEW_METRICS.recordEngineCost(
+          writeToStateMachineEndTime - writeToStateMachineStartTime);
+      // statistic the time of writing request into stateMachine
+      pipeConsensusServerMetrics.recordUserWriteStateMachineTime(
+          writeToStateMachineEndTime - writeToStateMachineStartTime);
+      return result;
     } finally {
       stateMachineLock.unlock();
     }
@@ -287,8 +315,23 @@ public class PipeConsensusServerImpl {
 
   public TSStatus writeOnFollowerReplica(IConsensusRequest request) {
     try {
+      long consensusWriteStartTime = System.nanoTime();
       stateMachineLock.lock();
-      return stateMachine.write(request);
+      long getStateMachineLockTime = System.nanoTime();
+      // statistic the time of acquiring stateMachine lock
+      pipeConsensusServerMetrics.recordGetStateMachineLockTime(
+          getStateMachineLockTime - consensusWriteStartTime);
+
+      long writeToStateMachineStartTime = System.nanoTime();
+      TSStatus result = stateMachine.write(request);
+      long writeToStateMachineEndTime = System.nanoTime();
+
+      PERFORMANCE_OVERVIEW_METRICS.recordEngineCost(
+          writeToStateMachineEndTime - writeToStateMachineStartTime);
+      // statistic the time of writing request into stateMachine
+      pipeConsensusServerMetrics.recordReplicaWriteStateMachineTime(
+          writeToStateMachineEndTime - writeToStateMachineStartTime);
+      return result;
     } finally {
       stateMachineLock.unlock();
     }
@@ -540,4 +583,12 @@ public class PipeConsensusServerImpl {
   public List<Peer> getPeers() {
     return peerManager.getPeers();
   }
+
+  public String getConsensusGroupId() {
+    return consensusGroupId;
+  }
+
+  public long getReplicateMode() {
+    return (replicateMode == ReplicateMode.BATCH) ? 2 : 1;
+  }
 }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeConnector.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeConnector.java
new file mode 100644
index 00000000000..6f1396db972
--- /dev/null
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeConnector.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.pipe.consensuspipe;
+
+public interface ConsensusPipeConnector {
+  long getConsensusPipeCommitProgress();
+
+  long getConsensusPipeReplicateProgress();
+}
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
index bb1bf2a1769..a6402a8610d 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
@@ -29,6 +29,7 @@ import com.google.common.collect.ImmutableMap;
 import java.util.Map;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_CONSENSUS_GROUP_ID_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_CONSENSUS_PIPE_NAME;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
@@ -78,6 +79,7 @@ public class ConsensusPipeManager {
             .put(
                 CONNECTOR_CONSENSUS_GROUP_ID_KEY,
                 
String.valueOf(consensusPipeName.getConsensusGroupId().getId()))
+            .put(CONNECTOR_CONSENSUS_PIPE_NAME, consensusPipeName.toString())
             .put(CONNECTOR_IOTDB_IP_KEY, receiverPeer.getEndpoint().ip)
             .put(CONNECTOR_IOTDB_PORT_KEY, 
String.valueOf(receiverPeer.getEndpoint().port))
             .put(CONNECTOR_IOTDB_PARALLEL_TASKS_KEY, String.valueOf(1))
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusServerMetrics.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusServerMetrics.java
new file mode 100644
index 00000000000..39bcb1e526f
--- /dev/null
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusServerMetrics.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.pipe.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.consensus.pipe.PipeConsensusServerImpl;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Timer;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class PipeConsensusServerMetrics implements IMetricSet {
+  private final PipeConsensusServerImpl impl;
+  private final PipeConsensusSyncLagManager syncLagManager;
+
+  private Timer getStateMachineLockTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer userWriteStateMachineTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer replicaWriteStateMachineTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+
+  public PipeConsensusServerMetrics(PipeConsensusServerImpl impl) {
+    this.impl = impl;
+    this.syncLagManager = 
PipeConsensusSyncLagManager.getInstance(impl.getConsensusGroupId());
+  }
+
+  private static final String IMPL = "PipeConsensusServerImpl";
+
+  public void recordGetStateMachineLockTime(long costTimeInNanos) {
+    getStateMachineLockTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordUserWriteStateMachineTime(long costTimeInNanos) {
+    userWriteStateMachineTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordReplicaWriteStateMachineTime(long costTimeInNanos) {
+    replicaWriteStateMachineTimer.updateNanos(costTimeInNanos);
+  }
+
+  @Override
+  public void bindTo(AbstractMetricService metricService) {
+    bindAutoGauge(metricService);
+    bindGauge(metricService);
+    bindStageTimer(metricService);
+  }
+
+  @Override
+  public void unbindFrom(AbstractMetricService metricService) {
+    unbindAutoGauge(metricService);
+    unbindGauge(metricService);
+    unbindStageTimer(metricService);
+
+    // release corresponding resource
+    PipeConsensusSyncLagManager.release(impl.getConsensusGroupId());
+  }
+
+  public void bindGauge(AbstractMetricService metricService) {
+    metricService
+        .getOrCreateGauge(
+            Metric.PIPE_CONSENSUS_MODE.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            IMPL,
+            Tag.TYPE.toString(),
+            "replicateMode")
+        .set(impl.getReplicateMode());
+  }
+
+  public void unbindGauge(AbstractMetricService metricService) {
+    metricService.remove(
+        MetricType.GAUGE,
+        Metric.PIPE_CONSENSUS_MODE.toString(),
+        Tag.NAME.toString(),
+        IMPL,
+        Tag.TYPE.toString(),
+        "replicateMode");
+  }
+
+  public void bindAutoGauge(AbstractMetricService metricService) {
+    metricService.createAutoGauge(
+        Metric.PIPE_CONSENSUS.toString(),
+        MetricLevel.IMPORTANT,
+        syncLagManager,
+        PipeConsensusSyncLagManager::calculateSyncLag,
+        Tag.NAME.toString(),
+        IMPL,
+        Tag.REGION.toString(),
+        impl.getConsensusGroupId(),
+        Tag.TYPE.toString(),
+        "syncLag");
+  }
+
+  public void unbindAutoGauge(AbstractMetricService metricService) {
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.PIPE_CONSENSUS.toString(),
+        Tag.NAME.toString(),
+        IMPL,
+        Tag.REGION.toString(),
+        impl.getConsensusGroupId(),
+        Tag.TYPE.toString(),
+        "syncLag");
+  }
+
+  public void bindStageTimer(AbstractMetricService metricService) {
+    getStateMachineLockTimer =
+        metricService.getOrCreateTimer(
+            Metric.STAGE.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            Metric.PIPE_CONSENSUS.toString(),
+            Tag.TYPE.toString(),
+            "getStateMachineLock",
+            Tag.REGION.toString(),
+            impl.getConsensusGroupId());
+    userWriteStateMachineTimer =
+        metricService.getOrCreateTimer(
+            Metric.STAGE.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            Metric.PIPE_CONSENSUS.toString(),
+            Tag.TYPE.toString(),
+            "userWriteStateMachine",
+            Tag.REGION.toString(),
+            impl.getConsensusGroupId());
+    replicaWriteStateMachineTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_RECEIVE_EVENT.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            Metric.PIPE_CONSENSUS.toString(),
+            Tag.TYPE.toString(),
+            "replicaWriteStateMachine",
+            Tag.REGION.toString(),
+            impl.getConsensusGroupId());
+  }
+
+  public void unbindStageTimer(AbstractMetricService metricService) {
+    getStateMachineLockTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    userWriteStateMachineTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    replicaWriteStateMachineTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.STAGE.toString(),
+        Tag.NAME.toString(),
+        Metric.PIPE_CONSENSUS.toString(),
+        Tag.TYPE.toString(),
+        "getStateMachineLock",
+        Tag.REGION.toString(),
+        impl.getConsensusGroupId());
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.STAGE.toString(),
+        Tag.NAME.toString(),
+        Metric.PIPE_CONSENSUS.toString(),
+        Tag.TYPE.toString(),
+        "writeStateMachine",
+        Tag.REGION.toString(),
+        impl.getConsensusGroupId());
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_RECEIVE_EVENT.toString(),
+        Tag.NAME.toString(),
+        Metric.PIPE_CONSENSUS.toString(),
+        Tag.TYPE.toString(),
+        "replicaWriteStateMachine",
+        Tag.REGION.toString(),
+        impl.getConsensusGroupId());
+  }
+}
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusSyncLagManager.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusSyncLagManager.java
new file mode 100644
index 00000000000..6ff754e02ef
--- /dev/null
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusSyncLagManager.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.pipe.metric;
+
+import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeConnector;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * This class is used to aggregate the write progress of all Connectors to 
calculate the minimum
+ * synchronization progress of all follower copies, thereby calculating 
syncLag.
+ *
+ * <p>Note: every consensusGroup/dataRegion has and only has 1 instance of 
this class.
+ */
+public class PipeConsensusSyncLagManager {
+  long userWriteProgress = 0;
+  long minReplicateProgress = Long.MAX_VALUE;
+  List<ConsensusPipeConnector> consensusPipeConnectorList = new 
CopyOnWriteArrayList<>();
+
+  private void updateReplicateProgress() {
+    minReplicateProgress = Long.MAX_VALUE;
+    // if there isn't a consensus pipe task, replicate progress is 
Long.MAX_VALUE.
+    if (consensusPipeConnectorList.isEmpty()) {
+      return;
+    }
+    // else we find the minimum progress in all consensus pipe task.
+    consensusPipeConnectorList.forEach(
+        consensusPipeConnector ->
+            minReplicateProgress =
+                Math.min(
+                    minReplicateProgress,
+                    
consensusPipeConnector.getConsensusPipeReplicateProgress()));
+  }
+
+  private void updateUserWriteProgress() {
+    // if there isn't a consensus pipe task, user write progress is 0.
+    if (consensusPipeConnectorList.isEmpty()) {
+      userWriteProgress = 0;
+      return;
+    }
+    // since the user write progress of different consensus pipes on the same 
DataRegion is the
+    // same, we only need to take out one Connector to calculate
+    try {
+      ConsensusPipeConnector connector = consensusPipeConnectorList.get(0);
+      userWriteProgress = connector.getConsensusPipeCommitProgress();
+    } catch (Exception e) {
+      // if removing the last connector happens after empty check, we may 
encounter
+      // OutOfBoundsException, in this case, we set userWriteProgress to 0.
+      userWriteProgress = 0;
+    }
+  }
+
+  public void addConsensusPipeConnector(ConsensusPipeConnector 
consensusPipeConnector) {
+    consensusPipeConnectorList.add(consensusPipeConnector);
+  }
+
+  public void removeConsensusPipeConnector(ConsensusPipeConnector connector) {
+    consensusPipeConnectorList.remove(connector);
+  }
+
+  /**
+   * SyncLag represents the difference between the current replica users' 
write progress and the
+   * minimum synchronization progress of all other replicas. The semantics is 
how much data the
+   * leader has left to synchronize.
+   */
+  public long calculateSyncLag() {
+    updateUserWriteProgress();
+    updateReplicateProgress();
+    // if there isn't a consensus pipe task, the syncLag is userWriteProgress 
- 0
+    if (minReplicateProgress == Long.MAX_VALUE) {
+      return userWriteProgress;
+    } else {
+      // since we first update userWriteProgress then update 
replicateProgress, there may have some
+      // cases that userWriteProgress is less than replicateProgress. In these 
cases, we return 0.
+      return Math.max(userWriteProgress - minReplicateProgress, 0);
+    }
+  }
+
+  private PipeConsensusSyncLagManager() {
+    // do nothing
+  }
+
+  private static class PipeConsensusSyncLagManagerHolder {
+    private static Map<String, PipeConsensusSyncLagManager> 
CONSENSU_GROUP_ID_2_INSTANCE_MAP;
+
+    private PipeConsensusSyncLagManagerHolder() {
+      // empty constructor
+    }
+
+    private static void build() {
+      if (CONSENSU_GROUP_ID_2_INSTANCE_MAP == null) {
+        CONSENSU_GROUP_ID_2_INSTANCE_MAP = new ConcurrentHashMap<>();
+      }
+    }
+  }
+
+  public static PipeConsensusSyncLagManager getInstance(String groupId) {
+    return 
PipeConsensusSyncLagManagerHolder.CONSENSU_GROUP_ID_2_INSTANCE_MAP.computeIfAbsent(
+        groupId, key -> new PipeConsensusSyncLagManager());
+  }
+
+  public static void release(String groupId) {
+    
PipeConsensusSyncLagManagerHolder.CONSENSU_GROUP_ID_2_INSTANCE_MAP.remove(groupId);
+  }
+
+  // Only when consensus protocol is PipeConsensus, this method will be called 
once when construct
+  // consensus class.
+  public static void build() {
+    PipeConsensusSyncLagManagerHolder.build();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index e50f1edcc69..a6abcbb9e9a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -620,6 +620,11 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
 
   ///////////////////////// Pipe Consensus /////////////////////////
 
+  public long getPipeCreationTime(final String pipeName) {
+    final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
+    return pipeMeta == null ? 0 : pipeMeta.getStaticMeta().getCreationTime();
+  }
+
   public ProgressIndex getPipeTaskProgressIndex(final String pipeName, final 
int consensusGroupId) {
     if (!tryReadLockWithTimeOut(10)) {
       throw new PipeException(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
index 50da4d56839..e3673c0330d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
@@ -25,26 +25,33 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.async.AsyncPipeConsensusServiceClient;
 import 
org.apache.iotdb.commons.client.container.PipeConsensusClientMgrContainer;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
-import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
 import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.progress.PipeEventCommitManager;
+import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeConnector;
+import org.apache.iotdb.consensus.pipe.metric.PipeConsensusSyncLagManager;
 import org.apache.iotdb.consensus.pipe.thrift.TCommitId;
 import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferReq;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.handler.PipeConsensusTabletBatchEventHandler;
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.handler.PipeConsensusTabletInsertNodeEventHandler;
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.handler.PipeConsensusTsFileInsertionEventHandler;
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.builder.PipeConsensusAsyncBatchReqBuilder;
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTabletBinaryReq;
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTabletInsertNodeReq;
+import org.apache.iotdb.db.pipe.consensus.PipeConsensusConnectorMetrics;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -64,48 +71,51 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_CONSENSUS_GROUP_ID_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_CONSENSUS_PIPE_NAME;
+
 // TODO: Optimize the network and disk io for TsFile onComplete
 // TODO: support Tablet Batch
-public class PipeConsensusAsyncConnector extends IoTDBConnector {
-
+public class PipeConsensusAsyncConnector extends IoTDBConnector implements 
ConsensusPipeConnector {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeConsensusAsyncConnector.class);
-
-  private static final String CUSTOMIZE_EXCEPTION_MSG =
-      "Failed to customize pipeConsensusAsyncConnector because there isn't 
consensusGroupId passed in. Please check your construct parameters.";
-
   private static final String ENQUEUE_EXCEPTION_MSG =
       "Timeout: PipeConsensusConnector offers an event into transferBuffer 
failed, because transferBuffer is full.";
-
   private static final String THRIFT_ERROR_FORMATTER_WITHOUT_ENDPOINT =
       "Failed to borrow client from client pool or exception occurred "
           + "when sending to receiver.";
-
   private static final String THRIFT_ERROR_FORMATTER_WITH_ENDPOINT =
       "Failed to borrow client from client pool or exception occurred "
           + "when sending to receiver %s:%s.";
-
   private static final IoTDBConfig IOTDB_CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
-
   private static final long PIPE_CONSENSUS_EVENT_ENQUEUE_TIMEOUT_IN_MS =
       IOTDB_CONFIG.getConnectionTimeoutInMS() / 6;
-
   private final BlockingQueue<Event> retryEventQueue = new 
LinkedBlockingQueue<>();
-
   // We use enrichedEvent here to make use of 
EnrichedEvent.equalsInPipeConsensus
   private final BlockingQueue<EnrichedEvent> transferBuffer =
       new LinkedBlockingDeque<>(IOTDB_CONFIG.getPipeConsensusPipelineSize());
-
   private final AtomicBoolean isClosed = new AtomicBoolean(false);
-
   private final int thisDataNodeId = 
IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
-
+  private PipeConsensusConnectorMetrics pipeConsensusConnectorMetrics;
+  private String consensusPipeName;
   private int consensusGroupId;
-
   private PipeConsensusSyncConnector retryConnector;
-
   private IClientManager<TEndPoint, AsyncPipeConsensusServiceClient> 
asyncTransferClientManager;
-
   private PipeConsensusAsyncBatchReqBuilder tabletBatchBuilder;
+  private volatile long currentReplicateProgress = 0;
+
+  @Override
+  public void validate(final PipeParameterValidator validator) throws 
Exception {
+    super.validate(validator);
+    // validate consensus pipe's parameters
+    final PipeParameters parameters = validator.getParameters();
+    validator.validate(
+        args -> (boolean) args[0] || (boolean) args[1],
+        String.format(
+            "One of %s, %s must be specified in consensus pipe",
+            CONNECTOR_CONSENSUS_GROUP_ID_KEY, CONNECTOR_CONSENSUS_PIPE_NAME),
+        parameters.hasAttribute(CONNECTOR_CONSENSUS_GROUP_ID_KEY),
+        parameters.hasAttribute(CONNECTOR_CONSENSUS_PIPE_NAME));
+  }
 
   @Override
   public void customize(PipeParameters parameters, 
PipeConnectorRuntimeConfiguration configuration)
@@ -113,15 +123,16 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector {
     super.customize(parameters, configuration);
 
     // Get consensusGroupId from parameters passed by PipeConsensusImpl
-    if 
(!parameters.hasAttribute(PipeConnectorConstant.CONNECTOR_CONSENSUS_GROUP_ID_KEY))
 {
-      throw new PipeException(CUSTOMIZE_EXCEPTION_MSG);
-    }
-    consensusGroupId = 
parameters.getInt(PipeConnectorConstant.CONNECTOR_CONSENSUS_GROUP_ID_KEY);
+    consensusGroupId = parameters.getInt(CONNECTOR_CONSENSUS_GROUP_ID_KEY);
+    // Get consensusPipeName from parameters passed by PipeConsensusImpl
+    consensusPipeName = parameters.getString(CONNECTOR_CONSENSUS_PIPE_NAME);
 
     // In PipeConsensus, one pipeConsensusTask corresponds to a 
pipeConsensusConnector. Thus,
     // `nodeUrls` here actually is a singletonList that contains one peer's 
TEndPoint. But here we
     // retain the implementation of list to cope with possible future expansion
-    retryConnector = new PipeConsensusSyncConnector(nodeUrls, 
consensusGroupId, thisDataNodeId);
+    retryConnector =
+        new PipeConsensusSyncConnector(
+            nodeUrls, consensusGroupId, thisDataNodeId, 
pipeConsensusConnectorMetrics);
     retryConnector.customize(parameters, configuration);
     asyncTransferClientManager =
         PipeConsensusClientMgrContainer.getInstance().getAsyncClientManager();
@@ -136,9 +147,17 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector {
 
     // currently, tablet batch is false by default in PipeConsensus;
     isTabletBatchModeEnabled = false;
+
+    // initialize metric components
+    pipeConsensusConnectorMetrics = new PipeConsensusConnectorMetrics(this);
+    PipeConsensusSyncLagManager.getInstance(getConsensusGroupIdStr())
+        .addConsensusPipeConnector(this);
+    
MetricService.getInstance().addMetricSet(this.pipeConsensusConnectorMetrics);
   }
 
-  /** Add an event to transferBuffer, whose events will be asynchronizedly 
transfer to receiver. */
+  /**
+   * Add an event to transferBuffer, whose events will be asynchronously 
transferred to receiver.
+   */
   private boolean addEvent2Buffer(EnrichedEvent event) {
     try {
       if (LOGGER.isDebugEnabled()) {
@@ -148,13 +167,21 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector {
             event.getCommitId(),
             event);
       }
+      long currentTime = System.nanoTime();
       boolean result =
           transferBuffer.offer(
               event, PIPE_CONSENSUS_EVENT_ENQUEUE_TIMEOUT_IN_MS, 
TimeUnit.MILLISECONDS);
+      long duration = System.nanoTime() - currentTime;
+      pipeConsensusConnectorMetrics.recordConnectorEnqueueTimer(duration);
       // add reference
       if (result) {
         
event.increaseReferenceCount(PipeConsensusAsyncConnector.class.getName());
       }
+      // if connector is closed when executing this method, need to clear this 
event's reference
+      // count to avoid unnecessarily pinning some resource such as WAL.
+      if (isClosed.get()) {
+        event.clearReferenceCount(PipeConsensusAsyncConnector.class.getName());
+      }
       return result;
     } catch (InterruptedException e) {
       LOGGER.info("PipeConsensusConnector transferBuffer queue offer is 
interrupted.", e);
@@ -182,6 +209,8 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector {
       current = iterator.next();
     }
     iterator.remove();
+    // update replicate progress
+    currentReplicateProgress = Math.max(currentReplicateProgress, 
event.getCommitId());
     // decrease reference count
     event.decreaseReferenceCount(PipeConsensusAsyncConnector.class.getName(), 
true);
   }
@@ -200,18 +229,18 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector {
 
   @Override
   public void transfer(TabletInsertionEvent tabletInsertionEvent) throws 
Exception {
+    syncTransferQueuedEventsIfNecessary();
+
     boolean enqueueResult = addEvent2Buffer((EnrichedEvent) 
tabletInsertionEvent);
     if (!enqueueResult) {
       throw new PipeException(ENQUEUE_EXCEPTION_MSG);
     }
-
-    syncTransferQueuedEventsIfNecessary();
-
     // batch transfer tablets.
     if (isTabletBatchModeEnabled) {
       if (tabletBatchBuilder.onEvent(tabletInsertionEvent)) {
         final PipeConsensusTabletBatchEventHandler 
pipeConsensusTabletBatchEventHandler =
-            new PipeConsensusTabletBatchEventHandler(tabletBatchBuilder, this);
+            new PipeConsensusTabletBatchEventHandler(
+                tabletBatchBuilder, this, pipeConsensusConnectorMetrics);
 
         transfer(pipeConsensusTabletBatchEventHandler);
 
@@ -252,7 +281,10 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector {
                   insertNode, tCommitId, tConsensusGroupId, progressIndex, 
thisDataNodeId);
       final PipeConsensusTabletInsertNodeEventHandler 
pipeConsensusInsertNodeReqHandler =
           new PipeConsensusTabletInsertNodeEventHandler(
-              pipeInsertNodeTabletInsertionEvent, pipeConsensusTransferReq, 
this);
+              pipeInsertNodeTabletInsertionEvent,
+              pipeConsensusTransferReq,
+              this,
+              pipeConsensusConnectorMetrics);
 
       transfer(pipeConsensusInsertNodeReqHandler);
     }
@@ -284,11 +316,6 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector {
 
   @Override
   public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws 
Exception {
-    boolean enqueueResult = addEvent2Buffer((EnrichedEvent) 
tsFileInsertionEvent);
-    if (!enqueueResult) {
-      throw new PipeException(ENQUEUE_EXCEPTION_MSG);
-    }
-
     syncTransferQueuedEventsIfNecessary();
     transferBatchedEventsIfNecessary();
 
@@ -299,6 +326,10 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector {
       return;
     }
 
+    boolean enqueueResult = addEvent2Buffer((EnrichedEvent) 
tsFileInsertionEvent);
+    if (!enqueueResult) {
+      throw new PipeException(ENQUEUE_EXCEPTION_MSG);
+    }
     final PipeTsFileInsertionEvent pipeTsFileInsertionEvent =
         (PipeTsFileInsertionEvent) tsFileInsertionEvent;
     TCommitId tCommitId =
@@ -322,7 +353,12 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector {
 
       final PipeConsensusTsFileInsertionEventHandler 
pipeConsensusTsFileInsertionEventHandler =
           new PipeConsensusTsFileInsertionEventHandler(
-              pipeTsFileInsertionEvent, this, tCommitId, tConsensusGroupId, 
thisDataNodeId);
+              pipeTsFileInsertionEvent,
+              this,
+              tCommitId,
+              tConsensusGroupId,
+              thisDataNodeId,
+              pipeConsensusConnectorMetrics);
 
       transfer(pipeConsensusTsFileInsertionEventHandler);
     } catch (Exception e) {
@@ -369,8 +405,9 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector {
       return;
     }
 
-    transfer(new PipeConsensusTabletBatchEventHandler(tabletBatchBuilder, 
this));
-
+    transfer(
+        new PipeConsensusTabletBatchEventHandler(
+            tabletBatchBuilder, this, pipeConsensusConnectorMetrics));
     tabletBatchBuilder.onSuccess();
   }
 
@@ -381,7 +418,7 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector {
    *     retry the {@link Event} and mark the {@link Event} as failure and 
stop the pipe if the
    *     retry times exceeds the threshold.
    */
-  private synchronized void syncTransferQueuedEventsIfNecessary() throws 
Exception {
+  private void syncTransferQueuedEventsIfNecessary() throws Exception {
     while (!retryEventQueue.isEmpty()) {
       synchronized (this) {
         if (isClosed.get() || retryEventQueue.isEmpty()) {
@@ -443,7 +480,7 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector {
       return;
     }
 
-    retryEventQueue.offer(event);
+    boolean ignore = retryEventQueue.offer(event);
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug(
           "PipeConsensus-ConsensusGroup-{}: Event {} transfer failed, will be 
added to retry queue.",
@@ -478,6 +515,13 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector {
     }
   }
 
+  public synchronized void clearTransferBufferReferenceCount() {
+    while (!transferBuffer.isEmpty()) {
+      final EnrichedEvent event = transferBuffer.poll();
+      event.clearReferenceCount(PipeConsensusAsyncConnector.class.getName());
+    }
+  }
+
   private void logOnClientException(
       final AsyncPipeConsensusServiceClient client, final Exception e) {
     if (client == null) {
@@ -506,13 +550,18 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector {
 
     retryConnector.close();
     clearRetryEventsReferenceCount();
+    clearTransferBufferReferenceCount();
 
     if (tabletBatchBuilder != null) {
       tabletBatchBuilder.close();
     }
+
+    PipeConsensusSyncLagManager.getInstance(getConsensusGroupIdStr())
+        .removeConsensusPipeConnector(this);
+    
MetricService.getInstance().removeMetricSet(this.pipeConsensusConnectorMetrics);
   }
 
-  //////////////////////////// TODO: APIs provided for metric framework 
////////////////////////////
+  //////////////////////////// APIs provided for metric framework 
////////////////////////////
 
   public int getTransferBufferSize() {
     return transferBuffer.size();
@@ -521,4 +570,24 @@ public class PipeConsensusAsyncConnector extends 
IoTDBConnector {
   public int getRetryBufferSize() {
     return retryEventQueue.size();
   }
+
+  @Override
+  public long getConsensusPipeCommitProgress() {
+    long creationTime = 
PipeAgent.task().getPipeCreationTime(consensusPipeName);
+    String committerKey =
+        String.format("%s_%s_%s", consensusPipeName, consensusGroupId, 
creationTime);
+    return 
PipeEventCommitManager.getInstance().getGivenConsensusPipeCommitId(committerKey);
+  }
+
+  @Override
+  public long getConsensusPipeReplicateProgress() {
+    return currentReplicateProgress;
+  }
+
+  public String getConsensusGroupIdStr() {
+    ConsensusGroupId groupId =
+        ConsensusGroupId.Factory.create(
+            TConsensusGroupType.DataRegion.getValue(), consensusGroupId);
+    return groupId.toString();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
index e29d5f49398..c10c5bf93bf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
@@ -40,6 +40,7 @@ import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFilePieceWithModReq;
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFileSealReq;
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFileSealWithModReq;
+import org.apache.iotdb.db.pipe.consensus.PipeConsensusConnectorMetrics;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
@@ -65,30 +66,24 @@ import java.util.stream.Collectors;
 
 /** This connector is used for PipeConsensus to transfer queued event. */
 public class PipeConsensusSyncConnector extends IoTDBConnector {
-
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeConsensusSyncConnector.class);
-
   private static final String PIPE_CONSENSUS_SYNC_CONNECTION_FAILED_FORMAT =
       "PipeConsensus: syncClient connection to %s:%s failed when %s, because: 
%s";
-
   private static final String TABLET_INSERTION_NODE_SCENARIO = "transfer 
insertionNode tablet";
-
   private static final String TSFILE_SCENARIO = "transfer tsfile";
-
   private static final String TABLET_BATCH_SCENARIO = "transfer tablet batch";
-
   private final IClientManager<TEndPoint, SyncPipeConsensusServiceClient> 
syncRetryClientManager;
-
   private final List<TEndPoint> peers;
-
   private final int thisDataNodeId;
-
   private final int consensusGroupId;
-
+  private final PipeConsensusConnectorMetrics pipeConsensusConnectorMetrics;
   private PipeConsensusSyncBatchReqBuilder tabletBatchBuilder;
 
   public PipeConsensusSyncConnector(
-      List<TEndPoint> peers, int consensusGroupId, int thisDataNodeId) {
+      List<TEndPoint> peers,
+      int consensusGroupId,
+      int thisDataNodeId,
+      PipeConsensusConnectorMetrics pipeConsensusConnectorMetrics) {
     // In PipeConsensus, one pipeConsensusTask corresponds to a 
pipeConsensusConnector. Thus,
     // `peers` here actually is a singletonList that contains one peer's 
TEndPoint. But here we
     // retain the implementation of list to cope with possible future expansion
@@ -97,6 +92,7 @@ public class PipeConsensusSyncConnector extends 
IoTDBConnector {
     this.thisDataNodeId = thisDataNodeId;
     this.syncRetryClientManager =
         PipeConsensusClientMgrContainer.getInstance().getSyncClientManager();
+    this.pipeConsensusConnectorMetrics = pipeConsensusConnectorMetrics;
   }
 
   @Override
@@ -137,7 +133,10 @@ public class PipeConsensusSyncConnector extends 
IoTDBConnector {
           doTransfer();
         }
       } else {
+        long startTime = System.nanoTime();
         doTransferWrapper((PipeInsertNodeTabletInsertionEvent) 
tabletInsertionEvent);
+        long duration = System.nanoTime() - startTime;
+        pipeConsensusConnectorMetrics.recordRetryWALTransferTimer(duration);
       }
     } catch (Exception e) {
       throw new PipeConnectionException(
@@ -154,12 +153,14 @@ public class PipeConsensusSyncConnector extends 
IoTDBConnector {
     // processor and will not change the event type like
     // 
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBDataRegionSyncConnector
     try {
+      long startTime = System.nanoTime();
       // In order to commit in order
       if (isTabletBatchModeEnabled && !tabletBatchBuilder.isEmpty()) {
         doTransfer();
       }
-
       doTransfer((PipeTsFileInsertionEvent) tsFileInsertionEvent);
+      long duration = System.nanoTime() - startTime;
+      pipeConsensusConnectorMetrics.recordRetryTsFileTransferTimer(duration);
     } catch (Exception e) {
       throw new PipeConnectionException(
           String.format(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletBatchEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletBatchEventHandler.java
index 25816e68fa6..de9c7bec37d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletBatchEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletBatchEventHandler.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusBatchTransferResp;
 import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp;
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.PipeConsensusAsyncConnector;
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.builder.PipeConsensusAsyncBatchReqBuilder;
+import org.apache.iotdb.db.pipe.consensus.PipeConsensusConnectorMetrics;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -48,16 +49,19 @@ public class PipeConsensusTabletBatchEventHandler
   private final List<Event> events;
   private final TPipeConsensusBatchTransferReq req;
   private final PipeConsensusAsyncConnector connector;
+  private final PipeConsensusConnectorMetrics pipeConsensusConnectorMetrics;
 
   public PipeConsensusTabletBatchEventHandler(
       final PipeConsensusAsyncBatchReqBuilder batchBuilder,
-      final PipeConsensusAsyncConnector connector)
+      final PipeConsensusAsyncConnector connector,
+      final PipeConsensusConnectorMetrics pipeConsensusConnectorMetrics)
       throws IOException {
     // Deep copy to keep Ids' and events' reference
     requestCommitIds = batchBuilder.deepCopyRequestCommitIds();
     events = batchBuilder.deepCopyEvents();
     req = batchBuilder.toTPipeConsensusBatchTransferReq();
 
+    this.pipeConsensusConnectorMetrics = pipeConsensusConnectorMetrics;
     this.connector = connector;
   }
 
@@ -86,6 +90,7 @@ public class PipeConsensusTabletBatchEventHandler
             .filter(tsStatus -> tsStatus.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode())
             .forEach(
                 tsStatus -> {
+                  pipeConsensusConnectorMetrics.recordRetryCounter();
                   connector
                       .statusHandler()
                       .handle(tsStatus, tsStatus.getMessage(), 
events.toString());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletInsertNodeEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletInsertNodeEventHandler.java
index 7b05fe297f2..cdd56d72cce 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletInsertNodeEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletInsertNodeEventHandler.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.commons.client.async.AsyncPipeConsensusServiceClient;
 import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferReq;
 import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp;
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.PipeConsensusAsyncConnector;
+import org.apache.iotdb.db.pipe.consensus.PipeConsensusConnectorMetrics;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 
 import org.apache.thrift.TException;
@@ -33,8 +34,9 @@ public class PipeConsensusTabletInsertNodeEventHandler
   public PipeConsensusTabletInsertNodeEventHandler(
       PipeInsertNodeTabletInsertionEvent event,
       TPipeConsensusTransferReq req,
-      PipeConsensusAsyncConnector connector) {
-    super(event, req, connector);
+      PipeConsensusAsyncConnector connector,
+      PipeConsensusConnectorMetrics metric) {
+    super(event, req, connector, metric);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletInsertionEventHandler.java
index 3e12b23f463..ae6a1e5334d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletInsertionEventHandler.java
@@ -26,6 +26,7 @@ import 
org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferReq;
 import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp;
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.PipeConsensusAsyncConnector;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTabletInsertionEventHandler;
+import org.apache.iotdb.db.pipe.consensus.PipeConsensusConnectorMetrics;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -46,13 +47,20 @@ public abstract class 
PipeConsensusTabletInsertionEventHandler<E extends TPipeCo
 
   protected final PipeConsensusAsyncConnector connector;
 
+  protected final PipeConsensusConnectorMetrics metric;
+
+  private final long createTime;
+
   protected PipeConsensusTabletInsertionEventHandler(
       TabletInsertionEvent event,
       TPipeConsensusTransferReq req,
-      PipeConsensusAsyncConnector connector) {
+      PipeConsensusAsyncConnector connector,
+      PipeConsensusConnectorMetrics metric) {
     this.event = event;
     this.req = req;
     this.connector = connector;
+    this.metric = metric;
+    this.createTime = System.nanoTime();
   }
 
   public void transfer(AsyncPipeConsensusServiceClient client) throws 
TException {
@@ -88,6 +96,9 @@ public abstract class 
PipeConsensusTabletInsertionEventHandler<E extends TPipeCo
             ((EnrichedEvent) event).getCommitId());
         connector.removeEventFromBuffer((EnrichedEvent) event);
       }
+
+      long duration = System.nanoTime() - createTime;
+      metric.recordConnectorWalTransferTimer(duration);
     } catch (Exception e) {
       onError(e);
     }
@@ -105,5 +116,6 @@ public abstract class 
PipeConsensusTabletInsertionEventHandler<E extends TPipeCo
         exception);
 
     connector.addFailureEventToRetryQueue(event);
+    metric.recordRetryCounter();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletRawEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletRawEventHandler.java
deleted file mode 100644
index 35dcae79efe..00000000000
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletRawEventHandler.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.handler;
-
-import org.apache.iotdb.commons.client.async.AsyncPipeConsensusServiceClient;
-import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferReq;
-import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp;
-import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.PipeConsensusAsyncConnector;
-import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
-
-import org.apache.thrift.TException;
-
-public class PipeConsensusTabletRawEventHandler
-    extends 
PipeConsensusTabletInsertionEventHandler<TPipeConsensusTransferResp> {
-
-  public PipeConsensusTabletRawEventHandler(
-      PipeRawTabletInsertionEvent event,
-      TPipeConsensusTransferReq req,
-      PipeConsensusAsyncConnector connector) {
-    super(event, req, connector);
-  }
-
-  @Override
-  protected void doTransfer(AsyncPipeConsensusServiceClient client, 
TPipeConsensusTransferReq req)
-      throws TException {
-    client.pipeConsensusTransfer(req, this);
-  }
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
index 4ed42afc63c..bc8879946aa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java
@@ -31,6 +31,7 @@ import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFilePieceWithModReq;
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFileSealReq;
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFileSealWithModReq;
+import org.apache.iotdb.db.pipe.consensus.PipeConsensusConnectorMetrics;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -74,12 +75,19 @@ public class PipeConsensusTsFileInsertionEventHandler
 
   private AsyncPipeConsensusServiceClient client;
 
+  private final PipeConsensusConnectorMetrics metric;
+
+  private final long createTime;
+
+  private long startTransferPieceTime;
+
   public PipeConsensusTsFileInsertionEventHandler(
       final PipeTsFileInsertionEvent event,
       final PipeConsensusAsyncConnector connector,
       final TCommitId commitId,
       final TConsensusGroupId consensusGroupId,
-      final int thisDataNodeId)
+      final int thisDataNodeId,
+      final PipeConsensusConnectorMetrics metric)
       throws FileNotFoundException {
     this.event = event;
     this.connector = connector;
@@ -102,15 +110,19 @@ public class PipeConsensusTsFileInsertionEventHandler
             : new RandomAccessFile(tsFile, "r");
 
     isSealSignalSent = new AtomicBoolean(false);
+
+    this.metric = metric;
+    this.createTime = System.nanoTime();
   }
 
   public void transfer(final AsyncPipeConsensusServiceClient client)
       throws TException, IOException {
+    startTransferPieceTime = System.nanoTime();
+
     this.client = client;
     client.setShouldReturnSelf(false);
 
     final int readLength = reader.read(readBuffer);
-
     if (readLength == -1) {
       if (currentFile == modFile) {
         currentFile = tsFile;
@@ -217,6 +229,9 @@ public class PipeConsensusTsFileInsertionEventHandler
           client.setShouldReturnSelf(true);
           client.returnSelf();
         }
+
+        long duration = System.nanoTime() - createTime;
+        metric.recordConnectorTsFileTransferTimer(duration);
       }
       return;
     }
@@ -245,6 +260,8 @@ public class PipeConsensusTsFileInsertionEventHandler
               .handle(status, response.getStatus().getMessage(), 
tsFile.getName());
         }
       }
+      long duration = System.nanoTime() - startTransferPieceTime;
+      metric.recordConnectorTsFilePieceTransferTimer(duration);
 
       transfer(client);
     } catch (final Exception e) {
@@ -269,6 +286,7 @@ public class PipeConsensusTsFileInsertionEventHandler
       LOGGER.warn("Failed to close file reader when failed to transfer file.", 
e);
     } finally {
       connector.addFailureEventToRetryQueue(event);
+      metric.recordRetryCounter();
 
       if (client != null) {
         client.setShouldReturnSelf(true);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/PipeConsensusConnectorMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/PipeConsensusConnectorMetrics.java
new file mode 100644
index 00000000000..7950fd76744
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/PipeConsensusConnectorMetrics.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.consensus;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.PipeConsensusAsyncConnector;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Counter;
+import org.apache.iotdb.metrics.type.Timer;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class PipeConsensusConnectorMetrics implements IMetricSet {
+  private final PipeConsensusAsyncConnector pipeConsensusAsyncConnector;
+
+  private Timer connectorEnqueueTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer connectorWALTransferTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer connectorTsFileTransferTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer connectorTsFilePieceTransferTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer retryWALTransferTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer retryTsFileTransferTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Counter retryCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
+
+  private static final String CONNECTOR = "pipeConsensusAsyncConnector";
+
+  public PipeConsensusConnectorMetrics(PipeConsensusAsyncConnector 
pipeConsensusAsyncConnector) {
+    this.pipeConsensusAsyncConnector = pipeConsensusAsyncConnector;
+  }
+
+  public void recordConnectorEnqueueTimer(long costTimeInNanos) {
+    connectorEnqueueTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordConnectorWalTransferTimer(long costTimeInNanos) {
+    connectorWALTransferTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordConnectorTsFileTransferTimer(long costTimeInNanos) {
+    connectorTsFileTransferTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordConnectorTsFilePieceTransferTimer(long costTimeInNanos) {
+    connectorTsFilePieceTransferTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordRetryWALTransferTimer(long costTimeInNanos) {
+    retryWALTransferTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordRetryTsFileTransferTimer(long costTimeInNanos) {
+    retryTsFileTransferTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordRetryCounter() {
+    retryCounter.inc();
+  }
+
+  @Override
+  public void bindTo(AbstractMetricService metricService) {
+    bindCounter(metricService);
+    bindAutoGauge(metricService);
+    bindTimer(metricService);
+  }
+
+  @Override
+  public void unbindFrom(AbstractMetricService metricService) {
+    unbindCounter(metricService);
+    unbindAutoGauge(metricService);
+    unbindTimer(metricService);
+  }
+
+  private void bindCounter(AbstractMetricService metricService) {
+    metricService.getOrCreateCounter(
+        Metric.PIPE_RETRY_SEND_EVENT.toString(),
+        MetricLevel.IMPORTANT,
+        Tag.NAME.toString(),
+        CONNECTOR,
+        Tag.REGION.toString(),
+        pipeConsensusAsyncConnector.getConsensusGroupIdStr(),
+        Tag.TYPE.toString(),
+        "pipeConsensusRetryCount");
+  }
+
+  private void bindAutoGauge(AbstractMetricService metricService) {
+    metricService.createAutoGauge(
+        Metric.PIPE_SEND_EVENT.toString(),
+        MetricLevel.IMPORTANT,
+        pipeConsensusAsyncConnector,
+        PipeConsensusAsyncConnector::getTransferBufferSize,
+        Tag.NAME.toString(),
+        CONNECTOR,
+        Tag.REGION.toString(),
+        pipeConsensusAsyncConnector.getConsensusGroupIdStr(),
+        Tag.TYPE.toString(),
+        "transferBufferSize");
+    metricService.createAutoGauge(
+        Metric.PIPE_SEND_EVENT.toString(),
+        MetricLevel.IMPORTANT,
+        pipeConsensusAsyncConnector,
+        PipeConsensusAsyncConnector::getRetryBufferSize,
+        Tag.NAME.toString(),
+        CONNECTOR,
+        Tag.REGION.toString(),
+        pipeConsensusAsyncConnector.getConsensusGroupIdStr(),
+        Tag.TYPE.toString(),
+        "retryBufferSize");
+  }
+
+  private void bindTimer(AbstractMetricService metricService) {
+    connectorEnqueueTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_SEND_EVENT.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            CONNECTOR,
+            Tag.TYPE.toString(),
+            "connectorEnqueue",
+            Tag.REGION.toString(),
+            pipeConsensusAsyncConnector.getConsensusGroupIdStr());
+    connectorTsFilePieceTransferTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_SEND_EVENT.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            CONNECTOR,
+            Tag.TYPE.toString(),
+            "connectorTsFilePieceTransfer",
+            Tag.REGION.toString(),
+            pipeConsensusAsyncConnector.getConsensusGroupIdStr());
+    connectorTsFileTransferTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_SEND_EVENT.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            CONNECTOR,
+            Tag.TYPE.toString(),
+            "connectorTsFileTransfer",
+            Tag.REGION.toString(),
+            pipeConsensusAsyncConnector.getConsensusGroupIdStr());
+    connectorWALTransferTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_SEND_EVENT.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            CONNECTOR,
+            Tag.TYPE.toString(),
+            "connectorWALTransfer",
+            Tag.REGION.toString(),
+            pipeConsensusAsyncConnector.getConsensusGroupIdStr());
+    retryWALTransferTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_RETRY_SEND_EVENT.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            CONNECTOR,
+            Tag.TYPE.toString(),
+            "retryWALTransfer",
+            Tag.REGION.toString(),
+            pipeConsensusAsyncConnector.getConsensusGroupIdStr());
+    retryTsFileTransferTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_RETRY_SEND_EVENT.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            CONNECTOR,
+            Tag.TYPE.toString(),
+            "retryTsFileTransfer",
+            Tag.REGION.toString(),
+            pipeConsensusAsyncConnector.getConsensusGroupIdStr());
+  }
+
+  private void unbindCounter(AbstractMetricService metricService) {
+    retryCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
+
+    metricService.remove(
+        MetricType.COUNTER,
+        Metric.PIPE_RETRY_SEND_EVENT.toString(),
+        Tag.NAME.toString(),
+        CONNECTOR,
+        Tag.REGION.toString(),
+        pipeConsensusAsyncConnector.getConsensusGroupIdStr(),
+        Tag.TYPE.toString(),
+        "pipeConsensusRetryCount");
+  }
+
+  private void unbindAutoGauge(AbstractMetricService metricService) {
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.PIPE_SEND_EVENT.toString(),
+        Tag.NAME.toString(),
+        CONNECTOR,
+        Tag.REGION.toString(),
+        pipeConsensusAsyncConnector.getConsensusGroupIdStr(),
+        Tag.TYPE.toString(),
+        "transferBufferSize");
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.PIPE_SEND_EVENT.toString(),
+        Tag.NAME.toString(),
+        CONNECTOR,
+        Tag.REGION.toString(),
+        pipeConsensusAsyncConnector.getConsensusGroupIdStr(),
+        Tag.TYPE.toString(),
+        "retryBufferSize");
+  }
+
+  private void unbindTimer(AbstractMetricService metricService) {
+    connectorEnqueueTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    connectorWALTransferTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    connectorTsFileTransferTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    connectorTsFilePieceTransferTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+    retryWALTransferTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    retryTsFileTransferTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_SEND_EVENT.toString(),
+        Tag.NAME.toString(),
+        CONNECTOR,
+        Tag.TYPE.toString(),
+        "connectorTsFileTransfer",
+        Tag.REGION.toString(),
+        pipeConsensusAsyncConnector.getConsensusGroupIdStr());
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_SEND_EVENT.toString(),
+        Tag.NAME.toString(),
+        CONNECTOR,
+        Tag.TYPE.toString(),
+        "connectorTsFilePieceTransfer",
+        Tag.REGION.toString(),
+        pipeConsensusAsyncConnector.getConsensusGroupIdStr());
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_SEND_EVENT.toString(),
+        Tag.NAME.toString(),
+        CONNECTOR,
+        Tag.TYPE.toString(),
+        "connectorTsFileTransfer",
+        Tag.REGION.toString(),
+        pipeConsensusAsyncConnector.getConsensusGroupIdStr());
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_SEND_EVENT.toString(),
+        Tag.NAME.toString(),
+        CONNECTOR,
+        Tag.TYPE.toString(),
+        "connectorWALTransfer",
+        Tag.REGION.toString(),
+        pipeConsensusAsyncConnector.getConsensusGroupIdStr());
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_RETRY_SEND_EVENT.toString(),
+        Tag.NAME.toString(),
+        CONNECTOR,
+        Tag.TYPE.toString(),
+        "retryWALTransfer",
+        Tag.REGION.toString(),
+        pipeConsensusAsyncConnector.getConsensusGroupIdStr());
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_RETRY_SEND_EVENT.toString(),
+        Tag.NAME.toString(),
+        CONNECTOR,
+        Tag.TYPE.toString(),
+        "retryTsFileTransfer",
+        Tag.REGION.toString(),
+        pipeConsensusAsyncConnector.getConsensusGroupIdStr());
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/PipeConsensusReceiverMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/PipeConsensusReceiverMetrics.java
new file mode 100644
index 00000000000..58d587d8b8d
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/PipeConsensusReceiverMetrics.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.consensus;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import 
org.apache.iotdb.db.pipe.receiver.protocol.pipeconsensus.PipeConsensusReceiver;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Timer;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class PipeConsensusReceiverMetrics implements IMetricSet {
+  private final PipeConsensusReceiver pipeConsensusReceiver;
+
+  private Timer tsFilePieceWriteTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer tsFilePiecePreCheckTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer tsFileSealLoadTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer tsFileSealPreCheckTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer borrowTsFileWriterTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer acquireExecutorLockTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer dispatchWaitingTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer receiveWALTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer receiveTsFileTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer receiveEventTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+
+  private static final String RECEIVER = "pipeConsensusReceiver";
+
+  public PipeConsensusReceiverMetrics(PipeConsensusReceiver 
pipeConsensusReceiver) {
+    this.pipeConsensusReceiver = pipeConsensusReceiver;
+  }
+
+  public void recordTsFilePieceWriteTime(long costTimeInNanos) {
+    tsFilePieceWriteTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordTsFilePiecePreCheckTime(long costTimeInNanos) {
+    tsFilePiecePreCheckTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordTsFileSealLoadTimer(long costTimeInNanos) {
+    tsFileSealLoadTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordTsFileSealPreCheckTimer(long costTimeInNanos) {
+    tsFileSealPreCheckTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordBorrowTsFileWriterTimer(long costTimeInNanos) {
+    borrowTsFileWriterTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordAcquireExecutorLockTimer(long costTimeInNanos) {
+    acquireExecutorLockTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordDispatchWaitingTimer(long costTimeInNanos) {
+    dispatchWaitingTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordReceiveWALTimer(long costTimeInNanos) {
+    receiveWALTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordReceiveTsFileTimer(long costTimeInNanos) {
+    receiveTsFileTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordReceiveEventTimer(long costTimeInNanos) {
+    receiveEventTimer.updateNanos(costTimeInNanos);
+  }
+
+  @Override
+  public void bindTo(AbstractMetricService metricService) {
+    bindAutoGauge(metricService);
+    bindStageTimer(metricService);
+    bindReceiveTimer(metricService);
+  }
+
+  @Override
+  public void unbindFrom(AbstractMetricService metricService) {
+    unbindAutoGauge(metricService);
+    unbindStageTimer(metricService);
+    unbindReceiveTimer(metricService);
+  }
+
+  public void bindAutoGauge(AbstractMetricService metricService) {
+    metricService.createAutoGauge(
+        Metric.PIPE_RECEIVE_EVENT.toString(),
+        MetricLevel.IMPORTANT,
+        pipeConsensusReceiver,
+        PipeConsensusReceiver::getReceiveBufferSize,
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.REGION.toString(),
+        pipeConsensusReceiver.getConsensusGroupIdStr(),
+        Tag.TYPE.toString(),
+        "receiveBufferSize");
+    metricService.createAutoGauge(
+        Metric.PIPE_RECEIVE_EVENT.toString(),
+        MetricLevel.IMPORTANT,
+        pipeConsensusReceiver,
+        PipeConsensusReceiver::getWALEventCount,
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.REGION.toString(),
+        pipeConsensusReceiver.getConsensusGroupIdStr(),
+        Tag.TYPE.toString(),
+        "WALEventCount");
+    metricService.createAutoGauge(
+        Metric.PIPE_RECEIVE_EVENT.toString(),
+        MetricLevel.IMPORTANT,
+        pipeConsensusReceiver,
+        PipeConsensusReceiver::getTsFileEventCount,
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.REGION.toString(),
+        pipeConsensusReceiver.getConsensusGroupIdStr(),
+        Tag.TYPE.toString(),
+        "tsFileEventCount");
+  }
+
+  public void bindStageTimer(AbstractMetricService metricService) {
+    tsFilePieceWriteTimer =
+        metricService.getOrCreateTimer(
+            Metric.STAGE.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            RECEIVER,
+            Tag.REGION.toString(),
+            pipeConsensusReceiver.getConsensusGroupIdStr(),
+            Tag.TYPE.toString(),
+            "tsFilePieceWrite");
+    tsFilePiecePreCheckTimer =
+        metricService.getOrCreateTimer(
+            Metric.STAGE.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            RECEIVER,
+            Tag.REGION.toString(),
+            pipeConsensusReceiver.getConsensusGroupIdStr(),
+            Tag.TYPE.toString(),
+            "tsFilePiecePreCheck");
+    tsFileSealLoadTimer =
+        metricService.getOrCreateTimer(
+            Metric.STAGE.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            RECEIVER,
+            Tag.REGION.toString(),
+            pipeConsensusReceiver.getConsensusGroupIdStr(),
+            Tag.TYPE.toString(),
+            "tsFileSealLoad");
+    tsFileSealPreCheckTimer =
+        metricService.getOrCreateTimer(
+            Metric.STAGE.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            RECEIVER,
+            Tag.REGION.toString(),
+            pipeConsensusReceiver.getConsensusGroupIdStr(),
+            Tag.TYPE.toString(),
+            "tsFileSealPreCheck");
+    borrowTsFileWriterTimer =
+        metricService.getOrCreateTimer(
+            Metric.STAGE.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            RECEIVER,
+            Tag.REGION.toString(),
+            pipeConsensusReceiver.getConsensusGroupIdStr(),
+            Tag.TYPE.toString(),
+            "borrowTsFileWriter");
+    acquireExecutorLockTimer =
+        metricService.getOrCreateTimer(
+            Metric.STAGE.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            RECEIVER,
+            Tag.REGION.toString(),
+            pipeConsensusReceiver.getConsensusGroupIdStr(),
+            Tag.TYPE.toString(),
+            "acquireExecutorLock");
+    dispatchWaitingTimer =
+        metricService.getOrCreateTimer(
+            Metric.STAGE.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            RECEIVER,
+            Tag.REGION.toString(),
+            pipeConsensusReceiver.getConsensusGroupIdStr(),
+            Tag.TYPE.toString(),
+            "dispatchWaiting");
+  }
+
+  public void bindReceiveTimer(AbstractMetricService metricService) {
+    receiveEventTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_RECEIVE_EVENT.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            RECEIVER,
+            Tag.REGION.toString(),
+            pipeConsensusReceiver.getConsensusGroupIdStr(),
+            Tag.TYPE.toString(),
+            "receiveEvent");
+    receiveWALTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_RECEIVE_EVENT.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            RECEIVER,
+            Tag.REGION.toString(),
+            pipeConsensusReceiver.getConsensusGroupIdStr(),
+            Tag.TYPE.toString(),
+            "receiveWALEvent");
+    receiveTsFileTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_RECEIVE_EVENT.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            RECEIVER,
+            Tag.REGION.toString(),
+            pipeConsensusReceiver.getConsensusGroupIdStr(),
+            Tag.TYPE.toString(),
+            "receiveTsFileEvent");
+  }
+
+  public void unbindAutoGauge(AbstractMetricService metricService) {
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.PIPE_RECEIVE_EVENT.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.REGION.toString(),
+        pipeConsensusReceiver.getConsensusGroupIdStr(),
+        Tag.TYPE.toString(),
+        "receiveBufferSize");
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.PIPE_RECEIVE_EVENT.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.REGION.toString(),
+        pipeConsensusReceiver.getConsensusGroupIdStr(),
+        Tag.TYPE.toString(),
+        "WALEventCount");
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.PIPE_RECEIVE_EVENT.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.REGION.toString(),
+        pipeConsensusReceiver.getConsensusGroupIdStr(),
+        Tag.TYPE.toString(),
+        "tsFileEventCount");
+  }
+
+  public void unbindStageTimer(AbstractMetricService metricService) {
+    tsFilePieceWriteTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    tsFilePiecePreCheckTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    tsFileSealLoadTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    tsFileSealPreCheckTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    borrowTsFileWriterTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    acquireExecutorLockTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    dispatchWaitingTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.STAGE.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.REGION.toString(),
+        pipeConsensusReceiver.getConsensusGroupIdStr(),
+        Tag.TYPE.toString(),
+        "tsFilePieceWrite");
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.STAGE.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.REGION.toString(),
+        pipeConsensusReceiver.getConsensusGroupIdStr(),
+        Tag.TYPE.toString(),
+        "tsFilePiecePreCheck");
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.STAGE.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.REGION.toString(),
+        pipeConsensusReceiver.getConsensusGroupIdStr(),
+        Tag.TYPE.toString(),
+        "tsFileSealLoad");
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.STAGE.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.REGION.toString(),
+        pipeConsensusReceiver.getConsensusGroupIdStr(),
+        Tag.TYPE.toString(),
+        "tsFileSealPreCheck");
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.STAGE.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.REGION.toString(),
+        pipeConsensusReceiver.getConsensusGroupIdStr(),
+        Tag.TYPE.toString(),
+        "borrowTsFileWriter");
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.STAGE.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.REGION.toString(),
+        pipeConsensusReceiver.getConsensusGroupIdStr(),
+        Tag.TYPE.toString(),
+        "acquireExecutorLock");
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.STAGE.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.REGION.toString(),
+        pipeConsensusReceiver.getConsensusGroupIdStr(),
+        Tag.TYPE.toString(),
+        "dispatchWaiting");
+  }
+
+  public void unbindReceiveTimer(AbstractMetricService metricService) {
+    receiveWALTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    receiveTsFileTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    receiveEventTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_RECEIVE_EVENT.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.REGION.toString(),
+        pipeConsensusReceiver.getConsensusGroupIdStr(),
+        Tag.TYPE.toString(),
+        "receiveWALEvent");
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_RECEIVE_EVENT.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.REGION.toString(),
+        pipeConsensusReceiver.getConsensusGroupIdStr(),
+        Tag.TYPE.toString(),
+        "receiveTsFileEvent");
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_RECEIVE_EVENT.toString(),
+        Tag.NAME.toString(),
+        RECEIVER,
+        Tag.REGION.toString(),
+        pipeConsensusReceiver.getConsensusGroupIdStr(),
+        Tag.TYPE.toString(),
+        "receiveEvent");
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index 5d9eb0947c1..d6b106705a1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -30,6 +30,7 @@ import 
org.apache.iotdb.commons.pipe.connector.payload.pipeconsensus.request.Pip
 import 
org.apache.iotdb.commons.pipe.connector.payload.pipeconsensus.response.PipeConsensusTransferFilePieceResp;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.response.PipeTransferFilePieceResp;
 import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiverAgent;
+import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.consensus.pipe.PipeConsensus;
 import org.apache.iotdb.consensus.pipe.PipeConsensusServerImpl;
@@ -46,6 +47,7 @@ import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFilePieceReq;
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFileSealReq;
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFileSealWithModReq;
+import org.apache.iotdb.db.pipe.consensus.PipeConsensusReceiverMetrics;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -89,16 +91,17 @@ public class PipeConsensusReceiver {
           * IOTDB_CONFIG.getPipeConsensusPipelineSize();
   private static final long CLOSE_TSFILE_WRITER_MAX_WAIT_TIME_IN_MS = 5000;
   private static final long RETRY_WAIT_TIME = 500;
-  private final RequestExecutor requestExecutor = new RequestExecutor();
+  private final RequestExecutor requestExecutor;
   private final PipeConsensus pipeConsensus;
   private final ConsensusGroupId consensusGroupId;
-  // Used to buffer TsFile when transfer TsFile asynchronously.
   private final ConsensusPipeName consensusPipeName;
   private final List<String> receiverBaseDirsName;
+  // Used to buffer TsFile when transfer TsFile asynchronously.
   private final PipeConsensusTsFileWriterPool pipeConsensusTsFileWriterPool =
       new PipeConsensusTsFileWriterPool();
   private final AtomicReference<File> receiverFileDirWithIdSuffix = new 
AtomicReference<>();
-  private FolderManager folderManager;
+  private final PipeConsensusReceiverMetrics pipeConsensusReceiverMetrics;
+  private final FolderManager folderManager;
 
   public PipeConsensusReceiver(
       PipeConsensus pipeConsensus,
@@ -106,7 +109,10 @@ public class PipeConsensusReceiver {
       ConsensusPipeName consensusPipeName) {
     this.pipeConsensus = pipeConsensus;
     this.consensusGroupId = consensusGroupId;
+    this.pipeConsensusReceiverMetrics = new PipeConsensusReceiverMetrics(this);
+    this.requestExecutor = new RequestExecutor(pipeConsensusReceiverMetrics);
     this.consensusPipeName = consensusPipeName;
+    MetricService.getInstance().addMetricSet(pipeConsensusReceiverMetrics);
 
     // Each pipeConsensusReceiver has its own base directories. for example, a 
default dir path is
     // 
data/datanode/system/pipe/consensus/receiver/__consensus.{consensusGroupId}_{leaderDataNodeId}_{followerDataNodeId}
@@ -116,13 +122,19 @@ public class PipeConsensusReceiver {
     try {
       this.folderManager =
           new FolderManager(receiverBaseDirsName, 
DirectoryStrategyType.SEQUENCE_STRATEGY);
-      initiateTsFileBufferFolder();
     } catch (Exception e) {
       LOGGER.error(
           "Fail to create pipeConsensus receiver file folders allocation 
strategy because all disks of folders are full.",
           e);
       throw new RuntimeException(e);
     }
+
+    try {
+      initiateTsFileBufferFolder();
+    } catch (Exception e) {
+      LOGGER.error("Fail to initiate file buffer folder, Error msg: {}", 
e.getMessage());
+      throw new RuntimeException(e);
+    }
   }
 
   /**
@@ -130,6 +142,7 @@ public class PipeConsensusReceiver {
    * load event must be synchronized.
    */
   public TPipeConsensusTransferResp receive(final TPipeConsensusTransferReq 
req) {
+    long startNanos = System.nanoTime();
     // PreCheck: if there are these cases: read-only; null impl; inactive 
impl, etc. The receiver
     // will reject synchronization.
     TPipeConsensusTransferResp resp = preCheckForReceiver(req);
@@ -144,18 +157,26 @@ public class PipeConsensusReceiver {
         case TRANSFER_TS_FILE_PIECE_WITH_MOD:
           // Just take a place in requestExecutor's buffer, the further seal 
request will remove
           // its place from buffer.
-          requestExecutor.onRequest(req, true);
-          return loadEvent(req);
+          requestExecutor.onRequest(req, true, false);
+          resp = loadEvent(req);
+          break;
         case TRANSFER_TS_FILE_SEAL:
         case TRANSFER_TS_FILE_SEAL_WITH_MOD:
-          // TODO: check memory when logging wal(in further version)
+          // TODO: check memory when logging WAL(in further version)
+          resp = requestExecutor.onRequest(req, false, true);
+          break;
         case TRANSFER_TABLET_BINARY:
         case TRANSFER_TABLET_INSERT_NODE:
           // TODO: support batch transfer(in further version)
         case TRANSFER_TABLET_BATCH:
         default:
-          return requestExecutor.onRequest(req, false);
+          resp = requestExecutor.onRequest(req, false, false);
+          break;
       }
+      // update receive an event's duration
+      long durationNanos = System.nanoTime() - startNanos;
+      pipeConsensusReceiverMetrics.recordReceiveEventTimer(durationNanos);
+      return resp;
     }
     // Unknown request type, which means the request can not be handled by 
this receiver,
     // maybe the version of the receiver is not compatible with the sender
@@ -185,8 +206,7 @@ public class PipeConsensusReceiver {
     if (impl.isReadOnly()) {
       String message =
           String.format(
-              "PipeConsensus-ConsensusGroupId-%s: fail to receive because 
system is read-only.",
-              groupId);
+              "PipeConsensus-PipeName-%s: fail to receive because system is 
read-only.", groupId);
       if (LOGGER.isErrorEnabled()) {
         LOGGER.error(message);
       }
@@ -196,7 +216,7 @@ public class PipeConsensusReceiver {
     if (!impl.isActive()) {
       String message =
           String.format(
-              "PipeConsensus-ConsensusGroupId-%s: fail to receive because peer 
is inactive and not ready.",
+              "PipeConsensus-PipeName-%s: fail to receive because peer is 
inactive and not ready.",
               groupId);
       if (LOGGER.isWarnEnabled()) {
         LOGGER.warn(message);
@@ -245,13 +265,13 @@ public class PipeConsensusReceiver {
               TSStatusCode.PIPE_CONSENSUS_TYPE_ERROR,
               String.format("Unknown PipeConsensusRequestType %s.", 
rawRequestType));
       LOGGER.warn(
-          "PipeConsensus-ConsensusGroupId-{}: Unknown PipeRequestType, 
response status = {}.",
-          consensusGroupId,
+          "PipeConsensus-PipeName-{}: Unknown PipeRequestType, response status 
= {}.",
+          consensusPipeName,
           status);
       return new TPipeConsensusTransferResp(status);
     } catch (Exception e) {
       final String error = String.format("Serialization error during pipe 
receiving, %s", e);
-      LOGGER.warn("PipeConsensus-ConsensusGroupId-{}: {}", consensusGroupId, 
error, e);
+      LOGGER.warn("PipeConsensus-PipeName-{}: {}", consensusPipeName, error, 
e);
       return new 
TPipeConsensusTransferResp(RpcUtils.getStatus(TSStatusCode.PIPE_ERROR, error));
     }
   }
@@ -259,8 +279,7 @@ public class PipeConsensusReceiver {
   private TPipeConsensusTransferResp handleTransferTabletInsertNode(
       final PipeConsensusTabletInsertNodeReq req) throws 
ConsensusGroupNotExistException {
     LOGGER.info(
-        "PipeConsensus-ConsensusGroupId-{}: starting to receive tablet 
insertNode",
-        consensusGroupId);
+        "PipeConsensus-PipeName-{}: starting to receive tablet insertNode", 
consensusPipeName);
     PipeConsensusServerImpl impl =
         Optional.ofNullable(pipeConsensus.getImpl(consensusGroupId))
             .orElseThrow(() -> new 
ConsensusGroupNotExistException(consensusGroupId));
@@ -272,8 +291,7 @@ public class PipeConsensusReceiver {
 
   private TPipeConsensusTransferResp handleTransferTabletBinary(
       final PipeConsensusTabletBinaryReq req) throws 
ConsensusGroupNotExistException {
-    LOGGER.info(
-        "PipeConsensus-ConsensusGroupId-{}: starting to receive tablet 
binary", consensusGroupId);
+    LOGGER.info("PipeConsensus-PipeName-{}: starting to receive tablet 
binary", consensusPipeName);
     PipeConsensusServerImpl impl =
         Optional.ofNullable(pipeConsensus.getImpl(consensusGroupId))
             .orElseThrow(() -> new 
ConsensusGroupNotExistException(consensusGroupId));
@@ -285,17 +303,20 @@ public class PipeConsensusReceiver {
 
   private TPipeConsensusTransferResp handleTransferFilePiece(
       final PipeConsensusTransferFilePieceReq req, final boolean isSingleFile) 
{
-    LOGGER.info(
-        "PipeConsensus-ConsensusGroupId-{}: starting to receive tsFile 
pieces", consensusGroupId);
-    PipeConsensusTsFileWriter diskBuffer =
+    LOGGER.info("PipeConsensus-PipeName-{}: starting to receive tsFile 
pieces", consensusPipeName);
+    long startBorrowTsFileWriterNanos = System.nanoTime();
+    PipeConsensusTsFileWriter tsFileWriter =
         
pipeConsensusTsFileWriterPool.borrowCorrespondingWriter(req.getCommitId());
+    long startPreCheckNanos = System.nanoTime();
+    pipeConsensusReceiverMetrics.recordBorrowTsFileWriterTimer(
+        startPreCheckNanos - startBorrowTsFileWriterNanos);
 
     try {
-      updateWritingFileIfNeeded(diskBuffer, req.getFileName(), isSingleFile);
-      final File writingFile = diskBuffer.getWritingFile();
-      final RandomAccessFile writingFileWriter = 
diskBuffer.getWritingFileWriter();
+      updateWritingFileIfNeeded(tsFileWriter, req.getFileName(), isSingleFile);
+      final File writingFile = tsFileWriter.getWritingFile();
+      final RandomAccessFile writingFileWriter = 
tsFileWriter.getWritingFileWriter();
 
-      if (isWritingFileOffsetNonCorrect(diskBuffer, 
req.getStartWritingOffset())) {
+      if (isWritingFileOffsetNonCorrect(tsFileWriter, 
req.getStartWritingOffset())) {
         if (!writingFile.getName().endsWith(TsFileConstant.TSFILE_SUFFIX)) {
           // If the file is a tsFile, then the content will not be changed for 
a specific
           // filename. However, for other files (mod, snapshot, etc.) the 
content varies for the
@@ -311,20 +332,24 @@ public class PipeConsensusReceiver {
                     "Request sender to reset file reader's offset from %s to 
%s.",
                     req.getStartWritingOffset(), writingFileWriter.length()));
         LOGGER.warn(
-            "PipeConsensus-ConsensusGroupId-{}: File offset reset requested by 
receiver, response status = {}.",
-            consensusGroupId,
+            "PipeConsensus-PipeName-{}: File offset reset requested by 
receiver, response status = {}.",
+            consensusPipeName,
             status);
         return PipeConsensusTransferFilePieceResp.toTPipeConsensusTransferResp(
             status, writingFileWriter.length());
       }
 
+      long endPreCheckNanos = System.nanoTime();
+      pipeConsensusReceiverMetrics.recordTsFilePiecePreCheckTime(
+          endPreCheckNanos - startPreCheckNanos);
       writingFileWriter.write(req.getFilePiece());
+      
pipeConsensusReceiverMetrics.recordTsFilePieceWriteTime(System.nanoTime() - 
endPreCheckNanos);
       return PipeConsensusTransferFilePieceResp.toTPipeConsensusTransferResp(
           RpcUtils.SUCCESS_STATUS, writingFileWriter.length());
     } catch (Exception e) {
       LOGGER.warn(
-          "PipeConsensus-ConsensusGroupId-{}: Failed to write file piece from 
req {}.",
-          consensusGroupId,
+          "PipeConsensus-PipeName-{}: Failed to write file piece from req {}.",
+          consensusPipeName,
           req,
           e);
       final TSStatus status =
@@ -341,10 +366,13 @@ public class PipeConsensusReceiver {
   }
 
   private TPipeConsensusTransferResp handleTransferFileSeal(final 
PipeConsensusTsFileSealReq req) {
-    LOGGER.info(
-        "PipeConsensus-ConsensusGroupId-{}: starting to receive tsFile seal", 
consensusGroupId);
+    LOGGER.info("PipeConsensus-PipeName-{}: starting to receive tsFile seal", 
consensusPipeName);
+    long startBorrowTsFileWriterNanos = System.nanoTime();
     PipeConsensusTsFileWriter tsFileWriter =
         
pipeConsensusTsFileWriterPool.borrowCorrespondingWriter(req.getCommitId());
+    long startPreCheckNanos = System.nanoTime();
+    pipeConsensusReceiverMetrics.recordBorrowTsFileWriterTimer(
+        startPreCheckNanos - startBorrowTsFileWriterNanos);
     File writingFile = tsFileWriter.getWritingFile();
     RandomAccessFile writingFileWriter = tsFileWriter.getWritingFileWriter();
 
@@ -381,29 +409,33 @@ public class PipeConsensusReceiver {
       // writingFile will be deleted after load if no exception occurs
       tsFileWriter.setWritingFile(null);
 
+      long endPreCheckNanos = System.nanoTime();
+      pipeConsensusReceiverMetrics.recordTsFileSealPreCheckTimer(
+          endPreCheckNanos - startPreCheckNanos);
       final TSStatus status =
           loadFileToDataRegion(
               fileAbsolutePath,
               
ProgressIndexType.deserializeFrom(ByteBuffer.wrap(req.getProgressIndex())));
+      pipeConsensusReceiverMetrics.recordTsFileSealLoadTimer(System.nanoTime() 
- endPreCheckNanos);
       if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         // if transfer success, disk buffer will be released.
         tsFileWriter.returnSelf();
         LOGGER.info(
-            "PipeConsensus-ConsensusGroupId-{}: Seal file {} successfully.",
-            consensusGroupId,
+            "PipeConsensus-PipeName-{}: Seal file {} successfully.",
+            consensusPipeName,
             fileAbsolutePath);
       } else {
         LOGGER.warn(
-            "PipeConsensus-ConsensusGroupId-{}: Failed to seal file {}, 
because {}.",
-            consensusGroupId,
+            "PipeConsensus-PipeName-{}: Failed to seal file {}, because {}.",
+            consensusPipeName,
             fileAbsolutePath,
             status.getMessage());
       }
       return new TPipeConsensusTransferResp(status);
     } catch (IOException e) {
       LOGGER.warn(
-          "PipeConsensus-ConsensusGroupId-{}: Failed to seal file {} from req 
{}.",
-          consensusGroupId,
+          "PipeConsensus-PipeName-{}: Failed to seal file {} from req {}.",
+          consensusPipeName,
           writingFile,
           req,
           e);
@@ -413,8 +445,8 @@ public class PipeConsensusReceiver {
               String.format("Failed to seal file %s because %s", writingFile, 
e.getMessage())));
     } catch (LoadFileException e) {
       LOGGER.warn(
-          "PipeConsensus-ConsensusGroupId-{}: Failed to load file {} from req 
{}.",
-          consensusGroupId,
+          "PipeConsensus-PipeName-{}: Failed to load file {} from req {}.",
+          consensusPipeName,
           writingFile,
           req,
           e);
@@ -434,10 +466,13 @@ public class PipeConsensusReceiver {
   private TPipeConsensusTransferResp handleTransferFileSealWithMods(
       final PipeConsensusTsFileSealWithModReq req) {
     LOGGER.info(
-        "PipeConsensus-ConsensusGroupId-{}: starting to receive tsFile seal 
with mods",
-        consensusGroupId);
+        "PipeConsensus-PipeName-{}: starting to receive tsFile seal with 
mods", consensusPipeName);
+    long startBorrowTsFileWriterNanos = System.nanoTime();
     PipeConsensusTsFileWriter tsFileWriter =
         
pipeConsensusTsFileWriterPool.borrowCorrespondingWriter(req.getCommitId());
+    long startPreCheckNanos = System.nanoTime();
+    pipeConsensusReceiverMetrics.recordBorrowTsFileWriterTimer(
+        startPreCheckNanos - startBorrowTsFileWriterNanos);
     File writingFile = tsFileWriter.getWritingFile();
     RandomAccessFile writingFileWriter = tsFileWriter.getWritingFileWriter();
 
@@ -491,29 +526,33 @@ public class PipeConsensusReceiver {
       final List<String> fileAbsolutePaths =
           
files.stream().map(File::getAbsolutePath).collect(Collectors.toList());
 
+      long endPreCheckNanos = System.nanoTime();
+      pipeConsensusReceiverMetrics.recordTsFileSealPreCheckTimer(
+          endPreCheckNanos - startPreCheckNanos);
       final TSStatus status =
           loadFileToDataRegion(
               fileAbsolutePaths.get(1),
               
ProgressIndexType.deserializeFrom(ByteBuffer.wrap(req.getProgressIndex())));
+      pipeConsensusReceiverMetrics.recordTsFileSealLoadTimer(System.nanoTime() 
- endPreCheckNanos);
       if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         // if transfer success, disk buffer will be released.
         tsFileWriter.returnSelf();
         LOGGER.info(
-            "PipeConsensus-ConsensusGroupId-{}: Seal file with mods {} 
successfully.",
-            consensusGroupId,
+            "PipeConsensus-PipeName-{}: Seal file with mods {} successfully.",
+            consensusPipeName,
             fileAbsolutePaths);
       } else {
         LOGGER.warn(
-            "PipeConsensus-ConsensusGroupId-{}: Failed to seal file {}, status 
is {}.",
-            consensusGroupId,
+            "PipeConsensus-PipeName-{}: Failed to seal file {}, status is {}.",
+            consensusPipeName,
             fileAbsolutePaths,
             status);
       }
       return new TPipeConsensusTransferResp(status);
     } catch (Exception e) {
       LOGGER.warn(
-          "PipeConsensus-ConsensusGroupId-{}: Failed to seal file {} from req 
{}.",
-          consensusGroupId,
+          "PipeConsensus-PipeName-{}: Failed to seal file {} from req {}.",
+          consensusPipeName,
           files,
           req,
           e);
@@ -546,8 +585,8 @@ public class PipeConsensusReceiver {
               TSStatusCode.PIPE_CONSENSUS_TRANSFER_FILE_ERROR,
               String.format("Failed to seal file %s, the file does not 
exist.", fileName));
       LOGGER.warn(
-          "PipeConsensus-ConsensusGroupId-{}: Failed to seal file {}, because 
the file does not exist.",
-          consensusGroupId,
+          "PipeConsensus-PipeName-{}: Failed to seal file {}, because the file 
does not exist.",
+          consensusPipeName,
           fileName);
       return new TPipeConsensusTransferResp(status);
     }
@@ -561,9 +600,9 @@ public class PipeConsensusReceiver {
                       + "The original file has length %s, but receiver file 
has length %s.",
                   fileName, fileLength, writingFileWriter.length()));
       LOGGER.warn(
-          "PipeConsensus-ConsensusGroupId-{}: Failed to seal file {} when 
check non final seal, because the length of file is not correct. "
+          "PipeConsensus-PipeName-{}: Failed to seal file {} when check non 
final seal, because the length of file is not correct. "
               + "The original file has length {}, but receiver file has length 
{}.",
-          consensusGroupId,
+          consensusPipeName,
           fileName,
           fileLength,
           writingFileWriter.length());
@@ -604,9 +643,9 @@ public class PipeConsensusReceiver {
         writingFile != null && writingFile.exists() && writingFileWriter != 
null;
     if (!isWritingFileAvailable) {
       LOGGER.info(
-          "PipeConsensus-ConsensusGroupId-{}: Writing file {} is not 
available. "
+          "PipeConsensus-PipeName-{}: Writing file {} is not available. "
               + "Writing file is null: {}, writing file exists: {}, writing 
file writer is null: {}.",
-          consensusGroupId,
+          consensusPipeName,
           writingFile,
           writingFile == null,
           writingFile != null && writingFile.exists(),
@@ -628,8 +667,8 @@ public class PipeConsensusReceiver {
               String.format(
                   "Failed to seal file %s, because writing file is %s.", 
fileName, writingFile));
       LOGGER.warn(
-          "PipeConsensus-ConsensusGroupId-{}: Failed to seal file {}, because 
writing file is {}.",
-          consensusGroupId,
+          "PipeConsensus-PipeName-{}: Failed to seal file {}, because writing 
file is {}.",
+          consensusPipeName,
           fileName,
           writingFile);
       return new TPipeConsensusTransferResp(status);
@@ -644,9 +683,9 @@ public class PipeConsensusReceiver {
                       + "The original file has length %s, but receiver file 
has length %s.",
                   fileName, fileLength, writingFileWriter.length()));
       LOGGER.warn(
-          "PipeConsensus-ConsensusGroupId-{}: Failed to seal file {} when 
check final seal file, because the length of file is not correct. "
+          "PipeConsensus-PipeName-{}: Failed to seal file {} when check final 
seal file, because the length of file is not correct. "
               + "The original file has length {}, but receiver file has length 
{}.",
-          consensusGroupId,
+          consensusPipeName,
           fileName,
           fileLength,
           writingFileWriter.length());
@@ -670,8 +709,8 @@ public class PipeConsensusReceiver {
     final boolean offsetCorrect = writingFileWriter.length() == offset;
     if (!offsetCorrect) {
       LOGGER.warn(
-          "PipeConsensus-ConsensusGroupId-{}: Writing file {}'s offset is {}, 
but request sender's offset is {}.",
-          consensusGroupId,
+          "PipeConsensus-PipeName-{}: Writing file {}'s offset is {}, but 
request sender's offset is {}.",
+          consensusPipeName,
           writingFile.getPath(),
           writingFileWriter.length(),
           offset);
@@ -684,13 +723,13 @@ public class PipeConsensusReceiver {
       try {
         diskBuffer.getWritingFileWriter().close();
         LOGGER.info(
-            "PipeConsensus-ConsensusGroupId-{}: Current writing file writer {} 
was closed.",
-            consensusGroupId,
+            "PipeConsensus-PipeName-{}: Current writing file writer {} was 
closed.",
+            consensusPipeName,
             diskBuffer.getWritingFile() == null ? "null" : 
diskBuffer.getWritingFile().getPath());
       } catch (IOException e) {
         LOGGER.warn(
-            "PipeConsensus-ConsensusGroupId-{}: Failed to close current 
writing file writer {}, because {}.",
-            consensusGroupId,
+            "PipeConsensus-PipeName-{}: Failed to close current writing file 
writer {}, because {}.",
+            consensusPipeName,
             diskBuffer.getWritingFile() == null ? "null" : 
diskBuffer.getWritingFile().getPath(),
             e.getMessage(),
             e);
@@ -699,8 +738,8 @@ public class PipeConsensusReceiver {
     } else {
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug(
-            "PipeConsensus-ConsensusGroupId-{}: Current writing file writer is 
null. No need to close.",
-            consensusGroupId.getId());
+            "PipeConsensus-PipeName-{}: Current writing file writer is null. 
No need to close.",
+            consensusPipeName.toString());
       }
     }
   }
@@ -710,13 +749,13 @@ public class PipeConsensusReceiver {
       try {
         FileUtils.delete(file);
         LOGGER.info(
-            "PipeConsensus-ConsensusGroupId-{}: Original writing file {} was 
deleted.",
-            consensusGroupId,
+            "PipeConsensus-PipeName-{}: Original writing file {} was deleted.",
+            consensusPipeName,
             file.getPath());
       } catch (IOException e) {
         LOGGER.warn(
-            "PipeConsensus-ConsensusGroupId-{}: Failed to delete original 
writing file {}, because {}.",
-            consensusGroupId,
+            "PipeConsensus-PipeName-{}: Failed to delete original writing file 
{}, because {}.",
+            consensusPipeName,
             file.getPath(),
             e.getMessage(),
             e);
@@ -724,8 +763,8 @@ public class PipeConsensusReceiver {
     } else {
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug(
-            "PipeConsensus-ConsensusGroupId-{}: Original file {} is not 
existed. No need to delete.",
-            consensusGroupId,
+            "PipeConsensus-PipeName-{}: Original file {} is not existed. No 
need to delete.",
+            consensusPipeName,
             file.getPath());
       }
     }
@@ -737,8 +776,8 @@ public class PipeConsensusReceiver {
     } else {
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug(
-            "PipeConsensus-ConsensusGroupId-{}: Current writing file is null. 
No need to delete.",
-            consensusGroupId.getId());
+            "PipeConsensus-PipeName-{}: Current writing file is null. No need 
to delete.",
+            consensusPipeName.toString());
       }
     }
   }
@@ -751,9 +790,9 @@ public class PipeConsensusReceiver {
     }
 
     LOGGER.info(
-        "PipeConsensus-ConsensusGroupId-{}: Writing file {} is not existed or 
name is not correct, try to create it. "
+        "PipeConsensus-PipeName-{}: Writing file {} is not existed or name is 
not correct, try to create it. "
             + "Current writing file is {}.",
-        consensusGroupId,
+        consensusPipeName,
         fileName,
         diskBuffer.getWritingFile() == null ? "null" : 
diskBuffer.getWritingFile().getPath());
 
@@ -770,13 +809,13 @@ public class PipeConsensusReceiver {
     if (!receiverFileDirWithIdSuffix.get().exists()) {
       if (receiverFileDirWithIdSuffix.get().mkdirs()) {
         LOGGER.info(
-            "PipeConsensus-ConsensusGroupId-{}: Receiver file dir {} was 
created.",
-            consensusGroupId,
+            "PipeConsensus-PipeName-{}: Receiver file dir {} was created.",
+            consensusPipeName,
             receiverFileDirWithIdSuffix.get().getPath());
       } else {
         LOGGER.error(
-            "PipeConsensus-ConsensusGroupId-{}: Failed to create receiver file 
dir {}.",
-            consensusGroupId,
+            "PipeConsensus-PipeName-{}: Failed to create receiver file dir 
{}.",
+            consensusPipeName,
             receiverFileDirWithIdSuffix.get().getPath());
       }
     }
@@ -784,8 +823,8 @@ public class PipeConsensusReceiver {
     diskBuffer.setWritingFile(new File(receiverFileDirWithIdSuffix.get(), 
fileName));
     diskBuffer.setWritingFileWriter(new 
RandomAccessFile(diskBuffer.getWritingFile(), "rw"));
     LOGGER.info(
-        "PipeConsensus-ConsensusGroupId-{}: Writing file {} was created. Ready 
to write file pieces.",
-        consensusGroupId,
+        "PipeConsensus-PipeName-{}: Writing file {} was created. Ready to 
write file pieces.",
+        consensusPipeName,
         diskBuffer.getWritingFile().getPath());
   }
 
@@ -801,13 +840,13 @@ public class PipeConsensusReceiver {
         try {
           FileUtils.deleteDirectory(receiverFileDirWithIdSuffix.get());
           LOGGER.info(
-              "PipeConsensus-ConsensusGroupId-{}: Original receiver file dir 
{} was deleted successfully.",
-              consensusGroupId,
+              "PipeConsensus-PipeName-{}: Original receiver file dir {} was 
deleted successfully.",
+              consensusPipeName,
               receiverFileDirWithIdSuffix.get().getPath());
         } catch (IOException e) {
           LOGGER.warn(
-              "PipeConsensus-ConsensusGroupId-{}: Failed to delete original 
receiver file dir {}, because {}.",
-              consensusGroupId,
+              "PipeConsensus-PipeName-{}: Failed to delete original receiver 
file dir {}, because {}.",
+              consensusPipeName,
               receiverFileDirWithIdSuffix.get().getPath(),
               e.getMessage(),
               e);
@@ -815,16 +854,16 @@ public class PipeConsensusReceiver {
       } else {
         if (LOGGER.isDebugEnabled()) {
           LOGGER.debug(
-              "PipeConsensus-ConsensusGroupId-{}: Original receiver file dir 
{} is not existed. No need to delete.",
-              consensusGroupId,
+              "PipeConsensus-PipeName-{}: Original receiver file dir {} is not 
existed. No need to delete.",
+              consensusPipeName,
               receiverFileDirWithIdSuffix.get().getPath());
         }
       }
       receiverFileDirWithIdSuffix.set(null);
     } else {
       LOGGER.debug(
-          "PipeConsensus-ConsensusGroupId-{}: Current receiver file dir is 
null. No need to delete.",
-          consensusGroupId.getId());
+          "PipeConsensus-PipeName-{}: Current receiver file dir is null. No 
need to delete.",
+          consensusPipeName.toString());
     }
 
     // initiate receiverFileDirWithIdSuffix
@@ -832,8 +871,8 @@ public class PipeConsensusReceiver {
       final String receiverFileBaseDir = getReceiverFileBaseDir();
       if (Objects.isNull(receiverFileBaseDir)) {
         LOGGER.warn(
-            "PipeConsensus-ConsensusGroupId-{}: Failed to init pipeConsensus 
receiver file folder manager because all disks of folders are full.",
-            consensusGroupId.getId());
+            "PipeConsensus-PipeName-{}: Failed to get pipeConsensus receiver 
file base directory, because your folderManager is null. May because the disk 
is full.",
+            consensusPipeName.toString());
         throw new DiskSpaceInsufficientException(receiverBaseDirsName);
       }
       // Create a new receiver file dir
@@ -841,19 +880,19 @@ public class PipeConsensusReceiver {
       if (newReceiverDir.exists()) {
         FileUtils.deleteDirectory(newReceiverDir);
         LOGGER.info(
-            "PipeConsensus-ConsensusGroupId-{}: Origin receiver file dir {} 
was deleted.",
-            consensusGroupId,
+            "PipeConsensus-PipeName-{}: Origin receiver file dir {} was 
deleted.",
+            consensusPipeName,
             newReceiverDir.getPath());
       }
       if (!newReceiverDir.mkdirs()) {
         LOGGER.warn(
-            "PipeConsensus-ConsensusGroupId-{}: Failed to create receiver file 
dir {}.",
-            newReceiverDir.getPath(),
-            consensusGroupId.getId());
+            "PipeConsensus-PipeName-{}: Failed to create receiver file dir {}. 
May because authority or dir already exists etc.",
+            consensusPipeName,
+            newReceiverDir.getPath());
         throw new IOException(
             String.format(
-                "PipeConsensus-ConsensusGroupId-%s: Failed to create receiver 
file dir %s.",
-                newReceiverDir.getPath(), consensusGroupId.getId()));
+                "PipeConsensus-PipeName-%s: Failed to create receiver file dir 
%s. May because authority or dir already exists etc.",
+                consensusPipeName, newReceiverDir.getPath()));
       }
       receiverFileDirWithIdSuffix.set(newReceiverDir);
 
@@ -871,7 +910,7 @@ public class PipeConsensusReceiver {
 
   public synchronized void handleExit() {
     // Clear the diskBuffers
-    pipeConsensusTsFileWriterPool.handleExit(consensusGroupId);
+    pipeConsensusTsFileWriterPool.handleExit(consensusPipeName);
 
     // Clear the original receiver file dir if exists
     if (receiverFileDirWithIdSuffix.get() != null) {
@@ -879,21 +918,21 @@ public class PipeConsensusReceiver {
         try {
           FileUtils.deleteDirectory(receiverFileDirWithIdSuffix.get());
           LOGGER.info(
-              "PipeConsensus-ConsensusGroupId-{}: Receiver exit: Original 
receiver file dir {} was deleted.",
-              consensusGroupId,
+              "PipeConsensus-PipeName-{}: Receiver exit: Original receiver 
file dir {} was deleted.",
+              consensusPipeName,
               receiverFileDirWithIdSuffix.get().getPath());
         } catch (IOException e) {
           LOGGER.warn(
-              "PipeConsensus-ConsensusGroupId-{}: Receiver exit: Delete 
original receiver file dir {} error.",
-              consensusGroupId,
+              "PipeConsensus-PipeName-{}: Receiver exit: Delete original 
receiver file dir {} error.",
+              consensusPipeName,
               receiverFileDirWithIdSuffix.get().getPath(),
               e);
         }
       } else {
         if (LOGGER.isDebugEnabled()) {
           LOGGER.debug(
-              "PipeConsensus-ConsensusGroupId-{}: Receiver exit: Original 
receiver file dir {} does not exist. No need to delete.",
-              consensusGroupId,
+              "PipeConsensus-PipeName-{}: Receiver exit: Original receiver 
file dir {} does not exist. No need to delete.",
+              consensusPipeName,
               receiverFileDirWithIdSuffix.get().getPath());
         }
       }
@@ -901,14 +940,16 @@ public class PipeConsensusReceiver {
     } else {
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug(
-            "PipeConsensus-ConsensusGroupId-{}: Receiver exit: Original 
receiver file dir is null. No need to delete.",
-            consensusGroupId.getId());
+            "PipeConsensus-PipeName-{}: Receiver exit: Original receiver file 
dir is null. No need to delete.",
+            consensusPipeName.toString());
       }
     }
 
+    // remove metric
+    MetricService.getInstance().removeMetricSet(pipeConsensusReceiverMetrics);
+
     LOGGER.info(
-        "PipeConsensus-ConsensusGroupId-{}: Receiver exit: Receiver exited.",
-        consensusGroupId.getId());
+        "PipeConsensus-PipeName-{}: Receiver exit: Receiver exited.", 
consensusPipeName.toString());
   }
 
   private static class PipeConsensusTsFileWriterPool {
@@ -948,7 +989,7 @@ public class PipeConsensusReceiver {
       return diskBuffer.get();
     }
 
-    public void handleExit(ConsensusGroupId consensusGroupId) {
+    public void handleExit(ConsensusPipeName consensusPipeName) {
       pipeConsensusTsFileWriterPool.forEach(
           diskBuffer -> {
             // Wait until diskBuffer is not used by TsFileInsertionEvent or 
timeout.
@@ -960,13 +1001,13 @@ public class PipeConsensusReceiver {
                 Thread.sleep(RETRY_WAIT_TIME);
               } catch (InterruptedException e) {
                 LOGGER.warn(
-                    "PipeConsensus-ConsensusGroupId-{}: receiver thread get 
interrupted when exiting.",
-                    consensusGroupId.getId());
+                    "PipeConsensus-PipeName-{}: receiver thread get 
interrupted when exiting.",
+                    consensusPipeName.toString());
                 // avoid infinite loop
                 break;
               }
             }
-            diskBuffer.closeSelf(consensusGroupId);
+            diskBuffer.closeSelf(consensusPipeName);
           });
     }
   }
@@ -1023,19 +1064,19 @@ public class PipeConsensusReceiver {
       this.commitIdOfCorrespondingHolderEvent = null;
     }
 
-    public void closeSelf(ConsensusGroupId consensusGroupId) {
+    public void closeSelf(ConsensusPipeName consensusPipeName) {
       // close file writer
       if (writingFileWriter != null) {
         try {
           writingFileWriter.close();
           LOGGER.info(
-              "PipeConsensus-ConsensusGroupId-{}: TsFileWriter-{} exit: 
Writing file writer was closed.",
-              consensusGroupId.getId(),
+              "PipeConsensus-PipeName-{}: TsFileWriter-{} exit: Writing file 
writer was closed.",
+              consensusPipeName.toString(),
               index);
         } catch (Exception e) {
           LOGGER.warn(
-              "PipeConsensus-ConsensusGroupId-{}: TsFileWriter-{} exit: Close 
Writing file writer error.",
-              consensusGroupId,
+              "PipeConsensus-PipeName-{}: TsFileWriter-{} exit: Close Writing 
file writer error.",
+              consensusPipeName,
               index,
               e);
         }
@@ -1043,8 +1084,8 @@ public class PipeConsensusReceiver {
       } else {
         if (LOGGER.isDebugEnabled()) {
           LOGGER.debug(
-              "PipeConsensus-ConsensusGroupId-{}: TsFileWriter-{} exit: 
Writing file writer is null. No need to close.",
-              consensusGroupId.getId(),
+              "PipeConsensus-PipeName-{}: TsFileWriter-{} exit: Writing file 
writer is null. No need to close.",
+              consensusPipeName.toString(),
               index);
         }
       }
@@ -1054,13 +1095,13 @@ public class PipeConsensusReceiver {
         try {
           FileUtils.delete(writingFile);
           LOGGER.info(
-              "PipeConsensus-ConsensusGroupId-{}: TsFileWriter exit: Writing 
file {} was deleted.",
-              consensusGroupId,
+              "PipeConsensus-PipeName-{}: TsFileWriter exit: Writing file {} 
was deleted.",
+              consensusPipeName,
               writingFile.getPath());
         } catch (Exception e) {
           LOGGER.warn(
-              "PipeConsensus-ConsensusGroupId-{}: TsFileWriter exit: Delete 
writing file {} error.",
-              consensusGroupId,
+              "PipeConsensus-PipeName-{}: TsFileWriter exit: Delete writing 
file {} error.",
+              consensusPipeName,
               writingFile.getPath(),
               e);
         }
@@ -1068,8 +1109,8 @@ public class PipeConsensusReceiver {
       } else {
         if (LOGGER.isDebugEnabled()) {
           LOGGER.debug(
-              "PipeConsensus-ConsensusGroupId-{}: TsFileWriter exit: Writing 
file is null. No need to delete.",
-              consensusGroupId.getId());
+              "PipeConsensus-PipeName-{}: TsFileWriter exit: Writing file is 
null. No need to delete.",
+              consensusPipeName.toString());
         }
       }
     }
@@ -1083,38 +1124,57 @@ public class PipeConsensusReceiver {
     // An ordered set that buffers transfer requests' TCommitId, whose length 
is not larger than
     // PIPE_CONSENSUS_PIPELINE_SIZE.
     // Here we use set is to avoid duplicate events being received in some 
special cases
-    private final TreeSet<TCommitId> reqExecutionOrderBuffer;
+    private final TreeSet<RequestMeta> reqExecutionOrderBuffer;
     private final Lock lock;
     private final Condition condition;
-    private long onSyncedCommitIndex = -1;
+    private final PipeConsensusReceiverMetrics metric;
+    private long onSyncedCommitIndex = 0;
     private int connectorRebootTimes = 0;
+    private volatile int WALEventCount = 0;
+    private volatile int tsFileEventCount = 0;
 
-    public RequestExecutor() {
-      reqExecutionOrderBuffer =
+    public RequestExecutor(PipeConsensusReceiverMetrics metric) {
+      this.reqExecutionOrderBuffer =
           new TreeSet<>(
-              Comparator.comparingInt(TCommitId::getRebootTimes)
-                  .thenComparingLong(TCommitId::getCommitIndex));
-      lock = new ReentrantLock();
-      condition = lock.newCondition();
+              Comparator.comparingInt(RequestMeta::getRebootTimes)
+                  .thenComparingLong(RequestMeta::getCommitIndex));
+      this.lock = new ReentrantLock();
+      this.condition = lock.newCondition();
+      this.metric = metric;
     }
 
-    private void onSuccess(long nextSyncedCommitIndex) {
+    private void onSuccess(long nextSyncedCommitIndex, boolean 
isTransferTsFileSeal) {
       LOGGER.info(
-          "PipeConsensus-ConsensusGroupId-{}: process no.{} event 
successfully!",
-          consensusGroupId,
+          "PipeConsensus-PipeName-{}: process no.{} event successfully!",
+          consensusPipeName,
           nextSyncedCommitIndex);
-      reqExecutionOrderBuffer.pollFirst();
+      RequestMeta curMeta = reqExecutionOrderBuffer.pollFirst();
       onSyncedCommitIndex = nextSyncedCommitIndex;
+      // update metric, notice that curMeta is never null.
+      if (isTransferTsFileSeal) {
+        tsFileEventCount--;
+        metric.recordReceiveTsFileTimer(System.nanoTime() - 
curMeta.getStartApplyNanos());
+      } else {
+        WALEventCount--;
+        metric.recordReceiveWALTimer(System.nanoTime() - 
curMeta.getStartApplyNanos());
+      }
     }
 
     private TPipeConsensusTransferResp onRequest(
-        final TPipeConsensusTransferReq req, final boolean 
isTransferTsFilePiece) {
+        final TPipeConsensusTransferReq req,
+        final boolean isTransferTsFilePiece,
+        final boolean isTransferTsFileSeal) {
+      long startAcquireLockNanos = System.nanoTime();
       lock.lock();
       try {
+        long startDispatchNanos = System.nanoTime();
+        metric.recordAcquireExecutorLockTimer(startDispatchNanos - 
startAcquireLockNanos);
+
         TCommitId tCommitId = req.getCommitId();
+        RequestMeta requestMeta = new RequestMeta(tCommitId);
         LOGGER.info(
-            "PipeConsensus-ConsensusGroup-{}: start to receive no.{} event",
-            consensusGroupId,
+            "PipeConsensus-PipeName-{}: start to receive no.{} event",
+            consensusPipeName,
             tCommitId.getCommitIndex());
         // if a req is deprecated, we will discard it
         // This case may happen in this scenario: leader has transferred {1,2} 
and is intending to
@@ -1132,8 +1192,8 @@ public class PipeConsensusReceiver {
                       TSStatusCode.PIPE_CONSENSUS_DEPRECATED_REQUEST,
                       "PipeConsensus receiver received a deprecated request, 
which may be sent before the connector restart. Consider to discard it"));
           LOGGER.info(
-              "PipeConsensus-ConsensusGroup-{}: received a deprecated request, 
which may be sent before the connector restart. Consider to discard it",
-              consensusGroupId);
+              "PipeConsensus-PipeName-{}: received a deprecated request, which 
may be sent before the connector restart. Consider to discard it",
+              consensusPipeName);
           return new TPipeConsensusTransferResp(status);
         }
         // Judge whether connector has rebooted or not, if the rebootTimes 
increases compared to
@@ -1141,16 +1201,28 @@ public class PipeConsensusReceiver {
         if (tCommitId.getRebootTimes() > connectorRebootTimes) {
           resetWithNewestRebootTime(tCommitId.getRebootTimes());
         }
-        reqExecutionOrderBuffer.add(tCommitId);
+        // update metric
+        if (isTransferTsFilePiece && 
!reqExecutionOrderBuffer.contains(requestMeta)) {
+          // only update tsFileEventCount when tsFileEvent is first enqueue.
+          tsFileEventCount++;
+        }
+        if (!isTransferTsFileSeal && !isTransferTsFilePiece) {
+          WALEventCount++;
+        }
+        reqExecutionOrderBuffer.add(requestMeta);
+
         // TsFilePieceTransferEvent will not enter further procedure, it just 
holds a place in
         // buffer. Only after the corresponding sealing event is processed, 
this event can be
         // dequeued.
         if (isTransferTsFilePiece) {
+          long startApplyNanos = System.nanoTime();
+          metric.recordDispatchWaitingTimer(startApplyNanos - 
startDispatchNanos);
+          requestMeta.setStartApplyNanos(startApplyNanos);
           return null;
         }
 
         if (reqExecutionOrderBuffer.size() >= 
IOTDB_CONFIG.getPipeConsensusPipelineSize()
-            && !reqExecutionOrderBuffer.first().equals(tCommitId)) {
+            && !reqExecutionOrderBuffer.first().equals(requestMeta)) {
           // If reqBuffer is full and current thread do not hold the 
reqBuffer's peek, this req
           // is not supposed to be processed. So current thread should notify 
the corresponding
           // threads to process the peek.
@@ -1159,8 +1231,11 @@ public class PipeConsensusReceiver {
 
         // Polling to process
         while (true) {
-          if (reqExecutionOrderBuffer.first().equals(tCommitId)
+          if (reqExecutionOrderBuffer.first().equals(requestMeta)
               && tCommitId.getCommitIndex() == onSyncedCommitIndex + 1) {
+            long startApplyNanos = System.nanoTime();
+            metric.recordDispatchWaitingTimer(startApplyNanos - 
startDispatchNanos);
+            requestMeta.setStartApplyNanos(startApplyNanos);
             // If current req is supposed to be process, load this event 
through
             // DataRegionStateMachine.
             TPipeConsensusTransferResp resp = loadEvent(req);
@@ -1171,19 +1246,22 @@ public class PipeConsensusReceiver {
             // when the last seal req is applied, we can discard this event.
             if (resp != null
                 && resp.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-              onSuccess(onSyncedCommitIndex + 1);
+              onSuccess(onSyncedCommitIndex + 1, isTransferTsFileSeal);
             }
             return resp;
           }
 
           if (reqExecutionOrderBuffer.size() >= 
IOTDB_CONFIG.getPipeConsensusPipelineSize()
-              && reqExecutionOrderBuffer.first().equals(tCommitId)) {
+              && reqExecutionOrderBuffer.first().equals(requestMeta)) {
+            long startApplyNanos = System.nanoTime();
+            metric.recordDispatchWaitingTimer(startApplyNanos - 
startDispatchNanos);
+            requestMeta.setStartApplyNanos(startApplyNanos);
             // If the reqBuffer is full and its peek is hold by current 
thread, load this event.
             TPipeConsensusTransferResp resp = loadEvent(req);
 
             if (resp != null
                 && resp.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-              onSuccess(tCommitId.getCommitIndex());
+              onSuccess(tCommitId.getCommitIndex(), isTransferTsFileSeal);
               // signal all other reqs that may wait for this event
               condition.signalAll();
             }
@@ -1203,12 +1281,15 @@ public class PipeConsensusReceiver {
               if (timeout
                   && reqExecutionOrderBuffer.size() < 
IOTDB_CONFIG.getPipeConsensusPipelineSize()
                   && reqExecutionOrderBuffer.first() != null
-                  && reqExecutionOrderBuffer.first().equals(tCommitId)) {
+                  && reqExecutionOrderBuffer.first().equals(requestMeta)) {
+                long startApplyNanos = System.nanoTime();
+                metric.recordDispatchWaitingTimer(startApplyNanos - 
startDispatchNanos);
+                requestMeta.setStartApplyNanos(startApplyNanos);
                 TPipeConsensusTransferResp resp = loadEvent(req);
 
                 if (resp != null
                     && resp.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-                  onSuccess(tCommitId.getCommitIndex());
+                  onSuccess(tCommitId.getCommitIndex(), isTransferTsFileSeal);
                   // signal all other reqs that may wait for this event
                   condition.signalAll();
                 }
@@ -1216,8 +1297,8 @@ public class PipeConsensusReceiver {
               }
             } catch (InterruptedException e) {
               LOGGER.warn(
-                  "PipeConsensus-ConsensusGroup-{}: current waiting is 
interrupted. onSyncedCommitIndex: {}. Exception: ",
-                  consensusGroupId,
+                  "PipeConsensus-PipeName-{}: current waiting is interrupted. 
onSyncedCommitIndex: {}. Exception: ",
+                  consensusPipeName,
                   tCommitId.getCommitIndex(),
                   e);
               Thread.currentThread().interrupt();
@@ -1240,12 +1321,76 @@ public class PipeConsensusReceiver {
      */
     private void resetWithNewestRebootTime(int connectorRebootTimes) {
       LOGGER.info(
-          "PipeConsensus-ConsensusGroup-{}: receiver detected an newer 
rebootTimes, which indicates the leader has rebooted. receiver will reset all 
its data.",
-          consensusGroupId);
+          "PipeConsensus-PipeName-{}: receiver detected an newer rebootTimes, 
which indicates the leader has rebooted. receiver will reset all its data.",
+          consensusPipeName);
       this.reqExecutionOrderBuffer.clear();
       this.onSyncedCommitIndex = -1;
       // sync the follower's connectorRebootTimes with connector's actual 
rebootTimes
       this.connectorRebootTimes = connectorRebootTimes;
     }
   }
+
+  private static class RequestMeta {
+    private final TCommitId commitId;
+    private long startApplyNanos = 0;
+
+    public RequestMeta(TCommitId commitId) {
+      this.commitId = commitId;
+    }
+
+    public int getRebootTimes() {
+      return commitId.getRebootTimes();
+    }
+
+    public long getCommitIndex() {
+      return commitId.getCommitIndex();
+    }
+
+    public void setStartApplyNanos(long startApplyNanos) {
+      // Notice that a tsFileInsertionEvent will enter RequestExecutor 
multiple times, we only need
+      // to record the time of the first apply
+      if (startApplyNanos == 0) {
+        this.startApplyNanos = startApplyNanos;
+      }
+    }
+
+    public long getStartApplyNanos() {
+      return startApplyNanos;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      RequestMeta that = (RequestMeta) o;
+      return commitId.equals(that.commitId);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(commitId);
+    }
+  }
+
+  //////////////////////////// APIs provided for metric framework 
////////////////////////////
+
+  public int getReceiveBufferSize() {
+    return this.requestExecutor.reqExecutionOrderBuffer.size();
+  }
+
+  public int getWALEventCount() {
+    return this.requestExecutor.WALEventCount;
+  }
+
+  public int getTsFileEventCount() {
+    return this.requestExecutor.tsFileEventCount;
+  }
+
+  public String getConsensusGroupIdStr() {
+    return consensusGroupId.toString();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 2b48becaed2..32f37e20818 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.db.pipe.metric.PipeProcessorMetrics;
+import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor;
 import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.utils.ErrorHandlingUtils;
@@ -148,8 +149,12 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
               outputEventCollector);
         }
       }
+
       final boolean shouldReport =
-          !isClosed.get() && 
outputEventCollector.hasNoCollectInvocationAfterReset();
+          !isClosed.get()
+              && outputEventCollector.hasNoCollectInvocationAfterReset()
+              // Events generated from consensusPipe's transferred data should 
never be reported.
+              && !(pipeProcessor instanceof PipeConsensusProcessor);
       if (shouldReport && event instanceof EnrichedEvent) {
         PipeEventCommitManager.getInstance()
             .enrichWithCommitterKeyAndCommitId((EnrichedEvent) event, 
creationTime, regionId);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/PipeConsensusClientMgrContainer.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/PipeConsensusClientMgrContainer.java
index 5a52eb34409..6fc04163433 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/PipeConsensusClientMgrContainer.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/container/PipeConsensusClientMgrContainer.java
@@ -42,7 +42,7 @@ public class PipeConsensusClientMgrContainer {
   private final IClientManager<TEndPoint, AsyncPipeConsensusServiceClient> 
asyncClientManager;
   private final IClientManager<TEndPoint, SyncPipeConsensusServiceClient> 
syncClientManager;
 
-  public PipeConsensusClientMgrContainer() {
+  private PipeConsensusClientMgrContainer() {
     // load rpc client config
     PipeConsensusClientProperty config =
         PipeConsensusClientProperty.newBuilder()
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index 8d6879ae8c2..dfcf35c03ca 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -195,6 +195,7 @@ public class PipeConnectorConstant {
   public static final String SINK_CONSUMER_GROUP_KEY = "sink.consumer-group";
 
   public static final String CONNECTOR_CONSENSUS_GROUP_ID_KEY = 
"connector.consensus.group-id";
+  public static final String CONNECTOR_CONSENSUS_PIPE_NAME = 
"connector.consensus.pipe-name";
 
   private PipeConnectorConstant() {
     throw new IllegalStateException("Utility class");
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index 3742c3c6028..b43e37788cd 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -342,8 +342,8 @@ public abstract class EnrichedEvent implements Event {
   }
 
   /**
-   * Used for pipeConsensus. In PipeConsensus, we only need commiterKey, 
commitId and rebootTimes to
-   * uniquely identify an event
+   * Used for pipeConsensus. In PipeConsensus, we only need committerKey, 
commitId and rebootTimes
+   * to uniquely identify an event
    */
   public boolean equalsInPipeConsensus(Object o) {
     if (this == o) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitManager.java
index 9f9369ae56b..0b740f7e12a 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitManager.java
@@ -137,6 +137,14 @@ public class PipeEventCommitManager {
     // Do nothing but make it private.
   }
 
+  public long getGivenConsensusPipeCommitId(String committerKey) {
+    final PipeEventCommitter committer = eventCommitterMap.get(committerKey);
+    if (committer == null) {
+      return 0;
+    }
+    return committer.getCurrentCommitId();
+  }
+
   private static class PipeEventCommitManagerHolder {
     private static final PipeEventCommitManager INSTANCE = new 
PipeEventCommitManager();
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitter.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitter.java
index 04502441daf..559e53ee15b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitter.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitter.java
@@ -136,4 +136,8 @@ public class PipeEventCommitter {
   public long commitQueueSize() {
     return commitQueue.size();
   }
+
+  public long getCurrentCommitId() {
+    return commitIdGenerator.get();
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index c207435d3f1..76a40ce2e3d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -55,6 +55,11 @@ public enum Metric {
   IOT_CONSENSUS("iot_consensus"),
   IOT_SEND_LOG("iot_send_log"),
   IOT_RECEIVE_LOG("iot_receive_log"),
+  PIPE_CONSENSUS("pipe_consensus"),
+  PIPE_CONSENSUS_MODE("pipe_consensus_mode"),
+  PIPE_SEND_EVENT("pipe_send_event"),
+  PIPE_RETRY_SEND_EVENT("pipe_retry_send_event"),
+  PIPE_RECEIVE_EVENT("pipe_receive_event"),
   RATIS_CONSENSUS_WRITE("ratis_consensus_write"),
   RATIS_CONSENSUS_READ("ratis_consensus_read"),
   // storage engine related


Reply via email to