This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 6c92f630e4c Pipe: Add metrics for tsfile to tablets invocation call 
count and time (#15202) & Pipe: Prevent Duplicate Time Usage Reporting on Close 
in PipeTsFileToTabletsMetrics (#15220) (#15224)
6c92f630e4c is described below

commit 6c92f630e4cbe77437390b72829e0700dcf10248
Author: nanxiang xia <[email protected]>
AuthorDate: Tue Apr 1 15:32:43 2025 +0800

    Pipe: Add metrics for tsfile to tablets invocation call count and time 
(#15202) & Pipe: Prevent Duplicate Time Usage Reporting on Close in 
PipeTsFileToTabletsMetrics (#15220) (#15224)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |  11 +-
 .../common/tsfile/PipeTsFileInsertionEvent.java    |   9 +-
 .../container/TsFileInsertionDataContainer.java    |  23 +++
 .../TsFileInsertionDataContainerProvider.java      |  23 ++-
 .../query/TsFileInsertionQueryDataContainer.java   |  19 ++-
 .../scan/TsFileInsertionScanDataContainer.java     |  15 +-
 .../dataregion/IoTDBDataRegionExtractor.java       |   2 +
 .../iotdb/db/pipe/metric/PipeDataNodeMetrics.java  |   3 +
 .../overview/PipeTsFileToTabletsMetrics.java       | 170 +++++++++++++++++++++
 .../iotdb/commons/service/metric/enums/Metric.java |   2 +
 10 files changed, 264 insertions(+), 13 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index f521698b87c..6714ef5e065 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -48,6 +48,7 @@ import 
org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener;
 import 
org.apache.iotdb.db.pipe.extractor.schemaregion.SchemaRegionListeningFilter;
 import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
 import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -317,8 +318,9 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
       return false;
     }
 
-    PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
-        .deregister(pipeName + "_" + creationTime);
+    final String taskId = pipeName + "_" + creationTime;
+    PipeTsFileToTabletsMetrics.getInstance().deregister(taskId);
+    PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(taskId);
 
     return true;
   }
@@ -334,8 +336,9 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
 
     if (Objects.nonNull(pipeMeta)) {
       final long creationTime = pipeMeta.getStaticMeta().getCreationTime();
-      PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
-          .deregister(pipeName + "_" + creationTime);
+      final String taskId = pipeName + "_" + creationTime;
+      PipeTsFileToTabletsMetrics.getInstance().deregister(taskId);
+      
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(taskId);
     }
 
     return true;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 4f9823709a3..edbad61b631 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -495,7 +495,14 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
       dataContainer.compareAndSet(
           null,
           new TsFileInsertionDataContainerProvider(
-                  tsFile, pipePattern, startTime, endTime, pipeTaskMeta, this)
+                  pipeName,
+                  creationTime,
+                  tsFile,
+                  pipePattern,
+                  startTime,
+                  endTime,
+                  pipeTaskMeta,
+                  this)
               .provide());
       return dataContainer.get();
     } catch (final IOException e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
index 66a6ebb0d25..16ca1aff79d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
@@ -22,6 +22,7 @@ package 
org.apache.iotdb.db.pipe.event.common.tsfile.container;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -38,22 +39,33 @@ public abstract class TsFileInsertionDataContainer 
implements AutoCloseable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TsFileInsertionDataContainer.class);
 
+  protected final String pipeName;
+  protected final long creationTime;
+
   protected final PipePattern pattern; // used to filter data
   protected final GlobalTimeExpression timeFilterExpression; // used to filter 
data
 
   protected final PipeTaskMeta pipeTaskMeta; // used to report progress
   protected final EnrichedEvent sourceEvent; // used to report progress
 
+  protected final long initialTimeNano = System.nanoTime();
+  protected boolean timeUsageReported = false;
+
   protected final PipeMemoryBlock allocatedMemoryBlockForTablet;
 
   protected TsFileSequenceReader tsFileSequenceReader;
 
   protected TsFileInsertionDataContainer(
+      final String pipeName,
+      final long creationTime,
       final PipePattern pattern,
       final long startTime,
       final long endTime,
       final PipeTaskMeta pipeTaskMeta,
       final EnrichedEvent sourceEvent) {
+    this.pipeName = pipeName;
+    this.creationTime = creationTime;
+
     this.pattern = pattern;
     timeFilterExpression =
         (startTime == Long.MIN_VALUE && endTime == Long.MAX_VALUE)
@@ -75,6 +87,17 @@ public abstract class TsFileInsertionDataContainer 
implements AutoCloseable {
 
   @Override
   public void close() {
+    try {
+      if (pipeName != null && !timeUsageReported) {
+        PipeTsFileToTabletsMetrics.getInstance()
+            .recordTsFileToTabletTime(
+                pipeName + "_" + creationTime, System.nanoTime() - 
initialTimeNano);
+        timeUsageReported = true;
+      }
+    } catch (final Exception e) {
+      LOGGER.warn("Failed to report time usage for parsing tsfile for pipe 
{}", pipeName, e);
+    }
+
     try {
       if (tsFileSequenceReader != null) {
         tsFileSequenceReader.close();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainerProvider.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainerProvider.java
index 5c8b87f9b20..a5af5aec4f1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainerProvider.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainerProvider.java
@@ -26,6 +26,7 @@ import 
org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.query.TsFileInsertionQueryDataContainer;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
+import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResource;
 
@@ -40,6 +41,9 @@ import java.util.stream.Collectors;
 
 public class TsFileInsertionDataContainerProvider {
 
+  private final String pipeName;
+  private final long creationTime;
+
   private final File tsFile;
   private final PipePattern pattern;
   private final long startTime;
@@ -49,12 +53,16 @@ public class TsFileInsertionDataContainerProvider {
   protected final PipeTsFileInsertionEvent sourceEvent;
 
   public TsFileInsertionDataContainerProvider(
+      final String pipeName,
+      final long creationTime,
       final File tsFile,
       final PipePattern pipePattern,
       final long startTime,
       final long endTime,
       final PipeTaskMeta pipeTaskMeta,
       final PipeTsFileInsertionEvent sourceEvent) {
+    this.pipeName = pipeName;
+    this.creationTime = creationTime;
     this.tsFile = tsFile;
     this.pattern = pipePattern;
     this.startTime = startTime;
@@ -64,12 +72,17 @@ public class TsFileInsertionDataContainerProvider {
   }
 
   public TsFileInsertionDataContainer provide() throws IOException {
+    if (pipeName != null) {
+      PipeTsFileToTabletsMetrics.getInstance()
+          .markTsFileToTabletInvocation(pipeName + "_" + creationTime);
+    }
+
     // Use scan container to save memory
     if ((double) 
PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes()
             / PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes()
         > PipeTsFileResource.MEMORY_SUFFICIENT_THRESHOLD) {
       return new TsFileInsertionScanDataContainer(
-          tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent);
+          pipeName, creationTime, tsFile, pattern, startTime, endTime, 
pipeTaskMeta, sourceEvent);
     }
 
     if (pattern instanceof IoTDBPipePattern
@@ -81,7 +94,7 @@ public class TsFileInsertionDataContainerProvider {
       // hard to know whether it only matches one timeseries, while matching 
multiple is often the
       // case.
       return new TsFileInsertionQueryDataContainer(
-          tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent);
+          pipeName, creationTime, tsFile, pattern, startTime, endTime, 
pipeTaskMeta, sourceEvent);
     }
 
     final Map<IDeviceID, Boolean> deviceIsAlignedMap =
@@ -90,7 +103,7 @@ public class TsFileInsertionDataContainerProvider {
       // If we failed to get from cache, it indicates that the memory usage is 
high.
       // We use scan data container because it requires less memory.
       return new TsFileInsertionScanDataContainer(
-          tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent);
+          pipeName, creationTime, tsFile, pattern, startTime, endTime, 
pipeTaskMeta, sourceEvent);
     }
 
     final int originalSize = deviceIsAlignedMap.size();
@@ -100,8 +113,10 @@ public class TsFileInsertionDataContainerProvider {
     return (double) filteredDeviceIsAlignedMap.size() / originalSize
             > PipeConfig.getInstance().getPipeTsFileScanParsingThreshold()
         ? new TsFileInsertionScanDataContainer(
-            tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent)
+            pipeName, creationTime, tsFile, pattern, startTime, endTime, 
pipeTaskMeta, sourceEvent)
         : new TsFileInsertionQueryDataContainer(
+            pipeName,
+            creationTime,
             tsFile,
             pattern,
             startTime,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
index d5106b3f135..d020228863a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
@@ -72,10 +72,12 @@ public class TsFileInsertionQueryDataContainer extends 
TsFileInsertionDataContai
   public TsFileInsertionQueryDataContainer(
       final File tsFile, final PipePattern pattern, final long startTime, 
final long endTime)
       throws IOException {
-    this(tsFile, pattern, startTime, endTime, null, null);
+    this(null, 0, tsFile, pattern, startTime, endTime, null, null);
   }
 
   public TsFileInsertionQueryDataContainer(
+      final String pipeName,
+      final long creationTime,
       final File tsFile,
       final PipePattern pattern,
       final long startTime,
@@ -83,10 +85,21 @@ public class TsFileInsertionQueryDataContainer extends 
TsFileInsertionDataContai
       final PipeTaskMeta pipeTaskMeta,
       final EnrichedEvent sourceEvent)
       throws IOException {
-    this(tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent, null);
+    this(
+        pipeName,
+        creationTime,
+        tsFile,
+        pattern,
+        startTime,
+        endTime,
+        pipeTaskMeta,
+        sourceEvent,
+        null);
   }
 
   public TsFileInsertionQueryDataContainer(
+      final String pipeName,
+      final long creationTime,
       final File tsFile,
       final PipePattern pattern,
       final long startTime,
@@ -95,7 +108,7 @@ public class TsFileInsertionQueryDataContainer extends 
TsFileInsertionDataContai
       final EnrichedEvent sourceEvent,
       final Map<IDeviceID, Boolean> deviceIsAlignedMap)
       throws IOException {
-    super(pattern, startTime, endTime, pipeTaskMeta, sourceEvent);
+    super(pipeName, creationTime, pattern, startTime, endTime, pipeTaskMeta, 
sourceEvent);
 
     try {
       final PipeTsFileResourceManager tsFileResourceManager = 
PipeDataNodeResourceManager.tsfile();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
index 318e70b5605..eca344eaeca 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
@@ -90,6 +90,8 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
   private byte lastMarker = Byte.MIN_VALUE;
 
   public TsFileInsertionScanDataContainer(
+      final String pipeName,
+      final long creationTime,
       final File tsFile,
       final PipePattern pattern,
       final long startTime,
@@ -97,7 +99,7 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
       final PipeTaskMeta pipeTaskMeta,
       final EnrichedEvent sourceEvent)
       throws IOException {
-    super(pattern, startTime, endTime, pipeTaskMeta, sourceEvent);
+    super(pipeName, creationTime, pattern, startTime, endTime, pipeTaskMeta, 
sourceEvent);
 
     this.startTime = startTime;
     this.endTime = endTime;
@@ -118,6 +120,17 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
     }
   }
 
+  public TsFileInsertionScanDataContainer(
+      final File tsFile,
+      final PipePattern pattern,
+      final long startTime,
+      final long endTime,
+      final PipeTaskMeta pipeTaskMeta,
+      final EnrichedEvent sourceEvent)
+      throws IOException {
+    this(null, 0, tsFile, pattern, startTime, endTime, pipeTaskMeta, 
sourceEvent);
+  }
+
   @Override
   public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
     return () ->
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
index 7a5fde4612c..b0ccea5cc43 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java
@@ -35,6 +35,7 @@ import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRe
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionLogExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionTsFileExtractor;
 import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
 import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
@@ -343,6 +344,7 @@ public class IoTDBDataRegionExtractor extends 
IoTDBExtractor {
 
     // register metric after generating taskID
     PipeDataRegionExtractorMetrics.getInstance().register(this);
+    PipeTsFileToTabletsMetrics.getInstance().register(this);
     PipeDataNodeRemainingEventAndTimeMetrics.getInstance().register(this);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
index 004b1bccf77..3f03ce580fd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeMetrics.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.commons.pipe.metric.PipeEventCommitMetrics;
 import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
 import org.apache.iotdb.db.pipe.metric.overview.PipeHeartbeatEventMetrics;
 import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics;
+import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
 import org.apache.iotdb.db.pipe.metric.overview.PipeWALInsertNodeCacheMetrics;
 import org.apache.iotdb.db.pipe.metric.processor.PipeProcessorMetrics;
 import org.apache.iotdb.db.pipe.metric.receiver.PipeDataNodeReceiverMetrics;
@@ -54,6 +55,7 @@ public class PipeDataNodeMetrics implements IMetricSet {
     PipeSchemaRegionConnectorMetrics.getInstance().bindTo(metricService);
     
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().bindTo(metricService);
     PipeDataNodeReceiverMetrics.getInstance().bindTo(metricService);
+    PipeTsFileToTabletsMetrics.getInstance().bindTo(metricService);
   }
 
   @Override
@@ -71,6 +73,7 @@ public class PipeDataNodeMetrics implements IMetricSet {
     PipeSchemaRegionConnectorMetrics.getInstance().unbindFrom(metricService);
     
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().unbindFrom(metricService);
     PipeDataNodeReceiverMetrics.getInstance().unbindFrom(metricService);
+    PipeTsFileToTabletsMetrics.getInstance().unbindFrom(metricService);
   }
 
   //////////////////////////// singleton ////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java
new file mode 100644
index 00000000000..04015fe6f82
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.metric.overview;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Rate;
+import org.apache.iotdb.metrics.type.Timer;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+public class PipeTsFileToTabletsMetrics implements IMetricSet {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTsFileToTabletsMetrics.class);
+
+  @SuppressWarnings("java:S3077")
+  private volatile AbstractMetricService metricService;
+
+  private final ConcurrentSkipListSet<String> pipe = new 
ConcurrentSkipListSet<>();
+  private final Map<String, Timer> pipeTimerMap = new ConcurrentHashMap<>();
+  private final Map<String, Rate> pipeRateMap = new ConcurrentHashMap<>();
+
+  //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
+
+  @Override
+  public void bindTo(final AbstractMetricService metricService) {
+    this.metricService = metricService;
+    ImmutableSet.copyOf(pipe).forEach(this::createMetrics);
+  }
+
+  private void createMetrics(final String pipeID) {
+    pipeTimerMap.putIfAbsent(
+        pipeID,
+        metricService.getOrCreateTimer(
+            Metric.PIPE_TSFILE_TO_TABLETS_TIME.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            pipeID));
+    pipeRateMap.putIfAbsent(
+        pipeID,
+        metricService.getOrCreateRate(
+            Metric.PIPE_TSFILE_TO_TABLETS_RATE.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            pipeID));
+  }
+
+  @Override
+  public void unbindFrom(final AbstractMetricService metricService) {
+    ImmutableSet.copyOf(pipe).forEach(this::deregister);
+    if (!pipe.isEmpty()) {
+      LOGGER.warn(
+          "Failed to unbind from pipe tsfile to tablets metrics, pipe map is 
not empty, pipe: {}",
+          pipe);
+    }
+  }
+
+  private void removeMetrics(final String pipeID) {
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.PIPE_TSFILE_TO_TABLETS_TIME.toString(),
+        Tag.NAME.toString(),
+        pipeID);
+    pipeTimerMap.remove(pipeID);
+
+    metricService.remove(
+        MetricType.RATE,
+        Metric.PIPE_TSFILE_TO_TABLETS_RATE.toString(),
+        Tag.NAME.toString(),
+        pipeID);
+    pipeRateMap.remove(pipeID);
+  }
+
+  //////////////////////////// register & deregister 
////////////////////////////
+
+  public void register(final IoTDBDataRegionExtractor extractor) {
+    final String pipeID = extractor.getPipeName() + "_" + 
extractor.getCreationTime();
+    pipe.add(pipeID);
+    if (Objects.nonNull(metricService)) {
+      createMetrics(pipeID);
+    }
+  }
+
+  public void deregister(final String pipeID) {
+    if (!pipe.contains(pipeID)) {
+      LOGGER.warn(
+          "Failed to deregister pipe tsfile to tablets metrics, pipeID({}) 
does not exist", pipeID);
+      return;
+    }
+    try {
+      if (Objects.nonNull(metricService)) {
+        removeMetrics(pipeID);
+      }
+    } finally {
+      pipe.remove(pipeID);
+    }
+  }
+
+  //////////////////////////// pipe integration ////////////////////////////
+
+  public void markTsFileToTabletInvocation(final String taskID) {
+    if (Objects.isNull(metricService)) {
+      return;
+    }
+    final Rate rate = pipeRateMap.get(taskID);
+    if (rate == null) {
+      LOGGER.info(
+          "Failed to mark pipe tsfile to tablets invocation, pipeID({}) does 
not exist", taskID);
+      return;
+    }
+    rate.mark();
+  }
+
+  public void recordTsFileToTabletTime(final String taskID, long 
costTimeInNanos) {
+    if (Objects.isNull(metricService)) {
+      return;
+    }
+    final Timer timer = pipeTimerMap.get(taskID);
+    if (timer == null) {
+      LOGGER.info(
+          "Failed to record pipe tsfile to tablets time, pipeID({}) does not 
exist", taskID);
+      return;
+    }
+    timer.updateNanos(costTimeInNanos);
+  }
+
+  //////////////////////////// singleton ////////////////////////////
+
+  private static class Holder {
+
+    private static final PipeTsFileToTabletsMetrics INSTANCE = new 
PipeTsFileToTabletsMetrics();
+
+    private Holder() {
+      // Empty constructor
+    }
+  }
+
+  public static PipeTsFileToTabletsMetrics getInstance() {
+    return Holder.INSTANCE;
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index cef90d673b7..39dd9a23697 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -175,6 +175,8 @@ public enum Metric {
   PIPE_CONFIGNODE_REMAINING_TIME("pipe_confignode_remaining_time"),
   PIPE_GLOBAL_REMAINING_EVENT_COUNT("pipe_global_remaining_event_count"),
   PIPE_GLOBAL_REMAINING_TIME("pipe_global_remaining_time"),
+  PIPE_TSFILE_TO_TABLETS_TIME("pipe_tsfile_to_tablets_time"),
+  PIPE_TSFILE_TO_TABLETS_RATE("pipe_tsfile_to_tablets_rate"),
   // subscription related
   SUBSCRIPTION_UNCOMMITTED_EVENT_COUNT("subscription_uncommitted_event_count"),
   SUBSCRIPTION_CURRENT_COMMIT_ID("subscription_current_commit_id"),

Reply via email to