This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ee459bdc81c Pipe: add memory / cache hit rate / resource reference 
count / tsfile epoch metrics (#11388)
ee459bdc81c is described below

commit ee459bdc81c69330433737a8b8cd1e366b96be02
Author: V_Galaxy <[email protected]>
AuthorDate: Fri Oct 27 01:26:31 2023 +0800

    Pipe: add memory / cache hit rate / resource reference count / tsfile epoch 
metrics (#11388)
    
    As title, also fix potential NPE when marking...
    
    **NOTE:** Since there is no observed logic for the end of the 
`WALInsertNodeCache` lifecycle, the `PipeWALInsertNodeCacheMetrics::deregister` 
is currently not being called.
---
 .../pipe/extractor/IoTDBDataRegionExtractor.java   |   6 +-
 .../realtime/PipeRealtimeDataRegionExtractor.java  |  13 ++
 .../pipe/extractor/realtime/epoch/TsFileEpoch.java |  26 +++-
 .../realtime/epoch/TsFileEpochManager.java         |   4 +
 .../iotdb/db/pipe/metric/PipeConnectorMetrics.java |  35 ++++--
 .../iotdb/db/pipe/metric/PipeExtractorMetrics.java |  74 +++++++++--
 .../apache/iotdb/db/pipe/metric/PipeMetrics.java   |   4 +
 .../iotdb/db/pipe/metric/PipeProcessorMetrics.java |  35 ++++--
 .../iotdb/db/pipe/metric/PipeResourceMetrics.java  |  90 ++++++++++++++
 .../pipe/metric/PipeWALInsertNodeCacheMetrics.java | 137 +++++++++++++++++++++
 .../db/pipe/resource/memory/PipeMemoryManager.java |   4 +
 .../resource/tsfile/PipeTsFileResourceManager.java |   4 +
 .../pipe/resource/wal/PipeWALResourceManager.java  |   7 ++
 .../subtask/connector/PipeConnectorSubtask.java    |   6 +-
 .../subtask/processor/PipeProcessorSubtask.java    |   6 +-
 .../dataregion/wal/utils/WALInsertNodeCache.java   |  11 +-
 .../iotdb/commons/service/metric/enums/Metric.java |   6 +
 17 files changed, 431 insertions(+), 37 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
index c0f120a0d40..ee7adb96601 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
@@ -234,11 +234,11 @@ public class IoTDBDataRegionExtractor implements 
PipeExtractor {
             : historicalExtractor.supply();
     if (Objects.nonNull(event)) {
       if (event instanceof TabletInsertionEvent) {
-        PipeExtractorMetrics.getInstance().getTabletRate(taskID).mark();
+        PipeExtractorMetrics.getInstance().markTabletEvent(taskID);
       } else if (event instanceof TsFileInsertionEvent) {
-        PipeExtractorMetrics.getInstance().getTsFileRate(taskID).mark();
+        PipeExtractorMetrics.getInstance().markTsFileEvent(taskID);
       } else if (event instanceof PipeHeartbeatEvent) {
-        PipeExtractorMetrics.getInstance().getPipeHeartbeatRate(taskID).mark();
+        PipeExtractorMetrics.getInstance().markPipeHeartbeatEvent(taskID);
       }
     }
     return event;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
index 2634424ed0d..3087a6ef41f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
@@ -57,6 +57,8 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
 
   protected final AtomicBoolean isClosed = new AtomicBoolean(false);
 
+  private String taskID;
+
   protected PipeRealtimeDataRegionExtractor() {
     // Do nothing
   }
@@ -95,6 +97,13 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
         parameters.getBooleanOrDefault(
             PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_KEY,
             
PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE);
+
+    // Metrics related to TsFileEpoch are managed in PipeExtractorMetrics. 
These metrics are
+    // indexed by the taskID of IoTDBDataRegionExtractor. To avoid 
PipeRealtimeDataRegionExtractor
+    // holding a reference to IoTDBDataRegionExtractor, the taskID should be 
constructed to
+    // match that of IoTDBDataRegionExtractor.
+    long creationTime = 
configuration.getRuntimeEnvironment().getCreationTime();
+    taskID = pipeName + "_" + dataRegionId + "_" + creationTime;
   }
 
   @Override
@@ -190,4 +199,8 @@ public abstract class PipeRealtimeDataRegionExtractor 
implements PipeExtractor {
   public int getPipeHeartbeatEventCount() {
     return pendingQueue.getPipeHeartbeatEventCount();
   }
+
+  public String getTaskID() {
+    return taskID;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpoch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpoch.java
index f57408a9058..b0ec157f016 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpoch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpoch.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.extractor.realtime.epoch;
 
 import 
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor;
+import org.apache.iotdb.db.pipe.metric.PipeExtractorMetrics;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -49,6 +50,13 @@ public class TsFileEpoch {
         .getAndUpdate(visitor::migrate);
   }
 
+  public void setExtractorsRecentProcessedTsFileEpochState() {
+    dataRegionExtractor2State.forEach(
+        (extractor, state) ->
+            PipeExtractorMetrics.getInstance()
+                .setRecentProcessedTsFileEpochState(extractor.getTaskID(), 
state.get()));
+  }
+
   @Override
   public String toString() {
     return "TsFileEpoch{"
@@ -61,9 +69,19 @@ public class TsFileEpoch {
   }
 
   public enum State {
-    EMPTY,
-    USING_TABLET,
-    USING_TSFILE,
-    USING_BOTH
+    EMPTY(0),
+    USING_TABLET(1),
+    USING_BOTH(2),
+    USING_TSFILE(3);
+
+    private final int id;
+
+    State(int id) {
+      this.id = id;
+    }
+
+    public int getId() {
+      return id;
+    }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java
index 5b92828c00c..fdd2ad85c62 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/epoch/TsFileEpochManager.java
@@ -54,6 +54,10 @@ public class TsFileEpochManager {
         });
 
     final TsFileEpoch epoch = filePath2Epoch.remove(filePath);
+    // When all data corresponding to this TsFileEpoch have been extracted, 
update the state
+    // of the extractors processing this TsFileEpoch.
+    epoch.setExtractorsRecentProcessedTsFileEpochState();
+
     LOGGER.info("All data in TsFileEpoch {} was extracted", epoch);
     return new PipeRealtimeEvent(
         event,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
index b37aa2f056e..f2dde61092c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeConnectorMetrics.java
@@ -187,7 +187,7 @@ public class PipeConnectorMetrics implements IMetricSet {
   public void deregister(String taskID) {
     synchronized (this) {
       if (!connectorMap.containsKey(taskID)) {
-        LOGGER.info(
+        LOGGER.warn(
             "Failed to deregister pipe connector metrics, 
PipeConnectorSubtask({}) does not exist",
             taskID);
         return;
@@ -199,16 +199,37 @@ public class PipeConnectorMetrics implements IMetricSet {
     }
   }
 
-  public Rate getTabletRate(String taskID) {
-    return tabletRateMap.get(taskID);
+  public void markTabletEvent(String taskID) {
+    Rate rate = tabletRateMap.get(taskID);
+    if (rate == null) {
+      LOGGER.warn(
+          "Failed to mark pipe connector tablet event, 
PipeConnectorSubtask({}) does not exist",
+          taskID);
+      return;
+    }
+    rate.mark();
   }
 
-  public Rate getTsFileRate(String taskID) {
-    return tsFileRateMap.get(taskID);
+  public void markTsFileEvent(String taskID) {
+    Rate rate = tsFileRateMap.get(taskID);
+    if (rate == null) {
+      LOGGER.warn(
+          "Failed to mark pipe connector tsfile event, 
PipeConnectorSubtask({}) does not exist",
+          taskID);
+      return;
+    }
+    rate.mark();
   }
 
-  public Rate getPipeHeartbeatRate(String taskID) {
-    return pipeHeartbeatRateMap.get(taskID);
+  public void markPipeHeartbeatEvent(String taskID) {
+    Rate rate = pipeHeartbeatRateMap.get(taskID);
+    if (rate == null) {
+      LOGGER.warn(
+          "Failed to mark pipe connector heartbeat event, 
PipeConnectorSubtask({}) does not exist",
+          taskID);
+      return;
+    }
+    rate.mark();
   }
 
   //////////////////////////// singleton ////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
index eb30a045e00..b7f5738b152 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeExtractorMetrics.java
@@ -22,8 +22,10 @@ package org.apache.iotdb.db.pipe.metric;
 import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
 import org.apache.iotdb.db.pipe.extractor.IoTDBDataRegionExtractor;
+import org.apache.iotdb.db.pipe.extractor.realtime.epoch.TsFileEpoch;
 import org.apache.iotdb.metrics.AbstractMetricService;
 import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Gauge;
 import org.apache.iotdb.metrics.type.Rate;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.metrics.utils.MetricType;
@@ -42,6 +44,8 @@ public class PipeExtractorMetrics implements IMetricSet {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeExtractorMetrics.class);
 
+  private AbstractMetricService metricService;
+
   private final Map<String, IoTDBDataRegionExtractor> extractorMap = new 
HashMap<>();
 
   private final Map<String, Rate> tabletRateMap = new ConcurrentHashMap<>();
@@ -50,7 +54,7 @@ public class PipeExtractorMetrics implements IMetricSet {
 
   private final Map<String, Rate> pipeHeartbeatRateMap = new 
ConcurrentHashMap<>();
 
-  private AbstractMetricService metricService;
+  private final Map<String, Gauge> recentProcessedTsFileEpochStateMap = new 
ConcurrentHashMap<>();
 
   //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
 
@@ -67,9 +71,11 @@ public class PipeExtractorMetrics implements IMetricSet {
   private void createMetrics(String taskID) {
     createAutoGauge(taskID);
     createRate(taskID);
+    createGauge(taskID);
   }
 
   private void createAutoGauge(String taskID) {
+    // pending event count
     metricService.createAutoGauge(
         Metric.UNPROCESSED_HISTORICAL_TSFILE_COUNT.toString(),
         MetricLevel.IMPORTANT,
@@ -124,6 +130,16 @@ public class PipeExtractorMetrics implements IMetricSet {
             taskID));
   }
 
+  private void createGauge(String taskID) {
+    recentProcessedTsFileEpochStateMap.put(
+        taskID,
+        metricService.getOrCreateGauge(
+            Metric.PIPE_EXTRACTOR_TSFILE_EPOCH_STATE.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            taskID));
+  }
+
   @Override
   public void unbindFrom(AbstractMetricService metricService) {
     ImmutableSet<String> taskIDs = ImmutableSet.copyOf(extractorMap.keySet());
@@ -138,9 +154,11 @@ public class PipeExtractorMetrics implements IMetricSet {
   private void removeMetrics(String taskID) {
     removeAutoGauge(taskID);
     removeRate(taskID);
+    removeGauge(taskID);
   }
 
   private void removeAutoGauge(String taskID) {
+    // pending event count
     metricService.remove(
         MetricType.AUTO_GAUGE,
         Metric.UNPROCESSED_HISTORICAL_TSFILE_COUNT.toString(),
@@ -184,6 +202,14 @@ public class PipeExtractorMetrics implements IMetricSet {
     pipeHeartbeatRateMap.remove(taskID);
   }
 
+  private void removeGauge(String taskID) {
+    metricService.remove(
+        MetricType.GAUGE,
+        Metric.PIPE_EXTRACTOR_TSFILE_EPOCH_STATE.toString(),
+        Tag.NAME.toString(),
+        taskID);
+  }
+
   //////////////////////////// register & deregister (pipe integration) 
////////////////////////////
 
   public void register(@NonNull IoTDBDataRegionExtractor extractor) {
@@ -199,7 +225,7 @@ public class PipeExtractorMetrics implements IMetricSet {
   public void deregister(String taskID) {
     synchronized (this) {
       if (!extractorMap.containsKey(taskID)) {
-        LOGGER.info(
+        LOGGER.warn(
             "Failed to deregister pipe extractor metrics, 
IoTDBDataRegionExtractor({}) does not exist",
             taskID);
         return;
@@ -211,16 +237,48 @@ public class PipeExtractorMetrics implements IMetricSet {
     }
   }
 
-  public Rate getTabletRate(String taskID) {
-    return tabletRateMap.get(taskID);
+  public void markTabletEvent(String taskID) {
+    Rate rate = tabletRateMap.get(taskID);
+    if (rate == null) {
+      LOGGER.warn(
+          "Failed to mark pipe extractor tablet event, 
IoTDBDataRegionExtractor({}) does not exist",
+          taskID);
+      return;
+    }
+    rate.mark();
+  }
+
+  public void markTsFileEvent(String taskID) {
+    Rate rate = tsFileRateMap.get(taskID);
+    if (rate == null) {
+      LOGGER.warn(
+          "Failed to mark pipe extractor tsfile event, 
IoTDBDataRegionExtractor({}) does not exist",
+          taskID);
+      return;
+    }
+    rate.mark();
   }
 
-  public Rate getTsFileRate(String taskID) {
-    return tsFileRateMap.get(taskID);
+  public void markPipeHeartbeatEvent(String taskID) {
+    Rate rate = pipeHeartbeatRateMap.get(taskID);
+    if (rate == null) {
+      LOGGER.warn(
+          "Failed to mark pipe extractor heartbeat event, 
IoTDBDataRegionExtractor({}) does not exist",
+          taskID);
+      return;
+    }
+    rate.mark();
   }
 
-  public Rate getPipeHeartbeatRate(String taskID) {
-    return pipeHeartbeatRateMap.get(taskID);
+  public void setRecentProcessedTsFileEpochState(String taskID, 
TsFileEpoch.State state) {
+    Gauge gauge = recentProcessedTsFileEpochStateMap.get(taskID);
+    if (gauge == null) {
+      LOGGER.warn(
+          "Failed to set recent processed tsfile epoch state, 
PipeRealtimeDataRegionExtractor({}) does not exist",
+          taskID);
+      return;
+    }
+    gauge.set(state.getId());
   }
 
   //////////////////////////// singleton ////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeMetrics.java
index 8fa6e0c1999..e437f1bd457 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeMetrics.java
@@ -33,6 +33,8 @@ public class PipeMetrics implements IMetricSet {
     PipeProcessorMetrics.getInstance().bindTo(metricService);
     PipeConnectorMetrics.getInstance().bindTo(metricService);
     PipeHeartbeatEventMetrics.getInstance().bindTo(metricService);
+    PipeWALInsertNodeCacheMetrics.getInstance().bindTo(metricService);
+    PipeResourceMetrics.getInstance().bindTo(metricService);
   }
 
   @Override
@@ -42,6 +44,8 @@ public class PipeMetrics implements IMetricSet {
     PipeProcessorMetrics.getInstance().unbindFrom(metricService);
     PipeConnectorMetrics.getInstance().unbindFrom(metricService);
     PipeHeartbeatEventMetrics.getInstance().unbindFrom(metricService);
+    PipeWALInsertNodeCacheMetrics.getInstance().unbindFrom(metricService);
+    PipeResourceMetrics.getInstance().unbindFrom(metricService);
   }
 
   //////////////////////////// singleton ////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
index 550b5bd56cd..7ca6be329a7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
@@ -187,7 +187,7 @@ public class PipeProcessorMetrics implements IMetricSet {
   public void deregister(String taskID) {
     synchronized (this) {
       if (!processorMap.containsKey(taskID)) {
-        LOGGER.info(
+        LOGGER.warn(
             "Failed to deregister pipe processor metrics, 
PipeProcessorSubtask({}) does not exist",
             taskID);
         return;
@@ -199,16 +199,37 @@ public class PipeProcessorMetrics implements IMetricSet {
     }
   }
 
-  public Rate getTabletRate(String taskID) {
-    return tabletRateMap.get(taskID);
+  public void markTabletEvent(String taskID) {
+    Rate rate = tabletRateMap.get(taskID);
+    if (rate == null) {
+      LOGGER.warn(
+          "Failed to mark pipe processor tablet event, 
PipeProcessorSubtask({}) does not exist",
+          taskID);
+      return;
+    }
+    rate.mark();
   }
 
-  public Rate getTsFileRate(String taskID) {
-    return tsFileRateMap.get(taskID);
+  public void markTsFileEvent(String taskID) {
+    Rate rate = tsFileRateMap.get(taskID);
+    if (rate == null) {
+      LOGGER.warn(
+          "Failed to mark pipe processor tsfile event, 
PipeProcessorSubtask({}) does not exist",
+          taskID);
+      return;
+    }
+    rate.mark();
   }
 
-  public Rate getPipeHeartbeatRate(String taskID) {
-    return pipeHeartbeatRateMap.get(taskID);
+  public void markPipeHeartbeatEvent(String taskID) {
+    Rate rate = pipeHeartbeatRateMap.get(taskID);
+    if (rate == null) {
+      LOGGER.warn(
+          "Failed to mark pipe processor heartbeat event, 
PipeProcessorSubtask({}) does not exist",
+          taskID);
+      return;
+    }
+    rate.mark();
   }
 
   //////////////////////////// singleton ////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeResourceMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeResourceMetrics.java
new file mode 100644
index 00000000000..eb777dfbdb1
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeResourceMetrics.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
+import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
+import org.apache.iotdb.db.pipe.resource.wal.PipeWALResourceManager;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class PipeResourceMetrics implements IMetricSet {
+
+  //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
+
+  @Override
+  public void bindTo(AbstractMetricService metricService) {
+    // pipe memory related
+    metricService.createAutoGauge(
+        Metric.PIPE_MEM_COST.toString(),
+        MetricLevel.IMPORTANT,
+        PipeResourceManager.memory(),
+        PipeMemoryManager::getUsedMemorySizeInBytes);
+    metricService.createAutoGauge(
+        Metric.PIPE_MEM_USAGE.toString(),
+        MetricLevel.IMPORTANT,
+        PipeResourceManager.memory(),
+        PipeMemoryManager::getMemoryUsage);
+    // resource reference count
+    metricService.createAutoGauge(
+        Metric.PIPE_PINNED_MEMTABLE_COUNT.toString(),
+        MetricLevel.IMPORTANT,
+        PipeResourceManager.wal(),
+        PipeWALResourceManager::getPinnedWalCount);
+    metricService.createAutoGauge(
+        Metric.PIPE_LINKED_TSFILE_COUNT.toString(),
+        MetricLevel.IMPORTANT,
+        PipeResourceManager.tsfile(),
+        PipeTsFileResourceManager::getLinkedTsfileCount);
+  }
+
+  @Override
+  public void unbindFrom(AbstractMetricService metricService) {
+    // pipe memory related
+    metricService.remove(MetricType.AUTO_GAUGE, 
Metric.PIPE_MEM_COST.toString());
+    metricService.remove(MetricType.AUTO_GAUGE, 
Metric.PIPE_MEM_USAGE.toString());
+    // resource reference count
+    metricService.remove(MetricType.AUTO_GAUGE, 
Metric.PIPE_PINNED_MEMTABLE_COUNT.toString());
+    metricService.remove(MetricType.AUTO_GAUGE, 
Metric.PIPE_LINKED_TSFILE_COUNT.toString());
+  }
+
+  //////////////////////////// singleton ////////////////////////////
+
+  private static class PipeResourceMetricsHolder {
+
+    private static final PipeResourceMetrics INSTANCE = new 
PipeResourceMetrics();
+
+    private PipeResourceMetricsHolder() {
+      // empty constructor
+    }
+  }
+
+  public static PipeResourceMetrics getInstance() {
+    return PipeResourceMetrics.PipeResourceMetricsHolder.INSTANCE;
+  }
+
+  private PipeResourceMetrics() {
+    // empty constructor
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeWALInsertNodeCacheMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeWALInsertNodeCacheMetrics.java
new file mode 100644
index 00000000000..2f246281f82
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeWALInsertNodeCacheMetrics.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import 
org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALInsertNodeCache;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class PipeWALInsertNodeCacheMetrics implements IMetricSet {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeWALInsertNodeCacheMetrics.class);
+
+  private volatile AbstractMetricService metricService;
+
+  private final Map<Integer, WALInsertNodeCache> cacheMap = new 
ConcurrentHashMap<>();
+
+  //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
+
+  @Override
+  public void bindTo(AbstractMetricService metricService) {
+    this.metricService = metricService;
+    ImmutableSet<Integer> dataRegionIds = 
ImmutableSet.copyOf(cacheMap.keySet());
+    for (Integer dataRegionId : dataRegionIds) {
+      createMetrics(dataRegionId);
+    }
+  }
+
+  private void createMetrics(Integer dataRegionId) {
+    createAutoGauge(dataRegionId);
+  }
+
+  private void createAutoGauge(Integer dataRegionId) {
+    metricService.createAutoGauge(
+        Metric.PIPE_WAL_INSERT_NODE_CACHE_HIT_RATE.toString(),
+        MetricLevel.IMPORTANT,
+        cacheMap.get(dataRegionId),
+        WALInsertNodeCache::getCacheHitRate,
+        Tag.REGION.toString(),
+        String.valueOf(dataRegionId));
+  }
+
+  @Override
+  public void unbindFrom(AbstractMetricService metricService) {
+    ImmutableSet<Integer> dataRegionIds = 
ImmutableSet.copyOf(cacheMap.keySet());
+    for (Integer dataRegionId : dataRegionIds) {
+      deregister(dataRegionId);
+    }
+    if (!cacheMap.isEmpty()) {
+      LOGGER.warn("Failed to unbind from wal insert node cache metrics, cache 
map not empty");
+    }
+  }
+
+  private void removeMetrics(Integer dataRegionId) {
+    removeAutoGauge(dataRegionId);
+  }
+
+  private void removeAutoGauge(Integer dataRegionId) {
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.PIPE_WAL_INSERT_NODE_CACHE_HIT_RATE.toString(),
+        Tag.REGION.toString(),
+        String.valueOf(dataRegionId));
+  }
+
+  //////////////////////////// register & deregister (pipe integration) 
////////////////////////////
+
+  public void register(@NonNull WALInsertNodeCache walInsertNodeCache, Integer 
dataRegionId) {
+    cacheMap.putIfAbsent(dataRegionId, walInsertNodeCache);
+    if (Objects.nonNull(metricService)) {
+      createMetrics(dataRegionId);
+    }
+  }
+
+  public void deregister(Integer dataRegionId) {
+    // TODO: waiting called by WALInsertNodeCache
+    if (!cacheMap.containsKey(dataRegionId)) {
+      LOGGER.warn(
+          "Failed to deregister wal insert node cache metrics, 
WALInsertNodeCache({}) does not exist",
+          dataRegionId);
+      return;
+    }
+    if (Objects.nonNull(metricService)) {
+      removeMetrics(dataRegionId);
+    }
+    cacheMap.remove(dataRegionId);
+  }
+
+  //////////////////////////// singleton ////////////////////////////
+
+  private static class PipeWALInsertNodeCacheMetricsHolder {
+
+    private static final PipeWALInsertNodeCacheMetrics INSTANCE =
+        new PipeWALInsertNodeCacheMetrics();
+
+    private PipeWALInsertNodeCacheMetricsHolder() {
+      // empty constructor
+    }
+  }
+
+  public static PipeWALInsertNodeCacheMetrics getInstance() {
+    return PipeWALInsertNodeCacheMetricsHolder.INSTANCE;
+  }
+
+  private PipeWALInsertNodeCacheMetrics() {
+    // empty constructor
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index 19a6f80246c..86a8020b7f3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -85,4 +85,8 @@ public class PipeMemoryManager {
   public long getTotalMemorySizeInBytes() {
     return TOTAL_MEMORY_SIZE_IN_BYTES;
   }
+
+  public double getMemoryUsage() {
+    return (double) usedMemorySizeInBytes / TOTAL_MEMORY_SIZE_IN_BYTES;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index 9839d793a25..73c0240dd76 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -187,4 +187,8 @@ public class PipeTsFileResourceManager {
   public synchronized void unpinTsFileResource(TsFileResource resource) throws 
IOException {
     
decreaseFileReference(getHardlinkOrCopiedFileInPipeDir(resource.getTsFile()));
   }
+
+  public int getLinkedTsfileCount() {
+    return hardlinkOrCopiedFileToReferenceMap.size();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
index f8202372554..1b3dff90fd8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -120,4 +121,10 @@ public abstract class PipeWALResourceManager {
 
   protected abstract void unpinInternal(long memtableId, WALEntryHandler 
walEntryHandler)
       throws IOException;
+
+  public int getPinnedWalCount() {
+    return Objects.nonNull(memtableIdToPipeWALResourceMap)
+        ? memtableIdToPipeWALResourceMap.size()
+        : 0;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index 26149d791a2..05119db0ca2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -118,10 +118,10 @@ public class PipeConnectorSubtask extends PipeSubtask {
     try {
       if (event instanceof TabletInsertionEvent) {
         outputPipeConnector.transfer((TabletInsertionEvent) event);
-        PipeConnectorMetrics.getInstance().getTabletRate(taskID).mark();
+        PipeConnectorMetrics.getInstance().markTabletEvent(taskID);
       } else if (event instanceof TsFileInsertionEvent) {
         outputPipeConnector.transfer((TsFileInsertionEvent) event);
-        PipeConnectorMetrics.getInstance().getTsFileRate(taskID).mark();
+        PipeConnectorMetrics.getInstance().markTsFileEvent(taskID);
       } else if (event instanceof PipeHeartbeatEvent) {
         try {
           outputPipeConnector.heartbeat();
@@ -132,7 +132,7 @@ public class PipeConnectorSubtask extends PipeSubtask {
               e);
         }
         ((PipeHeartbeatEvent) event).onTransferred();
-        PipeConnectorMetrics.getInstance().getPipeHeartbeatRate(taskID).mark();
+        PipeConnectorMetrics.getInstance().markPipeHeartbeatEvent(taskID);
       } else {
         outputPipeConnector.transfer(event);
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 0cc38fda9d8..d20d0731b7b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -106,14 +106,14 @@ public class PipeProcessorSubtask extends PipeSubtask {
       if (!isClosed.get()) {
         if (event instanceof TabletInsertionEvent) {
           pipeProcessor.process((TabletInsertionEvent) event, 
outputEventCollector);
-          PipeProcessorMetrics.getInstance().getTabletRate(taskID).mark();
+          PipeProcessorMetrics.getInstance().markTabletEvent(taskID);
         } else if (event instanceof TsFileInsertionEvent) {
           pipeProcessor.process((TsFileInsertionEvent) event, 
outputEventCollector);
-          PipeProcessorMetrics.getInstance().getTsFileRate(taskID).mark();
+          PipeProcessorMetrics.getInstance().markTsFileEvent(taskID);
         } else if (event instanceof PipeHeartbeatEvent) {
           pipeProcessor.process(event, outputEventCollector);
           ((PipeHeartbeatEvent) event).onProcessed();
-          
PipeProcessorMetrics.getInstance().getPipeHeartbeatRate(taskID).mark();
+          PipeProcessorMetrics.getInstance().markPipeHeartbeatEvent(taskID);
         } else {
           pipeProcessor.process(event, outputEventCollector);
         }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index 31f5694d0e4..d60b964c9ac 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -22,6 +22,7 @@ package 
org.apache.iotdb.db.storageengine.dataregion.wal.utils;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.pipe.metric.PipeWALInsertNodeCacheMetrics;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
@@ -44,6 +45,7 @@ import java.nio.channels.FileChannel;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -61,7 +63,7 @@ public class WALInsertNodeCache {
 
   private volatile boolean hasPipeRunning = false;
 
-  private WALInsertNodeCache() {
+  private WALInsertNodeCache(Integer dataRegionId) {
     // TODO: try allocate memory 2 * config.getWalFileSizeThresholdInByte() 
for the cache
     // If allocate memory failed, disable batch load
     isBatchLoadEnabled = true;
@@ -72,6 +74,7 @@ public class WALInsertNodeCache {
                 (Weigher<WALEntryPosition, Pair<ByteBuffer, InsertNode>>)
                     (position, pair) -> position.getSize())
             .build(new WALInsertNodeCacheLoader());
+    PipeWALInsertNodeCacheMetrics.getInstance().register(this, dataRegionId);
   }
 
   /////////////////////////// Getter & Setter ///////////////////////////
@@ -157,6 +160,10 @@ public class WALInsertNodeCache {
     }
   }
 
+  public double getCacheHitRate() {
+    return Objects.nonNull(lruCache) ? lruCache.stats().hitRate() : 0;
+  }
+
   /////////////////////////// MemTable ///////////////////////////
 
   public void addMemTable(long memTableId) {
@@ -252,7 +259,7 @@ public class WALInsertNodeCache {
     private static final Map<Integer, WALInsertNodeCache> INSTANCE_MAP = new 
ConcurrentHashMap<>();
 
     public static WALInsertNodeCache getOrCreateInstance(Integer key) {
-      return INSTANCE_MAP.computeIfAbsent(key, k -> new WALInsertNodeCache());
+      return INSTANCE_MAP.computeIfAbsent(key, k -> new 
WALInsertNodeCache(key));
     }
 
     private InstanceHolder() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index 77455bffa8b..18cc8955a64 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -118,6 +118,12 @@ public enum Metric {
   PIPE_CONNECTOR_TSFILE_TRANSFER("pipe_connector_tsfile_transfer"),
   PIPE_CONNECTOR_HEARTBEAT_TRANSFER("pipe_connector_heartbeat_transfer"),
   PIPE_HEARTBEAT_EVENT("pipe_heartbeat_event"),
+  PIPE_WAL_INSERT_NODE_CACHE_HIT_RATE("pipe_wal_insert_node_cache_hit_rate"),
+  PIPE_EXTRACTOR_TSFILE_EPOCH_STATE("pipe_extractor_tsfile_epoch_state"),
+  PIPE_MEM_COST("pipe_mem_cost"),
+  PIPE_MEM_USAGE("pipe_mem_usage"),
+  PIPE_PINNED_MEMTABLE_COUNT("pipe_pinned_memtable_count"),
+  PIPE_LINKED_TSFILE_COUNT("pipe_linked_tsfile_count"),
   ;
 
   final String value;


Reply via email to