This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch batch_memtable_metric13
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b16d27b10bc3009e6470215b862682c91187ae26
Author: HTHou <[email protected]>
AuthorDate: Tue Oct 29 10:07:45 2024 +0800

    [To 1.3] Batch update inserted point number metric
---
 .../dataregion/memtable/AbstractMemTable.java      | 140 +++++----------------
 .../dataregion/memtable/IMemTable.java             |  11 +-
 .../dataregion/memtable/TsFileProcessor.java       |  20 +--
 .../wal/recover/file/TsFilePlanRedoer.java         |  22 ++--
 4 files changed, 65 insertions(+), 128 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index ce85508914f..bbabf29ac18 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.schemaengine.schemaregion.utils.ResourceByPathUtils;
@@ -198,7 +199,7 @@ public abstract class AbstractMemTable implements IMemTable 
{
   }
 
   @Override
-  public void insert(InsertRowNode insertRowNode) {
+  public int insert(InsertRowNode insertRowNode) {
 
     String[] measurements = insertRowNode.getMeasurements();
     Object[] values = insertRowNode.getValues();
@@ -228,39 +229,11 @@ public abstract class AbstractMemTable implements 
IMemTable {
             - nullPointsNumber;
 
     totalPointsNum += pointsInserted;
-
-    MetricService.getInstance()
-        .count(
-            pointsInserted,
-            Metric.QUANTITY.toString(),
-            MetricLevel.CORE,
-            Tag.NAME.toString(),
-            METRIC_POINT_IN,
-            Tag.DATABASE.toString(),
-            database,
-            Tag.REGION.toString(),
-            dataRegionId,
-            Tag.TYPE.toString(),
-            Metric.MEMTABLE_POINT_COUNT.toString());
-    if (!insertRowNode.isGeneratedByRemoteConsensusLeader()) {
-      MetricService.getInstance()
-          .count(
-              pointsInserted,
-              Metric.LEADER_QUANTITY.toString(),
-              MetricLevel.CORE,
-              Tag.NAME.toString(),
-              METRIC_POINT_IN,
-              Tag.DATABASE.toString(),
-              database,
-              Tag.REGION.toString(),
-              dataRegionId,
-              Tag.TYPE.toString(),
-              Metric.MEMTABLE_POINT_COUNT.toString());
-    }
+    return pointsInserted;
   }
 
   @Override
-  public void insertAlignedRow(InsertRowNode insertRowNode) {
+  public int insertAlignedRow(InsertRowNode insertRowNode) {
 
     String[] measurements = insertRowNode.getMeasurements();
     Object[] values = insertRowNode.getValues();
@@ -277,46 +250,18 @@ public abstract class AbstractMemTable implements 
IMemTable {
       dataTypes.add(schema.getType());
     }
     if (schemaList.isEmpty()) {
-      return;
+      return 0;
     }
     memSize += MemUtils.getAlignedRowRecordSize(dataTypes, values);
     writeAlignedRow(insertRowNode.getDeviceID(), schemaList, 
insertRowNode.getTime(), values);
     int pointsInserted =
         insertRowNode.getMeasurements().length - 
insertRowNode.getFailedMeasurementNumber();
     totalPointsNum += pointsInserted;
-
-    MetricService.getInstance()
-        .count(
-            pointsInserted,
-            Metric.QUANTITY.toString(),
-            MetricLevel.CORE,
-            Tag.NAME.toString(),
-            METRIC_POINT_IN,
-            Tag.DATABASE.toString(),
-            database,
-            Tag.REGION.toString(),
-            dataRegionId,
-            Tag.TYPE.toString(),
-            Metric.MEMTABLE_POINT_COUNT.toString());
-    if (!insertRowNode.isGeneratedByRemoteConsensusLeader()) {
-      MetricService.getInstance()
-          .count(
-              pointsInserted,
-              Metric.LEADER_QUANTITY.toString(),
-              MetricLevel.CORE,
-              Tag.NAME.toString(),
-              METRIC_POINT_IN,
-              Tag.DATABASE.toString(),
-              database,
-              Tag.REGION.toString(),
-              dataRegionId,
-              Tag.TYPE.toString(),
-              Metric.MEMTABLE_POINT_COUNT.toString());
-    }
+    return pointsInserted;
   }
 
   @Override
-  public void insertTablet(InsertTabletNode insertTabletNode, int start, int 
end)
+  public int insertTablet(InsertTabletNode insertTabletNode, int start, int 
end)
       throws WriteProcessException {
     try {
       writeTabletNode(insertTabletNode, start, end);
@@ -325,41 +270,14 @@ public abstract class AbstractMemTable implements 
IMemTable {
           (insertTabletNode.getDataTypes().length - 
insertTabletNode.getFailedMeasurementNumber())
               * (end - start);
       totalPointsNum += pointsInserted;
-      MetricService.getInstance()
-          .count(
-              pointsInserted,
-              Metric.QUANTITY.toString(),
-              MetricLevel.CORE,
-              Tag.NAME.toString(),
-              METRIC_POINT_IN,
-              Tag.DATABASE.toString(),
-              database,
-              Tag.REGION.toString(),
-              dataRegionId,
-              Tag.TYPE.toString(),
-              Metric.MEMTABLE_POINT_COUNT.toString());
-      if (!insertTabletNode.isGeneratedByRemoteConsensusLeader()) {
-        MetricService.getInstance()
-            .count(
-                pointsInserted,
-                Metric.LEADER_QUANTITY.toString(),
-                MetricLevel.CORE,
-                Tag.NAME.toString(),
-                METRIC_POINT_IN,
-                Tag.DATABASE.toString(),
-                database,
-                Tag.REGION.toString(),
-                dataRegionId,
-                Tag.TYPE.toString(),
-                Metric.MEMTABLE_POINT_COUNT.toString());
-      }
+      return pointsInserted;
     } catch (RuntimeException e) {
       throw new WriteProcessException(e);
     }
   }
 
   @Override
-  public void insertAlignedTablet(InsertTabletNode insertTabletNode, int 
start, int end)
+  public int insertAlignedTablet(InsertTabletNode insertTabletNode, int start, 
int end)
       throws WriteProcessException {
     try {
       writeAlignedTablet(insertTabletNode, start, end);
@@ -368,10 +286,31 @@ public abstract class AbstractMemTable implements 
IMemTable {
           (insertTabletNode.getDataTypes().length - 
insertTabletNode.getFailedMeasurementNumber())
               * (end - start);
       totalPointsNum += pointsInserted;
+      return pointsInserted;
+    } catch (RuntimeException e) {
+      throw new WriteProcessException(e);
+    }
+  }
+
+  public void updateMemtablePointCountMetric(InsertNode insertNode, int 
pointsInserted) {
+    MetricService.getInstance()
+        .count(
+            pointsInserted,
+            Metric.QUANTITY.toString(),
+            MetricLevel.CORE,
+            Tag.NAME.toString(),
+            METRIC_POINT_IN,
+            Tag.DATABASE.toString(),
+            database,
+            Tag.REGION.toString(),
+            dataRegionId,
+            Tag.TYPE.toString(),
+            Metric.MEMTABLE_POINT_COUNT.toString());
+    if (!insertNode.isGeneratedByRemoteConsensusLeader()) {
       MetricService.getInstance()
           .count(
               pointsInserted,
-              Metric.QUANTITY.toString(),
+              Metric.LEADER_QUANTITY.toString(),
               MetricLevel.CORE,
               Tag.NAME.toString(),
               METRIC_POINT_IN,
@@ -381,23 +320,6 @@ public abstract class AbstractMemTable implements 
IMemTable {
               dataRegionId,
               Tag.TYPE.toString(),
               Metric.MEMTABLE_POINT_COUNT.toString());
-      if (!insertTabletNode.isGeneratedByRemoteConsensusLeader()) {
-        MetricService.getInstance()
-            .count(
-                pointsInserted,
-                Metric.LEADER_QUANTITY.toString(),
-                MetricLevel.CORE,
-                Tag.NAME.toString(),
-                METRIC_POINT_IN,
-                Tag.DATABASE.toString(),
-                database,
-                Tag.REGION.toString(),
-                dataRegionId,
-                Tag.TYPE.toString(),
-                Metric.MEMTABLE_POINT_COUNT.toString());
-      }
-    } catch (RuntimeException e) {
-      throw new WriteProcessException(e);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
index 7918969d2e1..dc46cc8f33f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.storageengine.dataregion.flush.FlushStatus;
@@ -93,9 +94,9 @@ public interface IMemTable extends WALEntryValue {
    *
    * @param insertRowNode insertRowNode
    */
-  void insert(InsertRowNode insertRowNode);
+  int insert(InsertRowNode insertRowNode);
 
-  void insertAlignedRow(InsertRowNode insertRowNode);
+  int insertAlignedRow(InsertRowNode insertRowNode);
 
   /**
    * insert tablet into this memtable. The rows to be inserted are in the 
range [start, end). Null
@@ -106,10 +107,10 @@ public interface IMemTable extends WALEntryValue {
    * @param start included
    * @param end excluded
    */
-  void insertTablet(InsertTabletNode insertTabletNode, int start, int end)
+  int insertTablet(InsertTabletNode insertTabletNode, int start, int end)
       throws WriteProcessException;
 
-  void insertAlignedTablet(InsertTabletNode insertTabletNode, int start, int 
end)
+  int insertAlignedTablet(InsertTabletNode insertTabletNode, int start, int 
end)
       throws WriteProcessException;
 
   ReadOnlyMemChunk query(
@@ -205,4 +206,6 @@ public interface IMemTable extends WALEntryValue {
   void markAsNotGeneratedByPipe();
 
   boolean isTotallyGeneratedByPipe();
+
+  void updateMemtablePointCountMetric(InsertNode insertNode, int 
pointsInserted);
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index f39cb9f0bb0..5b025847fa4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -321,10 +321,11 @@ public class TsFileProcessor {
             insertRowNode,
             tsFileResource);
 
+    int pointInserted;
     if (insertRowNode.isAligned()) {
-      workMemTable.insertAlignedRow(insertRowNode);
+      pointInserted = workMemTable.insertAlignedRow(insertRowNode);
     } else {
-      workMemTable.insert(insertRowNode);
+      pointInserted = workMemTable.insert(insertRowNode);
     }
 
     // Update start time of this memtable
@@ -334,6 +335,7 @@ public class TsFileProcessor {
     if (!sequence) {
       tsFileResource.updateEndTime(insertRowNode.getDeviceID(), 
insertRowNode.getTime());
     }
+    workMemTable.updateMemtablePointCountMetric(insertRowNode, pointInserted);
 
     tsFileResource.updateProgressIndex(insertRowNode.getProgressIndex());
     // RecordScheduleMemTableCost
@@ -414,12 +416,13 @@ public class TsFileProcessor {
             walFlushListener.getWalEntryHandler(),
             insertRowsNode,
             tsFileResource);
-    for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
 
+    int pointInserted = 0;
+    for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
       if (insertRowNode.isAligned()) {
-        workMemTable.insertAlignedRow(insertRowNode);
+        pointInserted += workMemTable.insertAlignedRow(insertRowNode);
       } else {
-        workMemTable.insert(insertRowNode);
+        pointInserted += workMemTable.insert(insertRowNode);
       }
 
       // update start time of this memtable
@@ -430,6 +433,7 @@ public class TsFileProcessor {
         tsFileResource.updateEndTime(insertRowNode.getDeviceID(), 
insertRowNode.getTime());
       }
     }
+    workMemTable.updateMemtablePointCountMetric(insertRowsNode, pointInserted);
     tsFileResource.updateProgressIndex(insertRowsNode.getProgressIndex());
     // recordScheduleMemTableCost
     costsForMetrics[3] += System.nanoTime() - startTime;
@@ -526,11 +530,12 @@ public class TsFileProcessor {
             insertTabletNode,
             tsFileResource);
 
+    int pointInserted;
     try {
       if (insertTabletNode.isAligned()) {
-        workMemTable.insertAlignedTablet(insertTabletNode, start, end);
+        pointInserted = workMemTable.insertAlignedTablet(insertTabletNode, 
start, end);
       } else {
-        workMemTable.insertTablet(insertTabletNode, start, end);
+        pointInserted = workMemTable.insertTablet(insertTabletNode, start, 
end);
       }
     } catch (WriteProcessException e) {
       for (int i = start; i < end; i++) {
@@ -550,6 +555,7 @@ public class TsFileProcessor {
       tsFileResource.updateEndTime(
           insertTabletNode.getDeviceID(), insertTabletNode.getTimes()[end - 
1]);
     }
+    workMemTable.updateMemtablePointCountMetric(insertTabletNode, 
pointInserted);
 
     tsFileResource.updateProgressIndex(insertTabletNode.getProgressIndex());
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
index 162b1ac02b2..55e385bf8a8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
@@ -91,24 +91,29 @@ public class TsFilePlanRedoer {
       }
     }
 
+    int pointsInserted;
     if (node instanceof InsertRowNode) {
       if (node.isAligned()) {
-        recoveryMemTable.insertAlignedRow((InsertRowNode) node);
+        pointsInserted = recoveryMemTable.insertAlignedRow((InsertRowNode) 
node);
       } else {
-        recoveryMemTable.insert((InsertRowNode) node);
+        pointsInserted = recoveryMemTable.insert((InsertRowNode) node);
       }
     } else {
       if (node.isAligned()) {
-        recoveryMemTable.insertAlignedTablet(
-            (InsertTabletNode) node, 0, ((InsertTabletNode) 
node).getRowCount());
+        pointsInserted =
+            recoveryMemTable.insertAlignedTablet(
+                (InsertTabletNode) node, 0, ((InsertTabletNode) 
node).getRowCount());
       } else {
-        recoveryMemTable.insertTablet(
-            (InsertTabletNode) node, 0, ((InsertTabletNode) 
node).getRowCount());
+        pointsInserted =
+            recoveryMemTable.insertTablet(
+                (InsertTabletNode) node, 0, ((InsertTabletNode) 
node).getRowCount());
       }
     }
+    recoveryMemTable.updateMemtablePointCountMetric(node, pointsInserted);
   }
 
   void redoInsertRows(InsertRowsNode insertRowsNode) {
+    int pointsInserted = 0;
     for (InsertRowNode node : insertRowsNode.getInsertRowNodeList()) {
       if (!node.hasValidMeasurements()) {
         continue;
@@ -125,11 +130,12 @@ public class TsFilePlanRedoer {
         }
       }
       if (node.isAligned()) {
-        recoveryMemTable.insertAlignedRow(node);
+        pointsInserted += recoveryMemTable.insertAlignedRow(node);
       } else {
-        recoveryMemTable.insert(node);
+        pointsInserted += recoveryMemTable.insert(node);
       }
     }
+    recoveryMemTable.updateMemtablePointCountMetric(insertRowsNode, 
pointsInserted);
   }
 
   void resetRecoveryMemTable(IMemTable memTable) {

Reply via email to