This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new bb96c818048 [To dev/1.3] Pipe: Add metrics for TsFile parsing to 
tablets (#16668) (#16673)
bb96c818048 is described below

commit bb96c81804851e7895a2c2d389131e99797092e3
Author: Zhenyu Luo <[email protected]>
AuthorDate: Fri Oct 31 09:51:01 2025 +0800

    [To dev/1.3] Pipe: Add metrics for TsFile parsing to tablets (#16668) 
(#16673)
---
 .../container/TsFileInsertionDataContainer.java    | 68 ++++++++++++++++++----
 .../query/TsFileInsertionQueryDataContainer.java   | 14 ++++-
 .../scan/TsFileInsertionScanDataContainer.java     | 12 +++-
 .../overview/PipeTsFileToTabletsMetrics.java       | 67 +++++++++++++++++++++
 .../iotdb/commons/service/metric/enums/Metric.java |  3 +
 5 files changed, 150 insertions(+), 14 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
index 9f779528f52..cbbfea0a5b2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
 import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
 import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -34,6 +35,7 @@ import 
org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.tsfile.read.TsFileSequenceReader;
 import org.apache.tsfile.read.expression.impl.GlobalTimeExpression;
 import org.apache.tsfile.read.filter.factory.TimeFilterApi;
+import org.apache.tsfile.write.record.Tablet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,8 +58,9 @@ public abstract class TsFileInsertionDataContainer implements 
AutoCloseable {
   protected PipeMemoryBlock allocatedMemoryBlockForModifications;
   protected PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> 
currentModifications;
 
-  protected final long initialTimeNano = System.nanoTime();
-  protected boolean timeUsageReported = false;
+  protected long parseStartTimeNano = -1;
+  protected boolean parseStartTimeRecorded = false;
+  protected boolean parseEndTimeRecorded = false;
 
   protected final PipeMemoryBlock allocatedMemoryBlockForTablet;
 
@@ -97,21 +100,62 @@ public abstract class TsFileInsertionDataContainer 
implements AutoCloseable {
    */
   public abstract Iterable<TabletInsertionEvent> toTabletInsertionEvents();
 
-  @Override
-  public void close() {
+  /**
+   * Record parse start time when hasNext() is called for the first time and 
returns true. Should be
+   * called in Iterator.hasNext() when it's the first call.
+   */
+  protected void recordParseStartTime() {
+    if (pipeName == null || parseStartTimeRecorded) {
+      return;
+    }
+    parseStartTimeNano = System.nanoTime();
+    parseStartTimeRecorded = true;
+  }
 
-    tabletInsertionIterable = null;
+  /**
+   * Record parse end time when hasNext() is called and returns false (last 
call). Should be called
+   * in Iterator.hasNext() when it returns false.
+   */
+  protected void recordParseEndTime() {
+    if (pipeName == null || !parseStartTimeRecorded || parseEndTimeRecorded) {
+      return;
+    }
+    try {
+      final long parseEndTimeNano = System.nanoTime();
+      final long totalTimeNanos = parseEndTimeNano - parseStartTimeNano;
+      final String taskID = pipeName + "_" + creationTime;
+      
PipeTsFileToTabletsMetrics.getInstance().recordTsFileToTabletTime(taskID, 
totalTimeNanos);
+      parseEndTimeRecorded = true;
+    } catch (final Exception e) {
+      LOGGER.warn("Failed to record parse end time for pipe {}", pipeName, e);
+    }
+  }
 
+  /**
+   * Record metrics when a tablet is generated. Should be called by subclasses 
when generating
+   * tablets.
+   *
+   * @param tablet the generated tablet
+   */
+  protected void recordTabletMetrics(final Tablet tablet) {
+    if (pipeName == null || tablet == null) {
+      return;
+    }
     try {
-      if (pipeName != null && !timeUsageReported) {
-        PipeTsFileToTabletsMetrics.getInstance()
-            .recordTsFileToTabletTime(
-                pipeName + "_" + creationTime, System.nanoTime() - 
initialTimeNano);
-        timeUsageReported = true;
-      }
+      final String taskID = pipeName + "_" + creationTime;
+      final long tabletMemorySize = 
PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet);
+      PipeTsFileToTabletsMetrics.getInstance().recordTabletGenerated(taskID, 
tabletMemorySize);
     } catch (final Exception e) {
-      LOGGER.warn("Failed to report time usage for parsing tsfile for pipe 
{}", pipeName, e);
+      LOGGER.warn("Failed to record tablet metrics for pipe {}", pipeName, e);
     }
+  }
+
+  @Override
+  public void close() {
+
+    tabletInsertionIterable = null;
+
+    // Time recording is now handled in Iterator.hasNext(), no need to record 
here
 
     try {
       if (tsFileSequenceReader != null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
index 4a0c2be0bf0..15732fd0e8d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
@@ -353,8 +353,13 @@ public class TsFileInsertionQueryDataContainer extends 
TsFileInsertionDataContai
 
                 @Override
                 public boolean hasNext() {
+                  boolean hasNext = false;
                   while (tabletIterator == null || !tabletIterator.hasNext()) {
                     if (!deviceMeasurementsMapIterator.hasNext()) {
+                      // Record end time when no more data
+                      if (parseStartTimeRecorded && !parseEndTimeRecorded) {
+                        recordParseEndTime();
+                      }
                       close();
                       return false;
                     }
@@ -379,7 +384,12 @@ public class TsFileInsertionQueryDataContainer extends 
TsFileInsertionDataContai
                     }
                   }
 
-                  return true;
+                  hasNext = true;
+                  // Record start time on first hasNext() that returns true
+                  if (!parseStartTimeRecorded) {
+                    recordParseStartTime();
+                  }
+                  return hasNext;
                 }
 
                 @Override
@@ -390,6 +400,8 @@ public class TsFileInsertionQueryDataContainer extends 
TsFileInsertionDataContai
                   }
 
                   final Tablet tablet = tabletIterator.next();
+                  // Record tablet metrics
+                  recordTabletMetrics(tablet);
                   final boolean isAligned =
                       deviceIsAlignedMap.getOrDefault(new 
PlainDeviceID(tablet.deviceId), false);
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
index da2a94fab18..ce2c2b8e467 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
@@ -173,7 +173,15 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
 
                 @Override
                 public boolean hasNext() {
-                  return Objects.nonNull(chunkReader);
+                  final boolean hasNext = Objects.nonNull(chunkReader);
+                  if (hasNext && !parseStartTimeRecorded) {
+                    // Record start time on first hasNext() that returns true
+                    recordParseStartTime();
+                  } else if (!hasNext && parseStartTimeRecorded && 
!parseEndTimeRecorded) {
+                    // Record end time on last hasNext() that returns false
+                    recordParseEndTime();
+                  }
+                  return hasNext;
                 }
 
                 @Override
@@ -191,6 +199,8 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
                   // information.
                   final boolean isAligned = currentIsAligned;
                   final Tablet tablet = getNextTablet();
+                  // Record tablet metrics
+                  recordTabletMetrics(tablet);
                   final boolean hasNext = hasNext();
                   try {
                     return new PipeRawTabletInsertionEvent(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java
index 4f2159f356c..f9436377bb3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.service.metric.enums.Tag;
 import org.apache.iotdb.db.pipe.source.dataregion.IoTDBDataRegionSource;
 import org.apache.iotdb.metrics.AbstractMetricService;
 import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Counter;
 import org.apache.iotdb.metrics.type.Rate;
 import org.apache.iotdb.metrics.type.Timer;
 import org.apache.iotdb.metrics.utils.MetricLevel;
@@ -48,6 +49,9 @@ public class PipeTsFileToTabletsMetrics implements IMetricSet 
{
   private final ConcurrentSkipListSet<String> pipe = new 
ConcurrentSkipListSet<>();
   private final Map<String, Timer> pipeTimerMap = new ConcurrentHashMap<>();
   private final Map<String, Rate> pipeRateMap = new ConcurrentHashMap<>();
+  private final Map<String, Counter> pipeTabletCountMap = new 
ConcurrentHashMap<>();
+  private final Map<String, Counter> pipeTabletMemoryMap = new 
ConcurrentHashMap<>();
+  private final Map<String, Counter> pipeParseFileCountMap = new 
ConcurrentHashMap<>();
 
   //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
 
@@ -72,6 +76,27 @@ public class PipeTsFileToTabletsMetrics implements 
IMetricSet {
             MetricLevel.IMPORTANT,
             Tag.NAME.toString(),
             pipeID));
+    pipeTabletCountMap.putIfAbsent(
+        pipeID,
+        metricService.getOrCreateCounter(
+            Metric.PIPE_TSFILE_TO_TABLETS_COUNT.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            pipeID));
+    pipeTabletMemoryMap.putIfAbsent(
+        pipeID,
+        metricService.getOrCreateCounter(
+            Metric.PIPE_TSFILE_TO_TABLETS_TOTAL_MEMORY.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            pipeID));
+    pipeParseFileCountMap.putIfAbsent(
+        pipeID,
+        metricService.getOrCreateCounter(
+            Metric.PIPE_TSFILE_PARSE_FILE_COUNT.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            pipeID));
   }
 
   @Override
@@ -98,6 +123,27 @@ public class PipeTsFileToTabletsMetrics implements 
IMetricSet {
         Tag.NAME.toString(),
         pipeID);
     pipeRateMap.remove(pipeID);
+
+    metricService.remove(
+        MetricType.COUNTER,
+        Metric.PIPE_TSFILE_TO_TABLETS_COUNT.toString(),
+        Tag.NAME.toString(),
+        pipeID);
+    pipeTabletCountMap.remove(pipeID);
+
+    metricService.remove(
+        MetricType.COUNTER,
+        Metric.PIPE_TSFILE_TO_TABLETS_TOTAL_MEMORY.toString(),
+        Tag.NAME.toString(),
+        pipeID);
+    pipeTabletMemoryMap.remove(pipeID);
+
+    metricService.remove(
+        MetricType.COUNTER,
+        Metric.PIPE_TSFILE_PARSE_FILE_COUNT.toString(),
+        Tag.NAME.toString(),
+        pipeID);
+    pipeParseFileCountMap.remove(pipeID);
   }
 
   //////////////////////////// register & deregister 
////////////////////////////
@@ -151,6 +197,27 @@ public class PipeTsFileToTabletsMetrics implements 
IMetricSet {
       return;
     }
     timer.updateNanos(costTimeInNanos);
+    // Increment file count for this pipe when parsing ends
+    final Counter fileCount = pipeParseFileCountMap.get(taskID);
+    if (fileCount != null) {
+      fileCount.inc();
+    }
+  }
+
+  public void recordTabletGenerated(final String taskID, long 
tabletMemorySize) {
+    if (Objects.isNull(metricService)) {
+      return;
+    }
+    final Counter tabletCount = pipeTabletCountMap.get(taskID);
+    if (tabletCount == null) {
+      LOGGER.info("Failed to record tablet generated, pipeID({}) does not 
exist", taskID);
+      return;
+    }
+    tabletCount.inc();
+    final Counter tabletMemory = pipeTabletMemoryMap.get(taskID);
+    if (tabletMemory != null) {
+      tabletMemory.inc(tabletMemorySize);
+    }
   }
 
   //////////////////////////// singleton ////////////////////////////
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index db3a6b9d79e..965f5b31271 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -188,6 +188,9 @@ public enum Metric {
   PIPE_GLOBAL_REMAINING_TIME("pipe_global_remaining_time"),
   PIPE_TSFILE_TO_TABLETS_TIME("pipe_tsfile_to_tablets_time"),
   PIPE_TSFILE_TO_TABLETS_RATE("pipe_tsfile_to_tablets_rate"),
+  PIPE_TSFILE_TO_TABLETS_COUNT("pipe_tsfile_to_tablets_count"),
+  PIPE_TSFILE_TO_TABLETS_TOTAL_MEMORY("pipe_tsfile_to_tablets_total_memory"),
+  PIPE_TSFILE_PARSE_FILE_COUNT("pipe_tsfile_parse_file_count"),
   // subscription related
   SUBSCRIPTION_UNCOMMITTED_EVENT_COUNT("subscription_uncommitted_event_count"),
   SUBSCRIPTION_CURRENT_COMMIT_ID("subscription_current_commit_id"),

Reply via email to