This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch tsfile-2-tablet-metrics in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f4eeace825c0da661745b5ee707a0e9ea099f2c8 Author: Steve Yurong Su <[email protected]> AuthorDate: Wed Mar 26 19:49:02 2025 +0800 impl --- .../common/tsfile/PipeTsFileInsertionEvent.java | 2 + .../tsfile/parser/TsFileInsertionEventParser.java | 18 +++++++ .../parser/TsFileInsertionEventParserProvider.java | 55 ++++++++++++++++++++-- .../query/TsFileInsertionEventQueryParser.java | 19 ++++++-- .../scan/TsFileInsertionEventScanParser.java | 15 +++++- .../table/TsFileInsertionEventTableParser.java | 16 ++++++- 6 files changed, 115 insertions(+), 10 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index 357e11e2f39..04cf1d58e59 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -664,6 +664,8 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent eventParser.compareAndSet( null, new TsFileInsertionEventParserProvider( + pipeName, + creationTime, tsFile, treePattern, tablePattern, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java index a8b3a7f33c5..da574de46f6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java @@ -39,6 +39,9 @@ public abstract class TsFileInsertionEventParser implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(TsFileInsertionEventParser.class); + protected final String pipeName; + protected final long creationTime; + protected final TreePattern treePattern; // used to filter data protected final TablePattern tablePattern; // used to filter data protected final GlobalTimeExpression timeFilterExpression; // used to filter data @@ -48,17 +51,24 @@ public abstract class TsFileInsertionEventParser implements AutoCloseable { protected final PipeTaskMeta pipeTaskMeta; // used to report progress protected final PipeInsertionEvent sourceEvent; // used to report progress + protected final long initialTimeNano = System.nanoTime(); + protected final PipeMemoryBlock allocatedMemoryBlockForTablet; protected TsFileSequenceReader tsFileSequenceReader; protected TsFileInsertionEventParser( + final String pipeName, + final long creationTime, final TreePattern treePattern, final TablePattern tablePattern, final long startTime, final long endTime, final PipeTaskMeta pipeTaskMeta, final PipeInsertionEvent sourceEvent) { + this.pipeName = pipeName; + this.creationTime = creationTime; + this.treePattern = treePattern; this.tablePattern = tablePattern; timeFilterExpression = @@ -83,6 +93,14 @@ public abstract class TsFileInsertionEventParser implements AutoCloseable { @Override public void close() { + try { + if (pipeName != null) { + // report time usage + } + } catch (final Exception e) { + LOGGER.warn("Failed to report time usage for parsing tsfile for pipe {}", pipeName, e); + } + try { if (tsFileSequenceReader != null) { tsFileSequenceReader.close(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java index 200130f6e14..4cf6af7f4db 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java @@ -41,6 +41,9 @@ import java.util.stream.Collectors; public class TsFileInsertionEventParserProvider { + private final String pipeName; + private final long creationTime; + private final File tsFile; private final TreePattern treePattern; private final TablePattern tablePattern; @@ -52,6 +55,8 @@ public class TsFileInsertionEventParserProvider { private final String userName; public TsFileInsertionEventParserProvider( + final String pipeName, + final long creationTime, final File tsFile, final TreePattern treePattern, final TablePattern tablePattern, @@ -60,6 +65,8 @@ public class TsFileInsertionEventParserProvider { final PipeTaskMeta pipeTaskMeta, final String userName, final PipeTsFileInsertionEvent sourceEvent) { + this.pipeName = pipeName; + this.creationTime = creationTime; this.tsFile = tsFile; this.treePattern = treePattern; this.tablePattern = tablePattern; @@ -73,7 +80,15 @@ public class TsFileInsertionEventParserProvider { public TsFileInsertionEventParser provide() throws IOException { if (sourceEvent.isTableModelEvent()) { return new TsFileInsertionEventTableParser( - tsFile, tablePattern, startTime, endTime, pipeTaskMeta, userName, sourceEvent); + pipeName, + creationTime, + tsFile, + tablePattern, + startTime, + endTime, + pipeTaskMeta, + userName, + sourceEvent); } // Use scan container to save memory @@ -81,7 +96,14 @@ public class TsFileInsertionEventParserProvider { / PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes() > PipeTsFileResource.MEMORY_SUFFICIENT_THRESHOLD) { return new TsFileInsertionEventScanParser( - tsFile, treePattern, startTime, endTime, pipeTaskMeta, sourceEvent); + pipeName, + creationTime, + tsFile, + treePattern, + startTime, + endTime, + pipeTaskMeta, + sourceEvent); } if (treePattern instanceof IoTDBTreePattern @@ -93,7 +115,14 @@ public class TsFileInsertionEventParserProvider { // hard to know whether it only matches one timeseries, while matching multiple is often the // case. return new TsFileInsertionEventQueryParser( - tsFile, treePattern, startTime, endTime, pipeTaskMeta, sourceEvent); + pipeName, + creationTime, + tsFile, + treePattern, + startTime, + endTime, + pipeTaskMeta, + sourceEvent); } final Map<IDeviceID, Boolean> deviceIsAlignedMap = @@ -102,7 +131,14 @@ public class TsFileInsertionEventParserProvider { // If we failed to get from cache, it indicates that the memory usage is high. // We use scan data container because it requires less memory. return new TsFileInsertionEventScanParser( - tsFile, treePattern, startTime, endTime, pipeTaskMeta, sourceEvent); + pipeName, + creationTime, + tsFile, + treePattern, + startTime, + endTime, + pipeTaskMeta, + sourceEvent); } final int originalSize = deviceIsAlignedMap.size(); @@ -112,8 +148,17 @@ public class TsFileInsertionEventParserProvider { return (double) filteredDeviceIsAlignedMap.size() / originalSize > PipeConfig.getInstance().getPipeTsFileScanParsingThreshold() ? new TsFileInsertionEventScanParser( - tsFile, treePattern, startTime, endTime, pipeTaskMeta, sourceEvent) + pipeName, + creationTime, + tsFile, + treePattern, + startTime, + endTime, + pipeTaskMeta, + sourceEvent) : new TsFileInsertionEventQueryParser( + pipeName, + creationTime, tsFile, treePattern, startTime, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java index 52a2b054c35..ba07299d66b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java @@ -75,10 +75,12 @@ public class TsFileInsertionEventQueryParser extends TsFileInsertionEventParser final long endTime, final PipeInsertionEvent sourceEvent) throws IOException { - this(tsFile, pattern, startTime, endTime, null, sourceEvent); + this(null, 0, tsFile, pattern, startTime, endTime, null, sourceEvent); } public TsFileInsertionEventQueryParser( + final String pipeName, + final long creationTime, final File tsFile, final TreePattern pattern, final long startTime, @@ -86,10 +88,21 @@ public class TsFileInsertionEventQueryParser extends TsFileInsertionEventParser final PipeTaskMeta pipeTaskMeta, final PipeInsertionEvent sourceEvent) throws IOException { - this(tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent, null); + this( + pipeName, + creationTime, + tsFile, + pattern, + startTime, + endTime, + pipeTaskMeta, + sourceEvent, + null); } public TsFileInsertionEventQueryParser( + final String pipeName, + final long creationTime, final File tsFile, final TreePattern pattern, final long startTime, @@ -98,7 +111,7 @@ public class TsFileInsertionEventQueryParser extends TsFileInsertionEventParser final PipeInsertionEvent sourceEvent, final Map<IDeviceID, Boolean> deviceIsAlignedMap) throws IOException { - super(pattern, null, startTime, endTime, pipeTaskMeta, sourceEvent); + super(pipeName, creationTime, pattern, null, startTime, endTime, pipeTaskMeta, sourceEvent); try { final PipeTsFileResourceManager tsFileResourceManager = PipeDataNodeResourceManager.tsfile(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java index 54bdbc174a8..1559d472e04 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java @@ -87,6 +87,8 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { private byte lastMarker = Byte.MIN_VALUE; public TsFileInsertionEventScanParser( + final String pipeName, + final long creationTime, final File tsFile, final TreePattern pattern, final long startTime, @@ -94,7 +96,7 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { final PipeTaskMeta pipeTaskMeta, final PipeInsertionEvent sourceEvent) throws IOException { - super(pattern, null, startTime, endTime, pipeTaskMeta, sourceEvent); + super(pipeName, creationTime, pattern, null, startTime, endTime, pipeTaskMeta, sourceEvent); this.startTime = startTime; this.endTime = endTime; @@ -115,6 +117,17 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { } } + public TsFileInsertionEventScanParser( + final File tsFile, + final TreePattern pattern, + final long startTime, + final long endTime, + final PipeTaskMeta pipeTaskMeta, + final PipeInsertionEvent sourceEvent) + throws IOException { + this(null, 0, tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent); + } + @Override public Iterable<TabletInsertionEvent> toTabletInsertionEvents() { return () -> diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java index b363a163063..8011f9c3830 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java @@ -53,6 +53,8 @@ public class TsFileInsertionEventTableParser extends TsFileInsertionEventParser private final PipeMemoryBlock allocatedMemoryBlockForTableSchemas; public TsFileInsertionEventTableParser( + final String pipeName, + final long creationTime, final File tsFile, final TablePattern pattern, final long startTime, @@ -61,7 +63,7 @@ public class TsFileInsertionEventTableParser extends TsFileInsertionEventParser final String userName, final PipeInsertionEvent sourceEvent) throws IOException { - super(null, pattern, startTime, endTime, pipeTaskMeta, sourceEvent); + super(pipeName, creationTime, null, pattern, startTime, endTime, pipeTaskMeta, sourceEvent); try { this.allocatedMemoryBlockForChunk = @@ -85,6 +87,18 @@ public class TsFileInsertionEventTableParser extends TsFileInsertionEventParser } } + public TsFileInsertionEventTableParser( + final File tsFile, + final TablePattern pattern, + final long startTime, + final long endTime, + final PipeTaskMeta pipeTaskMeta, + final String userName, + final PipeInsertionEvent sourceEvent) + throws IOException { + this(null, 0, tsFile, pattern, startTime, endTime, pipeTaskMeta, userName, sourceEvent); + } + @Override public Iterable<TabletInsertionEvent> toTabletInsertionEvents() { return () ->
