This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr_vgraft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/expr_vgraft by this push:
     new 5afd76a746 add param queueOrdered in LogDispatcher
5afd76a746 is described below

commit 5afd76a746e6987ceee9203a00c8e2bc8c829d25
Author: Tian Jiang <[email protected]>
AuthorDate: Thu Oct 6 10:44:17 2022 +0800

    add param queueOrdered in LogDispatcher
---
 .../main/java/org/apache/iotdb/cluster/log/LogDispatcher.java    | 9 ++++++++-
 .../java/org/apache/iotdb/cluster/server/member/RaftMember.java  | 4 ++--
 2 files changed, 10 insertions(+), 3 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 12e9a2927e..10c27d52f1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -50,6 +50,7 @@ import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -80,6 +81,8 @@ public class LogDispatcher {
   List<Pair<Node, BlockingQueue<SendLogRequest>>> nodesLogQueuesList = new 
ArrayList<>();
   Map<Node, Boolean> nodesEnabled;
   Map<Node, ExecutorService> executorServices = new HashMap<>();
+  protected boolean queueOrdered =
+      !(clusterConfig.isUseFollowerSlidingWindow() && 
clusterConfig.isEnableWeakAcceptance());
 
   public static int bindingThreadNum = 
clusterConfig.getDispatcherBindingThreadNum();
   public static int maxBatchSize = 10;
@@ -335,6 +338,10 @@ public class LogDispatcher {
           }
           Statistic.LOG_DISPATCHER_LOG_BATCH_SIZE.add(currBatch.size());
           serializeEntries();
+          if (!queueOrdered) {
+            currBatch.sort(
+                Comparator.comparingLong(s -> 
s.getVotingLog().getLog().getCurrLogIndex()));
+          }
           sendBatchLogs(currBatch);
           currBatch.clear();
         }
@@ -461,7 +468,7 @@ public class LogDispatcher {
 
     private void sendBatchLogs(List<SendLogRequest> currBatch) throws 
TException {
       if (currBatch.size() > 1) {
-        if (useBatchInLogCatchUp) {
+        if (useBatchInLogCatchUp && queueOrdered) {
           sendLogs(currBatch);
         } else {
           for (SendLogRequest batch : currBatch) {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index c89aa376ac..6852499ff4 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -1920,8 +1920,8 @@ public abstract class RaftMember implements 
RaftMemberMBean {
     return request;
   }
 
-  protected AppendEntryRequest 
buildAppendEntryRequestExtended(AppendEntryRequest request, Log log,
-      boolean serializeNow) {
+  protected AppendEntryRequest buildAppendEntryRequestExtended(
+      AppendEntryRequest request, Log log, boolean serializeNow) {
     request.setIsFromLeader(true);
     if (ClusterDescriptor.getInstance().getConfig().isUseVGRaft()) {
       request.setLeaderSignature(KeyManager.INSTANCE.getNodeSignature());

Reply via email to