This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f2bc7156f0 merge insert nodes with same search index into one (#6727)
f2bc7156f0 is described below

commit f2bc7156f08ce2e66d854103055285b0d78335cc
Author: Alan Choo <[email protected]>
AuthorDate: Fri Jul 29 11:48:02 2022 +0800

    merge insert nodes with same search index into one (#6727)
---
 .../multileader/MultiLeaderServerImpl.java         |  5 +-
 .../multileader/logdispatcher/LogDispatcher.java   |  5 +-
 .../service/MultiLeaderRPCServiceProcessor.java    | 34 ++++++---
 .../statemachine/DataRegionStateMachine.java       | 86 +++++++++++++++++-----
 .../src/main/thrift/mutlileader.thrift             |  3 +-
 5 files changed, 101 insertions(+), 32 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index 5573f6ba9a..3e8cbbfb3b 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -186,9 +186,8 @@ public class MultiLeaderServerImpl {
   }
 
   public IndexedConsensusRequest buildIndexedConsensusRequestForRemoteRequest(
-      IConsensusRequest request) {
-    return new IndexedConsensusRequest(
-        ConsensusReqReader.DEFAULT_SEARCH_INDEX, 
Collections.singletonList(request));
+      List<IConsensusRequest> requests) {
+    return new 
IndexedConsensusRequest(ConsensusReqReader.DEFAULT_SEARCH_INDEX, requests);
   }
 
   /**
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 71ce7caa45..ed41ec5e16 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -344,7 +344,7 @@ public class LogDispatcher {
         currentIndex = data.getSearchIndex();
         iteratorIndex = currentIndex;
         for (IConsensusRequest innerRequest : data.getRequests()) {
-          logBatches.add(new TLogBatch(innerRequest.serializeToByteBuffer(), 
true));
+          logBatches.add(new TLogBatch(innerRequest.serializeToByteBuffer(), 
currentIndex, true));
         }
         if (currentIndex == maxIndex - 1) {
           break;
@@ -356,7 +356,8 @@ public class LogDispatcher {
     private void constructBatchIndexedFromConsensusRequest(
         IndexedConsensusRequest request, List<TLogBatch> logBatches) {
       for (IConsensusRequest innerRequest : request.getRequests()) {
-        logBatches.add(new TLogBatch(innerRequest.serializeToByteBuffer(), 
false));
+        logBatches.add(
+            new TLogBatch(innerRequest.serializeToByteBuffer(), 
request.getSearchIndex(), false));
       }
     }
   }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index b83c900892..3fc63c7d99 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.consensus.multileader.service;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
 import org.apache.iotdb.consensus.multileader.MultiLeaderConsensus;
 import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
@@ -68,15 +69,30 @@ public class MultiLeaderRPCServiceProcessor implements 
MultiLeaderConsensusIServ
       }
       List<TSStatus> statuses = new ArrayList<>();
       // We use synchronized to ensure atomicity of executing multiple logs
-      synchronized (impl.getStateMachine()) {
-        for (TLogBatch batch : req.getBatches()) {
-          statuses.add(
-              impl.getStateMachine()
-                  .write(
-                      impl.buildIndexedConsensusRequestForRemoteRequest(
-                          batch.isFromWAL()
-                              ? new MultiLeaderConsensusRequest(batch.data)
-                              : new ByteBufferConsensusRequest(batch.data))));
+      if (!req.getBatches().isEmpty()) {
+        synchronized (impl.getStateMachine()) {
+          List<IConsensusRequest> consensusRequests = new ArrayList<>();
+          long currentSearchIndex = req.getBatches().get(0).getSearchIndex();
+          for (TLogBatch batch : req.getBatches()) {
+            IConsensusRequest request =
+                batch.isFromWAL()
+                    ? new MultiLeaderConsensusRequest(batch.data)
+                    : new ByteBufferConsensusRequest(batch.data);
+            // merge TLogBatch with same search index into one request
+            if (batch.getSearchIndex() != currentSearchIndex) {
+              statuses.add(
+                  impl.getStateMachine()
+                      
.write(impl.buildIndexedConsensusRequestForRemoteRequest(consensusRequests)));
+              consensusRequests = new ArrayList<>();
+            }
+            consensusRequests.add(request);
+          }
+          // write last request
+          if (!consensusRequests.isEmpty()) {
+            statuses.add(
+                impl.getStateMachine()
+                    
.write(impl.buildIndexedConsensusRequestForRemoteRequest(consensusRequests)));
+          }
         }
       }
       logger.debug("Execute TSyncLogReq for {} with result {}", 
req.consensusGroupId, statuses);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
 
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index e5d9c2b6f4..285ad31d74 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.consensus.statemachine;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
@@ -33,13 +34,20 @@ import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
 
 public class DataRegionStateMachine extends BaseStateMachine {
 
@@ -98,36 +106,80 @@ public class DataRegionStateMachine extends 
BaseStateMachine {
 
   @Override
   public TSStatus write(IConsensusRequest request) {
-    TSStatus status;
     PlanNode planNode;
     try {
       if (request instanceof IndexedConsensusRequest) {
-        status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-        IndexedConsensusRequest indexedConsensusRequest = 
(IndexedConsensusRequest) request;
-        for (IConsensusRequest innerRequest : 
indexedConsensusRequest.getRequests()) {
-          planNode = getPlanNode(innerRequest);
-          if (planNode instanceof InsertNode) {
-            ((InsertNode) planNode)
-                .setSearchIndex(((IndexedConsensusRequest) 
request).getSearchIndex());
-          }
-          TSStatus subStatus = write(planNode);
-          if (subStatus.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-            status.setCode(subStatus.getCode());
-            status.setMessage(subStatus.getMessage());
-          }
-          status.addToSubStatus(subStatus);
+        IndexedConsensusRequest indexedRequest = (IndexedConsensusRequest) 
request;
+        List<InsertNode> insertNodes = new 
ArrayList<>(indexedRequest.getRequests().size());
+        for (IConsensusRequest req : indexedRequest.getRequests()) {
+          // PlanNode in IndexedConsensusRequest should always be InsertNode
+          InsertNode innerNode = (InsertNode) getPlanNode(req);
+          innerNode.setSearchIndex(indexedRequest.getSearchIndex());
+          insertNodes.add(innerNode);
         }
+        planNode = mergeInsertNodes(insertNodes);
       } else {
         planNode = getPlanNode(request);
-        status = write(planNode);
       }
-      return status;
+      return write(planNode);
     } catch (IllegalArgumentException e) {
       logger.error(e.getMessage(), e);
       return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
     }
   }
 
+  /**
+   * Merge insert nodes sharing same search index ( e.g. tablet-100, 
tablet-100, tablet-100 will be
+   * merged to one multi-tablet). <br>
+   * Notice: the continuity of insert nodes sharing same search index should 
be protected by the
+   * upper layer.
+   */
+  private InsertNode mergeInsertNodes(List<InsertNode> insertNodes) {
+    int size = insertNodes.size();
+    if (size == 0) {
+      throw new RuntimeException();
+    }
+    if (size == 1) {
+      return insertNodes.get(0);
+    }
+
+    InsertNode result;
+    if (insertNodes.get(0) instanceof InsertTabletNode) { // merge to 
InsertMultiTabletsNode
+      List<Integer> index = new ArrayList<>(size);
+      List<InsertTabletNode> insertTabletNodes = new ArrayList<>(size);
+      int i = 0;
+      for (InsertNode insertNode : insertNodes) {
+        insertTabletNodes.add((InsertTabletNode) insertNode);
+        index.add(i);
+        i++;
+      }
+      result =
+          new InsertMultiTabletsNode(insertNodes.get(0).getPlanNodeId(), 
index, insertTabletNodes);
+    } else { // merge to InsertRowsNode or InsertRowsOfOneDeviceNode
+      boolean sameDevice = true;
+      PartialPath device = insertNodes.get(0).getDevicePath();
+      List<Integer> index = new ArrayList<>(size);
+      List<InsertRowNode> insertRowNodes = new ArrayList<>(size);
+      int i = 0;
+      for (InsertNode insertNode : insertNodes) {
+        if (sameDevice && !insertNode.getDevicePath().equals(device)) {
+          sameDevice = false;
+        }
+        insertRowNodes.add((InsertRowNode) insertNode);
+        index.add(i);
+        i++;
+      }
+      result =
+          sameDevice
+              ? new InsertRowsOfOneDeviceNode(
+                  insertNodes.get(0).getPlanNodeId(), index, insertRowNodes)
+              : new InsertRowsNode(insertNodes.get(0).getPlanNodeId(), index, 
insertRowNodes);
+    }
+    result.setSearchIndex(insertNodes.get(0).getSearchIndex());
+    result.setDevicePath(insertNodes.get(0).getDevicePath());
+    return result;
+  }
+
   protected TSStatus write(PlanNode planNode) {
     return planNode.accept(new DataExecutionVisitor(), region);
   }
diff --git a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift 
b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
index d36601be1c..218318163a 100644
--- a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
+++ b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
@@ -22,7 +22,8 @@ namespace java org.apache.iotdb.consensus.multileader.thrift
 
 struct TLogBatch {
   1: required binary data
-  2: required bool fromWAL
+  2: required i64 searchIndex
+  3: required bool fromWAL
 }
 
 struct TSyncLogReq {

Reply via email to