This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f2bc7156f0 merge insert nodes with same search index into one (#6727)
f2bc7156f0 is described below
commit f2bc7156f08ce2e66d854103055285b0d78335cc
Author: Alan Choo <[email protected]>
AuthorDate: Fri Jul 29 11:48:02 2022 +0800
merge insert nodes with same search index into one (#6727)
---
.../multileader/MultiLeaderServerImpl.java | 5 +-
.../multileader/logdispatcher/LogDispatcher.java | 5 +-
.../service/MultiLeaderRPCServiceProcessor.java | 34 ++++++---
.../statemachine/DataRegionStateMachine.java | 86 +++++++++++++++++-----
.../src/main/thrift/mutlileader.thrift | 3 +-
5 files changed, 101 insertions(+), 32 deletions(-)
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index 5573f6ba9a..3e8cbbfb3b 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -186,9 +186,8 @@ public class MultiLeaderServerImpl {
}
public IndexedConsensusRequest buildIndexedConsensusRequestForRemoteRequest(
- IConsensusRequest request) {
- return new IndexedConsensusRequest(
- ConsensusReqReader.DEFAULT_SEARCH_INDEX,
Collections.singletonList(request));
+ List<IConsensusRequest> requests) {
+ return new
IndexedConsensusRequest(ConsensusReqReader.DEFAULT_SEARCH_INDEX, requests);
}
/**
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 71ce7caa45..ed41ec5e16 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -344,7 +344,7 @@ public class LogDispatcher {
currentIndex = data.getSearchIndex();
iteratorIndex = currentIndex;
for (IConsensusRequest innerRequest : data.getRequests()) {
- logBatches.add(new TLogBatch(innerRequest.serializeToByteBuffer(),
true));
+ logBatches.add(new TLogBatch(innerRequest.serializeToByteBuffer(),
currentIndex, true));
}
if (currentIndex == maxIndex - 1) {
break;
@@ -356,7 +356,8 @@ public class LogDispatcher {
private void constructBatchIndexedFromConsensusRequest(
IndexedConsensusRequest request, List<TLogBatch> logBatches) {
for (IConsensusRequest innerRequest : request.getRequests()) {
- logBatches.add(new TLogBatch(innerRequest.serializeToByteBuffer(),
false));
+ logBatches.add(
+ new TLogBatch(innerRequest.serializeToByteBuffer(),
request.getSearchIndex(), false));
}
}
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index b83c900892..3fc63c7d99 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.consensus.multileader.service;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
import org.apache.iotdb.consensus.multileader.MultiLeaderConsensus;
import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
@@ -68,15 +69,30 @@ public class MultiLeaderRPCServiceProcessor implements
MultiLeaderConsensusIServ
}
List<TSStatus> statuses = new ArrayList<>();
// We use synchronized to ensure atomicity of executing multiple logs
- synchronized (impl.getStateMachine()) {
- for (TLogBatch batch : req.getBatches()) {
- statuses.add(
- impl.getStateMachine()
- .write(
- impl.buildIndexedConsensusRequestForRemoteRequest(
- batch.isFromWAL()
- ? new MultiLeaderConsensusRequest(batch.data)
- : new ByteBufferConsensusRequest(batch.data))));
+ if (!req.getBatches().isEmpty()) {
+ synchronized (impl.getStateMachine()) {
+ List<IConsensusRequest> consensusRequests = new ArrayList<>();
+ long currentSearchIndex = req.getBatches().get(0).getSearchIndex();
+ for (TLogBatch batch : req.getBatches()) {
+ IConsensusRequest request =
+ batch.isFromWAL()
+ ? new MultiLeaderConsensusRequest(batch.data)
+ : new ByteBufferConsensusRequest(batch.data);
+ // merge TLogBatch with same search index into one request
+ if (batch.getSearchIndex() != currentSearchIndex) {
+ statuses.add(
+ impl.getStateMachine()
+
.write(impl.buildIndexedConsensusRequestForRemoteRequest(consensusRequests)));
+ consensusRequests = new ArrayList<>();
+ }
+ consensusRequests.add(request);
+ }
+ // write last request
+ if (!consensusRequests.isEmpty()) {
+ statuses.add(
+ impl.getStateMachine()
+
.write(impl.buildIndexedConsensusRequestForRemoteRequest(consensusRequests)));
+ }
}
}
logger.debug("Execute TSyncLogReq for {} with result {}",
req.consensusGroupId, statuses);
diff --git
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index e5d9c2b6f4..285ad31d74 100644
---
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.consensus.statemachine;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
@@ -33,13 +34,20 @@ import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
+import
org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
public class DataRegionStateMachine extends BaseStateMachine {
@@ -98,36 +106,80 @@ public class DataRegionStateMachine extends
BaseStateMachine {
@Override
public TSStatus write(IConsensusRequest request) {
- TSStatus status;
PlanNode planNode;
try {
if (request instanceof IndexedConsensusRequest) {
- status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- IndexedConsensusRequest indexedConsensusRequest =
(IndexedConsensusRequest) request;
- for (IConsensusRequest innerRequest :
indexedConsensusRequest.getRequests()) {
- planNode = getPlanNode(innerRequest);
- if (planNode instanceof InsertNode) {
- ((InsertNode) planNode)
- .setSearchIndex(((IndexedConsensusRequest)
request).getSearchIndex());
- }
- TSStatus subStatus = write(planNode);
- if (subStatus.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- status.setCode(subStatus.getCode());
- status.setMessage(subStatus.getMessage());
- }
- status.addToSubStatus(subStatus);
+ IndexedConsensusRequest indexedRequest = (IndexedConsensusRequest)
request;
+ List<InsertNode> insertNodes = new
ArrayList<>(indexedRequest.getRequests().size());
+ for (IConsensusRequest req : indexedRequest.getRequests()) {
+ // PlanNode in IndexedConsensusRequest should always be InsertNode
+ InsertNode innerNode = (InsertNode) getPlanNode(req);
+ innerNode.setSearchIndex(indexedRequest.getSearchIndex());
+ insertNodes.add(innerNode);
}
+ planNode = mergeInsertNodes(insertNodes);
} else {
planNode = getPlanNode(request);
- status = write(planNode);
}
- return status;
+ return write(planNode);
} catch (IllegalArgumentException e) {
logger.error(e.getMessage(), e);
return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
}
}
+ /**
+ * Merge insert nodes sharing same search index ( e.g. tablet-100,
tablet-100, tablet-100 will be
+ * merged to one multi-tablet). <br>
+ * Notice: the continuity of insert nodes sharing same search index should
be protected by the
+ * upper layer.
+ */
+ private InsertNode mergeInsertNodes(List<InsertNode> insertNodes) {
+ int size = insertNodes.size();
+ if (size == 0) {
+ throw new RuntimeException();
+ }
+ if (size == 1) {
+ return insertNodes.get(0);
+ }
+
+ InsertNode result;
+ if (insertNodes.get(0) instanceof InsertTabletNode) { // merge to
InsertMultiTabletsNode
+ List<Integer> index = new ArrayList<>(size);
+ List<InsertTabletNode> insertTabletNodes = new ArrayList<>(size);
+ int i = 0;
+ for (InsertNode insertNode : insertNodes) {
+ insertTabletNodes.add((InsertTabletNode) insertNode);
+ index.add(i);
+ i++;
+ }
+ result =
+ new InsertMultiTabletsNode(insertNodes.get(0).getPlanNodeId(),
index, insertTabletNodes);
+ } else { // merge to InsertRowsNode or InsertRowsOfOneDeviceNode
+ boolean sameDevice = true;
+ PartialPath device = insertNodes.get(0).getDevicePath();
+ List<Integer> index = new ArrayList<>(size);
+ List<InsertRowNode> insertRowNodes = new ArrayList<>(size);
+ int i = 0;
+ for (InsertNode insertNode : insertNodes) {
+ if (sameDevice && !insertNode.getDevicePath().equals(device)) {
+ sameDevice = false;
+ }
+ insertRowNodes.add((InsertRowNode) insertNode);
+ index.add(i);
+ i++;
+ }
+ result =
+ sameDevice
+ ? new InsertRowsOfOneDeviceNode(
+ insertNodes.get(0).getPlanNodeId(), index, insertRowNodes)
+ : new InsertRowsNode(insertNodes.get(0).getPlanNodeId(), index,
insertRowNodes);
+ }
+ result.setSearchIndex(insertNodes.get(0).getSearchIndex());
+ result.setDevicePath(insertNodes.get(0).getDevicePath());
+ return result;
+ }
+
protected TSStatus write(PlanNode planNode) {
return planNode.accept(new DataExecutionVisitor(), region);
}
diff --git a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
index d36601be1c..218318163a 100644
--- a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
+++ b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
@@ -22,7 +22,8 @@ namespace java org.apache.iotdb.consensus.multileader.thrift
struct TLogBatch {
1: required binary data
- 2: required bool fromWAL
+ 2: required i64 searchIndex
+ 3: required bool fromWAL
}
struct TSyncLogReq {