This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5619156e871 Pipe: Report load tsfile points to flush point metric
timeseries in root.__system (#11976)
5619156e871 is described below
commit 5619156e87107747aaaea75b8f83fefc7f852529
Author: Caideyipi <[email protected]>
AuthorDate: Sun Jan 28 17:58:08 2024 +0800
Pipe: Report load tsfile points to flush point metric timeseries in
root.__system (#11976)
---
.../execution/load/LoadTsFileManager.java | 33 ++++++++++++++--------
.../plan/scheduler/load/LoadTsFileScheduler.java | 33 ++++++++++++++--------
.../db/storageengine/dataregion/DataRegion.java | 17 +++++++++++
.../dataregion/flush/MemTableFlushTask.java | 23 +++++++--------
4 files changed, 71 insertions(+), 35 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index bbd3d3a760d..d7a87a3b219 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.pipe.agent.PipeAgent;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
import
org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler.LoadCommand;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
@@ -317,17 +318,27 @@ public class LoadTsFileManager {
DataRegion dataRegion = entry.getKey().getDataRegion();
dataRegion.loadNewTsFile(generateResource(writer, progressIndex),
true, isGeneratedByPipe);
- MetricService.getInstance()
- .count(
- getTsFileWritePointCount(writer),
- Metric.QUANTITY.toString(),
- MetricLevel.CORE,
- Tag.NAME.toString(),
- Metric.POINTS_IN.toString(),
- Tag.DATABASE.toString(),
- dataRegion.getDatabaseName(),
- Tag.REGION.toString(),
- dataRegion.getDataRegionId());
+ dataRegion
+ .getNonSystemDatabaseName()
+ .ifPresent(
+ databaseName -> {
+ long writePointCount = getTsFileWritePointCount(writer);
+ // Report load tsFile points to IoTDB flush metrics
+ MemTableFlushTask.recordFlushPointsMetricInternal(
+ writePointCount, databaseName,
dataRegion.getDataRegionId());
+
+ MetricService.getInstance()
+ .count(
+ writePointCount,
+ Metric.QUANTITY.toString(),
+ MetricLevel.CORE,
+ Tag.NAME.toString(),
+ Metric.POINTS_IN.toString(),
+ Tag.DATABASE.toString(),
+ databaseName,
+ Tag.REGION.toString(),
+ dataRegion.getDataRegionId());
+ });
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 579fc2ad263..aa18590d3c5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -59,6 +59,7 @@ import
org.apache.iotdb.db.queryengine.plan.scheduler.FragInstanceDispatchResult
import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
@@ -397,17 +398,27 @@ public class LoadTsFileScheduler implements IScheduler {
(DataRegionId)
ConsensusGroupId.Factory.createFromTConsensusGroupId(
node.getLocalRegionReplicaSet().getRegionId()));
- MetricService.getInstance()
- .count(
- node.getWritePointCount(),
- Metric.QUANTITY.toString(),
- MetricLevel.CORE,
- Tag.NAME.toString(),
- Metric.POINTS_IN.toString(),
- Tag.DATABASE.toString(),
- dataRegion.getDatabaseName(),
- Tag.REGION.toString(),
- dataRegion.getDataRegionId());
+
+ dataRegion
+ .getNonSystemDatabaseName()
+ .ifPresent(
+ databaseName -> {
+ // Report load tsFile points to IoTDB flush metrics
+ MemTableFlushTask.recordFlushPointsMetricInternal(
+ node.getWritePointCount(), databaseName,
dataRegion.getDataRegionId());
+
+ MetricService.getInstance()
+ .count(
+ node.getWritePointCount(),
+ Metric.QUANTITY.toString(),
+ MetricLevel.CORE,
+ Tag.NAME.toString(),
+ Metric.POINTS_IN.toString(),
+ Tag.DATABASE.toString(),
+ databaseName,
+ Tag.REGION.toString(),
+ dataRegion.getDataRegionId());
+ });
return true;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index c1a4ee9a7b8..5bda2c80bf3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -30,6 +30,7 @@ import
org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
@@ -138,6 +139,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
@@ -2531,6 +2533,21 @@ public class DataRegion implements IDataRegionForQuery {
BloomFilterCache.getInstance().clear();
}
+ public static Optional<String> getNonSystemDatabaseName(String databaseName)
{
+ if (databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE)) {
+ return Optional.empty();
+ }
+ int lastIndex = databaseName.lastIndexOf("-");
+ if (lastIndex == -1) {
+ lastIndex = databaseName.length();
+ }
+ return Optional.of(databaseName.substring(0, lastIndex));
+ }
+
+ public Optional<String> getNonSystemDatabaseName() {
+ return getNonSystemDatabaseName(databaseName);
+ }
+
/** merge file under this database processor */
public int compact() {
writeLock("merge");
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
index 46c919835a8..68ccba4bcec 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.storageengine.dataregion.flush;
-import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
@@ -26,6 +25,7 @@ import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.service.metrics.WritingMetrics;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import
org.apache.iotdb.db.storageengine.dataregion.flush.pool.FlushSubTaskPoolManager;
import org.apache.iotdb.db.storageengine.dataregion.memtable.IDeviceID;
import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
@@ -271,20 +271,17 @@ public class MemTableFlushTask {
Thread.currentThread().interrupt();
}
- recordFlushPointsMetric();
+ DataRegion.getNonSystemDatabaseName(storageGroup)
+ .ifPresent(
+ databaseName ->
+ recordFlushPointsMetricInternal(
+ memTable.getTotalPointsNum(), databaseName,
dataRegionId));
WRITING_METRICS.recordFlushCost(WritingMetrics.FLUSH_STAGE_ENCODING,
memSerializeTime);
}
};
- private void recordFlushPointsMetric() {
- if (storageGroup.startsWith(SchemaConstant.SYSTEM_DATABASE)) {
- return;
- }
- int lastIndex = storageGroup.lastIndexOf("-");
- if (lastIndex == -1) {
- lastIndex = storageGroup.length();
- }
- String storageGroupName = storageGroup.substring(0, lastIndex);
+ public static void recordFlushPointsMetricInternal(
+ long totalPointsNum, String storageGroupName, String dataRegionId) {
long currentTime = CommonDateTimeUtils.currentTime();
// compute the flush points
long writeTime =
@@ -300,12 +297,12 @@ public class MemTableFlushTask {
// record the flush points
MetricService.getInstance()
.gaugeWithInternalReportAsync(
- memTable.getTotalPointsNum(),
+ totalPointsNum,
Metric.POINTS.toString(),
MetricLevel.CORE,
writeTime,
Tag.DATABASE.toString(),
- storageGroup.substring(0, lastIndex),
+ storageGroupName,
Tag.TYPE.toString(),
"flush",
Tag.REGION.toString(),