This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b231a1c7076 Remove duplicate point calculate because of consensus and 
so on (#12459)
b231a1c7076 is described below

commit b231a1c7076edc84c90697f53ea51d58e5924305
Author: ZhangHongYin <[email protected]>
AuthorDate: Thu May 30 12:51:59 2024 +0800

    Remove duplicate point calculate because of consensus and so on (#12459)
---
 .../common/request/IConsensusRequest.java          |  4 ++
 .../consensus/iot/IoTConsensusServerImpl.java      |  1 +
 .../ratis/ApplicationStateMachineProxy.java        |  3 ++
 .../iotdb/consensus/ratis/RatisConsensus.java      |  2 +-
 .../IoTConsensusDataRegionStateMachine.java        |  3 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |  1 -
 .../plan/planner/plan/node/PlanNode.java           | 10 +++++
 .../plan/node/write/InsertMultiTabletsNode.java    |  6 +++
 .../planner/plan/node/write/InsertRowsNode.java    |  6 +++
 .../plan/node/write/InsertRowsOfOneDeviceNode.java |  6 +++
 .../plan/scheduler/load/LoadTsFileScheduler.java   | 14 ++++++
 .../dataregion/memtable/AbstractMemTable.java      | 52 ++++++++++++++++++++++
 .../iotdb/commons/service/metric/enums/Metric.java |  1 +
 13 files changed, 106 insertions(+), 3 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
index daa2a7d7d00..6cd7f370a6b 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
@@ -35,4 +35,8 @@ public interface IConsensusRequest {
    * changed or an error may occur
    */
   ByteBuffer serializeToByteBuffer();
+
+  default void markAsGeneratedByRemoteConsensusLeader() {
+    // do nothing by default
+  }
 }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 63fdcbf25ae..4ac574d282a 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -1132,6 +1132,7 @@ public class IoTConsensusServerImpl {
         ioTConsensusServerMetrics.recordSortCost(sortTime - insertStartTime);
         List<TSStatus> subStatus = new LinkedList<>();
         for (IConsensusRequest insertNode : request.getInsertNodes()) {
+          insertNode.markAsGeneratedByRemoteConsensusLeader();
           subStatus.add(stateMachine.write(insertNode));
         }
         long applyTime = System.nanoTime();
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
index 06e7df52891..178650bb893 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
@@ -146,6 +146,9 @@ public class ApplicationStateMachineProxy extends 
BaseStateMachine {
     waitUntilSystemAllowApply();
     do {
       try {
+        if (!isLeader) {
+          deserializedRequest.markAsGeneratedByRemoteConsensusLeader();
+        }
         final TSStatus result = 
applicationStateMachine.write(deserializedRequest);
         ret = new ResponseMessage(result);
         break;
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 45c82332f55..90816ba6291 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -288,7 +288,7 @@ class RatisConsensus implements IConsensus {
     }
 
     // serialize request into Message
-    Message message = new RequestMessage(request);
+    RequestMessage message = new RequestMessage(request);
 
     // 1. first try the local server
     RaftClientRequest clientRequest;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/IoTConsensusDataRegionStateMachine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/IoTConsensusDataRegionStateMachine.java
index a1901031ac3..775f1bf6f49 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/IoTConsensusDataRegionStateMachine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/IoTConsensusDataRegionStateMachine.java
@@ -56,7 +56,8 @@ public class IoTConsensusDataRegionStateMachine extends 
DataRegionStateMachine {
         List<TSStatus> subStatus = new LinkedList<>();
         for (IConsensusRequest consensusRequest :
             ((DeserializedBatchIndexedConsensusRequest) 
request).getInsertNodes()) {
-          subStatus.add(write((PlanNode) consensusRequest));
+          PlanNode writeNode = (PlanNode) consensusRequest;
+          subStatus.add(write(writeNode));
         }
         return new TSStatus().setSubStatus(subStatus);
       } else {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 92eacb40519..614956e9616 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -425,7 +425,6 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
       return createTLoadResp(
           new 
TSStatus(TSStatusCode.DESERIALIZE_PIECE_OF_TSFILE_ERROR.getStatusCode()));
     }
-
     TSStatus resultStatus =
         StorageEngine.getInstance()
             .writeLoadTsFileNode((DataRegionId) groupId, pieceNode, req.uuid);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
index 2bbf723fd30..29dd52df29a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
@@ -49,6 +49,7 @@ public abstract class PlanNode implements IConsensusRequest {
   protected PlanNodeId id;
 
   protected boolean isGeneratedByPipe = false;
+  protected boolean isGeneratedByRemoteConsensusLeader = false;
 
   protected PlanNode(PlanNodeId id) {
     requireNonNull(id, "id is null");
@@ -67,10 +68,19 @@ public abstract class PlanNode implements IConsensusRequest 
{
     return isGeneratedByPipe;
   }
 
+  public boolean isGeneratedByRemoteConsensusLeader() {
+    return isGeneratedByRemoteConsensusLeader;
+  }
+
   public void markAsGeneratedByPipe() {
     isGeneratedByPipe = true;
   }
 
+  @Override
+  public void markAsGeneratedByRemoteConsensusLeader() {
+    isGeneratedByRemoteConsensusLeader = true;
+  }
+
   public abstract List<PlanNode> getChildren();
 
   public abstract void addChild(PlanNode child);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
index 9f5dc8b79b1..683d6607ace 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -256,6 +256,12 @@ public class InsertMultiTabletsNode extends InsertNode {
     insertTabletNodeList.forEach(InsertTabletNode::markAsGeneratedByPipe);
   }
 
+  @Override
+  public void markAsGeneratedByRemoteConsensusLeader() {
+    super.markAsGeneratedByRemoteConsensusLeader();
+    
insertTabletNodeList.forEach(InsertTabletNode::markAsGeneratedByRemoteConsensusLeader);
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index 976fd6bf334..215f38bb11f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -225,6 +225,12 @@ public class InsertRowsNode extends InsertNode implements 
WALEntryValue {
     insertRowNodeList.forEach(InsertRowNode::markAsGeneratedByPipe);
   }
 
+  @Override
+  public void markAsGeneratedByRemoteConsensusLeader() {
+    super.markAsGeneratedByRemoteConsensusLeader();
+    
insertRowNodeList.forEach(InsertRowNode::markAsGeneratedByRemoteConsensusLeader);
+  }
+
   @Override
   public List<WritePlanNode> splitByPartition(Analysis analysis) {
     Map<TRegionReplicaSet, InsertRowsNode> splitMap = new HashMap<>();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index fc060d30edb..6fd53ce6da2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -295,6 +295,12 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
     insertRowNodeList.forEach(InsertRowNode::markAsGeneratedByPipe);
   }
 
+  @Override
+  public void markAsGeneratedByRemoteConsensusLeader() {
+    super.markAsGeneratedByRemoteConsensusLeader();
+    
insertRowNodeList.forEach(InsertRowNode::markAsGeneratedByRemoteConsensusLeader);
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index e166fa93856..d28681c161a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -434,7 +434,21 @@ public class LoadTsFileScheduler implements IScheduler {
                       databaseName,
                       Tag.REGION.toString(),
                       dataRegion.getDataRegionId());
+              if (!node.isGeneratedByRemoteConsensusLeader()) {
+                MetricService.getInstance()
+                    .count(
+                        node.getWritePointCount(),
+                        Metric.LEADER_QUANTITY.toString(),
+                        MetricLevel.CORE,
+                        Tag.NAME.toString(),
+                        Metric.POINTS_IN.toString(),
+                        Tag.DATABASE.toString(),
+                        databaseName,
+                        Tag.REGION.toString(),
+                        dataRegion.getDataRegionId());
+              }
             });
+
     return true;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index 550b35eeee2..39bbb33e830 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -236,6 +236,19 @@ public abstract class AbstractMemTable implements 
IMemTable {
             database,
             Tag.REGION.toString(),
             dataRegionId);
+    if (!insertRowNode.isGeneratedByRemoteConsensusLeader()) {
+      MetricService.getInstance()
+          .count(
+              pointsInserted,
+              Metric.LEADER_QUANTITY.toString(),
+              MetricLevel.CORE,
+              Tag.NAME.toString(),
+              METRIC_POINT_IN,
+              Tag.DATABASE.toString(),
+              database,
+              Tag.REGION.toString(),
+              dataRegionId);
+    }
   }
 
   @Override
@@ -275,6 +288,19 @@ public abstract class AbstractMemTable implements 
IMemTable {
             database,
             Tag.REGION.toString(),
             dataRegionId);
+    if (!insertRowNode.isGeneratedByRemoteConsensusLeader()) {
+      MetricService.getInstance()
+          .count(
+              pointsInserted,
+              Metric.LEADER_QUANTITY.toString(),
+              MetricLevel.CORE,
+              Tag.NAME.toString(),
+              METRIC_POINT_IN,
+              Tag.DATABASE.toString(),
+              database,
+              Tag.REGION.toString(),
+              dataRegionId);
+    }
   }
 
   @Override
@@ -298,6 +324,19 @@ public abstract class AbstractMemTable implements 
IMemTable {
               database,
               Tag.REGION.toString(),
               dataRegionId);
+      if (!insertTabletNode.isGeneratedByRemoteConsensusLeader()) {
+        MetricService.getInstance()
+            .count(
+                pointsInserted,
+                Metric.LEADER_QUANTITY.toString(),
+                MetricLevel.CORE,
+                Tag.NAME.toString(),
+                METRIC_POINT_IN,
+                Tag.DATABASE.toString(),
+                database,
+                Tag.REGION.toString(),
+                dataRegionId);
+      }
     } catch (RuntimeException e) {
       throw new WriteProcessException(e);
     }
@@ -324,6 +363,19 @@ public abstract class AbstractMemTable implements 
IMemTable {
               database,
               Tag.REGION.toString(),
               dataRegionId);
+      if (!insertTabletNode.isGeneratedByRemoteConsensusLeader()) {
+        MetricService.getInstance()
+            .count(
+                pointsInserted,
+                Metric.LEADER_QUANTITY.toString(),
+                MetricLevel.CORE,
+                Tag.NAME.toString(),
+                METRIC_POINT_IN,
+                Tag.DATABASE.toString(),
+                database,
+                Tag.REGION.toString(),
+                dataRegionId);
+      }
     } catch (RuntimeException e) {
       throw new WriteProcessException(e);
     }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index 49f1e057625..c207435d3f1 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -94,6 +94,7 @@ public enum Metric {
   CACHE("cache"),
   CACHE_HIT_RATE("cache_hit"),
   QUANTITY("quantity"),
+  LEADER_QUANTITY("leader_quantity"),
   SCHEMA_REGION("schema_region"),
   SCHEMA_ENGINE("schema_engine"),
   // query engine related

Reply via email to