This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new cc5ed977507 Pipe: refined key set of tags for pipe metrics & fixed hit 
rate in PipeWALInsertNodeCacheMetrics always `1.0`(#11484)
cc5ed977507 is described below

commit cc5ed977507c3eb0aa5d51cbeadd44c71cf9744f
Author: V_Galaxy <[email protected]>
AuthorDate: Tue Nov 7 14:19:20 2023 +0800

    Pipe: refined key set of tags for pipe metrics & fixed hit rate in 
PipeWALInsertNodeCacheMetrics always `1.0`(#11484)
    
    To provide more detailed legend information to Grafana and enable 
aggregation by pipe name or data region, this PR modifies some of the tag key 
sets for pipe metrics:
    
    - before: `{name (TaskID)}`
    - after this PR: `{Name (PipeName / AttributeSortedString), CreationTime, 
Index, DataRegion}`
    
    Additionally, it fixes the issue where the hit rate in 
PipeWALInsertNodeCacheMetrics is always `1.0`.
    
    ------
    
    * chore: refined set of tags for pipe metrics
    
    * fix: enable lruCache stats
---
 .../pipe/extractor/IoTDBDataRegionExtractor.java   |  21 +++-
 .../iotdb/db/pipe/metric/PipeConnectorMetrics.java |  86 +++++++++++++---
 .../iotdb/db/pipe/metric/PipeExtractorMetrics.java | 114 +++++++++++++++++----
 .../iotdb/db/pipe/metric/PipeProcessorMetrics.java |  86 +++++++++++++---
 .../db/pipe/task/stage/PipeTaskProcessorStage.java |   3 +
 .../iotdb/db/pipe/task/subtask/PipeSubtask.java    |  10 +-
 .../subtask/connector/PipeConnectorSubtask.java    |  43 +++++---
 .../connector/PipeConnectorSubtaskManager.java     |  10 +-
 .../subtask/processor/PipeProcessorSubtask.java    |  21 +++-
 .../dataregion/wal/utils/WALInsertNodeCache.java   |   1 +
 .../PipeConnectorSubtaskExecutorTest.java          |   3 +
 .../PipeProcessorSubtaskExecutorTest.java          |   3 +
 .../iotdb/commons/service/metric/enums/Tag.java    |   4 +-
 13 files changed, 336 insertions(+), 69 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
index 15f6696118d..b5c6aea4f15 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
@@ -72,7 +72,10 @@ public class IoTDBDataRegionExtractor implements 
PipeExtractor {
   private PipeHistoricalDataRegionExtractor historicalExtractor;
   private PipeRealtimeDataRegionExtractor realtimeExtractor;
 
+  // Record these variables to provide corresponding value to tag key of 
monitoring metrics
   private String taskID;
+  private String pipeName;
+  private long creationTime;
   private int dataRegionId;
 
   public IoTDBDataRegionExtractor() {
@@ -182,8 +185,8 @@ public class IoTDBDataRegionExtractor implements 
PipeExtractor {
       throws Exception {
     dataRegionId =
         ((PipeTaskExtractorRuntimeEnvironment) 
configuration.getRuntimeEnvironment()).getRegionId();
-    String pipeName = configuration.getRuntimeEnvironment().getPipeName();
-    long creationTime = 
configuration.getRuntimeEnvironment().getCreationTime();
+    pipeName = configuration.getRuntimeEnvironment().getPipeName();
+    creationTime = configuration.getRuntimeEnvironment().getCreationTime();
     taskID = pipeName + "_" + dataRegionId + "_" + creationTime;
 
     historicalExtractor.customize(parameters, configuration);
@@ -282,10 +285,24 @@ public class IoTDBDataRegionExtractor implements 
PipeExtractor {
     PipeExtractorMetrics.getInstance().deregister(taskID);
   }
 
+  //////////////////////////// APIs provided for metric framework 
////////////////////////////
+
   public String getTaskID() {
     return taskID;
   }
 
+  public String getPipeName() {
+    return pipeName;
+  }
+
+  public int getDataRegionId() {
+    return dataRegionId;
+  }
+
+  public long getCreationTime() {
+    return creationTime;
+  }
+
   public int getHistoricalTsFileInsertionEventCount() {
     return hasBeenStarted.get() ? historicalExtractor.getPendingQueueSize() : 
0;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
index f2dde61092c..fff2bcae55a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
@@ -70,51 +70,79 @@ public class PipeConnectorMetrics implements IMetricSet {
   }
 
   private void createAutoGauge(String taskID) {
+    PipeConnectorSubtask connector = connectorMap.get(taskID);
+    // pending event count
     metricService.createAutoGauge(
         Metric.UNTRANSFERRED_TABLET_COUNT.toString(),
         MetricLevel.IMPORTANT,
-        connectorMap.get(taskID),
+        connector,
         PipeConnectorSubtask::getTabletInsertionEventCount,
         Tag.NAME.toString(),
-        taskID);
+        connector.getAttributeSortedString(),
+        Tag.INDEX.toString(),
+        String.valueOf(connector.getConnectorIndex()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(connector.getCreationTime()));
     metricService.createAutoGauge(
         Metric.UNTRANSFERRED_TSFILE_COUNT.toString(),
         MetricLevel.IMPORTANT,
-        connectorMap.get(taskID),
+        connector,
         PipeConnectorSubtask::getTsFileInsertionEventCount,
         Tag.NAME.toString(),
-        taskID);
+        connector.getAttributeSortedString(),
+        Tag.INDEX.toString(),
+        String.valueOf(connector.getConnectorIndex()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(connector.getCreationTime()));
     metricService.createAutoGauge(
         Metric.UNTRANSFERRED_HEARTBEAT_COUNT.toString(),
         MetricLevel.IMPORTANT,
-        connectorMap.get(taskID),
+        connector,
         PipeConnectorSubtask::getPipeHeartbeatEventCount,
         Tag.NAME.toString(),
-        taskID);
+        connector.getAttributeSortedString(),
+        Tag.INDEX.toString(),
+        String.valueOf(connector.getConnectorIndex()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(connector.getCreationTime()));
   }
 
   private void createRate(String taskID) {
+    PipeConnectorSubtask connector = connectorMap.get(taskID);
+    // transfer event rate
     tabletRateMap.put(
         taskID,
         metricService.getOrCreateRate(
             Metric.PIPE_CONNECTOR_TABLET_TRANSFER.toString(),
             MetricLevel.IMPORTANT,
             Tag.NAME.toString(),
-            taskID));
+            connector.getAttributeSortedString(),
+            Tag.INDEX.toString(),
+            String.valueOf(connector.getConnectorIndex()),
+            Tag.CREATION_TIME.toString(),
+            String.valueOf(connector.getCreationTime())));
     tsFileRateMap.put(
         taskID,
         metricService.getOrCreateRate(
             Metric.PIPE_CONNECTOR_TSFILE_TRANSFER.toString(),
             MetricLevel.IMPORTANT,
             Tag.NAME.toString(),
-            taskID));
+            connector.getAttributeSortedString(),
+            Tag.INDEX.toString(),
+            String.valueOf(connector.getConnectorIndex()),
+            Tag.CREATION_TIME.toString(),
+            String.valueOf(connector.getCreationTime())));
     pipeHeartbeatRateMap.put(
         taskID,
         metricService.getOrCreateRate(
             Metric.PIPE_CONNECTOR_HEARTBEAT_TRANSFER.toString(),
             MetricLevel.IMPORTANT,
             Tag.NAME.toString(),
-            taskID));
+            connector.getAttributeSortedString(),
+            Tag.INDEX.toString(),
+            String.valueOf(connector.getConnectorIndex()),
+            Tag.CREATION_TIME.toString(),
+            String.valueOf(connector.getCreationTime())));
   }
 
   @Override
@@ -134,39 +162,67 @@ public class PipeConnectorMetrics implements IMetricSet {
   }
 
   private void removeAutoGauge(String taskID) {
+    PipeConnectorSubtask connector = connectorMap.get(taskID);
+    // pending event count
     metricService.remove(
         MetricType.AUTO_GAUGE,
         Metric.UNTRANSFERRED_TABLET_COUNT.toString(),
         Tag.NAME.toString(),
-        taskID);
+        connector.getAttributeSortedString(),
+        Tag.INDEX.toString(),
+        String.valueOf(connector.getConnectorIndex()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(connector.getCreationTime()));
     metricService.remove(
         MetricType.AUTO_GAUGE,
         Metric.UNTRANSFERRED_TSFILE_COUNT.toString(),
         Tag.NAME.toString(),
-        taskID);
+        connector.getAttributeSortedString(),
+        Tag.INDEX.toString(),
+        String.valueOf(connector.getConnectorIndex()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(connector.getCreationTime()));
     metricService.remove(
         MetricType.AUTO_GAUGE,
         Metric.UNTRANSFERRED_HEARTBEAT_COUNT.toString(),
         Tag.NAME.toString(),
-        taskID);
+        connector.getAttributeSortedString(),
+        Tag.INDEX.toString(),
+        String.valueOf(connector.getConnectorIndex()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(connector.getCreationTime()));
   }
 
   private void removeRate(String taskID) {
+    PipeConnectorSubtask connector = connectorMap.get(taskID);
+    // transfer event rate
     metricService.remove(
         MetricType.RATE,
         Metric.PIPE_CONNECTOR_TABLET_TRANSFER.toString(),
         Tag.NAME.toString(),
-        taskID);
+        connector.getAttributeSortedString(),
+        Tag.INDEX.toString(),
+        String.valueOf(connector.getConnectorIndex()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(connector.getCreationTime()));
     metricService.remove(
         MetricType.RATE,
         Metric.PIPE_CONNECTOR_TSFILE_TRANSFER.toString(),
         Tag.NAME.toString(),
-        taskID);
+        connector.getAttributeSortedString(),
+        Tag.INDEX.toString(),
+        String.valueOf(connector.getConnectorIndex()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(connector.getCreationTime()));
     metricService.remove(
         MetricType.RATE,
         Metric.PIPE_CONNECTOR_HEARTBEAT_TRANSFER.toString(),
         Tag.NAME.toString(),
-        taskID);
+        connector.getAttributeSortedString(),
+        Tag.INDEX.toString(),
+        String.valueOf(connector.getConnectorIndex()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(connector.getCreationTime()));
     tabletRateMap.remove(taskID);
     tsFileRateMap.remove(taskID);
     pipeHeartbeatRateMap.remove(taskID);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
index b7f5738b152..f56672c7d6d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
@@ -75,69 +75,106 @@ public class PipeExtractorMetrics implements IMetricSet {
   }
 
   private void createAutoGauge(String taskID) {
+    IoTDBDataRegionExtractor extractor = extractorMap.get(taskID);
     // pending event count
     metricService.createAutoGauge(
         Metric.UNPROCESSED_HISTORICAL_TSFILE_COUNT.toString(),
         MetricLevel.IMPORTANT,
-        extractorMap.get(taskID),
+        extractor,
         IoTDBDataRegionExtractor::getHistoricalTsFileInsertionEventCount,
         Tag.NAME.toString(),
-        taskID);
+        extractor.getPipeName(),
+        Tag.REGION.toString(),
+        String.valueOf(extractor.getDataRegionId()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(extractor.getCreationTime()));
     metricService.createAutoGauge(
         Metric.UNPROCESSED_REALTIME_TSFILE_COUNT.toString(),
         MetricLevel.IMPORTANT,
-        extractorMap.get(taskID),
+        extractor,
         IoTDBDataRegionExtractor::getRealtimeTsFileInsertionEventCount,
         Tag.NAME.toString(),
-        taskID);
+        extractor.getPipeName(),
+        Tag.REGION.toString(),
+        String.valueOf(extractor.getDataRegionId()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(extractor.getCreationTime()));
     metricService.createAutoGauge(
         Metric.UNPROCESSED_TABLET_COUNT.toString(),
         MetricLevel.IMPORTANT,
-        extractorMap.get(taskID),
+        extractor,
         IoTDBDataRegionExtractor::getTabletInsertionEventCount,
         Tag.NAME.toString(),
-        taskID);
+        extractor.getPipeName(),
+        Tag.REGION.toString(),
+        String.valueOf(extractor.getDataRegionId()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(extractor.getCreationTime()));
     metricService.createAutoGauge(
         Metric.UNPROCESSED_HEARTBEAT_COUNT.toString(),
         MetricLevel.IMPORTANT,
-        extractorMap.get(taskID),
+        extractor,
         IoTDBDataRegionExtractor::getPipeHeartbeatEventCount,
         Tag.NAME.toString(),
-        taskID);
+        extractor.getPipeName(),
+        Tag.REGION.toString(),
+        String.valueOf(extractor.getDataRegionId()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(extractor.getCreationTime()));
   }
 
   private void createRate(String taskID) {
+    IoTDBDataRegionExtractor extractor = extractorMap.get(taskID);
+    // supply event rate
     tabletRateMap.put(
         taskID,
         metricService.getOrCreateRate(
             Metric.PIPE_EXTRACTOR_TABLET_SUPPLY.toString(),
             MetricLevel.IMPORTANT,
             Tag.NAME.toString(),
-            taskID));
+            extractor.getPipeName(),
+            Tag.REGION.toString(),
+            String.valueOf(extractor.getDataRegionId()),
+            Tag.CREATION_TIME.toString(),
+            String.valueOf(extractor.getCreationTime())));
     tsFileRateMap.put(
         taskID,
         metricService.getOrCreateRate(
             Metric.PIPE_EXTRACTOR_TSFILE_SUPPLY.toString(),
             MetricLevel.IMPORTANT,
             Tag.NAME.toString(),
-            taskID));
+            extractor.getPipeName(),
+            Tag.REGION.toString(),
+            String.valueOf(extractor.getDataRegionId()),
+            Tag.CREATION_TIME.toString(),
+            String.valueOf(extractor.getCreationTime())));
     pipeHeartbeatRateMap.put(
         taskID,
         metricService.getOrCreateRate(
             Metric.PIPE_EXTRACTOR_HEARTBEAT_SUPPLY.toString(),
             MetricLevel.IMPORTANT,
             Tag.NAME.toString(),
-            taskID));
+            extractor.getPipeName(),
+            Tag.REGION.toString(),
+            String.valueOf(extractor.getDataRegionId()),
+            Tag.CREATION_TIME.toString(),
+            String.valueOf(extractor.getCreationTime())));
   }
 
   private void createGauge(String taskID) {
+    IoTDBDataRegionExtractor extractor = extractorMap.get(taskID);
+    // tsfile epoch state
     recentProcessedTsFileEpochStateMap.put(
         taskID,
         metricService.getOrCreateGauge(
             Metric.PIPE_EXTRACTOR_TSFILE_EPOCH_STATE.toString(),
             MetricLevel.IMPORTANT,
             Tag.NAME.toString(),
-            taskID));
+            extractor.getPipeName(),
+            Tag.REGION.toString(),
+            String.valueOf(extractor.getDataRegionId()),
+            Tag.CREATION_TIME.toString(),
+            String.valueOf(extractor.getCreationTime())));
   }
 
   @Override
@@ -158,56 +195,93 @@ public class PipeExtractorMetrics implements IMetricSet {
   }
 
   private void removeAutoGauge(String taskID) {
+    IoTDBDataRegionExtractor extractor = extractorMap.get(taskID);
     // pending event count
     metricService.remove(
         MetricType.AUTO_GAUGE,
         Metric.UNPROCESSED_HISTORICAL_TSFILE_COUNT.toString(),
         Tag.NAME.toString(),
-        taskID);
+        extractor.getPipeName(),
+        Tag.REGION.toString(),
+        String.valueOf(extractor.getDataRegionId()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(extractor.getCreationTime()));
     metricService.remove(
         MetricType.AUTO_GAUGE,
         Metric.UNPROCESSED_REALTIME_TSFILE_COUNT.toString(),
         Tag.NAME.toString(),
-        taskID);
+        extractor.getPipeName(),
+        Tag.REGION.toString(),
+        String.valueOf(extractor.getDataRegionId()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(extractor.getCreationTime()));
     metricService.remove(
         MetricType.AUTO_GAUGE,
         Metric.UNPROCESSED_TABLET_COUNT.toString(),
         Tag.NAME.toString(),
-        taskID);
+        extractor.getPipeName(),
+        Tag.REGION.toString(),
+        String.valueOf(extractor.getDataRegionId()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(extractor.getCreationTime()));
     metricService.remove(
         MetricType.AUTO_GAUGE,
         Metric.UNPROCESSED_HEARTBEAT_COUNT.toString(),
         Tag.NAME.toString(),
-        taskID);
+        extractor.getPipeName(),
+        Tag.REGION.toString(),
+        String.valueOf(extractor.getDataRegionId()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(extractor.getCreationTime()));
   }
 
   private void removeRate(String taskID) {
+    IoTDBDataRegionExtractor extractor = extractorMap.get(taskID);
+    // supply event rate
     metricService.remove(
         MetricType.RATE,
         Metric.PIPE_EXTRACTOR_TABLET_SUPPLY.toString(),
         Tag.NAME.toString(),
-        taskID);
+        extractor.getPipeName(),
+        Tag.REGION.toString(),
+        String.valueOf(extractor.getDataRegionId()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(extractor.getCreationTime()));
     metricService.remove(
         MetricType.RATE,
         Metric.PIPE_EXTRACTOR_TSFILE_SUPPLY.toString(),
         Tag.NAME.toString(),
-        taskID);
+        extractor.getPipeName(),
+        Tag.REGION.toString(),
+        String.valueOf(extractor.getDataRegionId()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(extractor.getCreationTime()));
     metricService.remove(
         MetricType.RATE,
         Metric.PIPE_EXTRACTOR_HEARTBEAT_SUPPLY.toString(),
         Tag.NAME.toString(),
-        taskID);
+        extractor.getPipeName(),
+        Tag.REGION.toString(),
+        String.valueOf(extractor.getDataRegionId()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(extractor.getCreationTime()));
     tabletRateMap.remove(taskID);
     tsFileRateMap.remove(taskID);
     pipeHeartbeatRateMap.remove(taskID);
   }
 
   private void removeGauge(String taskID) {
+    IoTDBDataRegionExtractor extractor = extractorMap.get(taskID);
+    // tsfile epoch state
     metricService.remove(
         MetricType.GAUGE,
         Metric.PIPE_EXTRACTOR_TSFILE_EPOCH_STATE.toString(),
         Tag.NAME.toString(),
-        taskID);
+        extractor.getPipeName(),
+        Tag.REGION.toString(),
+        String.valueOf(extractor.getDataRegionId()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(extractor.getCreationTime()));
   }
 
   //////////////////////////// register & deregister (pipe integration) 
////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
index 7ca6be329a7..291c3529d8b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
@@ -70,51 +70,79 @@ public class PipeProcessorMetrics implements IMetricSet {
   }
 
   private void createAutoGauge(String taskID) {
+    PipeProcessorSubtask processor = processorMap.get(taskID);
+    // pending event count
     metricService.createAutoGauge(
         Metric.BUFFERED_TABLET_COUNT.toString(),
         MetricLevel.IMPORTANT,
-        processorMap.get(taskID),
+        processor,
         PipeProcessorSubtask::getTabletInsertionEventCount,
         Tag.NAME.toString(),
-        taskID);
+        processor.getPipeName(),
+        Tag.REGION.toString(),
+        String.valueOf(processor.getDataRegionId()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(processor.getCreationTime()));
     metricService.createAutoGauge(
         Metric.BUFFERED_TSFILE_COUNT.toString(),
         MetricLevel.IMPORTANT,
-        processorMap.get(taskID),
+        processor,
         PipeProcessorSubtask::getTsFileInsertionEventCount,
         Tag.NAME.toString(),
-        taskID);
+        processor.getPipeName(),
+        Tag.REGION.toString(),
+        String.valueOf(processor.getDataRegionId()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(processor.getCreationTime()));
     metricService.createAutoGauge(
         Metric.BUFFERED_HEARTBEAT_COUNT.toString(),
         MetricLevel.IMPORTANT,
-        processorMap.get(taskID),
+        processor,
         PipeProcessorSubtask::getPipeHeartbeatEventCount,
         Tag.NAME.toString(),
-        taskID);
+        processor.getPipeName(),
+        Tag.REGION.toString(),
+        String.valueOf(processor.getDataRegionId()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(processor.getCreationTime()));
   }
 
   private void createRate(String taskID) {
+    PipeProcessorSubtask processor = processorMap.get(taskID);
+    // process event rate
     tabletRateMap.put(
         taskID,
         metricService.getOrCreateRate(
             Metric.PIPE_PROCESSOR_TABLET_PROCESS.toString(),
             MetricLevel.IMPORTANT,
             Tag.NAME.toString(),
-            taskID));
+            processor.getPipeName(),
+            Tag.REGION.toString(),
+            String.valueOf(processor.getDataRegionId()),
+            Tag.CREATION_TIME.toString(),
+            String.valueOf(processor.getCreationTime())));
     tsFileRateMap.put(
         taskID,
         metricService.getOrCreateRate(
             Metric.PIPE_PROCESSOR_TSFILE_PROCESS.toString(),
             MetricLevel.IMPORTANT,
             Tag.NAME.toString(),
-            taskID));
+            processor.getPipeName(),
+            Tag.REGION.toString(),
+            String.valueOf(processor.getDataRegionId()),
+            Tag.CREATION_TIME.toString(),
+            String.valueOf(processor.getCreationTime())));
     pipeHeartbeatRateMap.put(
         taskID,
         metricService.getOrCreateRate(
             Metric.PIPE_PROCESSOR_HEARTBEAT_PROCESS.toString(),
             MetricLevel.IMPORTANT,
             Tag.NAME.toString(),
-            taskID));
+            processor.getPipeName(),
+            Tag.REGION.toString(),
+            String.valueOf(processor.getDataRegionId()),
+            Tag.CREATION_TIME.toString(),
+            String.valueOf(processor.getCreationTime())));
   }
 
   @Override
@@ -134,39 +162,67 @@ public class PipeProcessorMetrics implements IMetricSet {
   }
 
   private void removeAutoGauge(String taskID) {
+    PipeProcessorSubtask processor = processorMap.get(taskID);
+    // pending event count
     metricService.remove(
         MetricType.AUTO_GAUGE,
         Metric.BUFFERED_TABLET_COUNT.toString(),
         Tag.NAME.toString(),
-        taskID);
+        processor.getPipeName(),
+        Tag.REGION.toString(),
+        String.valueOf(processor.getDataRegionId()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(processor.getCreationTime()));
     metricService.remove(
         MetricType.AUTO_GAUGE,
         Metric.BUFFERED_TSFILE_COUNT.toString(),
         Tag.NAME.toString(),
-        taskID);
+        processor.getPipeName(),
+        Tag.REGION.toString(),
+        String.valueOf(processor.getDataRegionId()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(processor.getCreationTime()));
     metricService.remove(
         MetricType.AUTO_GAUGE,
         Metric.BUFFERED_HEARTBEAT_COUNT.toString(),
         Tag.NAME.toString(),
-        taskID);
+        processor.getPipeName(),
+        Tag.REGION.toString(),
+        String.valueOf(processor.getDataRegionId()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(processor.getCreationTime()));
   }
 
   private void removeRate(String taskID) {
+    PipeProcessorSubtask processor = processorMap.get(taskID);
+    // process event rate
     metricService.remove(
         MetricType.RATE,
         Metric.PIPE_PROCESSOR_TABLET_PROCESS.toString(),
         Tag.NAME.toString(),
-        taskID);
+        processor.getPipeName(),
+        Tag.REGION.toString(),
+        String.valueOf(processor.getDataRegionId()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(processor.getCreationTime()));
     metricService.remove(
         MetricType.RATE,
         Metric.PIPE_PROCESSOR_TSFILE_PROCESS.toString(),
         Tag.NAME.toString(),
-        taskID);
+        processor.getPipeName(),
+        Tag.REGION.toString(),
+        String.valueOf(processor.getDataRegionId()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(processor.getCreationTime()));
     metricService.remove(
         MetricType.RATE,
         Metric.PIPE_PROCESSOR_HEARTBEAT_PROCESS.toString(),
         Tag.NAME.toString(),
-        taskID);
+        processor.getPipeName(),
+        Tag.REGION.toString(),
+        String.valueOf(processor.getDataRegionId()),
+        Tag.CREATION_TIME.toString(),
+        String.valueOf(processor.getCreationTime()));
     tabletRateMap.remove(taskID);
     tsFileRateMap.remove(taskID);
     pipeHeartbeatRateMap.remove(taskID);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 4835810dd17..66d48365b04 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -96,6 +96,9 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
     this.pipeProcessorSubtask =
         new PipeProcessorSubtask(
             taskId,
+            creationTime,
+            pipeName,
+            dataRegionId.getId(),
             pipeExtractorInputEventSupplier,
             pipeProcessor,
             pipeConnectorOutputEventCollector);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
index d4e35c35200..4614a25d8c5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeSubtask.java
@@ -46,6 +46,9 @@ public abstract class PipeSubtask
   // Used for identifying the subtask
   protected final String taskID;
 
+  // Record these variables to provide corresponding value to tag key of 
monitoring metrics
+  protected long creationTime;
+
   // For thread pool to execute subtasks
   protected ListeningExecutorService subtaskWorkerThreadPoolExecutor;
 
@@ -59,9 +62,10 @@ public abstract class PipeSubtask
   protected final AtomicInteger retryCount = new AtomicInteger(0);
   protected Event lastEvent;
 
-  protected PipeSubtask(String taskID) {
+  protected PipeSubtask(String taskID, long creationTime) {
     super();
     this.taskID = taskID;
+    this.creationTime = creationTime;
   }
 
   public abstract void bindExecutors(
@@ -237,6 +241,10 @@ public abstract class PipeSubtask
     return taskID;
   }
 
+  public long getCreationTime() {
+    return creationTime;
+  }
+
   public int getRetryCount() {
     return retryCount.get();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index 05119db0ca2..283a3f71510 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -59,23 +59,20 @@ public class PipeConnectorSubtask extends PipeSubtask {
   protected final DecoratingLock callbackDecoratingLock = new DecoratingLock();
   protected ExecutorService subtaskCallbackListeningExecutor;
 
-  public Integer getTsFileInsertionEventCount() {
-    return inputPendingQueue.getTsFileInsertionEventCount();
-  }
-
-  public Integer getTabletInsertionEventCount() {
-    return inputPendingQueue.getTabletInsertionEventCount();
-  }
-
-  public Integer getPipeHeartbeatEventCount() {
-    return inputPendingQueue.getPipeHeartbeatEventCount();
-  }
+  // Record these variables to provide corresponding value to tag key of 
monitoring metrics
+  private final String attributeSortedString;
+  private final int connectorIndex;
 
   public PipeConnectorSubtask(
       String taskID,
+      long creationTime,
+      String attributeSortedString,
+      int connectorIndex,
       BoundedBlockingPendingQueue<Event> inputPendingQueue,
       PipeConnector outputPipeConnector) {
-    super(taskID);
+    super(taskID, creationTime);
+    this.attributeSortedString = attributeSortedString;
+    this.connectorIndex = connectorIndex;
     this.inputPendingQueue = inputPendingQueue;
     this.outputPipeConnector = outputPipeConnector;
     PipeConnectorMetrics.getInstance().register(this);
@@ -292,4 +289,26 @@ public class PipeConnectorSubtask extends PipeSubtask {
       super.close();
     }
   }
+
+  //////////////////////////// APIs provided for metric framework 
////////////////////////////
+
+  public String getAttributeSortedString() {
+    return attributeSortedString;
+  }
+
+  public int getConnectorIndex() {
+    return connectorIndex;
+  }
+
+  public Integer getTsFileInsertionEventCount() {
+    return inputPendingQueue.getTsFileInsertionEventCount();
+  }
+
+  public Integer getTabletInsertionEventCount() {
+    return inputPendingQueue.getTabletInsertionEventCount();
+  }
+
+  public Integer getPipeHeartbeatEventCount() {
+    return inputPendingQueue.getPipeHeartbeatEventCount();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
index 1c07e78f029..76fa00cd652 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -86,7 +86,7 @@ public class PipeConnectorSubtaskManager {
           new BoundedBlockingPendingQueue<>(
               PipeConfig.getInstance().getPipeConnectorPendingQueueSize());
 
-      for (int i = 0; i < connectorNum; i++) {
+      for (int connectorIndex = 0; connectorIndex < connectorNum; 
connectorIndex++) {
         final PipeConnector pipeConnector =
             CONNECTOR_CONSTRUCTORS
                 .getOrDefault(
@@ -110,7 +110,13 @@ public class PipeConnectorSubtaskManager {
         final PipeConnectorSubtask pipeConnectorSubtask =
             new PipeConnectorSubtask(
                 String.format(
-                    "%s_%s_%s", attributeSortedString, 
pipeRuntimeEnvironment.getCreationTime(), i),
+                    "%s_%s_%s",
+                    attributeSortedString,
+                    pipeRuntimeEnvironment.getCreationTime(),
+                    connectorIndex),
+                pipeRuntimeEnvironment.getCreationTime(),
+                attributeSortedString,
+                connectorIndex,
                 pendingQueue,
                 pipeConnector);
         final PipeConnectorSubtaskLifeCycle pipeConnectorSubtaskLifeCycle =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index d20d0731b7b..35fdac641fa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -49,12 +49,21 @@ public class PipeProcessorSubtask extends PipeSubtask {
   private final PipeProcessor pipeProcessor;
   private final PipeEventCollector outputEventCollector;
 
+  // Record these variables to provide corresponding value to tag key of 
monitoring metrics
+  private final String pipeName;
+  private final int dataRegionId;
+
   public PipeProcessorSubtask(
       String taskID,
+      long creationTime,
+      String pipeName,
+      int dataRegionId,
       EventSupplier inputEventSupplier,
       PipeProcessor pipeProcessor,
       PipeEventCollector outputEventCollector) {
-    super(taskID);
+    super(taskID, creationTime);
+    this.pipeName = pipeName;
+    this.dataRegionId = dataRegionId;
     this.inputEventSupplier = inputEventSupplier;
     this.pipeProcessor = pipeProcessor;
     this.outputEventCollector = outputEventCollector;
@@ -180,6 +189,16 @@ public class PipeProcessorSubtask extends PipeSubtask {
     return taskID.hashCode();
   }
 
+  //////////////////////////// APIs provided for metric framework 
////////////////////////////
+
+  public String getPipeName() {
+    return pipeName;
+  }
+
+  public int getDataRegionId() {
+    return dataRegionId;
+  }
+
   public int getTabletInsertionEventCount() {
     return outputEventCollector.getTabletInsertionEventCount();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index da651cf4322..ecb4bb3483a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -83,6 +83,7 @@ public class WALInsertNodeCache {
             .weigher(
                 (Weigher<WALEntryPosition, Pair<ByteBuffer, InsertNode>>)
                     (position, pair) -> position.getSize())
+            .recordStats()
             .build(new WALInsertNodeCacheLoader());
     PipeWALInsertNodeCacheMetrics.getInstance().register(this, dataRegionId);
   }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeConnectorSubtaskExecutorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeConnectorSubtaskExecutorTest.java
index dbc66e9a099..9dc36a14667 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeConnectorSubtaskExecutorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeConnectorSubtaskExecutorTest.java
@@ -39,6 +39,9 @@ public class PipeConnectorSubtaskExecutorTest extends 
PipeSubtaskExecutorTest {
         Mockito.spy(
             new PipeConnectorSubtask(
                 "PipeConnectorSubtaskExecutorTest",
+                System.currentTimeMillis(),
+                "TestAttributeSortedString",
+                0,
                 mock(BoundedBlockingPendingQueue.class),
                 mock(PipeConnector.class)));
   }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeProcessorSubtaskExecutorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeProcessorSubtaskExecutorTest.java
index eea84c9bb6d..8a8be617152 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeProcessorSubtaskExecutorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeProcessorSubtaskExecutorTest.java
@@ -40,6 +40,9 @@ public class PipeProcessorSubtaskExecutorTest extends 
PipeSubtaskExecutorTest {
         Mockito.spy(
             new PipeProcessorSubtask(
                 "PipeProcessorSubtaskExecutorTest",
+                System.currentTimeMillis(),
+                "TestPipe",
+                0,
                 mock(EventSupplier.class),
                 mock(PipeProcessor.class),
                 mock(PipeEventCollector.class)));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Tag.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Tag.java
index 786d5e2a68c..f3062737d92 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Tag.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Tag.java
@@ -28,7 +28,9 @@ public enum Tag {
   FROM("from"),
   STAGE("stage"),
   OPERATION("operation"),
-  INTERFACE("interface");
+  INTERFACE("interface"),
+  CREATION_TIME("creation_time"),
+  INDEX("index");
 
   final String value;
 


Reply via email to