This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b13edf3d099 [IOTDB-6207] Load: Add Write Point Rate Metric (#11379)
b13edf3d099 is described below

commit b13edf3d09986899916d235dd3103ffc09561ad0
Author: Itami Sho <[email protected]>
AuthorDate: Mon Oct 30 10:01:13 2023 +0800

    [IOTDB-6207] Load: Add Write Point Rate Metric (#11379)
    
    Co-authored-by: yschengzi <[email protected]>
---
 .../execution/load/LoadTsFileManager.java          | 30 +++++++++++++++++++---
 .../plan/analyze/LoadTsfileAnalyzer.java           | 22 +++++++++++-----
 .../plan/node/load/LoadSingleTsFileNode.java       | 19 ++++++++------
 .../planner/plan/node/load/LoadTsFileNode.java     | 10 ++++++--
 .../plan/scheduler/load/LoadTsFileScheduler.java   | 27 +++++++++++++++++++
 .../plan/statement/crud/LoadTsFileStatement.java   | 11 ++++++++
 .../crud/PipeEnrichedLoadTsFileStatement.java      | 10 ++++++++
 .../dataregion/memtable/AbstractMemTable.java      |  2 +-
 .../plan/plan/node/load/LoadTsFileNodeTest.java    |  2 +-
 .../iotdb/commons/service/metric/enums/Metric.java |  1 +
 10 files changed, 112 insertions(+), 22 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index 97542ceab24..00aa3bf9b52 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -23,6 +23,9 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
 import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -33,6 +36,7 @@ import 
org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler.L
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.db.utils.FileLoaderUtils;
+import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 
@@ -260,10 +264,21 @@ public class LoadTsFileManager {
           writer.endChunkGroup();
         }
         writer.endFile();
-        entry
-            .getKey()
-            .getDataRegion()
-            .loadNewTsFile(generateResource(writer), true, isGeneratedByPipe);
+
+        DataRegion dataRegion = entry.getKey().getDataRegion();
+        dataRegion.loadNewTsFile(generateResource(writer), true, 
isGeneratedByPipe);
+
+        MetricService.getInstance()
+            .count(
+                getTsFileWritePointCount(writer),
+                Metric.QUANTITY.toString(),
+                MetricLevel.CORE,
+                Tag.NAME.toString(),
+                Metric.POINTS_IN.toString(),
+                Tag.DATABASE.toString(),
+                dataRegion.getDatabaseName(),
+                Tag.REGION.toString(),
+                dataRegion.getDataRegionId());
       }
     }
 
@@ -273,6 +288,13 @@ public class LoadTsFileManager {
       return tsFileResource;
     }
 
+    private long getTsFileWritePointCount(TsFileIOWriter writer) {
+      return writer.getChunkGroupMetadataList().stream()
+          .flatMap(chunkGroupMetadata -> 
chunkGroupMetadata.getChunkMetadataList().stream())
+          .mapToLong(chunkMetadata -> chunkMetadata.getStatistics().getCount())
+          .sum();
+    }
+
     private void close() {
       if (isClosed) {
         return;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
index cbad758cc55..6f718c25383 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
@@ -174,13 +174,16 @@ public class LoadTsfileAnalyzer {
   private void analyzeSingleTsFile(File tsFile) throws IOException, 
AuthException {
     try (final TsFileSequenceReader reader = new 
TsFileSequenceReader(tsFile.getAbsolutePath())) {
       // can be reused when constructing tsfile resource
-      Map<String, List<TimeseriesMetadata>> device2TimeseriesMetadata = null;
+      final Map<String, List<TimeseriesMetadata>> device2TimeseriesMetadata =
+          reader.getAllTimeseriesMetadata(true);
+      if (device2TimeseriesMetadata.isEmpty()) {
+        LOGGER.warn("device2TimeseriesMetadata is empty, because maybe the 
tsfile is empty");
+        return;
+      }
 
       // auto create or verify schema
       if (IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()
           || loadTsFileStatement.isVerifySchema()) {
-        // cache timeseries metadata for the next step
-        device2TimeseriesMetadata = reader.getAllTimeseriesMetadata(true);
 
         final TimeSeriesIterator timeSeriesIterator =
             new TimeSeriesIterator(tsFile, device2TimeseriesMetadata);
@@ -194,9 +197,6 @@ public class LoadTsfileAnalyzer {
       // construct tsfile resource
       final TsFileResource tsFileResource = new TsFileResource(tsFile);
       if (!tsFileResource.resourceFileExists()) {
-        if (device2TimeseriesMetadata == null) {
-          device2TimeseriesMetadata = reader.getAllTimeseriesMetadata(true);
-        }
         // it will be serialized in LoadSingleTsFileNode
         FileLoaderUtils.updateTsFileResource(device2TimeseriesMetadata, 
tsFileResource);
         tsFileResource.updatePlanIndexes(reader.getMinPlanIndex());
@@ -204,12 +204,22 @@ public class LoadTsfileAnalyzer {
       } else {
         tsFileResource.deserialize();
       }
+
       
TimestampPrecisionUtils.checkTimestampPrecision(tsFileResource.getFileEndTime());
       tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
+
       loadTsFileStatement.addTsFileResource(tsFileResource);
+      
loadTsFileStatement.addWritePointCount(getWritePointCount(device2TimeseriesMetadata));
     }
   }
 
+  private long getWritePointCount(Map<String, List<TimeseriesMetadata>> 
device2TimeseriesMetadata) {
+    return device2TimeseriesMetadata.values().stream()
+        .flatMap(List::stream)
+        .mapToLong(t -> t.getStatistics().getCount())
+        .sum();
+  }
+
   private static final class TimeSeriesIterator
       implements Iterator<Pair<String, TimeseriesMetadata>> {
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
index 897522afa04..c4ff3c34407 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
@@ -54,22 +54,21 @@ import java.util.function.Function;
 public class LoadSingleTsFileNode extends WritePlanNode {
   private static final Logger logger = 
LoggerFactory.getLogger(LoadSingleTsFileNode.class);
 
-  private File tsFile;
-  private TsFileResource resource;
+  private final File tsFile;
+  private final TsFileResource resource;
+  private final boolean deleteAfterLoad;
+  private final long writePointCount;
   private boolean needDecodeTsFile;
-  private boolean deleteAfterLoad;
 
   private TRegionReplicaSet localRegionReplicaSet;
 
-  public LoadSingleTsFileNode(PlanNodeId id) {
-    super(id);
-  }
-
-  public LoadSingleTsFileNode(PlanNodeId id, TsFileResource resource, boolean 
deleteAfterLoad) {
+  public LoadSingleTsFileNode(
+      PlanNodeId id, TsFileResource resource, boolean deleteAfterLoad, long 
writePointCount) {
     super(id);
     this.tsFile = resource.getTsFile();
     this.resource = resource;
     this.deleteAfterLoad = deleteAfterLoad;
+    this.writePointCount = writePointCount;
   }
 
   public boolean isTsFileEmpty() {
@@ -138,6 +137,10 @@ public class LoadSingleTsFileNode extends WritePlanNode {
     return deleteAfterLoad;
   }
 
+  public long getWritePointCount() {
+    return writePointCount;
+  }
+
   /**
    * only used for load locally.
    *
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
index 793e6139693..23a5620a611 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java
@@ -89,8 +89,14 @@ public class LoadTsFileNode extends WritePlanNode {
   public List<WritePlanNode> splitByPartition(Analysis analysis) {
     List<WritePlanNode> res = new ArrayList<>();
     LoadTsFileStatement statement = (LoadTsFileStatement) 
analysis.getStatement();
-    for (TsFileResource resource : resources) {
-      res.add(new LoadSingleTsFileNode(getPlanNodeId(), resource, 
statement.isDeleteAfterLoad()));
+
+    for (int i = 0; i < resources.size(); i++) {
+      res.add(
+          new LoadSingleTsFileNode(
+              getPlanNodeId(),
+              resources.get(i),
+              statement.isDeleteAfterLoad(),
+              statement.getWritePointCount(i)));
     }
     return res;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index a1cf6f83749..d960a86ba23 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -26,9 +26,14 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
 import org.apache.iotdb.commons.partition.StorageExecutor;
+import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
@@ -47,6 +52,9 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadSingleTsF
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
 import 
org.apache.iotdb.db.queryengine.plan.scheduler.FragInstanceDispatchResult;
 import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -341,6 +349,25 @@ public class LoadTsFileScheduler implements IScheduler {
       stateMachine.transitionToFailed(e.getFailureStatus());
       return false;
     }
+
+    // add metrics
+    DataRegion dataRegion =
+        StorageEngine.getInstance()
+            .getDataRegion(
+                (DataRegionId)
+                    ConsensusGroupId.Factory.createFromTConsensusGroupId(
+                        node.getLocalRegionReplicaSet().getRegionId()));
+    MetricService.getInstance()
+        .count(
+            node.getWritePointCount(),
+            Metric.QUANTITY.toString(),
+            MetricLevel.CORE,
+            Tag.NAME.toString(),
+            Metric.POINTS_IN.toString(),
+            Tag.DATABASE.toString(),
+            dataRegion.getDatabaseName(),
+            Tag.REGION.toString(),
+            dataRegion.getDataRegionId());
     return true;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
index ca157c00fa3..68b676c7ca6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
@@ -46,6 +46,7 @@ public class LoadTsFileStatement extends Statement {
 
   private final List<File> tsFiles;
   private final List<TsFileResource> resources;
+  private final List<Long> writePointCountList;
 
   public LoadTsFileStatement(String filePath) throws FileNotFoundException {
     this.file = new File(filePath);
@@ -55,6 +56,7 @@ public class LoadTsFileStatement extends Statement {
     this.autoCreateDatabase = 
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
     this.tsFiles = new ArrayList<>();
     this.resources = new ArrayList<>();
+    this.writePointCountList = new ArrayList<>();
     this.statementType = StatementType.MULTI_BATCH_INSERT;
 
     if (file.isFile()) {
@@ -79,6 +81,7 @@ public class LoadTsFileStatement extends Statement {
     this.autoCreateDatabase = 
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled();
     this.tsFiles = new ArrayList<>();
     this.resources = new ArrayList<>();
+    this.writePointCountList = new ArrayList<>();
     this.statementType = StatementType.MULTI_BATCH_INSERT;
   }
 
@@ -153,6 +156,14 @@ public class LoadTsFileStatement extends Statement {
     return resources;
   }
 
+  public void addWritePointCount(long writePointCount) {
+    writePointCountList.add(writePointCount);
+  }
+
+  public long getWritePointCount(int resourceIndex) {
+    return writePointCountList.get(resourceIndex);
+  }
+
   @Override
   public List<PartialPath> getPaths() {
     return Collections.emptyList();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedLoadTsFileStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedLoadTsFileStatement.java
index e7a15b400ea..c2d9f349052 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedLoadTsFileStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedLoadTsFileStatement.java
@@ -110,6 +110,16 @@ public class PipeEnrichedLoadTsFileStatement extends 
LoadTsFileStatement {
     return loadTsFileStatement.getResources();
   }
 
+  @Override
+  public void addWritePointCount(long writePointCount) {
+    loadTsFileStatement.addWritePointCount(writePointCount);
+  }
+
+  @Override
+  public long getWritePointCount(int resourceIndex) {
+    return loadTsFileStatement.getWritePointCount(resourceIndex);
+  }
+
   @Override
   public List<PartialPath> getPaths() {
     return loadTsFileStatement.getPaths();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index 71863d10997..48aa981e31b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -99,7 +99,7 @@ public abstract class AbstractMemTable implements IMemTable {
   private String database;
   private String dataRegionId;
 
-  private static final String METRIC_POINT_IN = "pointsIn";
+  private static final String METRIC_POINT_IN = Metric.POINTS_IN.toString();
 
   protected AbstractMemTable() {
     this.database = null;
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/load/LoadTsFileNodeTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/load/LoadTsFileNodeTest.java
index 25d13fde459..2291a6a3543 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/load/LoadTsFileNodeTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/node/load/LoadTsFileNodeTest.java
@@ -39,7 +39,7 @@ public class LoadTsFileNodeTest {
   @Test
   public void testLoadSingleTsFileNode() {
     TsFileResource resource = new TsFileResource(new File("1"));
-    LoadSingleTsFileNode node = new LoadSingleTsFileNode(new PlanNodeId(""), 
resource, true);
+    LoadSingleTsFileNode node = new LoadSingleTsFileNode(new PlanNodeId(""), 
resource, true, 0L);
     Assert.assertTrue(node.isDeleteAfterLoad());
     Assert.assertEquals(resource, node.getTsFileResource());
     Assert.assertNull(node.getLocalRegionReplicaSet());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index 18cc8955a64..f8c1a9fe9dc 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -52,6 +52,7 @@ public enum Metric {
   RATIS_CONSENSUS_READ("ratis_consensus_read"),
   // storage engine related
   POINTS("points"),
+  POINTS_IN("points_in"),
   COST_TASK("cost_task"),
   QUEUE("queue"),
   FLUSHING_MEM_TABLE_STATUS("flushing_mem_table_status"),

Reply via email to