This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5619156e871 Pipe: Report load tsfile points to flush point metric 
timeseries in root.__system  (#11976)
5619156e871 is described below

commit 5619156e87107747aaaea75b8f83fefc7f852529
Author: Caideyipi <[email protected]>
AuthorDate: Sun Jan 28 17:58:08 2024 +0800

    Pipe: Report load tsfile points to flush point metric timeseries in 
root.__system  (#11976)
---
 .../execution/load/LoadTsFileManager.java          | 33 ++++++++++++++--------
 .../plan/scheduler/load/LoadTsFileScheduler.java   | 33 ++++++++++++++--------
 .../db/storageengine/dataregion/DataRegion.java    | 17 +++++++++++
 .../dataregion/flush/MemTableFlushTask.java        | 23 +++++++--------
 4 files changed, 71 insertions(+), 35 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index bbd3d3a760d..d7a87a3b219 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
 import 
org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler.LoadCommand;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
@@ -317,17 +318,27 @@ public class LoadTsFileManager {
         DataRegion dataRegion = entry.getKey().getDataRegion();
         dataRegion.loadNewTsFile(generateResource(writer, progressIndex), 
true, isGeneratedByPipe);
 
-        MetricService.getInstance()
-            .count(
-                getTsFileWritePointCount(writer),
-                Metric.QUANTITY.toString(),
-                MetricLevel.CORE,
-                Tag.NAME.toString(),
-                Metric.POINTS_IN.toString(),
-                Tag.DATABASE.toString(),
-                dataRegion.getDatabaseName(),
-                Tag.REGION.toString(),
-                dataRegion.getDataRegionId());
+        dataRegion
+            .getNonSystemDatabaseName()
+            .ifPresent(
+                databaseName -> {
+                  long writePointCount = getTsFileWritePointCount(writer);
+                  // Report load tsFile points to IoTDB flush metrics
+                  MemTableFlushTask.recordFlushPointsMetricInternal(
+                      writePointCount, databaseName, 
dataRegion.getDataRegionId());
+
+                  MetricService.getInstance()
+                      .count(
+                          writePointCount,
+                          Metric.QUANTITY.toString(),
+                          MetricLevel.CORE,
+                          Tag.NAME.toString(),
+                          Metric.POINTS_IN.toString(),
+                          Tag.DATABASE.toString(),
+                          databaseName,
+                          Tag.REGION.toString(),
+                          dataRegion.getDataRegionId());
+                });
       }
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 579fc2ad263..aa18590d3c5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -59,6 +59,7 @@ import 
org.apache.iotdb.db.queryengine.plan.scheduler.FragInstanceDispatchResult
 import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
@@ -397,17 +398,27 @@ public class LoadTsFileScheduler implements IScheduler {
                 (DataRegionId)
                     ConsensusGroupId.Factory.createFromTConsensusGroupId(
                         node.getLocalRegionReplicaSet().getRegionId()));
-    MetricService.getInstance()
-        .count(
-            node.getWritePointCount(),
-            Metric.QUANTITY.toString(),
-            MetricLevel.CORE,
-            Tag.NAME.toString(),
-            Metric.POINTS_IN.toString(),
-            Tag.DATABASE.toString(),
-            dataRegion.getDatabaseName(),
-            Tag.REGION.toString(),
-            dataRegion.getDataRegionId());
+
+    dataRegion
+        .getNonSystemDatabaseName()
+        .ifPresent(
+            databaseName -> {
+              // Report load tsFile points to IoTDB flush metrics
+              MemTableFlushTask.recordFlushPointsMetricInternal(
+                  node.getWritePointCount(), databaseName, 
dataRegion.getDataRegionId());
+
+              MetricService.getInstance()
+                  .count(
+                      node.getWritePointCount(),
+                      Metric.QUANTITY.toString(),
+                      MetricLevel.CORE,
+                      Tag.NAME.toString(),
+                      Metric.POINTS_IN.toString(),
+                      Tag.DATABASE.toString(),
+                      databaseName,
+                      Tag.REGION.toString(),
+                      dataRegion.getDataRegionId());
+            });
     return true;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index c1a4ee9a7b8..5bda2c80bf3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -30,6 +30,7 @@ import 
org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.SchemaConstant;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
 import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
@@ -138,6 +139,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
@@ -2531,6 +2533,21 @@ public class DataRegion implements IDataRegionForQuery {
     BloomFilterCache.getInstance().clear();
   }
 
+  public static Optional<String> getNonSystemDatabaseName(String databaseName) 
{
+    if (databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE)) {
+      return Optional.empty();
+    }
+    int lastIndex = databaseName.lastIndexOf("-");
+    if (lastIndex == -1) {
+      lastIndex = databaseName.length();
+    }
+    return Optional.of(databaseName.substring(0, lastIndex));
+  }
+
+  public Optional<String> getNonSystemDatabaseName() {
+    return getNonSystemDatabaseName(databaseName);
+  }
+
   /** merge file under this database processor */
   public int compact() {
     writeLock("merge");
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
index 46c919835a8..68ccba4bcec 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.db.storageengine.dataregion.flush;
 
-import org.apache.iotdb.commons.schema.SchemaConstant;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
@@ -26,6 +25,7 @@ import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.service.metrics.WritingMetrics;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import 
org.apache.iotdb.db.storageengine.dataregion.flush.pool.FlushSubTaskPoolManager;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.IDeviceID;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
@@ -271,20 +271,17 @@ public class MemTableFlushTask {
             Thread.currentThread().interrupt();
           }
 
-          recordFlushPointsMetric();
+          DataRegion.getNonSystemDatabaseName(storageGroup)
+              .ifPresent(
+                  databaseName ->
+                      recordFlushPointsMetricInternal(
+                          memTable.getTotalPointsNum(), databaseName, 
dataRegionId));
           WRITING_METRICS.recordFlushCost(WritingMetrics.FLUSH_STAGE_ENCODING, 
memSerializeTime);
         }
       };
 
-  private void recordFlushPointsMetric() {
-    if (storageGroup.startsWith(SchemaConstant.SYSTEM_DATABASE)) {
-      return;
-    }
-    int lastIndex = storageGroup.lastIndexOf("-");
-    if (lastIndex == -1) {
-      lastIndex = storageGroup.length();
-    }
-    String storageGroupName = storageGroup.substring(0, lastIndex);
+  public static void recordFlushPointsMetricInternal(
+      long totalPointsNum, String storageGroupName, String dataRegionId) {
     long currentTime = CommonDateTimeUtils.currentTime();
     // compute the flush points
     long writeTime =
@@ -300,12 +297,12 @@ public class MemTableFlushTask {
     // record the flush points
     MetricService.getInstance()
         .gaugeWithInternalReportAsync(
-            memTable.getTotalPointsNum(),
+            totalPointsNum,
             Metric.POINTS.toString(),
             MetricLevel.CORE,
             writeTime,
             Tag.DATABASE.toString(),
-            storageGroup.substring(0, lastIndex),
+            storageGroupName,
             Tag.TYPE.toString(),
             "flush",
             Tag.REGION.toString(),

Reply via email to