This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5eff10ec013 Pipe: integrate pipe with metrics framework & add assigner 
/ extractor / processor / connector / heartbeat metrics (#11302)
5eff10ec013 is described below

commit 5eff10ec013f58f61c268e3ef36549c51ce75413
Author: V_Galaxy <[email protected]>
AuthorDate: Thu Oct 19 23:38:07 2023 +0800

    Pipe: integrate pipe with metrics framework & add assigner / extractor / 
processor / connector / heartbeat metrics (#11302)
---
 .../event/common/heartbeat/PipeHeartbeatEvent.java |   7 +
 .../pipe/extractor/IoTDBDataRegionExtractor.java   |  50 ++++-
 .../PipeHistoricalDataRegionExtractor.java         |   2 +
 .../PipeHistoricalDataRegionTsFileExtractor.java   |   5 +
 .../realtime/PipeRealtimeDataRegionExtractor.java  |  12 +
 .../realtime/assigner/DisruptorQueue.java          |  23 +-
 .../realtime/assigner/PipeDataRegionAssigner.java  |  24 +-
 .../listener/PipeInsertionDataNodeListener.java    |   2 +-
 .../iotdb/db/pipe/metric/PipeAssignerMetrics.java  | 165 ++++++++++++++
 .../iotdb/db/pipe/metric/PipeConnectorMetrics.java | 232 ++++++++++++++++++++
 .../iotdb/db/pipe/metric/PipeEventCounter.java     |  79 +++++++
 .../iotdb/db/pipe/metric/PipeExtractorMetrics.java | 244 +++++++++++++++++++++
 .../db/pipe/metric/PipeHeartbeatEventMetrics.java  | 125 +++++++++++
 .../apache/iotdb/db/pipe/metric/PipeMetrics.java   |  65 ++++++
 .../iotdb/db/pipe/metric/PipeProcessorMetrics.java | 232 ++++++++++++++++++++
 .../pipe/task/connection/BlockingPendingQueue.java |  52 ++---
 .../db/pipe/task/connection/EnrichedDeque.java     |  33 +--
 .../pipe/task/connection/PipeEventCollector.java   |  12 +
 .../db/pipe/task/stage/PipeTaskProcessorStage.java |   2 +-
 .../subtask/connector/PipeConnectorSubtask.java    |  18 ++
 .../subtask/processor/PipeProcessorSubtask.java    |  18 ++
 .../db/service/metrics/DataNodeMetricsHelper.java  |   4 +
 .../iotdb/commons/service/metric/enums/Metric.java |  27 ++-
 23 files changed, 1365 insertions(+), 68 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index abbfdd8c24d..397babdc431 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionHybridExtractor;
+import org.apache.iotdb.db.pipe.metric.PipeHeartbeatEventMetrics;
 import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.task.connection.EnrichedDeque;
 import org.apache.iotdb.db.pipe.task.connection.UnboundedBlockingPendingQueue;
@@ -133,18 +134,24 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
   public void onAssigned() {
     if (shouldPrintMessage) {
       timeAssigned = System.currentTimeMillis();
+      PipeHeartbeatEventMetrics.getInstance()
+          .recordPublishedToAssignedTime(timeAssigned - timePublished);
     }
   }
 
   public void onProcessed() {
     if (shouldPrintMessage) {
       timeProcessed = System.currentTimeMillis();
+      PipeHeartbeatEventMetrics.getInstance()
+          .recordAssignedToProcessedTime(timeProcessed - timeAssigned);
     }
   }
 
   public void onTransferred() {
     if (shouldPrintMessage) {
       timeTransferred = System.currentTimeMillis();
+      PipeHeartbeatEventMetrics.getInstance()
+          .recordProcessedToTransferredTime(timeTransferred - timeProcessed);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
index a895ef39fd9..c0f120a0d40 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.extractor;
 
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import 
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
+import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.extractor.historical.PipeHistoricalDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.historical.PipeHistoricalDataRegionTsFileExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor;
@@ -28,17 +29,21 @@ import 
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionFakeExt
 import 
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionHybridExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionLogExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionTsFileExtractor;
+import org.apache.iotdb.db.pipe.metric.PipeExtractorMetrics;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.pipe.api.PipeExtractor;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -59,6 +64,7 @@ public class IoTDBDataRegionExtractor implements 
PipeExtractor {
   private PipeHistoricalDataRegionExtractor historicalExtractor;
   private PipeRealtimeDataRegionExtractor realtimeExtractor;
 
+  private String taskID;
   private int dataRegionId;
 
   public IoTDBDataRegionExtractor() {
@@ -145,9 +151,15 @@ public class IoTDBDataRegionExtractor implements 
PipeExtractor {
       throws Exception {
     dataRegionId =
         ((PipeTaskExtractorRuntimeEnvironment) 
configuration.getRuntimeEnvironment()).getRegionId();
+    String pipeName = configuration.getRuntimeEnvironment().getPipeName();
+    long creationTime = 
configuration.getRuntimeEnvironment().getCreationTime();
+    taskID = pipeName + "_" + dataRegionId + "_" + creationTime;
 
     historicalExtractor.customize(parameters, configuration);
     realtimeExtractor.customize(parameters, configuration);
+
+    // register metric after generating taskID
+    PipeExtractorMetrics.getInstance().register(this);
   }
 
   @Override
@@ -216,14 +228,46 @@ public class IoTDBDataRegionExtractor implements 
PipeExtractor {
 
   @Override
   public Event supply() throws Exception {
-    return historicalExtractor.hasConsumedAll()
-        ? realtimeExtractor.supply()
-        : historicalExtractor.supply();
+    Event event =
+        historicalExtractor.hasConsumedAll()
+            ? realtimeExtractor.supply()
+            : historicalExtractor.supply();
+    if (Objects.nonNull(event)) {
+      if (event instanceof TabletInsertionEvent) {
+        PipeExtractorMetrics.getInstance().getTabletRate(taskID).mark();
+      } else if (event instanceof TsFileInsertionEvent) {
+        PipeExtractorMetrics.getInstance().getTsFileRate(taskID).mark();
+      } else if (event instanceof PipeHeartbeatEvent) {
+        PipeExtractorMetrics.getInstance().getPipeHeartbeatRate(taskID).mark();
+      }
+    }
+    return event;
   }
 
   @Override
   public void close() throws Exception {
     historicalExtractor.close();
     realtimeExtractor.close();
+    PipeExtractorMetrics.getInstance().deregister(taskID);
+  }
+
+  public String getTaskID() {
+    return taskID;
+  }
+
+  public int getHistoricalTsFileInsertionEventCount() {
+    return hasBeenStarted.get() ? historicalExtractor.getPendingQueueSize() : 
0;
+  }
+
+  public int getTabletInsertionEventCount() {
+    return hasBeenStarted.get() ? 
realtimeExtractor.getTabletInsertionEventCount() : 0;
+  }
+
+  public int getRealtimeTsFileInsertionEventCount() {
+    return hasBeenStarted.get() ? 
realtimeExtractor.getTsFileInsertionEventCount() : 0;
+  }
+
+  public int getPipeHeartbeatEventCount() {
+    return hasBeenStarted.get() ? 
realtimeExtractor.getPipeHeartbeatEventCount() : 0;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionExtractor.java
index 4cba6de3f01..3a841d72033 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionExtractor.java
@@ -24,4 +24,6 @@ import org.apache.iotdb.pipe.api.PipeExtractor;
 public interface PipeHistoricalDataRegionExtractor extends PipeExtractor {
 
   boolean hasConsumedAll();
+
+  int getPendingQueueSize();
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 6094b9766a0..80e22d8c220 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -304,6 +304,11 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
     return pendingQueue != null && pendingQueue.isEmpty();
   }
 
+  @Override
+  public int getPendingQueueSize() {
+    return pendingQueue.size();
+  }
+
   @Override
   public synchronized void close() {
     if (pendingQueue != null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
index 01a3df8b3f6..07d7a1c4183 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
@@ -157,4 +157,16 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
         + '\''
         + '}';
   }
+
+  public int getTabletInsertionEventCount() {
+    return pendingQueue.getTabletInsertionEventCount();
+  }
+
+  public int getTsFileInsertionEventCount() {
+    return pendingQueue.getTsFileInsertionEventCount();
+  }
+
+  public int getPipeHeartbeatEventCount() {
+    return pendingQueue.getPipeHeartbeatEventCount();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
index 6113045ae04..5a0a0142e29 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.db.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
+import org.apache.iotdb.db.pipe.metric.PipeEventCounter;
 
 import com.lmax.disruptor.BlockingWaitStrategy;
 import com.lmax.disruptor.EventHandler;
@@ -41,6 +42,8 @@ public class DisruptorQueue {
   private final Disruptor<EventContainer> disruptor;
   private final RingBuffer<EventContainer> ringBuffer;
 
+  private final PipeEventCounter eventCounter = new PipeEventCounter();
+
   public DisruptorQueue(EventHandler<PipeRealtimeEvent> eventHandler) {
     disruptor =
         new Disruptor<>(
@@ -50,8 +53,11 @@ public class DisruptorQueue {
             ProducerType.MULTI,
             new BlockingWaitStrategy());
     disruptor.handleEventsWith(
-        (container, sequence, endOfBatch) ->
-            eventHandler.onEvent(container.getEvent(), sequence, endOfBatch));
+        (container, sequence, endOfBatch) -> {
+          eventHandler.onEvent(container.getEvent(), sequence, endOfBatch);
+          EnrichedEvent innerEvent = container.getEvent().getEvent();
+          eventCounter.decreaseEventCount(innerEvent);
+        });
     disruptor.setDefaultExceptionHandler(new DisruptorQueueExceptionHandler());
 
     ringBuffer = disruptor.start();
@@ -63,6 +69,7 @@ public class DisruptorQueue {
       ((PipeHeartbeatEvent) internalEvent).recordDisruptorSize(ringBuffer);
     }
     ringBuffer.publishEvent((container, sequence, o) -> 
container.setEvent(event), event);
+    eventCounter.increaseEventCount(internalEvent);
   }
 
   public void clear() {
@@ -83,4 +90,16 @@ public class DisruptorQueue {
       this.event = event;
     }
   }
+
+  public int getTabletInsertionEventCount() {
+    return eventCounter.getTabletInsertionEventCount();
+  }
+
+  public int getTsFileInsertionEventCount() {
+    return eventCounter.getTsFileInsertionEventCount();
+  }
+
+  public int getPipeHeartbeatEventCount() {
+    return eventCounter.getPipeHeartbeatEventCount();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
index 7ac257fcc74..0042c9d3c91 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/PipeDataRegionAssigner.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import 
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.realtime.matcher.CachedSchemaPatternMatcher;
 import 
org.apache.iotdb.db.pipe.extractor.realtime.matcher.PipeDataRegionMatcher;
+import org.apache.iotdb.db.pipe.metric.PipeAssignerMetrics;
 
 public class PipeDataRegionAssigner {
 
@@ -34,9 +35,17 @@ public class PipeDataRegionAssigner {
   /** The disruptor is used to assign the event to the extractor. */
   private final DisruptorQueue disruptor;
 
-  public PipeDataRegionAssigner() {
+  private final String dataRegionId;
+
+  public String getDataRegionId() {
+    return dataRegionId;
+  }
+
+  public PipeDataRegionAssigner(String dataRegionId) {
     this.matcher = new CachedSchemaPatternMatcher();
     this.disruptor = new DisruptorQueue(this::assignToExtractor);
+    this.dataRegionId = dataRegionId;
+    PipeAssignerMetrics.getInstance().register(this);
   }
 
   public void publishToAssign(PipeRealtimeEvent event) {
@@ -92,7 +101,20 @@ public class PipeDataRegionAssigner {
    * method.
    */
   public void gc() {
+    PipeAssignerMetrics.getInstance().deregister(dataRegionId);
     matcher.clear();
     disruptor.clear();
   }
+
+  public int getTabletInsertionEventCount() {
+    return disruptor.getTabletInsertionEventCount();
+  }
+
+  public int getTsFileInsertionEventCount() {
+    return disruptor.getTsFileInsertionEventCount();
+  }
+
+  public int getPipeHeartbeatEventCount() {
+    return disruptor.getPipeHeartbeatEventCount();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
index e3c7a62b6d9..5bee5216451 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
@@ -54,7 +54,7 @@ public class PipeInsertionDataNodeListener {
   public synchronized void startListenAndAssign(
       String dataRegionId, PipeRealtimeDataRegionExtractor extractor) {
     dataRegionId2Assigner
-        .computeIfAbsent(dataRegionId, o -> new PipeDataRegionAssigner())
+        .computeIfAbsent(dataRegionId, o -> new 
PipeDataRegionAssigner(dataRegionId))
         .startAssignTo(extractor);
 
     if (extractor.isNeedListenToTsFile()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeAssignerMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeAssignerMetrics.java
new file mode 100644
index 00000000000..30069b90c84
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeAssignerMetrics.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import 
org.apache.iotdb.db.pipe.extractor.realtime.assigner.PipeDataRegionAssigner;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+public class PipeAssignerMetrics implements IMetricSet {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeAssignerMetrics.class);
+
+  private AbstractMetricService metricService;
+
+  private final Map<String, PipeDataRegionAssigner> assignerMap = new 
HashMap<>();
+
+  //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
+
+  @Override
+  public void bindTo(AbstractMetricService metricService) {
+    this.metricService = metricService;
+    synchronized (this) {
+      for (String dataRegionId : assignerMap.keySet()) {
+        createMetrics(dataRegionId);
+      }
+    }
+  }
+
+  private void createMetrics(String dataRegionId) {
+    createAutoGauge(dataRegionId);
+  }
+
+  private void createAutoGauge(String dataRegionId) {
+    metricService.createAutoGauge(
+        Metric.UNASSIGNED_HEARTBEAT_COUNT.toString(),
+        MetricLevel.IMPORTANT,
+        assignerMap.get(dataRegionId),
+        PipeDataRegionAssigner::getPipeHeartbeatEventCount,
+        Tag.REGION.toString(),
+        dataRegionId);
+    metricService.createAutoGauge(
+        Metric.UNASSIGNED_TABLET_COUNT.toString(),
+        MetricLevel.IMPORTANT,
+        assignerMap.get(dataRegionId),
+        PipeDataRegionAssigner::getTabletInsertionEventCount,
+        Tag.REGION.toString(),
+        dataRegionId);
+    metricService.createAutoGauge(
+        Metric.UNASSIGNED_TSFILE_COUNT.toString(),
+        MetricLevel.IMPORTANT,
+        assignerMap.get(dataRegionId),
+        PipeDataRegionAssigner::getTsFileInsertionEventCount,
+        Tag.REGION.toString(),
+        dataRegionId);
+  }
+
+  @Override
+  public void unbindFrom(AbstractMetricService metricService) {
+    ImmutableSet<String> dataRegionIds = 
ImmutableSet.copyOf(assignerMap.keySet());
+    for (String dataRegionId : dataRegionIds) {
+      deregister(dataRegionId);
+    }
+    if (!assignerMap.isEmpty()) {
+      LOGGER.warn("Failed to unbind from pipe assigner metrics, assigner map 
not empty");
+    }
+  }
+
+  private void removeMetrics(String dataRegionId) {
+    removeAutoGauge(dataRegionId);
+  }
+
+  private void removeAutoGauge(String dataRegionId) {
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.UNASSIGNED_HEARTBEAT_COUNT.toString(),
+        Tag.REGION.toString(),
+        dataRegionId);
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.UNASSIGNED_TABLET_COUNT.toString(),
+        Tag.REGION.toString(),
+        dataRegionId);
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.UNASSIGNED_TSFILE_COUNT.toString(),
+        Tag.REGION.toString(),
+        dataRegionId);
+  }
+
+  //////////////////////////// register & deregister (pipe integration) 
////////////////////////////
+
+  public void register(@NonNull PipeDataRegionAssigner pipeDataRegionAssigner) 
{
+    String dataRegionId = pipeDataRegionAssigner.getDataRegionId();
+    synchronized (this) {
+      assignerMap.putIfAbsent(dataRegionId, pipeDataRegionAssigner);
+      if (Objects.nonNull(metricService)) {
+        createMetrics(dataRegionId);
+      }
+    }
+  }
+
+  public void deregister(String dataRegionId) {
+    synchronized (this) {
+      if (!assignerMap.containsKey(dataRegionId)) {
+        LOGGER.warn(
+            "Failed to deregister pipe assigner metrics, 
PipeDataRegionAssigner({}) does not exist",
+            dataRegionId);
+        return;
+      }
+      if (Objects.nonNull(metricService)) {
+        removeMetrics(dataRegionId);
+      }
+      assignerMap.remove(dataRegionId);
+    }
+  }
+
+  //////////////////////////// singleton ////////////////////////////
+
+  private static class PipeAssignerMetricsHolder {
+
+    private static final PipeAssignerMetrics INSTANCE = new 
PipeAssignerMetrics();
+
+    private PipeAssignerMetricsHolder() {
+      // empty constructor
+    }
+  }
+
+  public static PipeAssignerMetrics getInstance() {
+    return PipeAssignerMetrics.PipeAssignerMetricsHolder.INSTANCE;
+  }
+
+  private PipeAssignerMetrics() {
+    // empty constructor
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
new file mode 100644
index 00000000000..b37aa2f056e
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Rate;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class PipeConnectorMetrics implements IMetricSet {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeConnectorMetrics.class);
+
+  private AbstractMetricService metricService;
+
+  private final Map<String, PipeConnectorSubtask> connectorMap = new 
HashMap<>();
+
+  private final Map<String, Rate> tabletRateMap = new ConcurrentHashMap<>();
+
+  private final Map<String, Rate> tsFileRateMap = new ConcurrentHashMap<>();
+
+  private final Map<String, Rate> pipeHeartbeatRateMap = new 
ConcurrentHashMap<>();
+
+  //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
+
+  @Override
+  public void bindTo(AbstractMetricService metricService) {
+    this.metricService = metricService;
+    synchronized (this) {
+      for (String taskID : connectorMap.keySet()) {
+        createMetrics(taskID);
+      }
+    }
+  }
+
+  private void createMetrics(String taskID) {
+    createAutoGauge(taskID);
+    createRate(taskID);
+  }
+
+  private void createAutoGauge(String taskID) {
+    metricService.createAutoGauge(
+        Metric.UNTRANSFERRED_TABLET_COUNT.toString(),
+        MetricLevel.IMPORTANT,
+        connectorMap.get(taskID),
+        PipeConnectorSubtask::getTabletInsertionEventCount,
+        Tag.NAME.toString(),
+        taskID);
+    metricService.createAutoGauge(
+        Metric.UNTRANSFERRED_TSFILE_COUNT.toString(),
+        MetricLevel.IMPORTANT,
+        connectorMap.get(taskID),
+        PipeConnectorSubtask::getTsFileInsertionEventCount,
+        Tag.NAME.toString(),
+        taskID);
+    metricService.createAutoGauge(
+        Metric.UNTRANSFERRED_HEARTBEAT_COUNT.toString(),
+        MetricLevel.IMPORTANT,
+        connectorMap.get(taskID),
+        PipeConnectorSubtask::getPipeHeartbeatEventCount,
+        Tag.NAME.toString(),
+        taskID);
+  }
+
+  private void createRate(String taskID) {
+    tabletRateMap.put(
+        taskID,
+        metricService.getOrCreateRate(
+            Metric.PIPE_CONNECTOR_TABLET_TRANSFER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            taskID));
+    tsFileRateMap.put(
+        taskID,
+        metricService.getOrCreateRate(
+            Metric.PIPE_CONNECTOR_TSFILE_TRANSFER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            taskID));
+    pipeHeartbeatRateMap.put(
+        taskID,
+        metricService.getOrCreateRate(
+            Metric.PIPE_CONNECTOR_HEARTBEAT_TRANSFER.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            taskID));
+  }
+
+  @Override
+  public void unbindFrom(AbstractMetricService metricService) {
+    ImmutableSet<String> taskIDs = ImmutableSet.copyOf(connectorMap.keySet());
+    for (String taskID : taskIDs) {
+      deregister(taskID);
+    }
+    if (!connectorMap.isEmpty()) {
+      LOGGER.warn("Failed to unbind from pipe connector metrics, connector map 
not empty");
+    }
+  }
+
+  private void removeMetrics(String taskID) {
+    removeAutoGauge(taskID);
+    removeRate(taskID);
+  }
+
+  private void removeAutoGauge(String taskID) {
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.UNTRANSFERRED_TABLET_COUNT.toString(),
+        Tag.NAME.toString(),
+        taskID);
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.UNTRANSFERRED_TSFILE_COUNT.toString(),
+        Tag.NAME.toString(),
+        taskID);
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.UNTRANSFERRED_HEARTBEAT_COUNT.toString(),
+        Tag.NAME.toString(),
+        taskID);
+  }
+
+  private void removeRate(String taskID) {
+    metricService.remove(
+        MetricType.RATE,
+        Metric.PIPE_CONNECTOR_TABLET_TRANSFER.toString(),
+        Tag.NAME.toString(),
+        taskID);
+    metricService.remove(
+        MetricType.RATE,
+        Metric.PIPE_CONNECTOR_TSFILE_TRANSFER.toString(),
+        Tag.NAME.toString(),
+        taskID);
+    metricService.remove(
+        MetricType.RATE,
+        Metric.PIPE_CONNECTOR_HEARTBEAT_TRANSFER.toString(),
+        Tag.NAME.toString(),
+        taskID);
+    tabletRateMap.remove(taskID);
+    tsFileRateMap.remove(taskID);
+    pipeHeartbeatRateMap.remove(taskID);
+  }
+
+  //////////////////////////// register & deregister (pipe integration) 
////////////////////////////
+
+  public void register(@NonNull PipeConnectorSubtask pipeConnectorSubtask) {
+    String taskID = pipeConnectorSubtask.getTaskID();
+    synchronized (this) {
+      connectorMap.putIfAbsent(taskID, pipeConnectorSubtask);
+      if (Objects.nonNull(metricService)) {
+        createMetrics(taskID);
+      }
+    }
+  }
+
+  public void deregister(String taskID) {
+    synchronized (this) {
+      if (!connectorMap.containsKey(taskID)) {
+        LOGGER.info(
+            "Failed to deregister pipe connector metrics, 
PipeConnectorSubtask({}) does not exist",
+            taskID);
+        return;
+      }
+      if (Objects.nonNull(metricService)) {
+        removeMetrics(taskID);
+      }
+      connectorMap.remove(taskID);
+    }
+  }
+
+  public Rate getTabletRate(String taskID) {
+    return tabletRateMap.get(taskID);
+  }
+
+  public Rate getTsFileRate(String taskID) {
+    return tsFileRateMap.get(taskID);
+  }
+
+  public Rate getPipeHeartbeatRate(String taskID) {
+    return pipeHeartbeatRateMap.get(taskID);
+  }
+
+  //////////////////////////// singleton ////////////////////////////
+
+  private static class PipeConnectorMetricsHolder {
+
+    private static final PipeConnectorMetrics INSTANCE = new 
PipeConnectorMetrics();
+
+    private PipeConnectorMetricsHolder() {
+      // empty constructor
+    }
+  }
+
+  public static PipeConnectorMetrics getInstance() {
+    return PipeConnectorMetrics.PipeConnectorMetricsHolder.INSTANCE;
+  }
+
+  private PipeConnectorMetrics() {
+    // empty constructor
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeEventCounter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeEventCounter.java
new file mode 100644
index 00000000000..551d5ed1699
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeEventCounter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.metric;
+
+import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class PipeEventCounter {
+
+  private final AtomicInteger tabletInsertionEventCount = new AtomicInteger(0);
+  private final AtomicInteger tsFileInsertionEventCount = new AtomicInteger(0);
+  private final AtomicInteger pipeHeartbeatEventCount = new AtomicInteger(0);
+
+  public Integer getTsFileInsertionEventCount() {
+    return tsFileInsertionEventCount.get();
+  }
+
+  public Integer getTabletInsertionEventCount() {
+    return tabletInsertionEventCount.get();
+  }
+
+  public Integer getPipeHeartbeatEventCount() {
+    return pipeHeartbeatEventCount.get();
+  }
+
+  public void increaseEventCount(Event event) {
+    if (Objects.isNull(event)) {
+      return;
+    }
+    if (event instanceof PipeHeartbeatEvent) {
+      pipeHeartbeatEventCount.incrementAndGet();
+    } else if (event instanceof TabletInsertionEvent) {
+      tabletInsertionEventCount.incrementAndGet();
+    } else if (event instanceof TsFileInsertionEvent) {
+      tsFileInsertionEventCount.incrementAndGet();
+    }
+  }
+
+  public void decreaseEventCount(Event event) {
+    if (Objects.isNull(event)) {
+      return;
+    }
+    if (event instanceof PipeHeartbeatEvent) {
+      pipeHeartbeatEventCount.decrementAndGet();
+    } else if (event instanceof TabletInsertionEvent) {
+      tabletInsertionEventCount.decrementAndGet();
+    } else if (event instanceof TsFileInsertionEvent) {
+      tsFileInsertionEventCount.decrementAndGet();
+    }
+  }
+
+  public void reset() {
+    tabletInsertionEventCount.set(0);
+    tsFileInsertionEventCount.set(0);
+    pipeHeartbeatEventCount.set(0);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
new file mode 100644
index 00000000000..eb30a045e00
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.pipe.extractor.IoTDBDataRegionExtractor;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Rate;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class PipeExtractorMetrics implements IMetricSet {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeExtractorMetrics.class);
+
+  private final Map<String, IoTDBDataRegionExtractor> extractorMap = new 
HashMap<>();
+
+  private final Map<String, Rate> tabletRateMap = new ConcurrentHashMap<>();
+
+  private final Map<String, Rate> tsFileRateMap = new ConcurrentHashMap<>();
+
+  private final Map<String, Rate> pipeHeartbeatRateMap = new 
ConcurrentHashMap<>();
+
+  private AbstractMetricService metricService;
+
+  //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
+
+  @Override
+  public void bindTo(AbstractMetricService metricService) {
+    this.metricService = metricService;
+    synchronized (this) {
+      for (String taskID : extractorMap.keySet()) {
+        createMetrics(taskID);
+      }
+    }
+  }
+
+  private void createMetrics(String taskID) {
+    createAutoGauge(taskID);
+    createRate(taskID);
+  }
+
+  private void createAutoGauge(String taskID) {
+    metricService.createAutoGauge(
+        Metric.UNPROCESSED_HISTORICAL_TSFILE_COUNT.toString(),
+        MetricLevel.IMPORTANT,
+        extractorMap.get(taskID),
+        IoTDBDataRegionExtractor::getHistoricalTsFileInsertionEventCount,
+        Tag.NAME.toString(),
+        taskID);
+    metricService.createAutoGauge(
+        Metric.UNPROCESSED_REALTIME_TSFILE_COUNT.toString(),
+        MetricLevel.IMPORTANT,
+        extractorMap.get(taskID),
+        IoTDBDataRegionExtractor::getRealtimeTsFileInsertionEventCount,
+        Tag.NAME.toString(),
+        taskID);
+    metricService.createAutoGauge(
+        Metric.UNPROCESSED_TABLET_COUNT.toString(),
+        MetricLevel.IMPORTANT,
+        extractorMap.get(taskID),
+        IoTDBDataRegionExtractor::getTabletInsertionEventCount,
+        Tag.NAME.toString(),
+        taskID);
+    metricService.createAutoGauge(
+        Metric.UNPROCESSED_HEARTBEAT_COUNT.toString(),
+        MetricLevel.IMPORTANT,
+        extractorMap.get(taskID),
+        IoTDBDataRegionExtractor::getPipeHeartbeatEventCount,
+        Tag.NAME.toString(),
+        taskID);
+  }
+
+  private void createRate(String taskID) {
+    tabletRateMap.put(
+        taskID,
+        metricService.getOrCreateRate(
+            Metric.PIPE_EXTRACTOR_TABLET_SUPPLY.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            taskID));
+    tsFileRateMap.put(
+        taskID,
+        metricService.getOrCreateRate(
+            Metric.PIPE_EXTRACTOR_TSFILE_SUPPLY.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            taskID));
+    pipeHeartbeatRateMap.put(
+        taskID,
+        metricService.getOrCreateRate(
+            Metric.PIPE_EXTRACTOR_HEARTBEAT_SUPPLY.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            taskID));
+  }
+
+  @Override
+  public void unbindFrom(AbstractMetricService metricService) {
+    ImmutableSet<String> taskIDs = ImmutableSet.copyOf(extractorMap.keySet());
+    for (String taskID : taskIDs) {
+      deregister(taskID);
+    }
+    if (!extractorMap.isEmpty()) {
+      LOGGER.warn("Failed to unbind from pipe extractor metrics, extractor map 
not empty");
+    }
+  }
+
+  private void removeMetrics(String taskID) {
+    removeAutoGauge(taskID);
+    removeRate(taskID);
+  }
+
+  private void removeAutoGauge(String taskID) {
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.UNPROCESSED_HISTORICAL_TSFILE_COUNT.toString(),
+        Tag.NAME.toString(),
+        taskID);
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.UNPROCESSED_REALTIME_TSFILE_COUNT.toString(),
+        Tag.NAME.toString(),
+        taskID);
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.UNPROCESSED_TABLET_COUNT.toString(),
+        Tag.NAME.toString(),
+        taskID);
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.UNPROCESSED_HEARTBEAT_COUNT.toString(),
+        Tag.NAME.toString(),
+        taskID);
+  }
+
+  private void removeRate(String taskID) {
+    metricService.remove(
+        MetricType.RATE,
+        Metric.PIPE_EXTRACTOR_TABLET_SUPPLY.toString(),
+        Tag.NAME.toString(),
+        taskID);
+    metricService.remove(
+        MetricType.RATE,
+        Metric.PIPE_EXTRACTOR_TSFILE_SUPPLY.toString(),
+        Tag.NAME.toString(),
+        taskID);
+    metricService.remove(
+        MetricType.RATE,
+        Metric.PIPE_EXTRACTOR_HEARTBEAT_SUPPLY.toString(),
+        Tag.NAME.toString(),
+        taskID);
+    tabletRateMap.remove(taskID);
+    tsFileRateMap.remove(taskID);
+    pipeHeartbeatRateMap.remove(taskID);
+  }
+
+  //////////////////////////// register & deregister (pipe integration) 
////////////////////////////
+
+  public void register(@NonNull IoTDBDataRegionExtractor extractor) {
+    String taskID = extractor.getTaskID();
+    synchronized (this) {
+      extractorMap.putIfAbsent(taskID, extractor);
+      if (Objects.nonNull(metricService)) {
+        createMetrics(taskID);
+      }
+    }
+  }
+
+  public void deregister(String taskID) {
+    synchronized (this) {
+      if (!extractorMap.containsKey(taskID)) {
+        LOGGER.info(
+            "Failed to deregister pipe extractor metrics, 
IoTDBDataRegionExtractor({}) does not exist",
+            taskID);
+        return;
+      }
+      if (Objects.nonNull(metricService)) {
+        removeMetrics(taskID);
+      }
+      extractorMap.remove(taskID);
+    }
+  }
+
+  public Rate getTabletRate(String taskID) {
+    return tabletRateMap.get(taskID);
+  }
+
+  public Rate getTsFileRate(String taskID) {
+    return tsFileRateMap.get(taskID);
+  }
+
+  public Rate getPipeHeartbeatRate(String taskID) {
+    return pipeHeartbeatRateMap.get(taskID);
+  }
+
+  //////////////////////////// singleton ////////////////////////////
+
+  private static class PipeExtractorMetricsHolder {
+
+    private static final PipeExtractorMetrics INSTANCE = new 
PipeExtractorMetrics();
+
+    private PipeExtractorMetricsHolder() {
+      // empty constructor
+    }
+  }
+
+  public static PipeExtractorMetrics getInstance() {
+    return PipeExtractorMetrics.PipeExtractorMetricsHolder.INSTANCE;
+  }
+
+  private PipeExtractorMetrics() {
+    // empty constructor
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeHeartbeatEventMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeHeartbeatEventMetrics.java
new file mode 100644
index 00000000000..882fc51aaa5
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeHeartbeatEventMetrics.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Timer;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class PipeHeartbeatEventMetrics implements IMetricSet {
+
+  private Timer publishedToAssignedTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+
+  private Timer assignedToProcessedTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+
+  private Timer processedToTransferredTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+
+  //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
+
+  @Override
+  public void bindTo(AbstractMetricService metricService) {
+    bindStageTimer(metricService);
+  }
+
+  private void bindStageTimer(AbstractMetricService metricService) {
+    publishedToAssignedTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_HEARTBEAT_EVENT.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.STAGE.toString(),
+            "publishedToAssigned");
+    assignedToProcessedTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_HEARTBEAT_EVENT.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.STAGE.toString(),
+            "assignedToProcessed");
+    processedToTransferredTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_HEARTBEAT_EVENT.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.STAGE.toString(),
+            "processedToTransferred");
+  }
+
+  @Override
+  public void unbindFrom(AbstractMetricService metricService) {
+    unbindStageTimer(metricService);
+  }
+
+  private void unbindStageTimer(AbstractMetricService metricService) {
+    publishedToAssignedTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    assignedToProcessedTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    processedToTransferredTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_HEARTBEAT_EVENT.toString(),
+        Tag.STAGE.toString(),
+        "publishedToAssigned");
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_HEARTBEAT_EVENT.toString(),
+        Tag.STAGE.toString(),
+        "assignedToProcessed");
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_HEARTBEAT_EVENT.toString(),
+        Tag.STAGE.toString(),
+        "processedToTransferred");
+  }
+
+  //////////////////////////// pipe integration ////////////////////////////
+
+  public void recordPublishedToAssignedTime(long costTimeInMillis) {
+    publishedToAssignedTimer.updateMillis(costTimeInMillis);
+  }
+
+  public void recordAssignedToProcessedTime(long costTimeInMillis) {
+    assignedToProcessedTimer.updateMillis(costTimeInMillis);
+  }
+
+  public void recordProcessedToTransferredTime(long costTimeInMillis) {
+    processedToTransferredTimer.updateMillis(costTimeInMillis);
+  }
+
+  //////////////////////////// singleton ////////////////////////////
+
+  private static class PipeHeartbeatEventMetricsHolder {
+
+    private static final PipeHeartbeatEventMetrics INSTANCE = new 
PipeHeartbeatEventMetrics();
+
+    private PipeHeartbeatEventMetricsHolder() {
+      // empty constructor
+    }
+  }
+
+  public static PipeHeartbeatEventMetrics getInstance() {
+    return PipeHeartbeatEventMetrics.PipeHeartbeatEventMetricsHolder.INSTANCE;
+  }
+
+  private PipeHeartbeatEventMetrics() {
+    // empty constructor
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeMetrics.java
new file mode 100644
index 00000000000..8fa6e0c1999
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeMetrics.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.metric;
+
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+
+public class PipeMetrics implements IMetricSet {
+
+  //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
+
+  @Override
+  public void bindTo(AbstractMetricService metricService) {
+    PipeAssignerMetrics.getInstance().bindTo(metricService);
+    PipeExtractorMetrics.getInstance().bindTo(metricService);
+    PipeProcessorMetrics.getInstance().bindTo(metricService);
+    PipeConnectorMetrics.getInstance().bindTo(metricService);
+    PipeHeartbeatEventMetrics.getInstance().bindTo(metricService);
+  }
+
+  @Override
+  public void unbindFrom(AbstractMetricService metricService) {
+    PipeAssignerMetrics.getInstance().unbindFrom(metricService);
+    PipeExtractorMetrics.getInstance().unbindFrom(metricService);
+    PipeProcessorMetrics.getInstance().unbindFrom(metricService);
+    PipeConnectorMetrics.getInstance().unbindFrom(metricService);
+    PipeHeartbeatEventMetrics.getInstance().unbindFrom(metricService);
+  }
+
+  //////////////////////////// singleton ////////////////////////////
+
+  private static class PipeMetricsHolder {
+
+    private static final PipeMetrics INSTANCE = new PipeMetrics();
+
+    private PipeMetricsHolder() {
+      // empty constructor
+    }
+  }
+
+  public static PipeMetrics getInstance() {
+    return PipeMetrics.PipeMetricsHolder.INSTANCE;
+  }
+
+  private PipeMetrics() {
+    // empty constructor
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
new file mode 100644
index 00000000000..550b5bd56cd
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.pipe.task.subtask.processor.PipeProcessorSubtask;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Rate;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class PipeProcessorMetrics implements IMetricSet {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeProcessorMetrics.class);
+
+  private AbstractMetricService metricService;
+
+  private final Map<String, PipeProcessorSubtask> processorMap = new 
HashMap<>();
+
+  private final Map<String, Rate> tabletRateMap = new ConcurrentHashMap<>();
+
+  private final Map<String, Rate> tsFileRateMap = new ConcurrentHashMap<>();
+
+  private final Map<String, Rate> pipeHeartbeatRateMap = new 
ConcurrentHashMap<>();
+
+  //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
+
+  @Override
+  public void bindTo(AbstractMetricService metricService) {
+    this.metricService = metricService;
+    synchronized (this) {
+      for (String taskID : processorMap.keySet()) {
+        createMetrics(taskID);
+      }
+    }
+  }
+
+  private void createMetrics(String taskID) {
+    createAutoGauge(taskID);
+    createRate(taskID);
+  }
+
+  private void createAutoGauge(String taskID) {
+    metricService.createAutoGauge(
+        Metric.BUFFERED_TABLET_COUNT.toString(),
+        MetricLevel.IMPORTANT,
+        processorMap.get(taskID),
+        PipeProcessorSubtask::getTabletInsertionEventCount,
+        Tag.NAME.toString(),
+        taskID);
+    metricService.createAutoGauge(
+        Metric.BUFFERED_TSFILE_COUNT.toString(),
+        MetricLevel.IMPORTANT,
+        processorMap.get(taskID),
+        PipeProcessorSubtask::getTsFileInsertionEventCount,
+        Tag.NAME.toString(),
+        taskID);
+    metricService.createAutoGauge(
+        Metric.BUFFERED_HEARTBEAT_COUNT.toString(),
+        MetricLevel.IMPORTANT,
+        processorMap.get(taskID),
+        PipeProcessorSubtask::getPipeHeartbeatEventCount,
+        Tag.NAME.toString(),
+        taskID);
+  }
+
+  private void createRate(String taskID) {
+    tabletRateMap.put(
+        taskID,
+        metricService.getOrCreateRate(
+            Metric.PIPE_PROCESSOR_TABLET_PROCESS.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            taskID));
+    tsFileRateMap.put(
+        taskID,
+        metricService.getOrCreateRate(
+            Metric.PIPE_PROCESSOR_TSFILE_PROCESS.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            taskID));
+    pipeHeartbeatRateMap.put(
+        taskID,
+        metricService.getOrCreateRate(
+            Metric.PIPE_PROCESSOR_HEARTBEAT_PROCESS.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            taskID));
+  }
+
+  @Override
+  public void unbindFrom(AbstractMetricService metricService) {
+    ImmutableSet<String> taskIDs = ImmutableSet.copyOf(processorMap.keySet());
+    for (String taskID : taskIDs) {
+      deregister(taskID);
+    }
+    if (!processorMap.isEmpty()) {
+      LOGGER.warn("Failed to unbind from pipe processor metrics, processor map 
not empty");
+    }
+  }
+
+  private void removeMetrics(String taskID) {
+    removeAutoGauge(taskID);
+    removeRate(taskID);
+  }
+
+  private void removeAutoGauge(String taskID) {
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.BUFFERED_TABLET_COUNT.toString(),
+        Tag.NAME.toString(),
+        taskID);
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.BUFFERED_TSFILE_COUNT.toString(),
+        Tag.NAME.toString(),
+        taskID);
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.BUFFERED_HEARTBEAT_COUNT.toString(),
+        Tag.NAME.toString(),
+        taskID);
+  }
+
+  private void removeRate(String taskID) {
+    metricService.remove(
+        MetricType.RATE,
+        Metric.PIPE_PROCESSOR_TABLET_PROCESS.toString(),
+        Tag.NAME.toString(),
+        taskID);
+    metricService.remove(
+        MetricType.RATE,
+        Metric.PIPE_PROCESSOR_TSFILE_PROCESS.toString(),
+        Tag.NAME.toString(),
+        taskID);
+    metricService.remove(
+        MetricType.RATE,
+        Metric.PIPE_PROCESSOR_HEARTBEAT_PROCESS.toString(),
+        Tag.NAME.toString(),
+        taskID);
+    tabletRateMap.remove(taskID);
+    tsFileRateMap.remove(taskID);
+    pipeHeartbeatRateMap.remove(taskID);
+  }
+
+  //////////////////////////// register & deregister (pipe integration) 
////////////////////////////
+
+  public void register(@NonNull PipeProcessorSubtask pipeProcessorSubtask) {
+    String taskID = pipeProcessorSubtask.getTaskID();
+    synchronized (this) {
+      processorMap.putIfAbsent(taskID, pipeProcessorSubtask);
+      if (Objects.nonNull(metricService)) {
+        createMetrics(taskID);
+      }
+    }
+  }
+
+  public void deregister(String taskID) {
+    synchronized (this) {
+      if (!processorMap.containsKey(taskID)) {
+        LOGGER.info(
+            "Failed to deregister pipe processor metrics, 
PipeProcessorSubtask({}) does not exist",
+            taskID);
+        return;
+      }
+      if (Objects.nonNull(metricService)) {
+        removeMetrics(taskID);
+      }
+      processorMap.remove(taskID);
+    }
+  }
+
+  public Rate getTabletRate(String taskID) {
+    return tabletRateMap.get(taskID);
+  }
+
+  public Rate getTsFileRate(String taskID) {
+    return tsFileRateMap.get(taskID);
+  }
+
+  public Rate getPipeHeartbeatRate(String taskID) {
+    return pipeHeartbeatRateMap.get(taskID);
+  }
+
+  //////////////////////////// singleton ////////////////////////////
+
+  private static class PipeProcessorMetricsHolder {
+
+    private static final PipeProcessorMetrics INSTANCE = new 
PipeProcessorMetrics();
+
+    private PipeProcessorMetricsHolder() {
+      // empty constructor
+    }
+  }
+
+  public static PipeProcessorMetrics getInstance() {
+    return PipeProcessorMetrics.PipeProcessorMetricsHolder.INSTANCE;
+  }
+
+  private PipeProcessorMetrics() {
+    // empty constructor
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java
index 51d5435ac01..8b3ca3891a7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java
@@ -20,16 +20,14 @@
 package org.apache.iotdb.db.pipe.task.connection;
 
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.metric.PipeEventCounter;
 import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
 public abstract class BlockingPendingQueue<E extends Event> {
@@ -39,14 +37,12 @@ public abstract class BlockingPendingQueue<E extends Event> 
{
   private static final long MAX_BLOCKING_TIME_MS =
       
PipeConfig.getInstance().getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs();
 
-  private final AtomicInteger tabletInsertionEventCount;
-  private final AtomicInteger tsFileInsertionEventCount;
   protected final BlockingQueue<E> pendingQueue;
 
+  private final PipeEventCounter eventCounter = new PipeEventCounter();
+
   protected BlockingPendingQueue(BlockingQueue<E> pendingQueue) {
     this.pendingQueue = pendingQueue;
-    tabletInsertionEventCount = new AtomicInteger(0);
-    tsFileInsertionEventCount = new AtomicInteger(0);
   }
 
   public boolean waitedOffer(E event) {
@@ -54,11 +50,7 @@ public abstract class BlockingPendingQueue<E extends Event> {
       final boolean offered =
           pendingQueue.offer(event, MAX_BLOCKING_TIME_MS, 
TimeUnit.MILLISECONDS);
       if (offered) {
-        if (event instanceof TabletInsertionEvent) {
-          tabletInsertionEventCount.incrementAndGet();
-        } else if (event instanceof TsFileInsertionEvent) {
-          tsFileInsertionEventCount.incrementAndGet();
-        }
+        eventCounter.increaseEventCount(event);
       }
       return offered;
     } catch (InterruptedException e) {
@@ -71,11 +63,7 @@ public abstract class BlockingPendingQueue<E extends Event> {
   public boolean directOffer(E event) {
     final boolean offered = pendingQueue.offer(event);
     if (offered) {
-      if (event instanceof TabletInsertionEvent) {
-        tabletInsertionEventCount.incrementAndGet();
-      } else if (event instanceof TsFileInsertionEvent) {
-        tsFileInsertionEventCount.incrementAndGet();
-      }
+      eventCounter.increaseEventCount(event);
     }
     return offered;
   }
@@ -83,11 +71,7 @@ public abstract class BlockingPendingQueue<E extends Event> {
   public boolean put(E event) {
     try {
       pendingQueue.put(event);
-      if (event instanceof TabletInsertionEvent) {
-        tabletInsertionEventCount.incrementAndGet();
-      } else if (event instanceof TsFileInsertionEvent) {
-        tsFileInsertionEventCount.incrementAndGet();
-      }
+      eventCounter.increaseEventCount(event);
       return true;
     } catch (InterruptedException e) {
       LOGGER.info("pending queue put is interrupted.", e);
@@ -98,12 +82,7 @@ public abstract class BlockingPendingQueue<E extends Event> {
 
   public E directPoll() {
     final E event = pendingQueue.poll();
-    if (event instanceof TabletInsertionEvent) {
-      tabletInsertionEventCount.decrementAndGet();
-    }
-    if (event instanceof TsFileInsertionEvent) {
-      tsFileInsertionEventCount.decrementAndGet();
-    }
+    eventCounter.decreaseEventCount(event);
     return event;
   }
 
@@ -111,11 +90,7 @@ public abstract class BlockingPendingQueue<E extends Event> 
{
     E event = null;
     try {
       event = pendingQueue.poll(MAX_BLOCKING_TIME_MS, TimeUnit.MILLISECONDS);
-      if (event instanceof TabletInsertionEvent) {
-        tabletInsertionEventCount.decrementAndGet();
-      } else if (event instanceof TsFileInsertionEvent) {
-        tsFileInsertionEventCount.decrementAndGet();
-      }
+      eventCounter.decreaseEventCount(event);
     } catch (InterruptedException e) {
       LOGGER.info("pending queue poll is interrupted.", e);
       Thread.currentThread().interrupt();
@@ -125,8 +100,7 @@ public abstract class BlockingPendingQueue<E extends Event> 
{
 
   public void clear() {
     pendingQueue.clear();
-    tabletInsertionEventCount.set(0);
-    tsFileInsertionEventCount.set(0);
+    eventCounter.reset();
   }
 
   public void forEach(Consumer<? super E> action) {
@@ -142,10 +116,14 @@ public abstract class BlockingPendingQueue<E extends 
Event> {
   }
 
   public int getTabletInsertionEventCount() {
-    return tabletInsertionEventCount.get();
+    return eventCounter.getTabletInsertionEventCount();
   }
 
   public int getTsFileInsertionEventCount() {
-    return tsFileInsertionEventCount.get();
+    return eventCounter.getTsFileInsertionEventCount();
+  }
+
+  public int getPipeHeartbeatEventCount() {
+    return eventCounter.getPipeHeartbeatEventCount();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/EnrichedDeque.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/EnrichedDeque.java
index a77a87c712f..a3696522041 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/EnrichedDeque.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/EnrichedDeque.java
@@ -19,53 +19,38 @@
 
 package org.apache.iotdb.db.pipe.task.connection;
 
+import org.apache.iotdb.db.pipe.metric.PipeEventCounter;
 import org.apache.iotdb.pipe.api.event.Event;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 
 import java.util.Deque;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
 public class EnrichedDeque<E extends Event> {
 
-  private final AtomicInteger tabletInsertionEventCount;
-  private final AtomicInteger tsFileInsertionEventCount;
+  private final PipeEventCounter eventCounter = new PipeEventCounter();
   protected final Deque<E> deque;
 
   protected EnrichedDeque(Deque<E> deque) {
     this.deque = deque;
-    tabletInsertionEventCount = new AtomicInteger(0);
-    tsFileInsertionEventCount = new AtomicInteger(0);
   }
 
   public boolean offer(E event) {
     final boolean offered = deque.offer(event);
     if (offered) {
-      if (event instanceof TabletInsertionEvent) {
-        tabletInsertionEventCount.incrementAndGet();
-      } else if (event instanceof TsFileInsertionEvent) {
-        tsFileInsertionEventCount.incrementAndGet();
-      }
+      eventCounter.increaseEventCount(event);
     }
     return offered;
   }
 
   public E poll() {
     final E event = deque.poll();
-    if (event instanceof TabletInsertionEvent) {
-      tabletInsertionEventCount.decrementAndGet();
-    }
-    if (event instanceof TsFileInsertionEvent) {
-      tsFileInsertionEventCount.decrementAndGet();
-    }
+    eventCounter.decreaseEventCount(event);
     return event;
   }
 
   public void clear() {
     deque.clear();
-    tabletInsertionEventCount.set(0);
-    tsFileInsertionEventCount.set(0);
+    eventCounter.reset();
   }
 
   public int size() {
@@ -89,10 +74,14 @@ public class EnrichedDeque<E extends Event> {
   }
 
   public int getTabletInsertionEventCount() {
-    return tabletInsertionEventCount.get();
+    return eventCounter.getTabletInsertionEventCount();
   }
 
   public int getTsFileInsertionEventCount() {
-    return tsFileInsertionEventCount.get();
+    return eventCounter.getTsFileInsertionEventCount();
+  }
+
+  public int getPipeHeartbeatEventCount() {
+    return eventCounter.getPipeHeartbeatEventCount();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index a341f8f9fa0..cfb0aabeb9d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -108,4 +108,16 @@ public class PipeEventCollector implements EventCollector, 
AutoCloseable {
         });
     bufferQueue.clear();
   }
+
+  public int getTabletInsertionEventCount() {
+    return bufferQueue.getTabletInsertionEventCount();
+  }
+
+  public int getTsFileInsertionEventCount() {
+    return bufferQueue.getTsFileInsertionEventCount();
+  }
+
+  public int getPipeHeartbeatEventCount() {
+    return bufferQueue.getPipeHeartbeatEventCount();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index ef7997cb443..4835810dd17 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -90,7 +90,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
     // a timed thread. If a pipe is deleted and created again before its 
subtask is
     // removed, the new subtask will have the same pipeName and dataRegionId 
as the
     // old one, so we need creationTime to make their hash code different in 
the map.
-    final String taskId = pipeName + "_" + dataRegionId + "_" + creationTime;
+    final String taskId = pipeName + "_" + dataRegionId.getId() + "_" + 
creationTime;
     final PipeEventCollector pipeConnectorOutputEventCollector =
         new PipeEventCollector(pipeConnectorOutputPendingQueue);
     this.pipeProcessorSubtask =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index ffa339671d2..26149d791a2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.db.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.db.pipe.execution.scheduler.PipeSubtaskScheduler;
+import org.apache.iotdb.db.pipe.metric.PipeConnectorMetrics;
 import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
 import org.apache.iotdb.db.pipe.task.subtask.DecoratingLock;
@@ -58,6 +59,18 @@ public class PipeConnectorSubtask extends PipeSubtask {
   protected final DecoratingLock callbackDecoratingLock = new DecoratingLock();
   protected ExecutorService subtaskCallbackListeningExecutor;
 
+  public Integer getTsFileInsertionEventCount() {
+    return inputPendingQueue.getTsFileInsertionEventCount();
+  }
+
+  public Integer getTabletInsertionEventCount() {
+    return inputPendingQueue.getTabletInsertionEventCount();
+  }
+
+  public Integer getPipeHeartbeatEventCount() {
+    return inputPendingQueue.getPipeHeartbeatEventCount();
+  }
+
   public PipeConnectorSubtask(
       String taskID,
       BoundedBlockingPendingQueue<Event> inputPendingQueue,
@@ -65,6 +78,7 @@ public class PipeConnectorSubtask extends PipeSubtask {
     super(taskID);
     this.inputPendingQueue = inputPendingQueue;
     this.outputPipeConnector = outputPipeConnector;
+    PipeConnectorMetrics.getInstance().register(this);
   }
 
   @Override
@@ -104,8 +118,10 @@ public class PipeConnectorSubtask extends PipeSubtask {
     try {
       if (event instanceof TabletInsertionEvent) {
         outputPipeConnector.transfer((TabletInsertionEvent) event);
+        PipeConnectorMetrics.getInstance().getTabletRate(taskID).mark();
       } else if (event instanceof TsFileInsertionEvent) {
         outputPipeConnector.transfer((TsFileInsertionEvent) event);
+        PipeConnectorMetrics.getInstance().getTsFileRate(taskID).mark();
       } else if (event instanceof PipeHeartbeatEvent) {
         try {
           outputPipeConnector.heartbeat();
@@ -116,6 +132,7 @@ public class PipeConnectorSubtask extends PipeSubtask {
               e);
         }
         ((PipeHeartbeatEvent) event).onTransferred();
+        PipeConnectorMetrics.getInstance().getPipeHeartbeatRate(taskID).mark();
       } else {
         outputPipeConnector.transfer(event);
       }
@@ -253,6 +270,7 @@ public class PipeConnectorSubtask extends PipeSubtask {
 
   @Override
   public void close() {
+    PipeConnectorMetrics.getInstance().deregister(taskID);
     isClosed.set(true);
     try {
       outputPipeConnector.close();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index b5caed5245f..0cc38fda9d8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.task.subtask.processor;
 
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import org.apache.iotdb.db.pipe.execution.scheduler.PipeSubtaskScheduler;
+import org.apache.iotdb.db.pipe.metric.PipeProcessorMetrics;
 import org.apache.iotdb.db.pipe.task.connection.EventSupplier;
 import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
 import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
@@ -57,6 +58,7 @@ public class PipeProcessorSubtask extends PipeSubtask {
     this.inputEventSupplier = inputEventSupplier;
     this.pipeProcessor = pipeProcessor;
     this.outputEventCollector = outputEventCollector;
+    PipeProcessorMetrics.getInstance().register(this);
   }
 
   @Override
@@ -104,11 +106,14 @@ public class PipeProcessorSubtask extends PipeSubtask {
       if (!isClosed.get()) {
         if (event instanceof TabletInsertionEvent) {
           pipeProcessor.process((TabletInsertionEvent) event, 
outputEventCollector);
+          PipeProcessorMetrics.getInstance().getTabletRate(taskID).mark();
         } else if (event instanceof TsFileInsertionEvent) {
           pipeProcessor.process((TsFileInsertionEvent) event, 
outputEventCollector);
+          PipeProcessorMetrics.getInstance().getTsFileRate(taskID).mark();
         } else if (event instanceof PipeHeartbeatEvent) {
           pipeProcessor.process(event, outputEventCollector);
           ((PipeHeartbeatEvent) event).onProcessed();
+          
PipeProcessorMetrics.getInstance().getPipeHeartbeatRate(taskID).mark();
         } else {
           pipeProcessor.process(event, outputEventCollector);
         }
@@ -140,6 +145,7 @@ public class PipeProcessorSubtask extends PipeSubtask {
 
   @Override
   public void close() {
+    PipeProcessorMetrics.getInstance().deregister(taskID);
     try {
       isClosed.set(true);
 
@@ -173,4 +179,16 @@ public class PipeProcessorSubtask extends PipeSubtask {
   public int hashCode() {
     return taskID.hashCode();
   }
+
+  public int getTabletInsertionEventCount() {
+    return outputEventCollector.getTabletInsertionEventCount();
+  }
+
+  public int getTsFileInsertionEventCount() {
+    return outputEventCollector.getTsFileInsertionEventCount();
+  }
+
+  public int getPipeHeartbeatEventCount() {
+    return outputEventCollector.getPipeHeartbeatEventCount();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
index 9d76bb9b97d..f673cdc4a4e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.commons.service.metric.JvmGcMonitorMetrics;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.pipe.metric.PipeMetrics;
 import org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet;
 import org.apache.iotdb.db.queryengine.metric.DataExchangeCountMetricSet;
 import org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet;
@@ -81,6 +82,9 @@ public class DataNodeMetricsHelper {
 
     // bind gc metrics
     
MetricService.getInstance().addMetricSet(JvmGcMonitorMetrics.getInstance());
+
+    // bind pipe related metrics
+    MetricService.getInstance().addMetricSet(PipeMetrics.getInstance());
   }
 
   private static void initSystemMetrics() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index 510617eafd2..77455bffa8b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -93,7 +93,32 @@ public enum Metric {
   LOCAL_EXECUTION_PLANNER("local_execution_planner"),
   // file related
   FILE_SIZE("file_size"),
-  FILE_COUNT("file_count");
+  FILE_COUNT("file_count"),
+  // pipe related
+  UNASSIGNED_TABLET_COUNT("unassigned_tablet_count"),
+  UNASSIGNED_TSFILE_COUNT("unassigned_tsfile_count"),
+  UNASSIGNED_HEARTBEAT_COUNT("unassigned_heartbeat_count"),
+  UNPROCESSED_TABLET_COUNT("unprocessed_tablet_count"),
+  UNPROCESSED_HISTORICAL_TSFILE_COUNT("unprocessed_historical_tsfile_count"),
+  UNPROCESSED_REALTIME_TSFILE_COUNT("unprocessed_realtime_tsfile_count"),
+  UNPROCESSED_HEARTBEAT_COUNT("unprocessed_heartbeat_count"),
+  BUFFERED_TABLET_COUNT("buffered_tablet_count"),
+  BUFFERED_TSFILE_COUNT("buffered_tsfile_count"),
+  BUFFERED_HEARTBEAT_COUNT("buffered_heartbeat_count"),
+  UNTRANSFERRED_TABLET_COUNT("untransferred_tablet_count"),
+  UNTRANSFERRED_TSFILE_COUNT("untransferred_tsfile_count"),
+  UNTRANSFERRED_HEARTBEAT_COUNT("untransferred_heartbeat_count"),
+  PIPE_EXTRACTOR_TABLET_SUPPLY("pipe_extractor_tablet_supply"),
+  PIPE_EXTRACTOR_TSFILE_SUPPLY("pipe_extractor_tsfile_supply"),
+  PIPE_EXTRACTOR_HEARTBEAT_SUPPLY("pipe_extractor_heartbeat_supply"),
+  PIPE_PROCESSOR_TABLET_PROCESS("pipe_processor_tablet_process"),
+  PIPE_PROCESSOR_TSFILE_PROCESS("pipe_processor_tsfile_process"),
+  PIPE_PROCESSOR_HEARTBEAT_PROCESS("pipe_processor_heartbeat_process"),
+  PIPE_CONNECTOR_TABLET_TRANSFER("pipe_connector_tablet_transfer"),
+  PIPE_CONNECTOR_TSFILE_TRANSFER("pipe_connector_tsfile_transfer"),
+  PIPE_CONNECTOR_HEARTBEAT_TRANSFER("pipe_connector_heartbeat_transfer"),
+  PIPE_HEARTBEAT_EVENT("pipe_heartbeat_event"),
+  ;
 
   final String value;
 

Reply via email to