This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0cbe3e16abd Pipe: Add metrics for TsFile parsing to tablets (#16668)
0cbe3e16abd is described below

commit 0cbe3e16abd9b15f94b6e1ae5f2b4b1336c2eca0
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Oct 28 19:20:15 2025 +0800

    Pipe: Add metrics for TsFile parsing to tablets (#16668)
---
 .../tsfile/parser/TsFileInsertionEventParser.java  | 68 ++++++++++++++++++----
 .../query/TsFileInsertionEventQueryParser.java     | 14 ++++-
 .../scan/TsFileInsertionEventScanParser.java       | 12 +++-
 .../table/TsFileInsertionEventTableParser.java     | 15 ++++-
 .../overview/PipeTsFileToTabletsMetrics.java       | 67 +++++++++++++++++++++
 .../iotdb/commons/service/metric/enums/Metric.java |  3 +
 6 files changed, 162 insertions(+), 17 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
index b68d9492426..04b2a3bfd81 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
 import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
 import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
 import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -35,6 +36,7 @@ import 
org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.tsfile.read.TsFileSequenceReader;
 import org.apache.tsfile.read.expression.impl.GlobalTimeExpression;
 import org.apache.tsfile.read.filter.factory.TimeFilterApi;
+import org.apache.tsfile.write.record.Tablet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,8 +62,9 @@ public abstract class TsFileInsertionEventParser implements 
AutoCloseable {
   protected PipeMemoryBlock allocatedMemoryBlockForModifications;
   protected PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> 
currentModifications;
 
-  protected final long initialTimeNano = System.nanoTime();
-  protected boolean timeUsageReported = false;
+  protected long parseStartTimeNano = -1;
+  protected boolean parseStartTimeRecorded = false;
+  protected boolean parseEndTimeRecorded = false;
 
   protected final PipeMemoryBlock allocatedMemoryBlockForTablet;
 
@@ -104,21 +107,62 @@ public abstract class TsFileInsertionEventParser 
implements AutoCloseable {
    */
   public abstract Iterable<TabletInsertionEvent> toTabletInsertionEvents();
 
-  @Override
-  public void close() {
+  /**
+   * Record parse start time when hasNext() is called for the first time and 
returns true. Should be
+   * called in Iterator.hasNext() when it's the first call.
+   */
+  protected void recordParseStartTime() {
+    if (pipeName == null || parseStartTimeRecorded) {
+      return;
+    }
+    parseStartTimeNano = System.nanoTime();
+    parseStartTimeRecorded = true;
+  }
 
-    tabletInsertionIterable = null;
+  /**
+   * Record parse end time when hasNext() is called and returns false (last 
call). Should be called
+   * in Iterator.hasNext() when it returns false.
+   */
+  protected void recordParseEndTime() {
+    if (pipeName == null || !parseStartTimeRecorded || parseEndTimeRecorded) {
+      return;
+    }
+    try {
+      final long parseEndTimeNano = System.nanoTime();
+      final long totalTimeNanos = parseEndTimeNano - parseStartTimeNano;
+      final String taskID = pipeName + "_" + creationTime;
+      
PipeTsFileToTabletsMetrics.getInstance().recordTsFileToTabletTime(taskID, 
totalTimeNanos);
+      parseEndTimeRecorded = true;
+    } catch (final Exception e) {
+      LOGGER.warn("Failed to record parse end time for pipe {}", pipeName, e);
+    }
+  }
 
+  /**
+   * Record metrics when a tablet is generated. Should be called by subclasses 
when generating
+   * tablets.
+   *
+   * @param tablet the generated tablet
+   */
+  protected void recordTabletMetrics(final Tablet tablet) {
+    if (pipeName == null || tablet == null) {
+      return;
+    }
     try {
-      if (pipeName != null && !timeUsageReported) {
-        PipeTsFileToTabletsMetrics.getInstance()
-            .recordTsFileToTabletTime(
-                pipeName + "_" + creationTime, System.nanoTime() - 
initialTimeNano);
-        timeUsageReported = true;
-      }
+      final String taskID = pipeName + "_" + creationTime;
+      final long tabletMemorySize = 
PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet);
+      PipeTsFileToTabletsMetrics.getInstance().recordTabletGenerated(taskID, 
tabletMemorySize);
     } catch (final Exception e) {
-      LOGGER.warn("Failed to report time usage for parsing tsfile for pipe 
{}", pipeName, e);
+      LOGGER.warn("Failed to record tablet metrics for pipe {}", pipeName, e);
     }
+  }
+
+  @Override
+  public void close() {
+
+    tabletInsertionIterable = null;
+
+    // Time recording is now handled in Iterator.hasNext(), no need to record 
here
 
     try {
       if (tsFileSequenceReader != null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
index dbed85d5a6b..22f1cffdd0d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
@@ -354,8 +354,13 @@ public class TsFileInsertionEventQueryParser extends 
TsFileInsertionEventParser
 
                 @Override
                 public boolean hasNext() {
+                  boolean hasNext = false;
                   while (tabletIterator == null || !tabletIterator.hasNext()) {
                     if (!deviceMeasurementsMapIterator.hasNext()) {
+                      // Record end time when no more data
+                      if (parseStartTimeRecorded && !parseEndTimeRecorded) {
+                        recordParseEndTime();
+                      }
                       close();
                       return false;
                     }
@@ -380,7 +385,12 @@ public class TsFileInsertionEventQueryParser extends 
TsFileInsertionEventParser
                     }
                   }
 
-                  return true;
+                  hasNext = true;
+                  // Record start time on first hasNext() that returns true
+                  if (!parseStartTimeRecorded) {
+                    recordParseStartTime();
+                  }
+                  return hasNext;
                 }
 
                 @Override
@@ -391,6 +401,8 @@ public class TsFileInsertionEventQueryParser extends 
TsFileInsertionEventParser
                   }
 
                   final Tablet tablet = tabletIterator.next();
+                  // Record tablet metrics
+                  recordTabletMetrics(tablet);
                   final boolean isAligned =
                       deviceIsAlignedMap.getOrDefault(
                           
IDeviceID.Factory.DEFAULT_FACTORY.create(tablet.getDeviceId()), false);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
index 043cc87fa10..80382bf82b5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
@@ -164,7 +164,15 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
 
                 @Override
                 public boolean hasNext() {
-                  return Objects.nonNull(chunkReader);
+                  final boolean hasNext = Objects.nonNull(chunkReader);
+                  if (hasNext && !parseStartTimeRecorded) {
+                    // Record start time on first hasNext() that returns true
+                    recordParseStartTime();
+                  } else if (!hasNext && parseStartTimeRecorded && 
!parseEndTimeRecorded) {
+                    // Record end time on last hasNext() that returns false
+                    recordParseEndTime();
+                  }
+                  return hasNext;
                 }
 
                 @Override
@@ -182,6 +190,8 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
                   // information.
                   final boolean isAligned = currentIsAligned;
                   final Tablet tablet = getNextTablet();
+                  // Record tablet metrics
+                  recordTabletMetrics(tablet);
                   final boolean hasNext = hasNext();
                   try {
                     return sourceEvent == null
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
index 8e874acb683..8ec8106a496 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
@@ -163,11 +163,18 @@ public class TsFileInsertionEventTableParser extends 
TsFileInsertionEventParser
                               startTime,
                               endTime);
                     }
-                    if (!tabletIterator.hasNext()) {
+                    final boolean hasNext = tabletIterator.hasNext();
+                    if (hasNext && !parseStartTimeRecorded) {
+                      // Record start time on first hasNext() that returns true
+                      recordParseStartTime();
+                    } else if (!hasNext && parseStartTimeRecorded && 
!parseEndTimeRecorded) {
+                      // Record end time on last hasNext() that returns false
+                      recordParseEndTime();
+                      close();
+                    } else if (!hasNext) {
                       close();
-                      return false;
                     }
-                    return true;
+                    return hasNext;
                   } catch (Exception e) {
                     close();
                     throw new PipeException("Error while parsing tsfile 
insertion event", e);
@@ -194,6 +201,8 @@ public class TsFileInsertionEventTableParser extends 
TsFileInsertionEventParser
                   }
 
                   final Tablet tablet = tabletIterator.next();
+                  // Record tablet metrics
+                  recordTabletMetrics(tablet);
 
                   final TabletInsertionEvent next;
                   if (!hasNext()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java
index 4f2159f356c..f9436377bb3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.service.metric.enums.Tag;
 import org.apache.iotdb.db.pipe.source.dataregion.IoTDBDataRegionSource;
 import org.apache.iotdb.metrics.AbstractMetricService;
 import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Counter;
 import org.apache.iotdb.metrics.type.Rate;
 import org.apache.iotdb.metrics.type.Timer;
 import org.apache.iotdb.metrics.utils.MetricLevel;
@@ -48,6 +49,9 @@ public class PipeTsFileToTabletsMetrics implements IMetricSet 
{
   private final ConcurrentSkipListSet<String> pipe = new 
ConcurrentSkipListSet<>();
   private final Map<String, Timer> pipeTimerMap = new ConcurrentHashMap<>();
   private final Map<String, Rate> pipeRateMap = new ConcurrentHashMap<>();
+  private final Map<String, Counter> pipeTabletCountMap = new 
ConcurrentHashMap<>();
+  private final Map<String, Counter> pipeTabletMemoryMap = new 
ConcurrentHashMap<>();
+  private final Map<String, Counter> pipeParseFileCountMap = new 
ConcurrentHashMap<>();
 
   //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
 
@@ -72,6 +76,27 @@ public class PipeTsFileToTabletsMetrics implements 
IMetricSet {
             MetricLevel.IMPORTANT,
             Tag.NAME.toString(),
             pipeID));
+    pipeTabletCountMap.putIfAbsent(
+        pipeID,
+        metricService.getOrCreateCounter(
+            Metric.PIPE_TSFILE_TO_TABLETS_COUNT.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            pipeID));
+    pipeTabletMemoryMap.putIfAbsent(
+        pipeID,
+        metricService.getOrCreateCounter(
+            Metric.PIPE_TSFILE_TO_TABLETS_TOTAL_MEMORY.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            pipeID));
+    pipeParseFileCountMap.putIfAbsent(
+        pipeID,
+        metricService.getOrCreateCounter(
+            Metric.PIPE_TSFILE_PARSE_FILE_COUNT.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            pipeID));
   }
 
   @Override
@@ -98,6 +123,27 @@ public class PipeTsFileToTabletsMetrics implements 
IMetricSet {
         Tag.NAME.toString(),
         pipeID);
     pipeRateMap.remove(pipeID);
+
+    metricService.remove(
+        MetricType.COUNTER,
+        Metric.PIPE_TSFILE_TO_TABLETS_COUNT.toString(),
+        Tag.NAME.toString(),
+        pipeID);
+    pipeTabletCountMap.remove(pipeID);
+
+    metricService.remove(
+        MetricType.COUNTER,
+        Metric.PIPE_TSFILE_TO_TABLETS_TOTAL_MEMORY.toString(),
+        Tag.NAME.toString(),
+        pipeID);
+    pipeTabletMemoryMap.remove(pipeID);
+
+    metricService.remove(
+        MetricType.COUNTER,
+        Metric.PIPE_TSFILE_PARSE_FILE_COUNT.toString(),
+        Tag.NAME.toString(),
+        pipeID);
+    pipeParseFileCountMap.remove(pipeID);
   }
 
   //////////////////////////// register & deregister 
////////////////////////////
@@ -151,6 +197,27 @@ public class PipeTsFileToTabletsMetrics implements 
IMetricSet {
       return;
     }
     timer.updateNanos(costTimeInNanos);
+    // Increment file count for this pipe when parsing ends
+    final Counter fileCount = pipeParseFileCountMap.get(taskID);
+    if (fileCount != null) {
+      fileCount.inc();
+    }
+  }
+
+  public void recordTabletGenerated(final String taskID, long 
tabletMemorySize) {
+    if (Objects.isNull(metricService)) {
+      return;
+    }
+    final Counter tabletCount = pipeTabletCountMap.get(taskID);
+    if (tabletCount == null) {
+      LOGGER.info("Failed to record tablet generated, pipeID({}) does not 
exist", taskID);
+      return;
+    }
+    tabletCount.inc();
+    final Counter tabletMemory = pipeTabletMemoryMap.get(taskID);
+    if (tabletMemory != null) {
+      tabletMemory.inc(tabletMemorySize);
+    }
   }
 
   //////////////////////////// singleton ////////////////////////////
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index 2eaafb4549d..ad14e90cd57 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -198,6 +198,9 @@ public enum Metric {
   PIPE_GLOBAL_REMAINING_TIME("pipe_global_remaining_time"),
   PIPE_TSFILE_TO_TABLETS_TIME("pipe_tsfile_to_tablets_time"),
   PIPE_TSFILE_TO_TABLETS_RATE("pipe_tsfile_to_tablets_rate"),
+  PIPE_TSFILE_TO_TABLETS_COUNT("pipe_tsfile_to_tablets_count"),
+  PIPE_TSFILE_TO_TABLETS_TOTAL_MEMORY("pipe_tsfile_to_tablets_total_memory"),
+  PIPE_TSFILE_PARSE_FILE_COUNT("pipe_tsfile_parse_file_count"),
   // subscription related
   SUBSCRIPTION_UNCOMMITTED_EVENT_COUNT("subscription_uncommitted_event_count"),
   SUBSCRIPTION_CURRENT_COMMIT_ID("subscription_current_commit_id"),

Reply via email to