This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new bbab64fe9a [IOTDB-4267]Add the logic to process other PlanNode besides
InsertNode in MultiLeaderConsensus (#7170)
bbab64fe9a is described below
commit bbab64fe9a625e5e2cebea0c300780c1dfcca711
Author: Zhang.Jinrui <[email protected]>
AuthorDate: Tue Aug 30 16:34:18 2022 +0800
[IOTDB-4267]Add the logic to process other PlanNode besides InsertNode in
MultiLeaderConsensus (#7170)
---
.../statemachine/DataRegionStateMachine.java | 28 +++++++++++++++-------
1 file changed, 19 insertions(+), 9 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index d5cf5e4533..c53bb069a7 100644
---
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -196,8 +196,8 @@ public class DataRegionStateMachine extends
BaseStateMachine {
insertNodeWrapper.getStartSyncIndex(),
insertNodeWrapper.getEndSyncIndex());
List<TSStatus> subStatus = new LinkedList<>();
- for (InsertNode insertNode : insertNodeWrapper.getInsertNodes()) {
- subStatus.add(write(insertNode));
+ for (PlanNode planNode : insertNodeWrapper.getInsertNodes()) {
+ subStatus.add(write(planNode));
}
queueSortCondition.signalAll();
return new TSStatus().setSubStatus(subStatus);
@@ -209,7 +209,7 @@ public class DataRegionStateMachine extends
BaseStateMachine {
private static class InsertNodeWrapper implements
Comparable<InsertNodeWrapper> {
private final long startSyncIndex;
private final long endSyncIndex;
- private final List<InsertNode> insertNodes;
+ private final List<PlanNode> insertNodes;
public InsertNodeWrapper(long startSyncIndex, long endSyncIndex) {
this.startSyncIndex = startSyncIndex;
@@ -222,7 +222,7 @@ public class DataRegionStateMachine extends
BaseStateMachine {
return Long.compare(startSyncIndex, o.startSyncIndex);
}
- public void add(InsertNode insertNode) {
+ public void add(PlanNode insertNode) {
this.insertNodes.add(insertNode);
}
@@ -234,7 +234,7 @@ public class DataRegionStateMachine extends
BaseStateMachine {
return endSyncIndex;
}
- public List<InsertNode> getInsertNodes() {
+ public List<PlanNode> getInsertNodes() {
return insertNodes;
}
}
@@ -248,13 +248,23 @@ public class DataRegionStateMachine extends
BaseStateMachine {
return insertNodeWrapper;
}
- private InsertNode grabInsertNode(IndexedConsensusRequest indexedRequest) {
+ private PlanNode grabInsertNode(IndexedConsensusRequest indexedRequest) {
List<InsertNode> insertNodes = new
ArrayList<>(indexedRequest.getRequests().size());
for (IConsensusRequest req : indexedRequest.getRequests()) {
// PlanNode in IndexedConsensusRequest should always be InsertNode
- InsertNode innerNode = (InsertNode) getPlanNode(req);
- innerNode.setSearchIndex(indexedRequest.getSearchIndex());
- insertNodes.add(innerNode);
+ PlanNode planNode = getPlanNode(req);
+ if (planNode instanceof InsertNode) {
+ InsertNode innerNode = (InsertNode) planNode;
+ innerNode.setSearchIndex(indexedRequest.getSearchIndex());
+ insertNodes.add(innerNode);
+ } else if (indexedRequest.getRequests().size() == 1) {
+ // If the planNode is not InsertNode, it is expected that the
IndexedConsensusRequest only
+ // contains one request
+ return planNode;
+ } else {
+ throw new IllegalArgumentException(
+ "PlanNodes in IndexedConsensusRequest are not InsertNode and the
size of requests are larger than 1");
+ }
}
return mergeInsertNodes(insertNodes);
}