This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/native_raft by this push:
     new 3230b84075 optimize notifying and serialization
3230b84075 is described below

commit 3230b840754fbd8a40fbf4bfbefc461c843ccf61
Author: Tian Jiang <[email protected]>
AuthorDate: Tue Apr 18 10:19:51 2023 +0800

    optimize notifying and serialization
---
 client-go                                          |  2 +-
 .../common/request/IConsensusRequest.java          | 10 ++++
 .../iotdb/consensus/natraft/RaftConsensus.java     |  9 ++++
 .../consensus/natraft/protocol/RaftConfig.java     | 15 ++++++
 .../consensus/natraft/protocol/RaftMember.java     | 29 ++++++++----
 .../consensus/natraft/protocol/log/Entry.java      | 50 ++++++++++++++++----
 .../natraft/protocol/log/VotingEntry.java          | 28 +++++++++++
 .../log/dispatch/AppendNodeEntryHandler.java       | 31 +++++++++++--
 .../protocol/log/dispatch/DispatcherGroup.java     | 24 +++++-----
 .../protocol/log/dispatch/DispatcherThread.java    | 54 ++++++++++++++--------
 .../protocol/log/dispatch/LogDispatcher.java       | 26 ++++++-----
 .../protocol/log/dispatch/VotingLogList.java       | 13 +-----
 .../log/dispatch/flowcontrol/FlowBalancer.java     |  9 +---
 .../natraft/protocol/log/logtype/RequestEntry.java | 27 +++++------
 .../iotdb/consensus/natraft/utils/Timer.java       | 18 ++++++++
 .../db/mpp/plan/planner/plan/node/PlanNode.java    |  5 ++
 .../planner/plan/node/write/InsertTabletNode.java  | 14 +++---
 17 files changed, 255 insertions(+), 109 deletions(-)

diff --git a/client-go b/client-go
index a05323c73a..84b8d45829 160000
--- a/client-go
+++ b/client-go
@@ -1 +1 @@
-Subproject commit a05323c73a3d615efde25d4d3287fcee32ec1292
+Subproject commit 84b8d45829d846440a3246400e7bc5e39587dcb5
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
index f18fd7413d..8238caeb24 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.consensus.common.request;
 
 import org.apache.iotdb.commons.path.PartialPath;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 public interface IConsensusRequest {
@@ -39,6 +41,14 @@ public interface IConsensusRequest {
    */
   ByteBuffer serializeToByteBuffer();
 
+  default void serializeTo(DataOutputStream outputStream) throws IOException {
+    ByteBuffer byteBuffer = serializeToByteBuffer();
+    outputStream.write(
+        byteBuffer.array(),
+        byteBuffer.arrayOffset() + byteBuffer.position(),
+        byteBuffer.remaining());
+  }
+
   default long estimateSize() {
     return 0;
   }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
index 2d4611b7be..e7bc64c42f 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
@@ -47,6 +47,7 @@ import 
org.apache.iotdb.consensus.natraft.client.SyncClientAdaptor;
 import org.apache.iotdb.consensus.natraft.exception.CheckConsistencyException;
 import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
 import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
+import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
 import 
org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol.FlowMonitorManager;
 import org.apache.iotdb.consensus.natraft.service.RaftRPCService;
 import org.apache.iotdb.consensus.natraft.service.RaftRPCServiceProcessor;
@@ -99,6 +100,7 @@ public class RaftConsensus implements IConsensus {
             .createClientManager(new 
AsyncRaftServiceClientPoolFactory(this.config));
     FlowMonitorManager.INSTANCE.setConfig(this.config);
     SyncClientAdaptor.setConfig(this.config);
+    Entry.DEFAULT_SERIALIZATION_BUFFER_SIZE = 
this.config.getEntryDefaultSerializationBufferSize();
   }
 
   @Override
@@ -115,6 +117,11 @@ public class RaftConsensus implements IConsensus {
             new Thread(
                 () -> {
                   logger.info(Timer.Statistic.getReport());
+                  try {
+                    stop();
+                  } catch (IOException e) {
+                    logger.error("Error during exiting", e);
+                  }
                 }));
     reportThread = 
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("NodeReportThread");
     ScheduledExecutorUtil.safelyScheduleAtFixedRate(
@@ -158,6 +165,7 @@ public class RaftConsensus implements IConsensus {
 
   @Override
   public void stop() throws IOException {
+    reportThread.shutdownNow();
     clientManager.close();
     stateMachineMap.values().parallelStream().forEach(RaftMember::stop);
     registerManager.deregisterAll();
@@ -167,6 +175,7 @@ public class RaftConsensus implements IConsensus {
   @Override
   public ConsensusWriteResponse write(ConsensusGroupId groupId, 
IConsensusRequest request) {
     if (config.isOnlyTestNetwork()) {
+      request.serializeToByteBuffer();
       return 
ConsensusWriteResponse.newBuilder().setStatus(StatusUtils.OK).build();
     }
     RaftMember impl = stateMachineMap.get(groupId);
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
index be3e56d231..b1218ce689 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
@@ -39,6 +39,7 @@ public class RaftConfig {
   private boolean enableWeakAcceptance = false;
   private int maxNumOfLogsInMem = 10000;
   private int minNumOfLogsInMem = 1000;
+  private int entryDefaultSerializationBufferSize = 16 * 1024;
   private long maxMemorySizeForRaftLog = 512 * 1024 * 1024L;
   private int logDeleteCheckIntervalSecond = 1;
   private boolean enableRaftLogPersistence = true;
@@ -436,6 +437,14 @@ public class RaftConfig {
     this.maxRaftLogPersistDataSizePerFile = maxRaftLogPersistDataSizePerFile;
   }
 
+  public int getEntryDefaultSerializationBufferSize() {
+    return entryDefaultSerializationBufferSize;
+  }
+
+  public void setEntryDefaultSerializationBufferSize(int 
entryDefaultSerializationBufferSize) {
+    this.entryDefaultSerializationBufferSize = 
entryDefaultSerializationBufferSize;
+  }
+
   public void loadProperties(Properties properties) {
     logger.debug("Loading properties: {}", properties);
 
@@ -647,6 +656,12 @@ public class RaftConfig {
             properties.getProperty(
                 "flow_control_max_flow", 
String.valueOf(this.getFlowControlMaxFlow()))));
 
+    this.setEntryDefaultSerializationBufferSize(
+        Integer.parseInt(
+            properties.getProperty(
+                "entry_serialization_buffer_size",
+                
String.valueOf(this.getEntryDefaultSerializationBufferSize()))));
+
     String consistencyLevel = properties.getProperty("consistency_level");
     if (consistencyLevel != null) {
       
this.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel));
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index 8d84036592..a7c00d3bc2 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -385,6 +385,7 @@ public class RaftMember {
       return;
     }
 
+    logDispatcher.stop();
     heartbeatThread.stop();
     catchUpManager.stop();
 
@@ -633,6 +634,7 @@ public class RaftMember {
 
     logger.debug("{}: Processing request {}", name, request);
     Entry entry = new RequestEntry(request);
+    entry.preSerialize();
     entry.receiveTime = System.nanoTime();
 
     // just like processPlanLocally,we need to check the size of log
@@ -655,19 +657,15 @@ public class RaftMember {
       return StatusUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT);
     }
 
-    TSStatus tsStatus1 = waitForEntryResult(votingEntry);
-    entry.waitEndTime = System.nanoTime();
-    Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_WAIT_APPLY_END.add(
-        entry.waitEndTime - entry.createTime);
-    return tsStatus1;
+    return waitForEntryResult(votingEntry);
   }
 
   protected void waitApply(Entry entry) throws LogExecutionException {
     // when using async applier, the log here may not be applied. To return 
the execution
     // result, we must wait until the log is applied.
-    synchronized (entry) {
-      while (!entry.isApplied()) {
-        // wait until the log is applied
+    while (!entry.isApplied()) {
+      // wait until the log is applied
+      synchronized (entry) {
         try {
           entry.wait(1);
         } catch (InterruptedException e) {
@@ -676,6 +674,7 @@ public class RaftMember {
         }
       }
     }
+
     if (entry.getException() != null) {
       throw new LogExecutionException(entry.getException());
     }
@@ -753,10 +752,17 @@ public class RaftMember {
     long waitTime = 1;
     AcceptedType acceptedType = votingLogList.computeAcceptedType(log);
 
+    
Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_WAIT_APPEND_START.calOperationCostTimeFromStart(
+        log.getEntry().createTime);
+    long startTime = 
Statistic.RAFT_SENDER_LOG_APPEND_WAIT.getOperationStartTime();
     while (acceptedType == AcceptedType.NOT_ACCEPTED
         && alreadyWait < config.getWriteOperationTimeoutMS()) {
-      long startTime = 
Statistic.RAFT_SENDER_LOG_APPEND_WAIT.getOperationStartTime();
       synchronized (log.getEntry()) {
+        acceptedType = votingLogList.computeAcceptedType(log);
+        if (acceptedType != AcceptedType.NOT_ACCEPTED) {
+          break;
+        }
+
         try {
           log.getEntry().wait(waitTime);
         } catch (InterruptedException e) {
@@ -764,7 +770,6 @@ public class RaftMember {
           logger.warn("Unexpected interruption when sending a log", e);
         }
       }
-      
Statistic.RAFT_SENDER_LOG_APPEND_WAIT.calOperationCostTimeFromStart(startTime);
 
       acceptedType = votingLogList.computeAcceptedType(log);
       alreadyWait = (System.nanoTime() - waitStart) / 1000000;
@@ -777,6 +782,7 @@ public class RaftMember {
         nextTimeToPrint *= 2;
       }
     }
+    
Statistic.RAFT_SENDER_LOG_APPEND_WAIT.calOperationCostTimeFromStart(startTime);
 
     if (logger.isDebugEnabled()) {
       Thread.currentThread().setName(threadBaseName);
@@ -1260,6 +1266,9 @@ public class RaftMember {
         case OK:
           if (config.isWaitApply()) {
             waitApply(votingEntry.getEntry());
+            votingEntry.getEntry().waitEndTime = System.nanoTime();
+            Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_WAIT_APPLY_END.add(
+                votingEntry.getEntry().waitEndTime - 
votingEntry.getEntry().createTime);
             return includeLogNumbersInStatus(
                 StatusUtils.OK.deepCopy(),
                 votingEntry.getEntry().getCurrLogIndex(),
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
index 208678c6b2..9f509c93f2 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
@@ -35,7 +35,7 @@ public abstract class Entry implements Comparable<Entry> {
       
Comparator.comparingLong(Entry::getCurrLogIndex).thenComparing(Entry::getCurrLogTerm);
 
   // make this configurable or adaptive
-  protected static final int DEFAULT_SERIALIZATION_BUFFER_SIZE = 16 * 1024;
+  public static int DEFAULT_SERIALIZATION_BUFFER_SIZE = 16 * 1024;
   private volatile long currLogIndex = Long.MIN_VALUE;
   private long currLogTerm = -1;
   private long prevTerm = -1;
@@ -55,6 +55,8 @@ public abstract class Entry implements Comparable<Entry> {
   public long committedTime;
   public long applyTime;
   public long waitEndTime;
+
+  private ByteBuffer preSerializationCache;
   private ByteBuffer serializationCache;
 
   public int getDefaultSerializationBufferSize() {
@@ -63,22 +65,46 @@ public abstract class Entry implements Comparable<Entry> {
 
   protected abstract ByteBuffer serializeInternal();
 
+  /**
+   * Perform serialization before indexing to avoid serialization under locked 
environment. It
+   * should be noticed that at this time point, the index is not set yet, so 
when the final
+   * serialization is called, it must set the correct index, term, and 
prevTerm (starting from the
+   * second byte in the ByteBuffer).
+   */
+  public void preSerialize() {
+    if (preSerializationCache != null || serializationCache != null) {
+      return;
+    }
+    long startTime = Statistic.SERIALIZE_ENTRY.getOperationStartTime();
+    ByteBuffer byteBuffer = serializeInternal();
+    Statistic.SERIALIZE_ENTRY.calOperationCostTimeFromStart(startTime);
+    preSerializationCache = byteBuffer;
+  }
+
   public ByteBuffer serialize() {
     ByteBuffer cache = serializationCache;
     if (cache != null) {
       return cache.slice();
     }
-    ByteBuffer byteBuffer = serializeInternal();
-    serializationCache = byteBuffer;
-    return byteBuffer.slice();
-  };
+    if (preSerializationCache != null) {
+      preSerializationCache.position(1);
+      preSerializationCache.putLong(getCurrLogIndex());
+      preSerializationCache.putLong(getCurrLogTerm());
+      preSerializationCache.putLong(getPrevTerm());
+      preSerializationCache.position(0);
+      serializationCache = preSerializationCache;
+      preSerializationCache = null;
+    } else {
+      long startTime = Statistic.SERIALIZE_ENTRY.getOperationStartTime();
+      ByteBuffer byteBuffer = serializeInternal();
+      Statistic.SERIALIZE_ENTRY.calOperationCostTimeFromStart(startTime);
+      serializationCache = byteBuffer;
+    }
+    return serializationCache.slice();
+  }
 
   public abstract void deserialize(ByteBuffer buffer);
 
-  public void serialize(ByteBuffer buffer) {
-    buffer.put(serialize());
-  }
-
   public enum Types {
     // DO CHECK LogParser when you add a new type of log
     CLIENT_REQUEST,
@@ -152,6 +178,12 @@ public abstract class Entry implements Comparable<Entry> {
   }
 
   public long estimateSize() {
+    ByteBuffer cache;
+    if ((cache = serializationCache) != null) {
+      return cache.remaining();
+    } else if ((cache = preSerializationCache) != null) {
+      return cache.remaining();
+    }
     return byteSize;
   };
 
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingEntry.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingEntry.java
index b654a3a872..ee5c25c3c6 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingEntry.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingEntry.java
@@ -38,6 +38,8 @@ public class VotingEntry {
   protected List<Peer> currNodes;
   protected List<Peer> newNodes;
   private boolean isStronglyAccepted;
+  private boolean isWeaklyAccepted;
+  private boolean notified;
 
   public VotingEntry(
       Entry entry,
@@ -108,6 +110,24 @@ public class VotingEntry {
     return stronglyAcceptedByCurrNodes && stronglyAcceptedByNewNodes;
   }
 
+  public boolean isWeaklyAccepted(Map<Peer, Long> stronglyAcceptedIndices) {
+    if (isWeaklyAccepted) {
+      return true;
+    }
+    int currNodeQuorumNum = currNodesQuorumNum();
+    int newNodeQuorumNum = newNodesQuorumNum();
+    int stronglyAcceptedNumByCurrNodes = 
stronglyAcceptedNumByCurrNodes(stronglyAcceptedIndices);
+    int stronglyAcceptedNumByNewNodes = 
stronglyAcceptedNumByNewNodes(stronglyAcceptedIndices);
+    int weaklyAcceptedNumByCurrNodes = 
weaklyAcceptedNumByCurrNodes(stronglyAcceptedIndices);
+    int weaklyAcceptedNumByNewNodes = 
weaklyAcceptedNumByNewNodes(stronglyAcceptedIndices);
+    if ((weaklyAcceptedNumByCurrNodes + stronglyAcceptedNumByCurrNodes) >= 
currNodeQuorumNum
+        && (weaklyAcceptedNumByNewNodes + stronglyAcceptedNumByNewNodes) >= 
newNodeQuorumNum) {
+      isWeaklyAccepted = true;
+      return true;
+    }
+    return false;
+  }
+
   public int stronglyAcceptedNumByCurrNodes(Map<Peer, Long> 
stronglyAcceptedIndices) {
     int num = 0;
     for (Peer node : currNodes) {
@@ -159,4 +179,12 @@ public class VotingEntry {
   public boolean hasNewNodes() {
     return newNodes != null;
   }
+
+  public boolean isNotified() {
+    return notified;
+  }
+
+  public void setNotified(boolean notified) {
+    this.notified = notified;
+  }
 }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
index 7d169e6e66..6cb53f71d2 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
 import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
+import 
org.apache.iotdb.consensus.natraft.protocol.log.dispatch.VotingLogList.AcceptedType;
 import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
 
@@ -76,8 +77,17 @@ public class AppendNodeEntryHandler implements 
AsyncMethodCallback<AppendEntryRe
     if (resp == RESPONSE_STRONG_ACCEPT || resp == RESPONSE_AGREE) {
       member.getVotingLogList().onStronglyAccept(votingEntry, trueReceiver);
       
member.getStatus().getPeerMap().get(trueReceiver).setMatchIndex(response.lastLogIndex);
-      synchronized (votingEntry.getEntry()) {
-        votingEntry.getEntry().notifyAll();
+      if (!votingEntry.isNotified()) {
+        AcceptedType acceptedType = 
member.getVotingLogList().computeAcceptedType(votingEntry);
+        if (acceptedType == AcceptedType.STRONGLY_ACCEPTED
+            || acceptedType == AcceptedType.WEAKLY_ACCEPTED) {
+          synchronized (votingEntry.getEntry()) {
+            votingEntry.getEntry().notifyAll();
+            votingEntry.setNotified(true);
+            
Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_NOTIFIED.calOperationCostTimeFromStart(
+                votingEntry.getEntry().createTime);
+          }
+        }
       }
     } else if (resp > 0) {
       // a response > 0 is the follower's term
@@ -91,14 +101,25 @@ public class AppendNodeEntryHandler implements 
AsyncMethodCallback<AppendEntryRe
       member.stepDown(resp, null);
       synchronized (votingEntry.getEntry()) {
         votingEntry.getEntry().notifyAll();
+        votingEntry.setNotified(true);
+        
Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_NOTIFIED.calOperationCostTimeFromStart(
+            votingEntry.getEntry().createTime);
       }
     } else if (resp == RESPONSE_WEAK_ACCEPT) {
       votingEntry.getEntry().acceptedTime = System.nanoTime();
       Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_ACCEPT.add(
           votingEntry.getEntry().acceptedTime - 
votingEntry.getEntry().createTime);
-      synchronized (votingEntry.getEntry()) {
-        votingEntry.addWeaklyAcceptedNodes(trueReceiver);
-        votingEntry.getEntry().notifyAll();
+      votingEntry.addWeaklyAcceptedNodes(trueReceiver);
+      if (!votingEntry.isNotified()) {
+        AcceptedType acceptedType = 
member.getVotingLogList().computeAcceptedType(votingEntry);
+        if (acceptedType == AcceptedType.WEAKLY_ACCEPTED) {
+          synchronized (votingEntry.getEntry()) {
+            votingEntry.getEntry().notifyAll();
+            votingEntry.setNotified(true);
+            
Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_NOTIFIED.calOperationCostTimeFromStart(
+                votingEntry.getEntry().createTime);
+          }
+        }
       }
     } else {
       // e.g., Response.RESPONSE_LOG_MISMATCH
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
index 28ab02c7c4..e953d1bc0a 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
@@ -19,18 +19,20 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log.dispatch;
 
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
+
 import 
org.apache.ratis.thirdparty.com.google.common.util.concurrent.RateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 public class DispatcherGroup {
   private static final Logger logger = 
LoggerFactory.getLogger(DispatcherGroup.class);
   private final Peer peer;
@@ -65,22 +67,22 @@ public class DispatcherGroup {
       // ignore
     }
     if (!closeSucceeded) {
-      logger.warn("Cannot shut down dispatcher pool of {}-{}", 
logDispatcher.member.getName(),
-          peer);
+      logger.warn(
+          "Cannot shut down dispatcher pool of {}-{}", 
logDispatcher.member.getName(), peer);
     }
   }
+
   public void addThread() {
     int threadNum = groupThreadNum.incrementAndGet();
     if (threadNum <= maxBindingThreadNum) {
-      dispatcherThreadPool
-          .submit(newDispatcherThread(peer, entryQueue, rateLimiter));
+      dispatcherThreadPool.submit(newDispatcherThread(peer, entryQueue, 
rateLimiter));
     } else {
       groupThreadNum.decrementAndGet();
     }
   }
 
-  DispatcherThread newDispatcherThread(Peer node, BlockingQueue<VotingEntry> 
logBlockingQueue,
-      RateLimiter rateLimiter) {
+  DispatcherThread newDispatcherThread(
+      Peer node, BlockingQueue<VotingEntry> logBlockingQueue, RateLimiter 
rateLimiter) {
     return new DispatcherThread(logDispatcher, node, logBlockingQueue, 
rateLimiter, this);
   }
 
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
index 7173249696..7e45c26f8a 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
@@ -19,11 +19,6 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log.dispatch;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.natraft.client.AsyncRaftServiceClient;
@@ -35,12 +30,20 @@ import 
org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
 import org.apache.iotdb.consensus.raft.thrift.AppendCompressedEntriesRequest;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
+
 import 
org.apache.ratis.thirdparty.com.google.common.util.concurrent.RateLimiter;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+
 class DispatcherThread implements Runnable {
+
   private static final Logger logger = 
LoggerFactory.getLogger(DispatcherThread.class);
 
   private final LogDispatcher logDispatcher;
@@ -54,8 +57,11 @@ class DispatcherThread implements Runnable {
   private long runningTimeSum;
   private long lastDispatchTime;
 
-  protected DispatcherThread(LogDispatcher logDispatcher, Peer receiver,
-      BlockingQueue<VotingEntry> logBlockingDeque, RateLimiter rateLimiter,
+  protected DispatcherThread(
+      LogDispatcher logDispatcher,
+      Peer receiver,
+      BlockingQueue<VotingEntry> logBlockingDeque,
+      RateLimiter rateLimiter,
       DispatcherGroup group) {
     this.logDispatcher = logDispatcher;
     this.receiver = receiver;
@@ -75,8 +81,8 @@ class DispatcherThread implements Runnable {
       long runningStart = 0;
       while (!Thread.interrupted()) {
         if (group.isDelayed()) {
-          if (logBlockingDeque.size() < logDispatcher.maxBatchSize &&
-          System.nanoTime() - lastDispatchTime < 1_000_000_000L) {
+          if (logBlockingDeque.size() < logDispatcher.maxBatchSize
+              && System.nanoTime() - lastDispatchTime < 1_000_000_000L) {
             // the follower is being delayed, if there is not enough requests, 
and it has
             // dispatched recently, wait for a while to get a larger batch
             Thread.sleep(100);
@@ -95,7 +101,7 @@ class DispatcherThread implements Runnable {
           }
         }
         long currTime = System.nanoTime();
-        idleTimeSum = currTime - idleStart;
+        idleTimeSum += currTime - idleStart;
         runningStart = currTime;
         if (logger.isDebugEnabled()) {
           logger.debug("Sending {} logs to {}", currBatch.size(), receiver);
@@ -110,22 +116,23 @@ class DispatcherThread implements Runnable {
 
         currTime = System.nanoTime();
         lastDispatchTime = currTime;
-        runningTimeSum = currTime - runningStart;
+        runningTimeSum += currTime - runningStart;
         idleStart = currTime;
 
         // thread too idle
-        if (idleTimeSum * 1.0 / (idleTimeSum + runningTimeSum) < 0.5 &&
-        runningTimeSum > 10_000_000_000L) {
+        if (idleTimeSum * 1.0 / (idleTimeSum + runningTimeSum) > 0.5
+            && runningTimeSum > 10_000_000_000L) {
           int remaining = group.getGroupThreadNum().decrementAndGet();
           if (remaining > 1) {
             logger.info("Dispatcher thread too idle");
+            group.getGroupThreadNum().incrementAndGet();
             break;
           } else {
             group.getGroupThreadNum().incrementAndGet();
           }
           // thread too busy
-        } else if (idleTimeSum * 1.0 / (idleTimeSum + runningTimeSum) > 0.5 &&
-            runningTimeSum > 10_000_000_000L) {
+        } else if (idleTimeSum * 1.0 / (idleTimeSum + runningTimeSum) < 0.1
+            && runningTimeSum > 10_000_000_000L) {
           int groupThreadNum = group.getGroupThreadNum().get();
           if (groupThreadNum < group.getMaxBindingThreadNum()) {
             group.addThread();
@@ -140,11 +147,14 @@ class DispatcherThread implements Runnable {
     } catch (Exception e) {
       logger.error("Unexpected error in log dispatcher", e);
     }
-    logger.info("Dispatcher exits, idle ratio: {}", idleTimeSum * 1.0 / 
(idleTimeSum + runningTimeSum));
+    logger.info(
+        "Dispatcher exits, idle ratio: {}, running time: {}ms, idle time: 
{}ms",
+        idleTimeSum * 1.0 / (idleTimeSum + runningTimeSum),
+        runningTimeSum / 1_000_000L,
+        idleTimeSum / 1_000_000L);
     group.getGroupThreadNum().decrementAndGet();
   }
 
-
   protected void serializeEntries() throws InterruptedException {
     for (VotingEntry request : currBatch) {
 
@@ -167,7 +177,9 @@ class DispatcherThread implements Runnable {
     }
     if (logger.isDebugEnabled()) {
       logger.debug(
-          "{}: append entries {} with {} logs", 
logDispatcher.member.getName(), receiver,
+          "{}: append entries {} with {} logs",
+          logDispatcher.member.getName(),
+          receiver,
           logList.size());
     }
   }
@@ -189,7 +201,9 @@ class DispatcherThread implements Runnable {
     }
     if (logger.isDebugEnabled()) {
       logger.debug(
-          "{}: append entries {} with {} logs", 
logDispatcher.member.getName(), receiver,
+          "{}: append entries {} with {} logs",
+          logDispatcher.member.getName(),
+          receiver,
           logList.size());
     }
   }
@@ -256,7 +270,7 @@ class DispatcherThread implements Runnable {
       }
 
       if (logDispatcher.getConfig().isUseFollowerLoadBalance()) {
-        FlowMonitorManager.INSTANCE.report(receiver, logSize);
+        FlowMonitorManager.INSTANCE.report(receiver.getEndpoint(), logSize);
       }
       rateLimiter.acquire((int) logSize);
     }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
index d63b4f9649..6640528ec6 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
@@ -19,24 +19,26 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log.dispatch;
 
-import static org.apache.iotdb.consensus.natraft.utils.NodeUtils.unionNodes;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.BlockingQueue;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
 import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
 import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
 import org.apache.iotdb.tsfile.compress.ICompressor;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.BlockingQueue;
+
+import static org.apache.iotdb.consensus.natraft.utils.NodeUtils.unionNodes;
+
 /**
  * A LogDispatcher serves a raft leader by queuing logs that the leader wants 
to send to its
  * followers and send the logs in an ordered manner so that the followers will 
not wait for previous
@@ -138,8 +140,6 @@ public class LogDispatcher {
     }
   }
 
-
-
   public void applyNewNodes() {
     allNodes = newNodes;
     newNodes = null;
@@ -180,4 +180,8 @@ public class LogDispatcher {
   public RaftMember getMember() {
     return member;
   }
+
+  public void stop() {
+    dispatcherGroupMap.forEach((p, g) -> g.close());
+  }
 }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
index d19c6d33d3..6dd088ec28 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
@@ -124,18 +124,7 @@ public class VotingLogList {
     }
 
     if (enableWeakAcceptance) {
-      int currNodeQuorumNum = votingEntry.currNodesQuorumNum();
-      int newNodeQuorumNum = votingEntry.newNodesQuorumNum();
-      int stronglyAcceptedNumByCurrNodes =
-          votingEntry.stronglyAcceptedNumByCurrNodes(stronglyAcceptedIndices);
-      int stronglyAcceptedNumByNewNodes =
-          votingEntry.stronglyAcceptedNumByNewNodes(stronglyAcceptedIndices);
-      int weaklyAcceptedNumByCurrNodes =
-          votingEntry.weaklyAcceptedNumByCurrNodes(stronglyAcceptedIndices);
-      int weaklyAcceptedNumByNewNodes =
-          votingEntry.weaklyAcceptedNumByNewNodes(stronglyAcceptedIndices);
-      if ((weaklyAcceptedNumByCurrNodes + stronglyAcceptedNumByCurrNodes) >= 
currNodeQuorumNum
-          && (weaklyAcceptedNumByNewNodes + stronglyAcceptedNumByNewNodes) >= 
newNodeQuorumNum) {
+      if (votingEntry.isWeaklyAccepted(stronglyAcceptedIndices)) {
         return AcceptedType.WEAKLY_ACCEPTED;
       }
     }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
index eb2888d610..5a996c41e7 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
@@ -20,13 +20,6 @@
 package org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
@@ -34,6 +27,7 @@ import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
 import org.apache.iotdb.consensus.natraft.protocol.RaftRole;
 import 
org.apache.iotdb.consensus.natraft.protocol.log.dispatch.DispatcherGroup;
 import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.LogDispatcher;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,7 +36,6 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java
index 42f1a2659f..de8a9f6e57 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java
@@ -49,6 +49,8 @@ public class RequestEntry extends Entry {
   @Override
   protected ByteBuffer serializeInternal() {
     PublicBAOS byteArrayOutputStream = new 
PublicBAOS(getDefaultSerializationBufferSize());
+    int requestSize = 0;
+    int requestPos = 0;
     try (DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutputStream)) {
       dataOutputStream.writeByte((byte) CLIENT_REQUEST.ordinal());
 
@@ -56,25 +58,20 @@ public class RequestEntry extends Entry {
       dataOutputStream.writeLong(getCurrLogTerm());
       dataOutputStream.writeLong(getPrevTerm());
 
-      ByteBuffer byteBuffer = request.serializeToByteBuffer();
-      byteBuffer.rewind();
-      dataOutputStream.writeInt(byteBuffer.remaining());
-      dataOutputStream.write(byteBuffer.array(), byteBuffer.arrayOffset(), 
byteBuffer.remaining());
+      requestPos = byteArrayOutputStream.size();
+      dataOutputStream.writeInt(0);
+      request.serializeTo(dataOutputStream);
+      requestSize = byteArrayOutputStream.size() - requestPos - 4;
     } catch (IOException e) {
       // unreachable
     }
 
-    return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
-  }
-
-  @Override
-  public void serialize(ByteBuffer buffer) {
-    buffer.put((byte) CLIENT_REQUEST.ordinal());
-    buffer.putLong(getCurrLogIndex());
-    buffer.putLong(getCurrLogTerm());
-    ByteBuffer byteBuffer = request.serializeToByteBuffer();
-    buffer.putInt(byteBuffer.remaining());
-    buffer.put(byteBuffer);
+    ByteBuffer wrap =
+        ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    wrap.position(requestPos);
+    wrap.putInt(requestSize);
+    wrap.position(0);
+    return wrap;
   }
 
   @Override
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
index de89657540..b2f0b824d9 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
@@ -278,6 +278,12 @@ public class Timer {
         META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
     LOG_DISPATCHER_LOG_BATCH_SIZE(
         LOG_DISPATCHER, "batch size", 1, true, 
META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    SERIALIZE_ENTRY(
+        LOG_DISPATCHER,
+        "serialize entry",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
     LOG_DISPATCHER_FROM_RECEIVE_TO_CREATE(
         LOG_DISPATCHER,
         "from receive to create",
@@ -356,6 +362,18 @@ public class Timer {
         TIME_SCALE,
         true,
         META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    RAFT_SENDER_LOG_FROM_CREATE_TO_NOTIFIED(
+        LOG_DISPATCHER,
+        "from create to notified",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    RAFT_SENDER_LOG_FROM_CREATE_TO_WAIT_APPEND_START(
+        LOG_DISPATCHER,
+        "from create to wait append start",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
     RAFT_SENDER_LOG_APPEND_WAIT(
         LOG_DISPATCHER,
         "wait for being appended",
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
index 0e85bd9461..55b869dc0d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
@@ -160,6 +160,11 @@ public abstract class PlanNode implements 
IConsensusRequest {
     }
   }
 
+  @Override
+  public void serializeTo(DataOutputStream outputStream) throws IOException {
+    serialize(outputStream);
+  }
+
   protected abstract void serializeAttributes(ByteBuffer byteBuffer);
 
   protected abstract void serializeAttributes(DataOutputStream stream) throws 
IOException;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
index 7fa6b8ba30..f801f9a290 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
@@ -460,8 +460,8 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue, ISche
 
   /** Serialize measurements or measurement schemas, ignoring failed time 
series */
   private void writeMeasurementsOrSchemas(DataOutputStream stream) throws 
IOException {
-    ReadWriteIOUtils.write(measurements.length - getFailedMeasurementNumber(), 
stream);
-    ReadWriteIOUtils.write((byte) (measurementSchemas != null ? 1 : 0), 
stream);
+    stream.writeInt(measurements.length - getFailedMeasurementNumber());
+    stream.write((byte) (measurementSchemas != null ? 1 : 0));
 
     for (int i = 0; i < measurements.length; i++) {
       // ignore failed partial insert
@@ -624,31 +624,31 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue, ISche
       case INT32:
         int[] intValues = (int[]) column;
         for (int j = 0; j < rowCount; j++) {
-          ReadWriteIOUtils.write(intValues[j], stream);
+          stream.writeInt(intValues[j]);
         }
         break;
       case INT64:
         long[] longValues = (long[]) column;
         for (int j = 0; j < rowCount; j++) {
-          ReadWriteIOUtils.write(longValues[j], stream);
+          stream.writeLong(longValues[j]);
         }
         break;
       case FLOAT:
         float[] floatValues = (float[]) column;
         for (int j = 0; j < rowCount; j++) {
-          ReadWriteIOUtils.write(floatValues[j], stream);
+          stream.writeFloat(floatValues[j]);
         }
         break;
       case DOUBLE:
         double[] doubleValues = (double[]) column;
         for (int j = 0; j < rowCount; j++) {
-          ReadWriteIOUtils.write(doubleValues[j], stream);
+          stream.writeDouble(doubleValues[j]);
         }
         break;
       case BOOLEAN:
         boolean[] boolValues = (boolean[]) column;
         for (int j = 0; j < rowCount; j++) {
-          ReadWriteIOUtils.write(BytesUtils.boolToByte(boolValues[j]), stream);
+          stream.write(BytesUtils.boolToByte(boolValues[j]));
         }
         break;
       case TEXT:


Reply via email to