This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 15f867c4aad Load: Supports metrics in the Async Analyzer stage
(#15312) (#15409)
15f867c4aad is described below
commit 15f867c4aad835fba9faaafdd2010b370aa52cf4
Author: Zhenyu Luo <[email protected]>
AuthorDate: Sun Apr 27 16:23:15 2025 +0800
Load: Supports metrics in the Async Analyzer stage (#15312) (#15409)
---
.../plan/analyze/LoadTsFileAnalyzer.java | 63 ++++++++++++----------
.../load/metrics/LoadTsFileCostMetricsSet.java | 14 ++++-
2 files changed, 48 insertions(+), 29 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
index 0c92abdee74..95caf7d81a9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
@@ -109,6 +109,7 @@ import java.util.stream.Collectors;
import static org.apache.iotdb.commons.utils.FileUtils.copyFileWithMD5Check;
import static org.apache.iotdb.commons.utils.FileUtils.moveFileWithMD5Check;
import static
org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet.ANALYSIS;
+import static
org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet.ANALYSIS_ASYNC_MOVE;
public class LoadTsFileAnalyzer implements AutoCloseable {
@@ -238,37 +239,43 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
}
private boolean doAsyncLoad(final Analysis analysis) {
- final String[] loadActiveListeningDirs =
- IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs();
- String targetFilePath = null;
- for (int i = 0, size = loadActiveListeningDirs == null ? 0 :
loadActiveListeningDirs.length;
- i < size;
- i++) {
- if (loadActiveListeningDirs[i] != null) {
- targetFilePath = loadActiveListeningDirs[i];
- break;
+ long startTime = System.nanoTime();
+ try {
+ final String[] loadActiveListeningDirs =
+
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs();
+ String targetFilePath = null;
+ for (int i = 0, size = loadActiveListeningDirs == null ? 0 :
loadActiveListeningDirs.length;
+ i < size;
+ i++) {
+ if (loadActiveListeningDirs[i] != null) {
+ targetFilePath = loadActiveListeningDirs[i];
+ break;
+ }
+ }
+ if (targetFilePath == null) {
+ LOGGER.warn("Load active listening dir is not set. Will try sync load
instead.");
+ return false;
}
- }
- if (targetFilePath == null) {
- LOGGER.warn("Load active listening dir is not set. Will try sync load
instead.");
- return false;
- }
- try {
- loadTsFilesAsyncToTargetDir(new File(targetFilePath), tsFiles);
- } catch (Exception e) {
- LOGGER.warn(
- "Failed to async load tsfiles {} to target dir {}. Will try sync
load instead.",
- tsFiles,
- targetFilePath,
- e);
- return false;
- }
+ try {
+ loadTsFilesAsyncToTargetDir(new File(targetFilePath), tsFiles);
+ } catch (Exception e) {
+ LOGGER.warn(
+ "Failed to async load tsfiles {} to target dir {}. Will try sync
load instead.",
+ tsFiles,
+ targetFilePath,
+ e);
+ return false;
+ }
- analysis.setFinishQueryAfterAnalyze(true);
- analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
- analysis.setStatement(loadTsFileStatement);
- return true;
+ analysis.setFinishQueryAfterAnalyze(true);
+ analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
+ analysis.setStatement(loadTsFileStatement);
+ return true;
+ } finally {
+ LoadTsFileCostMetricsSet.getInstance()
+ .recordPhaseTimeCost(ANALYSIS_ASYNC_MOVE, System.nanoTime() -
startTime);
+ }
}
private void loadTsFilesAsyncToTargetDir(final File targetDir, final
List<File> files)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
index 1ccd00cb9bd..28bd40c2d29 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/metrics/LoadTsFileCostMetricsSet.java
@@ -42,6 +42,7 @@ public class LoadTsFileCostMetricsSet implements IMetricSet {
public static final String LOAD_LOCALLY = "load_locally";
public static final String SCHEDULER_CAST_TABLETS = "scheduler_cast_tablets";
public static final String ANALYSIS_CAST_TABLETS = "analysis_cast_tablets";
+ public static final String ANALYSIS_ASYNC_MOVE = "analysis_async_move";
private LoadTsFileCostMetricsSet() {
// empty constructor
@@ -53,6 +54,7 @@ public class LoadTsFileCostMetricsSet implements IMetricSet {
private Timer loadLocallyTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
private Timer schedulerCastTabletsTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
private Timer analysisCastTabletsTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer analysisAsyncMoveTimer =
DoNothingMetricManager.DO_NOTHING_TIMER;
private Counter diskIOCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
@@ -76,6 +78,9 @@ public class LoadTsFileCostMetricsSet implements IMetricSet {
case ANALYSIS_CAST_TABLETS:
analysisCastTabletsTimer.updateNanos(costTimeInNanos);
break;
+ case ANALYSIS_ASYNC_MOVE:
+ analysisAsyncMoveTimer.updateNanos(costTimeInNanos);
+ break;
default:
throw new UnsupportedOperationException("Unsupported stage: " + stage);
}
@@ -120,6 +125,12 @@ public class LoadTsFileCostMetricsSet implements
IMetricSet {
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
ANALYSIS_CAST_TABLETS);
+ analysisAsyncMoveTimer =
+ metricService.getOrCreateTimer(
+ Metric.LOAD_TIME_COST.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ ANALYSIS_ASYNC_MOVE);
diskIOCounter =
metricService.getOrCreateCounter(
@@ -137,7 +148,8 @@ public class LoadTsFileCostMetricsSet implements IMetricSet
{
SECOND_PHASE,
LOAD_LOCALLY,
SCHEDULER_CAST_TABLETS,
- ANALYSIS_CAST_TABLETS)
+ ANALYSIS_CAST_TABLETS,
+ ANALYSIS_ASYNC_MOVE)
.forEach(
stage ->
metricService.remove(