This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 15f867c4aad Load: Supports metrics in the Async Analyzer stage 
(#15312) (#15409)
15f867c4aad is described below

commit 15f867c4aad835fba9faaafdd2010b370aa52cf4
Author: Zhenyu Luo <[email protected]>
AuthorDate: Sun Apr 27 16:23:15 2025 +0800

    Load: Supports metrics in the Async Analyzer stage (#15312) (#15409)
---
 .../plan/analyze/LoadTsFileAnalyzer.java           | 63 ++++++++++++----------
 .../load/metrics/LoadTsFileCostMetricsSet.java     | 14 ++++-
 2 files changed, 48 insertions(+), 29 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
index 0c92abdee74..95caf7d81a9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
@@ -109,6 +109,7 @@ import java.util.stream.Collectors;
 import static org.apache.iotdb.commons.utils.FileUtils.copyFileWithMD5Check;
 import static org.apache.iotdb.commons.utils.FileUtils.moveFileWithMD5Check;
 import static 
org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet.ANALYSIS;
+import static 
org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet.ANALYSIS_ASYNC_MOVE;
 
 public class LoadTsFileAnalyzer implements AutoCloseable {
 
@@ -238,37 +239,43 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
   }
 
   private boolean doAsyncLoad(final Analysis analysis) {
-    final String[] loadActiveListeningDirs =
-        IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs();
-    String targetFilePath = null;
-    for (int i = 0, size = loadActiveListeningDirs == null ? 0 : 
loadActiveListeningDirs.length;
-        i < size;
-        i++) {
-      if (loadActiveListeningDirs[i] != null) {
-        targetFilePath = loadActiveListeningDirs[i];
-        break;
+    long startTime = System.nanoTime();
+    try {
+      final String[] loadActiveListeningDirs =
+          
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs();
+      String targetFilePath = null;
+      for (int i = 0, size = loadActiveListeningDirs == null ? 0 : 
loadActiveListeningDirs.length;
+          i < size;
+          i++) {
+        if (loadActiveListeningDirs[i] != null) {
+          targetFilePath = loadActiveListeningDirs[i];
+          break;
+        }
+      }
+      if (targetFilePath == null) {
+        LOGGER.warn("Load active listening dir is not set. Will try sync load 
instead.");
+        return false;
       }
-    }
-    if (targetFilePath == null) {
-      LOGGER.warn("Load active listening dir is not set. Will try sync load 
instead.");
-      return false;
-    }
 
-    try {
-      loadTsFilesAsyncToTargetDir(new File(targetFilePath), tsFiles);
-    } catch (Exception e) {
-      LOGGER.warn(
-          "Failed to async load tsfiles {} to target dir {}. Will try sync 
load instead.",
-          tsFiles,
-          targetFilePath,
-          e);
-      return false;
-    }
+      try {
+        loadTsFilesAsyncToTargetDir(new File(targetFilePath), tsFiles);
+      } catch (Exception e) {
+        LOGGER.warn(
+            "Failed to async load tsfiles {} to target dir {}. Will try sync 
load instead.",
+            tsFiles,
+            targetFilePath,
+            e);
+        return false;
+      }
 
-    analysis.setFinishQueryAfterAnalyze(true);
-    analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
-    analysis.setStatement(loadTsFileStatement);
-    return true;
+      analysis.setFinishQueryAfterAnalyze(true);
+      analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+      analysis.setStatement(loadTsFileStatement);
+      return true;
+    } finally {
+      LoadTsFileCostMetricsSet.getInstance()
+          .recordPhaseTimeCost(ANALYSIS_ASYNC_MOVE, System.nanoTime() - 
startTime);
+    }
   }
 
   private void loadTsFilesAsyncToTargetDir(final File targetDir, final 
List<File> files)
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
index 1ccd00cb9bd..28bd40c2d29 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
@@ -42,6 +42,7 @@ public class LoadTsFileCostMetricsSet implements IMetricSet {
   public static final String LOAD_LOCALLY = "load_locally";
   public static final String SCHEDULER_CAST_TABLETS = "scheduler_cast_tablets";
   public static final String ANALYSIS_CAST_TABLETS = "analysis_cast_tablets";
+  public static final String ANALYSIS_ASYNC_MOVE = "analysis_async_move";
 
   private LoadTsFileCostMetricsSet() {
     // empty constructor
@@ -53,6 +54,7 @@ public class LoadTsFileCostMetricsSet implements IMetricSet {
   private Timer loadLocallyTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
   private Timer schedulerCastTabletsTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
   private Timer analysisCastTabletsTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer analysisAsyncMoveTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
 
   private Counter diskIOCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
 
@@ -76,6 +78,9 @@ public class LoadTsFileCostMetricsSet implements IMetricSet {
       case ANALYSIS_CAST_TABLETS:
         analysisCastTabletsTimer.updateNanos(costTimeInNanos);
         break;
+      case ANALYSIS_ASYNC_MOVE:
+        analysisAsyncMoveTimer.updateNanos(costTimeInNanos);
+        break;
       default:
         throw new UnsupportedOperationException("Unsupported stage: " + stage);
     }
@@ -120,6 +125,12 @@ public class LoadTsFileCostMetricsSet implements 
IMetricSet {
             MetricLevel.IMPORTANT,
             Tag.NAME.toString(),
             ANALYSIS_CAST_TABLETS);
+    analysisAsyncMoveTimer =
+        metricService.getOrCreateTimer(
+            Metric.LOAD_TIME_COST.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            ANALYSIS_ASYNC_MOVE);
 
     diskIOCounter =
         metricService.getOrCreateCounter(
@@ -137,7 +148,8 @@ public class LoadTsFileCostMetricsSet implements IMetricSet 
{
             SECOND_PHASE,
             LOAD_LOCALLY,
             SCHEDULER_CAST_TABLETS,
-            ANALYSIS_CAST_TABLETS)
+            ANALYSIS_CAST_TABLETS,
+            ANALYSIS_ASYNC_MOVE)
         .forEach(
             stage ->
                 metricService.remove(

Reply via email to