This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0cbe3e16abd Pipe: Add metrics for TsFile parsing to tablets (#16668)
0cbe3e16abd is described below
commit 0cbe3e16abd9b15f94b6e1ae5f2b4b1336c2eca0
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Oct 28 19:20:15 2025 +0800
Pipe: Add metrics for TsFile parsing to tablets (#16668)
---
.../tsfile/parser/TsFileInsertionEventParser.java | 68 ++++++++++++++++++----
.../query/TsFileInsertionEventQueryParser.java | 14 ++++-
.../scan/TsFileInsertionEventScanParser.java | 12 +++-
.../table/TsFileInsertionEventTableParser.java | 15 ++++-
.../overview/PipeTsFileToTabletsMetrics.java | 67 +++++++++++++++++++++
.../iotdb/commons/service/metric/enums/Metric.java | 3 +
6 files changed, 162 insertions(+), 17 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
index b68d9492426..04b2a3bfd81 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
@@ -28,6 +28,7 @@ import
org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -35,6 +36,7 @@ import
org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.tsfile.read.filter.factory.TimeFilterApi;
+import org.apache.tsfile.write.record.Tablet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,8 +62,9 @@ public abstract class TsFileInsertionEventParser implements
AutoCloseable {
protected PipeMemoryBlock allocatedMemoryBlockForModifications;
protected PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>
currentModifications;
- protected final long initialTimeNano = System.nanoTime();
- protected boolean timeUsageReported = false;
+ protected long parseStartTimeNano = -1;
+ protected boolean parseStartTimeRecorded = false;
+ protected boolean parseEndTimeRecorded = false;
protected final PipeMemoryBlock allocatedMemoryBlockForTablet;
@@ -104,21 +107,62 @@ public abstract class TsFileInsertionEventParser
implements AutoCloseable {
*/
public abstract Iterable<TabletInsertionEvent> toTabletInsertionEvents();
- @Override
- public void close() {
+ /**
+ * Record parse start time when hasNext() is called for the first time and
returns true. Should be
+ * called in Iterator.hasNext() when it's the first call.
+ */
+ protected void recordParseStartTime() {
+ if (pipeName == null || parseStartTimeRecorded) {
+ return;
+ }
+ parseStartTimeNano = System.nanoTime();
+ parseStartTimeRecorded = true;
+ }
- tabletInsertionIterable = null;
+ /**
+ * Record parse end time when hasNext() is called and returns false (last
call). Should be called
+ * in Iterator.hasNext() when it returns false.
+ */
+ protected void recordParseEndTime() {
+ if (pipeName == null || !parseStartTimeRecorded || parseEndTimeRecorded) {
+ return;
+ }
+ try {
+ final long parseEndTimeNano = System.nanoTime();
+ final long totalTimeNanos = parseEndTimeNano - parseStartTimeNano;
+ final String taskID = pipeName + "_" + creationTime;
+
PipeTsFileToTabletsMetrics.getInstance().recordTsFileToTabletTime(taskID,
totalTimeNanos);
+ parseEndTimeRecorded = true;
+ } catch (final Exception e) {
+ LOGGER.warn("Failed to record parse end time for pipe {}", pipeName, e);
+ }
+ }
+ /**
+ * Record metrics when a tablet is generated. Should be called by subclasses
when generating
+ * tablets.
+ *
+ * @param tablet the generated tablet
+ */
+ protected void recordTabletMetrics(final Tablet tablet) {
+ if (pipeName == null || tablet == null) {
+ return;
+ }
try {
- if (pipeName != null && !timeUsageReported) {
- PipeTsFileToTabletsMetrics.getInstance()
- .recordTsFileToTabletTime(
- pipeName + "_" + creationTime, System.nanoTime() -
initialTimeNano);
- timeUsageReported = true;
- }
+ final String taskID = pipeName + "_" + creationTime;
+ final long tabletMemorySize =
PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet);
+ PipeTsFileToTabletsMetrics.getInstance().recordTabletGenerated(taskID,
tabletMemorySize);
} catch (final Exception e) {
- LOGGER.warn("Failed to report time usage for parsing tsfile for pipe
{}", pipeName, e);
+ LOGGER.warn("Failed to record tablet metrics for pipe {}", pipeName, e);
}
+ }
+
+ @Override
+ public void close() {
+
+ tabletInsertionIterable = null;
+
+ // Time recording is now handled in Iterator.hasNext(), no need to record
here
try {
if (tsFileSequenceReader != null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
index dbed85d5a6b..22f1cffdd0d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
@@ -354,8 +354,13 @@ public class TsFileInsertionEventQueryParser extends
TsFileInsertionEventParser
@Override
public boolean hasNext() {
+ boolean hasNext = false;
while (tabletIterator == null || !tabletIterator.hasNext()) {
if (!deviceMeasurementsMapIterator.hasNext()) {
+ // Record end time when no more data
+ if (parseStartTimeRecorded && !parseEndTimeRecorded) {
+ recordParseEndTime();
+ }
close();
return false;
}
@@ -380,7 +385,12 @@ public class TsFileInsertionEventQueryParser extends
TsFileInsertionEventParser
}
}
- return true;
+ hasNext = true;
+ // Record start time on first hasNext() that returns true
+ if (!parseStartTimeRecorded) {
+ recordParseStartTime();
+ }
+ return hasNext;
}
@Override
@@ -391,6 +401,8 @@ public class TsFileInsertionEventQueryParser extends
TsFileInsertionEventParser
}
final Tablet tablet = tabletIterator.next();
+ // Record tablet metrics
+ recordTabletMetrics(tablet);
final boolean isAligned =
deviceIsAlignedMap.getOrDefault(
IDeviceID.Factory.DEFAULT_FACTORY.create(tablet.getDeviceId()), false);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
index 043cc87fa10..80382bf82b5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
@@ -164,7 +164,15 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
@Override
public boolean hasNext() {
- return Objects.nonNull(chunkReader);
+ final boolean hasNext = Objects.nonNull(chunkReader);
+ if (hasNext && !parseStartTimeRecorded) {
+ // Record start time on first hasNext() that returns true
+ recordParseStartTime();
+ } else if (!hasNext && parseStartTimeRecorded &&
!parseEndTimeRecorded) {
+ // Record end time on last hasNext() that returns false
+ recordParseEndTime();
+ }
+ return hasNext;
}
@Override
@@ -182,6 +190,8 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
// information.
final boolean isAligned = currentIsAligned;
final Tablet tablet = getNextTablet();
+ // Record tablet metrics
+ recordTabletMetrics(tablet);
final boolean hasNext = hasNext();
try {
return sourceEvent == null
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
index 8e874acb683..8ec8106a496 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
@@ -163,11 +163,18 @@ public class TsFileInsertionEventTableParser extends
TsFileInsertionEventParser
startTime,
endTime);
}
- if (!tabletIterator.hasNext()) {
+ final boolean hasNext = tabletIterator.hasNext();
+ if (hasNext && !parseStartTimeRecorded) {
+ // Record start time on first hasNext() that returns true
+ recordParseStartTime();
+ } else if (!hasNext && parseStartTimeRecorded &&
!parseEndTimeRecorded) {
+ // Record end time on last hasNext() that returns false
+ recordParseEndTime();
+ close();
+ } else if (!hasNext) {
close();
- return false;
}
- return true;
+ return hasNext;
} catch (Exception e) {
close();
throw new PipeException("Error while parsing tsfile
insertion event", e);
@@ -194,6 +201,8 @@ public class TsFileInsertionEventTableParser extends
TsFileInsertionEventParser
}
final Tablet tablet = tabletIterator.next();
+ // Record tablet metrics
+ recordTabletMetrics(tablet);
final TabletInsertionEvent next;
if (!hasNext()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java
index 4f2159f356c..f9436377bb3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.db.pipe.source.dataregion.IoTDBDataRegionSource;
import org.apache.iotdb.metrics.AbstractMetricService;
import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Counter;
import org.apache.iotdb.metrics.type.Rate;
import org.apache.iotdb.metrics.type.Timer;
import org.apache.iotdb.metrics.utils.MetricLevel;
@@ -48,6 +49,9 @@ public class PipeTsFileToTabletsMetrics implements IMetricSet
{
private final ConcurrentSkipListSet<String> pipe = new
ConcurrentSkipListSet<>();
private final Map<String, Timer> pipeTimerMap = new ConcurrentHashMap<>();
private final Map<String, Rate> pipeRateMap = new ConcurrentHashMap<>();
+ private final Map<String, Counter> pipeTabletCountMap = new
ConcurrentHashMap<>();
+ private final Map<String, Counter> pipeTabletMemoryMap = new
ConcurrentHashMap<>();
+ private final Map<String, Counter> pipeParseFileCountMap = new
ConcurrentHashMap<>();
//////////////////////////// bindTo & unbindFrom (metric framework)
////////////////////////////
@@ -72,6 +76,27 @@ public class PipeTsFileToTabletsMetrics implements
IMetricSet {
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
pipeID));
+ pipeTabletCountMap.putIfAbsent(
+ pipeID,
+ metricService.getOrCreateCounter(
+ Metric.PIPE_TSFILE_TO_TABLETS_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ pipeID));
+ pipeTabletMemoryMap.putIfAbsent(
+ pipeID,
+ metricService.getOrCreateCounter(
+ Metric.PIPE_TSFILE_TO_TABLETS_TOTAL_MEMORY.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ pipeID));
+ pipeParseFileCountMap.putIfAbsent(
+ pipeID,
+ metricService.getOrCreateCounter(
+ Metric.PIPE_TSFILE_PARSE_FILE_COUNT.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ pipeID));
}
@Override
@@ -98,6 +123,27 @@ public class PipeTsFileToTabletsMetrics implements
IMetricSet {
Tag.NAME.toString(),
pipeID);
pipeRateMap.remove(pipeID);
+
+ metricService.remove(
+ MetricType.COUNTER,
+ Metric.PIPE_TSFILE_TO_TABLETS_COUNT.toString(),
+ Tag.NAME.toString(),
+ pipeID);
+ pipeTabletCountMap.remove(pipeID);
+
+ metricService.remove(
+ MetricType.COUNTER,
+ Metric.PIPE_TSFILE_TO_TABLETS_TOTAL_MEMORY.toString(),
+ Tag.NAME.toString(),
+ pipeID);
+ pipeTabletMemoryMap.remove(pipeID);
+
+ metricService.remove(
+ MetricType.COUNTER,
+ Metric.PIPE_TSFILE_PARSE_FILE_COUNT.toString(),
+ Tag.NAME.toString(),
+ pipeID);
+ pipeParseFileCountMap.remove(pipeID);
}
//////////////////////////// register & deregister
////////////////////////////
@@ -151,6 +197,27 @@ public class PipeTsFileToTabletsMetrics implements
IMetricSet {
return;
}
timer.updateNanos(costTimeInNanos);
+ // Increment file count for this pipe when parsing ends
+ final Counter fileCount = pipeParseFileCountMap.get(taskID);
+ if (fileCount != null) {
+ fileCount.inc();
+ }
+ }
+
+ public void recordTabletGenerated(final String taskID, long
tabletMemorySize) {
+ if (Objects.isNull(metricService)) {
+ return;
+ }
+ final Counter tabletCount = pipeTabletCountMap.get(taskID);
+ if (tabletCount == null) {
+ LOGGER.info("Failed to record tablet generated, pipeID({}) does not
exist", taskID);
+ return;
+ }
+ tabletCount.inc();
+ final Counter tabletMemory = pipeTabletMemoryMap.get(taskID);
+ if (tabletMemory != null) {
+ tabletMemory.inc(tabletMemorySize);
+ }
}
//////////////////////////// singleton ////////////////////////////
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index 2eaafb4549d..ad14e90cd57 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -198,6 +198,9 @@ public enum Metric {
PIPE_GLOBAL_REMAINING_TIME("pipe_global_remaining_time"),
PIPE_TSFILE_TO_TABLETS_TIME("pipe_tsfile_to_tablets_time"),
PIPE_TSFILE_TO_TABLETS_RATE("pipe_tsfile_to_tablets_rate"),
+ PIPE_TSFILE_TO_TABLETS_COUNT("pipe_tsfile_to_tablets_count"),
+ PIPE_TSFILE_TO_TABLETS_TOTAL_MEMORY("pipe_tsfile_to_tablets_total_memory"),
+ PIPE_TSFILE_PARSE_FILE_COUNT("pipe_tsfile_parse_file_count"),
// subscription related
SUBSCRIPTION_UNCOMMITTED_EVENT_COUNT("subscription_uncommitted_event_count"),
SUBSCRIPTION_CURRENT_COMMIT_ID("subscription_current_commit_id"),