This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new fe52d5de3e6 Load: Add some load metrics of time cost, write point and
disk throughput (#12735)
fe52d5de3e6 is described below
commit fe52d5de3e6559d8d75ba44ef756d2af27bc77cd
Author: Itami Sho <[email protected]>
AuthorDate: Thu Jun 20 16:43:13 2024 +0800
Load: Add some load metrics of time cost, write point and disk throughput
(#12735)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../execution/load/LoadTsFileManager.java | 5 +-
.../execution/load/LoadTsFileRateLimiter.java | 3 +
.../load/LoadTsFileAnalyzeSchemaMemoryBlock.java | 2 +-
.../metric/load/LoadTsFileCostMetricsSet.java | 131 +++++++++++++++++++++
.../metric/{ => load}/LoadTsFileMemMetricSet.java | 2 +-
.../queryengine/plan/analyze/AnalyzeVisitor.java | 6 +
.../plan/scheduler/load/LoadTsFileScheduler.java | 42 ++++++-
.../db/service/metrics/DataNodeMetricsHelper.java | 6 +-
.../iotdb/commons/service/metric/enums/Metric.java | 6 +-
9 files changed, 192 insertions(+), 11 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index 25d524154cd..61de9f2c905 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -412,6 +412,7 @@ public class LoadTsFileManager {
DataRegion dataRegion = entry.getKey().getDataRegion();
dataRegion.loadNewTsFile(generateResource(writer, progressIndex),
true, isGeneratedByPipe);
+ // Metrics
dataRegion
.getNonSystemDatabaseName()
.ifPresent(
@@ -431,7 +432,9 @@ public class LoadTsFileManager {
Tag.DATABASE.toString(),
databaseName,
Tag.REGION.toString(),
- dataRegion.getDataRegionId());
+ dataRegion.getDataRegionId(),
+ Tag.TYPE.toString(),
+ Metric.LOAD_POINT_COUNT.toString());
});
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
index d98a1d102fc..dedda70996d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileRateLimiter.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.execution.load;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.queryengine.metric.load.LoadTsFileCostMetricsSet;
import com.google.common.util.concurrent.AtomicDouble;
import com.google.common.util.concurrent.RateLimiter;
@@ -37,6 +38,8 @@ public class LoadTsFileRateLimiter {
private final RateLimiter loadWriteRateLimiter;
public void acquire(long bytes) {
+ LoadTsFileCostMetricsSet.getInstance().recordDiskIO(bytes);
+
if (reloadParams()) {
return;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileAnalyzeSchemaMemoryBlock.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileAnalyzeSchemaMemoryBlock.java
index 17aa4150bcd..17b52745f23 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileAnalyzeSchemaMemoryBlock.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileAnalyzeSchemaMemoryBlock.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.queryengine.load;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
-import org.apache.iotdb.db.queryengine.metric.LoadTsFileMemMetricSet;
+import org.apache.iotdb.db.queryengine.metric.load.LoadTsFileMemMetricSet;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.slf4j.Logger;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/load/LoadTsFileCostMetricsSet.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/load/LoadTsFileCostMetricsSet.java
new file mode 100644
index 00000000000..b0ce2ca69a4
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/load/LoadTsFileCostMetricsSet.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.metric.load;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Rate;
+import org.apache.iotdb.metrics.type.Timer;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import java.util.Arrays;
+
+public class LoadTsFileCostMetricsSet implements IMetricSet {
+
+ private static final LoadTsFileCostMetricsSet INSTANCE = new
LoadTsFileCostMetricsSet();
+
+ public static final String ANALYSIS = "analysis";
+ public static final String FIRST_PHASE = "first_phase";
+ public static final String SECOND_PHASE = "second_phase";
+ public static final String LOAD_LOCALLY = "load_locally";
+
+ private LoadTsFileCostMetricsSet() {
+ // empty constructor
+ }
+
+ private Timer analyzerTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer firstPhaseTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer secondPhaseTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+ private Timer loadLocallyTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+
+ private Rate diskIORate = DoNothingMetricManager.DO_NOTHING_RATE;
+
+ public void recordPhaseTimeCost(String stage, long costTimeInNanos) {
+ switch (stage) {
+ case ANALYSIS:
+ analyzerTimer.updateNanos(costTimeInNanos);
+ break;
+ case FIRST_PHASE:
+ firstPhaseTimer.updateNanos(costTimeInNanos);
+ break;
+ case SECOND_PHASE:
+ secondPhaseTimer.updateNanos(costTimeInNanos);
+ break;
+ case LOAD_LOCALLY:
+ loadLocallyTimer.updateNanos(costTimeInNanos);
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported stage: " + stage);
+ }
+ }
+
+ public void recordDiskIO(long bytes) {
+ diskIORate.mark(bytes);
+ }
+
+ @Override
+ public void bindTo(AbstractMetricService metricService) {
+ analyzerTimer =
+ metricService.getOrCreateTimer(
+ Metric.LOAD_TIME_COST.toString(), MetricLevel.IMPORTANT,
Tag.NAME.toString(), ANALYSIS);
+ firstPhaseTimer =
+ metricService.getOrCreateTimer(
+ Metric.LOAD_TIME_COST.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ FIRST_PHASE);
+ secondPhaseTimer =
+ metricService.getOrCreateTimer(
+ Metric.LOAD_TIME_COST.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ SECOND_PHASE);
+ loadLocallyTimer =
+ metricService.getOrCreateTimer(
+ Metric.LOAD_TIME_COST.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ LOAD_LOCALLY);
+
+ diskIORate =
+ metricService.getOrCreateRate(
+ Metric.LOAD_DISK_IO.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ "DataNode " +
IoTDBDescriptor.getInstance().getConfig().getDataNodeId());
+ }
+
+ @Override
+ public void unbindFrom(AbstractMetricService metricService) {
+ Arrays.asList(ANALYSIS, FIRST_PHASE, SECOND_PHASE, LOAD_LOCALLY)
+ .forEach(
+ stage ->
+ metricService.remove(
+ MetricType.TIMER,
+ Metric.LOAD_TIME_COST.toString(),
+ Tag.NAME.toString(),
+ stage));
+
+ metricService.remove(
+ MetricType.RATE,
+ Metric.LOAD_DISK_IO.toString(),
+ Tag.NAME.toString(),
+
String.valueOf(IoTDBDescriptor.getInstance().getConfig().getDataNodeId()));
+ }
+
+ public static LoadTsFileCostMetricsSet getInstance() {
+ return LoadTsFileCostMetricsSet.INSTANCE;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/LoadTsFileMemMetricSet.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/load/LoadTsFileMemMetricSet.java
similarity index 98%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/LoadTsFileMemMetricSet.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/load/LoadTsFileMemMetricSet.java
index 7e822421302..c1953815854 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/LoadTsFileMemMetricSet.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/load/LoadTsFileMemMetricSet.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.metric;
+package org.apache.iotdb.db.queryengine.metric.load;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 6d5058e5417..bcd90ceaf1e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -60,6 +60,7 @@ import
org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo;
import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
import org.apache.iotdb.db.queryengine.execution.operator.window.WindowType;
import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet;
+import org.apache.iotdb.db.queryengine.metric.load.LoadTsFileCostMetricsSet;
import
org.apache.iotdb.db.queryengine.plan.analyze.lock.DataNodeSchemaLockManager;
import org.apache.iotdb.db.queryengine.plan.analyze.lock.SchemaLockType;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
@@ -186,6 +187,7 @@ import static
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant
import static
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.ENDTIME;
import static
org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.PARTITION_FETCHER;
import static
org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.SCHEMA_FETCHER;
+import static
org.apache.iotdb.db.queryengine.metric.load.LoadTsFileCostMetricsSet.ANALYSIS;
import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.bindSchemaForExpression;
import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.concatDeviceAndBindSchemaForExpression;
import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.getMeasurementExpression;
@@ -2846,6 +2848,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
@Override
public Analysis visitLoadFile(LoadTsFileStatement loadTsFileStatement,
MPPQueryContext context) {
+ long startTime = System.nanoTime();
try (final LoadTsfileAnalyzer loadTsfileAnalyzer =
new LoadTsfileAnalyzer(loadTsFileStatement, context, partitionFetcher,
schemaFetcher)) {
return loadTsfileAnalyzer.analyzeFileByFile();
@@ -2860,6 +2863,9 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
analysis.setFinishQueryAfterAnalyze(true);
analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.LOAD_FILE_ERROR,
exceptionMessage));
return analysis;
+ } finally {
+ LoadTsFileCostMetricsSet.getInstance()
+ .recordPhaseTimeCost(ANALYSIS, System.nanoTime() - startTime);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index a36e75aed2f..91b01c47949 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -50,6 +50,7 @@ import
org.apache.iotdb.db.queryengine.execution.load.TsFileData;
import org.apache.iotdb.db.queryengine.execution.load.TsFileSplitter;
import org.apache.iotdb.db.queryengine.load.LoadTsFileDataCacheMemoryBlock;
import org.apache.iotdb.db.queryengine.load.LoadTsFileMemoryManager;
+import org.apache.iotdb.db.queryengine.metric.load.LoadTsFileCostMetricsSet;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
@@ -108,6 +109,9 @@ public class LoadTsFileScheduler implements IScheduler {
private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
+ private static final LoadTsFileCostMetricsSet LOAD_TSFILE_COST_METRICS_SET =
+ LoadTsFileCostMetricsSet.getInstance();
+
private static final long SINGLE_SCHEDULER_MAX_MEMORY_SIZE =
IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize() >> 2;
private static final int TRANSMIT_LIMIT =
@@ -177,16 +181,38 @@ public class LoadTsFileScheduler implements IScheduler {
partitionFetcher.queryDataPartition(
slotList,
queryContext.getSession().getUserName()))) { // do not
decode, load locally
- isLoadSingleTsFileSuccess = loadLocally(node);
+ final long startTime = System.nanoTime();
+ try {
+ isLoadSingleTsFileSuccess = loadLocally(node);
+ } finally {
+ LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost(
+ LoadTsFileCostMetricsSet.LOAD_LOCALLY, System.nanoTime() -
startTime);
+ }
+
node.clean();
} else { // need decode, load locally or remotely, use two phases
method
String uuid = UUID.randomUUID().toString();
dispatcher.setUuid(uuid);
allReplicaSets.clear();
- boolean isFirstPhaseSuccess = firstPhase(node);
- boolean isSecondPhaseSuccess =
- secondPhase(isFirstPhaseSuccess, uuid,
node.getTsFileResource());
+ long startTime = System.nanoTime();
+ final boolean isFirstPhaseSuccess;
+ try {
+ isFirstPhaseSuccess = firstPhase(node);
+ } finally {
+ LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost(
+ LoadTsFileCostMetricsSet.FIRST_PHASE, System.nanoTime() -
startTime);
+ }
+
+ startTime = System.nanoTime();
+ final boolean isSecondPhaseSuccess;
+ try {
+ isSecondPhaseSuccess =
+ secondPhase(isFirstPhaseSuccess, uuid,
node.getTsFileResource());
+ } finally {
+ LOAD_TSFILE_COST_METRICS_SET.recordPhaseTimeCost(
+ LoadTsFileCostMetricsSet.SECOND_PHASE, System.nanoTime() -
startTime);
+ }
node.clean();
if (!isFirstPhaseSuccess || !isSecondPhaseSuccess) {
@@ -433,7 +459,9 @@ public class LoadTsFileScheduler implements IScheduler {
Tag.DATABASE.toString(),
databaseName,
Tag.REGION.toString(),
- dataRegion.getDataRegionId());
+ dataRegion.getDataRegionId(),
+ Tag.TYPE.toString(),
+ Metric.LOAD_POINT_COUNT.toString());
if (!node.isGeneratedByRemoteConsensusLeader()) {
MetricService.getInstance()
.count(
@@ -445,7 +473,9 @@ public class LoadTsFileScheduler implements IScheduler {
Tag.DATABASE.toString(),
databaseName,
Tag.REGION.toString(),
- dataRegion.getDataRegionId());
+ dataRegion.getDataRegionId(),
+ Tag.TYPE.toString(),
+ Metric.LOAD_POINT_COUNT.toString());
}
});
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
index ade06cb875f..e5e3f27f5c1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java
@@ -34,12 +34,13 @@ import org.apache.iotdb.db.pipe.metric.PipeDataNodeMetrics;
import org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet;
import org.apache.iotdb.db.queryengine.metric.DataExchangeCountMetricSet;
import org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet;
-import org.apache.iotdb.db.queryengine.metric.LoadTsFileMemMetricSet;
import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet;
import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet;
import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet;
import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
+import org.apache.iotdb.db.queryengine.metric.load.LoadTsFileCostMetricsSet;
+import org.apache.iotdb.db.queryengine.metric.load.LoadTsFileMemMetricSet;
import org.apache.iotdb.db.subscription.metric.SubscriptionMetrics;
import org.apache.iotdb.metrics.metricsets.UpTimeMetrics;
import org.apache.iotdb.metrics.metricsets.disk.DiskMetrics;
@@ -93,6 +94,9 @@ public class DataNodeMetricsHelper {
// bind subscription related metrics
MetricService.getInstance().addMetricSet(SubscriptionMetrics.getInstance());
+
+ // bind load related metrics
+
MetricService.getInstance().addMetricSet(LoadTsFileCostMetricsSet.getInstance());
}
private static void initSystemMetrics() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index 76a40ce2e3d..219ead69451 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -176,7 +176,11 @@ public enum Metric {
SUBSCRIPTION_CURRENT_COMMIT_ID("subscription_current_commit_id"),
SUBSCRIPTION_EVENT_TRANSFER("subscription_event_transfer"),
// load related
- LOAD_MEM("load_mem");
+ LOAD_MEM("load_mem"),
+ LOAD_DISK_IO("load_disk_io"),
+ LOAD_TIME_COST("load_time_cost"),
+ LOAD_POINT_COUNT("load_point_count"),
+ ;
final String value;