This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr_vgraft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/expr_vgraft by this push:
new 5afd76a746 add param queueOrdered in LogDispatcher
5afd76a746 is described below
commit 5afd76a746e6987ceee9203a00c8e2bc8c829d25
Author: Tian Jiang <[email protected]>
AuthorDate: Thu Oct 6 10:44:17 2022 +0800
add param queueOrdered in LogDispatcher
---
.../main/java/org/apache/iotdb/cluster/log/LogDispatcher.java | 9 ++++++++-
.../java/org/apache/iotdb/cluster/server/member/RaftMember.java | 4 ++--
2 files changed, 10 insertions(+), 3 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 12e9a2927e..10c27d52f1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -50,6 +50,7 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -80,6 +81,8 @@ public class LogDispatcher {
List<Pair<Node, BlockingQueue<SendLogRequest>>> nodesLogQueuesList = new
ArrayList<>();
Map<Node, Boolean> nodesEnabled;
Map<Node, ExecutorService> executorServices = new HashMap<>();
+ protected boolean queueOrdered =
+ !(clusterConfig.isUseFollowerSlidingWindow() &&
clusterConfig.isEnableWeakAcceptance());
public static int bindingThreadNum =
clusterConfig.getDispatcherBindingThreadNum();
public static int maxBatchSize = 10;
@@ -335,6 +338,10 @@ public class LogDispatcher {
}
Statistic.LOG_DISPATCHER_LOG_BATCH_SIZE.add(currBatch.size());
serializeEntries();
+ if (!queueOrdered) {
+ currBatch.sort(
+ Comparator.comparingLong(s ->
s.getVotingLog().getLog().getCurrLogIndex()));
+ }
sendBatchLogs(currBatch);
currBatch.clear();
}
@@ -461,7 +468,7 @@ public class LogDispatcher {
private void sendBatchLogs(List<SendLogRequest> currBatch) throws
TException {
if (currBatch.size() > 1) {
- if (useBatchInLogCatchUp) {
+ if (useBatchInLogCatchUp && queueOrdered) {
sendLogs(currBatch);
} else {
for (SendLogRequest batch : currBatch) {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index c89aa376ac..6852499ff4 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -1920,8 +1920,8 @@ public abstract class RaftMember implements
RaftMemberMBean {
return request;
}
- protected AppendEntryRequest
buildAppendEntryRequestExtended(AppendEntryRequest request, Log log,
- boolean serializeNow) {
+ protected AppendEntryRequest buildAppendEntryRequestExtended(
+ AppendEntryRequest request, Log log, boolean serializeNow) {
request.setIsFromLeader(true);
if (ClusterDescriptor.getInstance().getConfig().isUseVGRaft()) {
request.setLeaderSignature(KeyManager.INSTANCE.getNodeSignature());