This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch tsfile-2-tablet-metrics
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f4eeace825c0da661745b5ee707a0e9ea099f2c8
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Mar 26 19:49:02 2025 +0800

    impl
---
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  2 +
 .../tsfile/parser/TsFileInsertionEventParser.java  | 18 +++++++
 .../parser/TsFileInsertionEventParserProvider.java | 55 ++++++++++++++++++++--
 .../query/TsFileInsertionEventQueryParser.java     | 19 ++++++--
 .../scan/TsFileInsertionEventScanParser.java       | 15 +++++-
 .../table/TsFileInsertionEventTableParser.java     | 16 ++++++-
 6 files changed, 115 insertions(+), 10 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 357e11e2f39..04cf1d58e59 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -664,6 +664,8 @@ public class PipeTsFileInsertionEvent extends 
PipeInsertionEvent
       eventParser.compareAndSet(
           null,
           new TsFileInsertionEventParserProvider(
+                  pipeName,
+                  creationTime,
                   tsFile,
                   treePattern,
                   tablePattern,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
index a8b3a7f33c5..da574de46f6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java
@@ -39,6 +39,9 @@ public abstract class TsFileInsertionEventParser implements 
AutoCloseable {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TsFileInsertionEventParser.class);
 
+  protected final String pipeName;
+  protected final long creationTime;
+
   protected final TreePattern treePattern; // used to filter data
   protected final TablePattern tablePattern; // used to filter data
   protected final GlobalTimeExpression timeFilterExpression; // used to filter 
data
@@ -48,17 +51,24 @@ public abstract class TsFileInsertionEventParser implements 
AutoCloseable {
   protected final PipeTaskMeta pipeTaskMeta; // used to report progress
   protected final PipeInsertionEvent sourceEvent; // used to report progress
 
+  protected final long initialTimeNano = System.nanoTime();
+
   protected final PipeMemoryBlock allocatedMemoryBlockForTablet;
 
   protected TsFileSequenceReader tsFileSequenceReader;
 
   protected TsFileInsertionEventParser(
+      final String pipeName,
+      final long creationTime,
       final TreePattern treePattern,
       final TablePattern tablePattern,
       final long startTime,
       final long endTime,
       final PipeTaskMeta pipeTaskMeta,
       final PipeInsertionEvent sourceEvent) {
+    this.pipeName = pipeName;
+    this.creationTime = creationTime;
+
     this.treePattern = treePattern;
     this.tablePattern = tablePattern;
     timeFilterExpression =
@@ -83,6 +93,14 @@ public abstract class TsFileInsertionEventParser implements 
AutoCloseable {
 
   @Override
   public void close() {
+    try {
+      if (pipeName != null) {
+        // report time usage
+      }
+    } catch (final Exception e) {
+      LOGGER.warn("Failed to report time usage for parsing tsfile for pipe 
{}", pipeName, e);
+    }
+
     try {
       if (tsFileSequenceReader != null) {
         tsFileSequenceReader.close();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java
index 200130f6e14..4cf6af7f4db 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java
@@ -41,6 +41,9 @@ import java.util.stream.Collectors;
 
 public class TsFileInsertionEventParserProvider {
 
+  private final String pipeName;
+  private final long creationTime;
+
   private final File tsFile;
   private final TreePattern treePattern;
   private final TablePattern tablePattern;
@@ -52,6 +55,8 @@ public class TsFileInsertionEventParserProvider {
   private final String userName;
 
   public TsFileInsertionEventParserProvider(
+      final String pipeName,
+      final long creationTime,
       final File tsFile,
       final TreePattern treePattern,
       final TablePattern tablePattern,
@@ -60,6 +65,8 @@ public class TsFileInsertionEventParserProvider {
       final PipeTaskMeta pipeTaskMeta,
       final String userName,
       final PipeTsFileInsertionEvent sourceEvent) {
+    this.pipeName = pipeName;
+    this.creationTime = creationTime;
     this.tsFile = tsFile;
     this.treePattern = treePattern;
     this.tablePattern = tablePattern;
@@ -73,7 +80,15 @@ public class TsFileInsertionEventParserProvider {
   public TsFileInsertionEventParser provide() throws IOException {
     if (sourceEvent.isTableModelEvent()) {
       return new TsFileInsertionEventTableParser(
-          tsFile, tablePattern, startTime, endTime, pipeTaskMeta, userName, 
sourceEvent);
+          pipeName,
+          creationTime,
+          tsFile,
+          tablePattern,
+          startTime,
+          endTime,
+          pipeTaskMeta,
+          userName,
+          sourceEvent);
     }
 
     // Use scan container to save memory
@@ -81,7 +96,14 @@ public class TsFileInsertionEventParserProvider {
             / PipeDataNodeResourceManager.memory().getTotalMemorySizeInBytes()
         > PipeTsFileResource.MEMORY_SUFFICIENT_THRESHOLD) {
       return new TsFileInsertionEventScanParser(
-          tsFile, treePattern, startTime, endTime, pipeTaskMeta, sourceEvent);
+          pipeName,
+          creationTime,
+          tsFile,
+          treePattern,
+          startTime,
+          endTime,
+          pipeTaskMeta,
+          sourceEvent);
     }
 
     if (treePattern instanceof IoTDBTreePattern
@@ -93,7 +115,14 @@ public class TsFileInsertionEventParserProvider {
       // hard to know whether it only matches one timeseries, while matching 
multiple is often the
       // case.
       return new TsFileInsertionEventQueryParser(
-          tsFile, treePattern, startTime, endTime, pipeTaskMeta, sourceEvent);
+          pipeName,
+          creationTime,
+          tsFile,
+          treePattern,
+          startTime,
+          endTime,
+          pipeTaskMeta,
+          sourceEvent);
     }
 
     final Map<IDeviceID, Boolean> deviceIsAlignedMap =
@@ -102,7 +131,14 @@ public class TsFileInsertionEventParserProvider {
       // If we failed to get from cache, it indicates that the memory usage is 
high.
       // We use scan data container because it requires less memory.
       return new TsFileInsertionEventScanParser(
-          tsFile, treePattern, startTime, endTime, pipeTaskMeta, sourceEvent);
+          pipeName,
+          creationTime,
+          tsFile,
+          treePattern,
+          startTime,
+          endTime,
+          pipeTaskMeta,
+          sourceEvent);
     }
 
     final int originalSize = deviceIsAlignedMap.size();
@@ -112,8 +148,17 @@ public class TsFileInsertionEventParserProvider {
     return (double) filteredDeviceIsAlignedMap.size() / originalSize
             > PipeConfig.getInstance().getPipeTsFileScanParsingThreshold()
         ? new TsFileInsertionEventScanParser(
-            tsFile, treePattern, startTime, endTime, pipeTaskMeta, sourceEvent)
+            pipeName,
+            creationTime,
+            tsFile,
+            treePattern,
+            startTime,
+            endTime,
+            pipeTaskMeta,
+            sourceEvent)
         : new TsFileInsertionEventQueryParser(
+            pipeName,
+            creationTime,
             tsFile,
             treePattern,
             startTime,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
index 52a2b054c35..ba07299d66b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
@@ -75,10 +75,12 @@ public class TsFileInsertionEventQueryParser extends 
TsFileInsertionEventParser
       final long endTime,
       final PipeInsertionEvent sourceEvent)
       throws IOException {
-    this(tsFile, pattern, startTime, endTime, null, sourceEvent);
+    this(null, 0, tsFile, pattern, startTime, endTime, null, sourceEvent);
   }
 
   public TsFileInsertionEventQueryParser(
+      final String pipeName,
+      final long creationTime,
       final File tsFile,
       final TreePattern pattern,
       final long startTime,
@@ -86,10 +88,21 @@ public class TsFileInsertionEventQueryParser extends 
TsFileInsertionEventParser
       final PipeTaskMeta pipeTaskMeta,
       final PipeInsertionEvent sourceEvent)
       throws IOException {
-    this(tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent, null);
+    this(
+        pipeName,
+        creationTime,
+        tsFile,
+        pattern,
+        startTime,
+        endTime,
+        pipeTaskMeta,
+        sourceEvent,
+        null);
   }
 
   public TsFileInsertionEventQueryParser(
+      final String pipeName,
+      final long creationTime,
       final File tsFile,
       final TreePattern pattern,
       final long startTime,
@@ -98,7 +111,7 @@ public class TsFileInsertionEventQueryParser extends 
TsFileInsertionEventParser
       final PipeInsertionEvent sourceEvent,
       final Map<IDeviceID, Boolean> deviceIsAlignedMap)
       throws IOException {
-    super(pattern, null, startTime, endTime, pipeTaskMeta, sourceEvent);
+    super(pipeName, creationTime, pattern, null, startTime, endTime, 
pipeTaskMeta, sourceEvent);
 
     try {
       final PipeTsFileResourceManager tsFileResourceManager = 
PipeDataNodeResourceManager.tsfile();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
index 54bdbc174a8..1559d472e04 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
@@ -87,6 +87,8 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
   private byte lastMarker = Byte.MIN_VALUE;
 
   public TsFileInsertionEventScanParser(
+      final String pipeName,
+      final long creationTime,
       final File tsFile,
       final TreePattern pattern,
       final long startTime,
@@ -94,7 +96,7 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
       final PipeTaskMeta pipeTaskMeta,
       final PipeInsertionEvent sourceEvent)
       throws IOException {
-    super(pattern, null, startTime, endTime, pipeTaskMeta, sourceEvent);
+    super(pipeName, creationTime, pattern, null, startTime, endTime, 
pipeTaskMeta, sourceEvent);
 
     this.startTime = startTime;
     this.endTime = endTime;
@@ -115,6 +117,17 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
     }
   }
 
+  public TsFileInsertionEventScanParser(
+      final File tsFile,
+      final TreePattern pattern,
+      final long startTime,
+      final long endTime,
+      final PipeTaskMeta pipeTaskMeta,
+      final PipeInsertionEvent sourceEvent)
+      throws IOException {
+    this(null, 0, tsFile, pattern, startTime, endTime, pipeTaskMeta, 
sourceEvent);
+  }
+
   @Override
   public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
     return () ->
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
index b363a163063..8011f9c3830 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java
@@ -53,6 +53,8 @@ public class TsFileInsertionEventTableParser extends 
TsFileInsertionEventParser
   private final PipeMemoryBlock allocatedMemoryBlockForTableSchemas;
 
   public TsFileInsertionEventTableParser(
+      final String pipeName,
+      final long creationTime,
       final File tsFile,
       final TablePattern pattern,
       final long startTime,
@@ -61,7 +63,7 @@ public class TsFileInsertionEventTableParser extends 
TsFileInsertionEventParser
       final String userName,
       final PipeInsertionEvent sourceEvent)
       throws IOException {
-    super(null, pattern, startTime, endTime, pipeTaskMeta, sourceEvent);
+    super(pipeName, creationTime, null, pattern, startTime, endTime, 
pipeTaskMeta, sourceEvent);
 
     try {
       this.allocatedMemoryBlockForChunk =
@@ -85,6 +87,18 @@ public class TsFileInsertionEventTableParser extends 
TsFileInsertionEventParser
     }
   }
 
+  public TsFileInsertionEventTableParser(
+      final File tsFile,
+      final TablePattern pattern,
+      final long startTime,
+      final long endTime,
+      final PipeTaskMeta pipeTaskMeta,
+      final String userName,
+      final PipeInsertionEvent sourceEvent)
+      throws IOException {
+    this(null, 0, tsFile, pattern, startTime, endTime, pipeTaskMeta, userName, 
sourceEvent);
+  }
+
   @Override
   public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
     return () ->

Reply via email to