This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b231a1c7076 Remove duplicate point calculate because of consensus and
so on (#12459)
b231a1c7076 is described below
commit b231a1c7076edc84c90697f53ea51d58e5924305
Author: ZhangHongYin <[email protected]>
AuthorDate: Thu May 30 12:51:59 2024 +0800
Remove duplicate point calculate because of consensus and so on (#12459)
---
.../common/request/IConsensusRequest.java | 4 ++
.../consensus/iot/IoTConsensusServerImpl.java | 1 +
.../ratis/ApplicationStateMachineProxy.java | 3 ++
.../iotdb/consensus/ratis/RatisConsensus.java | 2 +-
.../IoTConsensusDataRegionStateMachine.java | 3 +-
.../impl/DataNodeInternalRPCServiceImpl.java | 1 -
.../plan/planner/plan/node/PlanNode.java | 10 +++++
.../plan/node/write/InsertMultiTabletsNode.java | 6 +++
.../planner/plan/node/write/InsertRowsNode.java | 6 +++
.../plan/node/write/InsertRowsOfOneDeviceNode.java | 6 +++
.../plan/scheduler/load/LoadTsFileScheduler.java | 14 ++++++
.../dataregion/memtable/AbstractMemTable.java | 52 ++++++++++++++++++++++
.../iotdb/commons/service/metric/enums/Metric.java | 1 +
13 files changed, 106 insertions(+), 3 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
index daa2a7d7d00..6cd7f370a6b 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
@@ -35,4 +35,8 @@ public interface IConsensusRequest {
* changed or an error may occur
*/
ByteBuffer serializeToByteBuffer();
+
+ default void markAsGeneratedByRemoteConsensusLeader() {
+ // do nothing by default
+ }
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 63fdcbf25ae..4ac574d282a 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -1132,6 +1132,7 @@ public class IoTConsensusServerImpl {
ioTConsensusServerMetrics.recordSortCost(sortTime - insertStartTime);
List<TSStatus> subStatus = new LinkedList<>();
for (IConsensusRequest insertNode : request.getInsertNodes()) {
+ insertNode.markAsGeneratedByRemoteConsensusLeader();
subStatus.add(stateMachine.write(insertNode));
}
long applyTime = System.nanoTime();
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
index 06e7df52891..178650bb893 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
@@ -146,6 +146,9 @@ public class ApplicationStateMachineProxy extends
BaseStateMachine {
waitUntilSystemAllowApply();
do {
try {
+ if (!isLeader) {
+ deserializedRequest.markAsGeneratedByRemoteConsensusLeader();
+ }
final TSStatus result =
applicationStateMachine.write(deserializedRequest);
ret = new ResponseMessage(result);
break;
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 45c82332f55..90816ba6291 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -288,7 +288,7 @@ class RatisConsensus implements IConsensus {
}
// serialize request into Message
- Message message = new RequestMessage(request);
+ RequestMessage message = new RequestMessage(request);
// 1. first try the local server
RaftClientRequest clientRequest;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/IoTConsensusDataRegionStateMachine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/IoTConsensusDataRegionStateMachine.java
index a1901031ac3..775f1bf6f49 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/IoTConsensusDataRegionStateMachine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/IoTConsensusDataRegionStateMachine.java
@@ -56,7 +56,8 @@ public class IoTConsensusDataRegionStateMachine extends
DataRegionStateMachine {
List<TSStatus> subStatus = new LinkedList<>();
for (IConsensusRequest consensusRequest :
((DeserializedBatchIndexedConsensusRequest)
request).getInsertNodes()) {
- subStatus.add(write((PlanNode) consensusRequest));
+ PlanNode writeNode = (PlanNode) consensusRequest;
+ subStatus.add(write(writeNode));
}
return new TSStatus().setSubStatus(subStatus);
} else {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 92eacb40519..614956e9616 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -425,7 +425,6 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
return createTLoadResp(
new
TSStatus(TSStatusCode.DESERIALIZE_PIECE_OF_TSFILE_ERROR.getStatusCode()));
}
-
TSStatus resultStatus =
StorageEngine.getInstance()
.writeLoadTsFileNode((DataRegionId) groupId, pieceNode, req.uuid);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
index 2bbf723fd30..29dd52df29a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
@@ -49,6 +49,7 @@ public abstract class PlanNode implements IConsensusRequest {
protected PlanNodeId id;
protected boolean isGeneratedByPipe = false;
+ protected boolean isGeneratedByRemoteConsensusLeader = false;
protected PlanNode(PlanNodeId id) {
requireNonNull(id, "id is null");
@@ -67,10 +68,19 @@ public abstract class PlanNode implements IConsensusRequest
{
return isGeneratedByPipe;
}
+ public boolean isGeneratedByRemoteConsensusLeader() {
+ return isGeneratedByRemoteConsensusLeader;
+ }
+
public void markAsGeneratedByPipe() {
isGeneratedByPipe = true;
}
+ @Override
+ public void markAsGeneratedByRemoteConsensusLeader() {
+ isGeneratedByRemoteConsensusLeader = true;
+ }
+
public abstract List<PlanNode> getChildren();
public abstract void addChild(PlanNode child);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
index 9f5dc8b79b1..683d6607ace 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -256,6 +256,12 @@ public class InsertMultiTabletsNode extends InsertNode {
insertTabletNodeList.forEach(InsertTabletNode::markAsGeneratedByPipe);
}
+ @Override
+ public void markAsGeneratedByRemoteConsensusLeader() {
+ super.markAsGeneratedByRemoteConsensusLeader();
+
insertTabletNodeList.forEach(InsertTabletNode::markAsGeneratedByRemoteConsensusLeader);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index 976fd6bf334..215f38bb11f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -225,6 +225,12 @@ public class InsertRowsNode extends InsertNode implements
WALEntryValue {
insertRowNodeList.forEach(InsertRowNode::markAsGeneratedByPipe);
}
+ @Override
+ public void markAsGeneratedByRemoteConsensusLeader() {
+ super.markAsGeneratedByRemoteConsensusLeader();
+
insertRowNodeList.forEach(InsertRowNode::markAsGeneratedByRemoteConsensusLeader);
+ }
+
@Override
public List<WritePlanNode> splitByPartition(Analysis analysis) {
Map<TRegionReplicaSet, InsertRowsNode> splitMap = new HashMap<>();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index fc060d30edb..6fd53ce6da2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -295,6 +295,12 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
insertRowNodeList.forEach(InsertRowNode::markAsGeneratedByPipe);
}
+ @Override
+ public void markAsGeneratedByRemoteConsensusLeader() {
+ super.markAsGeneratedByRemoteConsensusLeader();
+
insertRowNodeList.forEach(InsertRowNode::markAsGeneratedByRemoteConsensusLeader);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index e166fa93856..d28681c161a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -434,7 +434,21 @@ public class LoadTsFileScheduler implements IScheduler {
databaseName,
Tag.REGION.toString(),
dataRegion.getDataRegionId());
+ if (!node.isGeneratedByRemoteConsensusLeader()) {
+ MetricService.getInstance()
+ .count(
+ node.getWritePointCount(),
+ Metric.LEADER_QUANTITY.toString(),
+ MetricLevel.CORE,
+ Tag.NAME.toString(),
+ Metric.POINTS_IN.toString(),
+ Tag.DATABASE.toString(),
+ databaseName,
+ Tag.REGION.toString(),
+ dataRegion.getDataRegionId());
+ }
});
+
return true;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index 550b35eeee2..39bbb33e830 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -236,6 +236,19 @@ public abstract class AbstractMemTable implements
IMemTable {
database,
Tag.REGION.toString(),
dataRegionId);
+ if (!insertRowNode.isGeneratedByRemoteConsensusLeader()) {
+ MetricService.getInstance()
+ .count(
+ pointsInserted,
+ Metric.LEADER_QUANTITY.toString(),
+ MetricLevel.CORE,
+ Tag.NAME.toString(),
+ METRIC_POINT_IN,
+ Tag.DATABASE.toString(),
+ database,
+ Tag.REGION.toString(),
+ dataRegionId);
+ }
}
@Override
@@ -275,6 +288,19 @@ public abstract class AbstractMemTable implements
IMemTable {
database,
Tag.REGION.toString(),
dataRegionId);
+ if (!insertRowNode.isGeneratedByRemoteConsensusLeader()) {
+ MetricService.getInstance()
+ .count(
+ pointsInserted,
+ Metric.LEADER_QUANTITY.toString(),
+ MetricLevel.CORE,
+ Tag.NAME.toString(),
+ METRIC_POINT_IN,
+ Tag.DATABASE.toString(),
+ database,
+ Tag.REGION.toString(),
+ dataRegionId);
+ }
}
@Override
@@ -298,6 +324,19 @@ public abstract class AbstractMemTable implements
IMemTable {
database,
Tag.REGION.toString(),
dataRegionId);
+ if (!insertTabletNode.isGeneratedByRemoteConsensusLeader()) {
+ MetricService.getInstance()
+ .count(
+ pointsInserted,
+ Metric.LEADER_QUANTITY.toString(),
+ MetricLevel.CORE,
+ Tag.NAME.toString(),
+ METRIC_POINT_IN,
+ Tag.DATABASE.toString(),
+ database,
+ Tag.REGION.toString(),
+ dataRegionId);
+ }
} catch (RuntimeException e) {
throw new WriteProcessException(e);
}
@@ -324,6 +363,19 @@ public abstract class AbstractMemTable implements
IMemTable {
database,
Tag.REGION.toString(),
dataRegionId);
+ if (!insertTabletNode.isGeneratedByRemoteConsensusLeader()) {
+ MetricService.getInstance()
+ .count(
+ pointsInserted,
+ Metric.LEADER_QUANTITY.toString(),
+ MetricLevel.CORE,
+ Tag.NAME.toString(),
+ METRIC_POINT_IN,
+ Tag.DATABASE.toString(),
+ database,
+ Tag.REGION.toString(),
+ dataRegionId);
+ }
} catch (RuntimeException e) {
throw new WriteProcessException(e);
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index 49f1e057625..c207435d3f1 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -94,6 +94,7 @@ public enum Metric {
CACHE("cache"),
CACHE_HIT_RATE("cache_hit"),
QUANTITY("quantity"),
+ LEADER_QUANTITY("leader_quantity"),
SCHEMA_REGION("schema_region"),
SCHEMA_ENGINE("schema_engine"),
// query engine related