This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b13edf3d099 [IOTDB-6207] Load: Add Write Point Rate Metric (#11379)
b13edf3d099 is described below
commit b13edf3d09986899916d235dd3103ffc09561ad0
Author: Itami Sho <[email protected]>
AuthorDate: Mon Oct 30 10:01:13 2023 +0800
[IOTDB-6207] Load: Add Write Point Rate Metric (#11379)
Co-authored-by: yschengzi <[email protected]>
---
.../execution/load/LoadTsFileManager.java | 30 +++++++++++++++++++---
.../plan/analyze/LoadTsfileAnalyzer.java | 22 +++++++++++-----
.../plan/node/load/LoadSingleTsFileNode.java | 19 ++++++++------
.../planner/plan/node/load/LoadTsFileNode.java | 10 ++++++--
.../plan/scheduler/load/LoadTsFileScheduler.java | 27 +++++++++++++++++++
.../plan/statement/crud/LoadTsFileStatement.java | 11 ++++++++
.../crud/PipeEnrichedLoadTsFileStatement.java | 10 ++++++++
.../dataregion/memtable/AbstractMemTable.java | 2 +-
.../plan/plan/node/load/LoadTsFileNodeTest.java | 2 +-
.../iotdb/commons/service/metric/enums/Metric.java | 1 +
10 files changed, 112 insertions(+), 22 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index 97542ceab24..00aa3bf9b52 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -23,6 +23,9 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -33,6 +36,7 @@ import
org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler.L
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
@@ -260,10 +264,21 @@ public class LoadTsFileManager {
writer.endChunkGroup();
}
writer.endFile();
- entry
- .getKey()
- .getDataRegion()
- .loadNewTsFile(generateResource(writer), true, isGeneratedByPipe);
+
+ DataRegion dataRegion = entry.getKey().getDataRegion();
+ dataRegion.loadNewTsFile(generateResource(writer), true,
isGeneratedByPipe);
+
+ MetricService.getInstance()
+ .count(
+ getTsFileWritePointCount(writer),
+ Metric.QUANTITY.toString(),
+ MetricLevel.CORE,
+ Tag.NAME.toString(),
+ Metric.POINTS_IN.toString(),
+ Tag.DATABASE.toString(),
+ dataRegion.getDatabaseName(),
+ Tag.REGION.toString(),
+ dataRegion.getDataRegionId());
}
}
@@ -273,6 +288,13 @@ public class LoadTsFileManager {
return tsFileResource;
}
+ private long getTsFileWritePointCount(TsFileIOWriter writer) {
+ return writer.getChunkGroupMetadataList().stream()
+ .flatMap(chunkGroupMetadata ->
chunkGroupMetadata.getChunkMetadataList().stream())
+ .mapToLong(chunkMetadata -> chunkMetadata.getStatistics().getCount())
+ .sum();
+ }
+
private void close() {
if (isClosed) {
return;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
index cbad758cc55..6f718c25383 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
@@ -174,13 +174,16 @@ public class LoadTsfileAnalyzer {
private void analyzeSingleTsFile(File tsFile) throws IOException,
AuthException {
try (final TsFileSequenceReader reader = new
TsFileSequenceReader(tsFile.getAbsolutePath())) {
// can be reused when constructing tsfile resource
- Map<String, List<TimeseriesMetadata>> device2TimeseriesMetadata = null;
+ final Map<String, List<TimeseriesMetadata>> device2TimeseriesMetadata =
+ reader.getAllTimeseriesMetadata(true);
+ if (device2TimeseriesMetadata.isEmpty()) {
+ LOGGER.warn("device2TimeseriesMetadata is empty, because maybe the
tsfile is empty");
+ return;
+ }
// auto create or verify schema
if (IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()
|| loadTsFileStatement.isVerifySchema()) {
- // cache timeseries metadata for the next step
- device2TimeseriesMetadata = reader.getAllTimeseriesMetadata(true);
final TimeSeriesIterator timeSeriesIterator =
new TimeSeriesIterator(tsFile, device2TimeseriesMetadata);
@@ -194,9 +197,6 @@ public class LoadTsfileAnalyzer {
// construct tsfile resource
final TsFileResource tsFileResource = new TsFileResource(tsFile);
if (!tsFileResource.resourceFileExists()) {
- if (device2TimeseriesMetadata == null) {
- device2TimeseriesMetadata = reader.getAllTimeseriesMetadata(true);
- }
// it will be serialized in LoadSingleTsFileNode
FileLoaderUtils.updateTsFileResource(device2TimeseriesMetadata,
tsFileResource);
tsFileResource.updatePlanIndexes(reader.getMinPlanIndex());
@@ -204,12 +204,22 @@ public class LoadTsfileAnalyzer {
} else {
tsFileResource.deserialize();
}
+
TimestampPrecisionUtils.checkTimestampPrecision(tsFileResource.getFileEndTime());
tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
+
loadTsFileStatement.addTsFileResource(tsFileResource);
+
loadTsFileStatement.addWritePointCount(getWritePointCount(device2TimeseriesMetadata));
}
}
+ private long getWritePointCount(Map<String, List<TimeseriesMetadata>>
device2TimeseriesMetadata) {
+ return device2TimeseriesMetadata.values().stream()
+ .flatMap(List::stream)
+ .mapToLong(t -> t.getStatistics().getCount())
+ .sum();
+ }
+
private static final class TimeSeriesIterator
implements Iterator<Pair<String, TimeseriesMetadata>> {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
index 897522afa04..c4ff3c34407 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
@@ -54,22 +54,21 @@ import java.util.function.Function;
public class LoadSingleTsFileNode extends WritePlanNode {
private static final Logger logger =
LoggerFactory.getLogger(LoadSingleTsFileNode.class);
- private File tsFile;
- private TsFileResource resource;
+ private final File tsFile;
+ private final TsFileResource resource;
+ private final boolean deleteAfterLoad;
+ private final long writePointCount;
private boolean needDecodeTsFile;
- private boolean deleteAfterLoad;
private TRegionReplicaSet localRegionReplicaSet;
- public LoadSingleTsFileNode(PlanNodeId id) {
- super(id);
- }
-
- public LoadSingleTsFileNode(PlanNodeId id, TsFileResource resource, boolean
deleteAfterLoad) {
+ public LoadSingleTsFileNode(
+ PlanNodeId id, TsFileResource resource, boolean deleteAfterLoad, long
writePointCount) {
super(id);
this.tsFile = resource.getTsFile();
this.resource = resource;
this.deleteAfterLoad = deleteAfterLoad;
+ this.writePointCount = writePointCount;
}
public boolean isTsFileEmpty() {
@@ -138,6 +137,10 @@ public class LoadSingleTsFileNode extends WritePlanNode {
return deleteAfterLoad;
}
+ public long getWritePointCount() {
+ return writePointCount;
+ }
+
/**
* only used for load locally.
*
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
index 793e6139693..23a5620a611 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
@@ -89,8 +89,14 @@ public class LoadTsFileNode extends WritePlanNode {
public List<WritePlanNode> splitByPartition(Analysis analysis) {
List<WritePlanNode> res = new ArrayList<>();
LoadTsFileStatement statement = (LoadTsFileStatement)
analysis.getStatement();
- for (TsFileResource resource : resources) {
- res.add(new LoadSingleTsFileNode(getPlanNodeId(), resource,
statement.isDeleteAfterLoad()));
+
+ for (int i = 0; i < resources.size(); i++) {
+ res.add(
+ new LoadSingleTsFileNode(
+ getPlanNodeId(),
+ resources.get(i),
+ statement.isDeleteAfterLoad(),
+ statement.getWritePointCount(i)));
}
return res;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index a1cf6f83749..d960a86ba23 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -26,9 +26,14 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.StorageExecutor;
+import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
@@ -47,6 +52,9 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadSingleTsF
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
import
org.apache.iotdb.db.queryengine.plan.scheduler.FragInstanceDispatchResult;
import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -341,6 +349,25 @@ public class LoadTsFileScheduler implements IScheduler {
stateMachine.transitionToFailed(e.getFailureStatus());
return false;
}
+
+ // add metrics
+ DataRegion dataRegion =
+ StorageEngine.getInstance()
+ .getDataRegion(
+ (DataRegionId)
+ ConsensusGroupId.Factory.createFromTConsensusGroupId(
+ node.getLocalRegionReplicaSet().getRegionId()));
+ MetricService.getInstance()
+ .count(
+ node.getWritePointCount(),
+ Metric.QUANTITY.toString(),
+ MetricLevel.CORE,
+ Tag.NAME.toString(),
+ Metric.POINTS_IN.toString(),
+ Tag.DATABASE.toString(),
+ dataRegion.getDatabaseName(),
+ Tag.REGION.toString(),
+ dataRegion.getDataRegionId());
return true;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
index ca157c00fa3..68b676c7ca6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
@@ -46,6 +46,7 @@ public class LoadTsFileStatement extends Statement {
private final List<File> tsFiles;
private final List<TsFileResource> resources;
+ private final List<Long> writePointCountList;
public LoadTsFileStatement(String filePath) throws FileNotFoundException {
this.file = new File(filePath);
@@ -55,6 +56,7 @@ public class LoadTsFileStatement extends Statement {
this.autoCreateDatabase =
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
this.tsFiles = new ArrayList<>();
this.resources = new ArrayList<>();
+ this.writePointCountList = new ArrayList<>();
this.statementType = StatementType.MULTI_BATCH_INSERT;
if (file.isFile()) {
@@ -79,6 +81,7 @@ public class LoadTsFileStatement extends Statement {
this.autoCreateDatabase =
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
this.tsFiles = new ArrayList<>();
this.resources = new ArrayList<>();
+ this.writePointCountList = new ArrayList<>();
this.statementType = StatementType.MULTI_BATCH_INSERT;
}
@@ -153,6 +156,14 @@ public class LoadTsFileStatement extends Statement {
return resources;
}
+ public void addWritePointCount(long writePointCount) {
+ writePointCountList.add(writePointCount);
+ }
+
+ public long getWritePointCount(int resourceIndex) {
+ return writePointCountList.get(resourceIndex);
+ }
+
@Override
public List<PartialPath> getPaths() {
return Collections.emptyList();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedLoadTsFileStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedLoadTsFileStatement.java
index e7a15b400ea..c2d9f349052 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedLoadTsFileStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedLoadTsFileStatement.java
@@ -110,6 +110,16 @@ public class PipeEnrichedLoadTsFileStatement extends
LoadTsFileStatement {
return loadTsFileStatement.getResources();
}
+ @Override
+ public void addWritePointCount(long writePointCount) {
+ loadTsFileStatement.addWritePointCount(writePointCount);
+ }
+
+ @Override
+ public long getWritePointCount(int resourceIndex) {
+ return loadTsFileStatement.getWritePointCount(resourceIndex);
+ }
+
@Override
public List<PartialPath> getPaths() {
return loadTsFileStatement.getPaths();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index 71863d10997..48aa981e31b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -99,7 +99,7 @@ public abstract class AbstractMemTable implements IMemTable {
private String database;
private String dataRegionId;
- private static final String METRIC_POINT_IN = "pointsIn";
+ private static final String METRIC_POINT_IN = Metric.POINTS_IN.toString();
protected AbstractMemTable() {
this.database = null;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/load/LoadTsFileNodeTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/load/LoadTsFileNodeTest.java
index 25d13fde459..2291a6a3543 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/load/LoadTsFileNodeTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/load/LoadTsFileNodeTest.java
@@ -39,7 +39,7 @@ public class LoadTsFileNodeTest {
@Test
public void testLoadSingleTsFileNode() {
TsFileResource resource = new TsFileResource(new File("1"));
- LoadSingleTsFileNode node = new LoadSingleTsFileNode(new PlanNodeId(""),
resource, true);
+ LoadSingleTsFileNode node = new LoadSingleTsFileNode(new PlanNodeId(""),
resource, true, 0L);
Assert.assertTrue(node.isDeleteAfterLoad());
Assert.assertEquals(resource, node.getTsFileResource());
Assert.assertNull(node.getLocalRegionReplicaSet());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index 18cc8955a64..f8c1a9fe9dc 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -52,6 +52,7 @@ public enum Metric {
RATIS_CONSENSUS_READ("ratis_consensus_read"),
// storage engine related
POINTS("points"),
+ POINTS_IN("points_in"),
COST_TASK("cost_task"),
QUEUE("queue"),
FLUSHING_MEM_TABLE_STATUS("flushing_mem_table_status"),