This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch xingtanzjr/fix_wirte_issue_ml in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 40f0613ab76f97e429ff5509fa19d19ad364704f Author: Jinrui.Zhang <[email protected]> AuthorDate: Tue Aug 30 13:17:44 2022 +0800 add the logic to process other PlanNode besides InsertNode --- .../statemachine/DataRegionStateMachine.java | 28 +++++++++++++++------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java index d5cf5e4533..c53bb069a7 100644 --- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java +++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java @@ -196,8 +196,8 @@ public class DataRegionStateMachine extends BaseStateMachine { insertNodeWrapper.getStartSyncIndex(), insertNodeWrapper.getEndSyncIndex()); List<TSStatus> subStatus = new LinkedList<>(); - for (InsertNode insertNode : insertNodeWrapper.getInsertNodes()) { - subStatus.add(write(insertNode)); + for (PlanNode planNode : insertNodeWrapper.getInsertNodes()) { + subStatus.add(write(planNode)); } queueSortCondition.signalAll(); return new TSStatus().setSubStatus(subStatus); @@ -209,7 +209,7 @@ public class DataRegionStateMachine extends BaseStateMachine { private static class InsertNodeWrapper implements Comparable<InsertNodeWrapper> { private final long startSyncIndex; private final long endSyncIndex; - private final List<InsertNode> insertNodes; + private final List<PlanNode> insertNodes; public InsertNodeWrapper(long startSyncIndex, long endSyncIndex) { this.startSyncIndex = startSyncIndex; @@ -222,7 +222,7 @@ public class DataRegionStateMachine extends BaseStateMachine { return Long.compare(startSyncIndex, o.startSyncIndex); } - public void add(InsertNode insertNode) { + public void add(PlanNode insertNode) { this.insertNodes.add(insertNode); } @@ -234,7 +234,7 @@ public class DataRegionStateMachine extends BaseStateMachine { return endSyncIndex; } - public List<InsertNode> getInsertNodes() { + public List<PlanNode> getInsertNodes() { return insertNodes; } } @@ -248,13 +248,23 @@ public class DataRegionStateMachine extends BaseStateMachine { return insertNodeWrapper; } - private InsertNode grabInsertNode(IndexedConsensusRequest indexedRequest) { + private PlanNode grabInsertNode(IndexedConsensusRequest indexedRequest) { List<InsertNode> insertNodes = new ArrayList<>(indexedRequest.getRequests().size()); for (IConsensusRequest req : indexedRequest.getRequests()) { // PlanNode in IndexedConsensusRequest should always be InsertNode - InsertNode innerNode = (InsertNode) getPlanNode(req); - innerNode.setSearchIndex(indexedRequest.getSearchIndex()); - insertNodes.add(innerNode); + PlanNode planNode = getPlanNode(req); + if (planNode instanceof InsertNode) { + InsertNode innerNode = (InsertNode) planNode; + innerNode.setSearchIndex(indexedRequest.getSearchIndex()); + insertNodes.add(innerNode); + } else if (indexedRequest.getRequests().size() == 1) { + // If the planNode is not InsertNode, it is expected that the IndexedConsensusRequest only + // contains one request + return planNode; + } else { + throw new IllegalArgumentException( + "PlanNodes in IndexedConsensusRequest are not InsertNode and the size of requests are larger than 1"); + } } return mergeInsertNodes(insertNodes); }
