This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/native_raft by this push:
     new 44c5248969e fix Status returned by EmptyStateMachine add applier 
thread num config optimize appending order in SlidingWindowLogAppender optimize 
buffer management in log serialization
44c5248969e is described below

commit 44c5248969e43f9fbafb688b36bd46a6f2d598bd
Author: Tian Jiang <[email protected]>
AuthorDate: Fri May 19 09:37:54 2023 +0800

    fix Status returned by EmptyStateMachine
    add applier thread num config
    optimize appending order in SlidingWindowLogAppender
    optimize buffer management in log serialization
---
 .../apache/iotdb/consensus/EmptyStateMachine.java  |  3 +-
 .../consensus/natraft/protocol/RaftConfig.java     | 14 +++++
 .../consensus/natraft/protocol/RaftMember.java     |  1 +
 .../log/appender/SlidingWindowLogAppender.java     | 71 ++++++++++++++++++----
 .../protocol/log/applier/AsyncLogApplier.java      |  8 +--
 .../protocol/log/dispatch/DispatcherGroup.java     | 20 +++---
 .../protocol/log/dispatch/DispatcherThread.java    | 23 +++----
 .../log/dispatch/flowcontrol/FlowBalancer.java     |  3 +-
 .../serialization/SyncLogDequeSerializer.java      | 55 +++++++++++------
 .../iotdb/consensus/natraft/utils/LogUtils.java    | 37 ++++++++---
 .../iotdb/consensus/natraft/utils/NodeReport.java  |  5 +-
 .../iotdb/consensus/natraft/utils/Timer.java       | 28 ++++++++-
 .../concurrent/dynamic/DynamicThreadGroup.java     | 14 ++++-
 13 files changed, 209 insertions(+), 73 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/EmptyStateMachine.java 
b/consensus/src/main/java/org/apache/iotdb/consensus/EmptyStateMachine.java
index daa8da958de..5111f820215 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/EmptyStateMachine.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/EmptyStateMachine.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.consensus;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import java.io.File;
 
@@ -37,7 +38,7 @@ public class EmptyStateMachine implements IStateMachine, 
IStateMachine.EventApi
 
   @Override
   public TSStatus write(IConsensusRequest IConsensusRequest) {
-    return new TSStatus(0);
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
 
   @Override
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
index cc0a44f4cd9..76864ba5795 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
@@ -52,6 +52,7 @@ public class RaftConfig {
   private long writeOperationTimeoutMS = 20_000L;
   private int logNumInBatch = 100;
   private int dispatcherBindingThreadNum = 16;
+  private int applierThreadNum = 16;
   private int followerLoadBalanceWindowsToUse = 1;
   private double followerLoadBalanceOverestimateFactor = 1.1;
   private int flowMonitorMaxWindowNum = 1000;
@@ -454,6 +455,14 @@ public class RaftConfig {
     this.logPersistCompressor = logPersistCompressor;
   }
 
+  public int getApplierThreadNum() {
+    return applierThreadNum;
+  }
+
+  public void setApplierThreadNum(int applierThreadNum) {
+    this.applierThreadNum = applierThreadNum;
+  }
+
   public void loadProperties(Properties properties) {
     logger.debug("Loading properties: {}", properties);
 
@@ -607,6 +616,11 @@ public class RaftConfig {
                 "dispatcher_binding_thread_num",
                 String.valueOf(this.getDispatcherBindingThreadNum()))));
 
+    this.setApplierThreadNum(
+        Integer.parseInt(
+            properties.getProperty(
+                "applier_thread_num", 
String.valueOf(this.getApplierThreadNum()))));
+
     this.setUseFollowerLoadBalance(
         Boolean.parseBoolean(
             properties.getProperty(
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index 1855e2cde11..1db0d383440 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -377,6 +377,7 @@ public class RaftMember {
       return;
     }
     setStopped(true);
+    logger.info("Member {} stopping", name);
 
     closeLogManager();
 
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
index 9702b4b461e..07486cb4edb 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
@@ -130,6 +130,7 @@ public class SlidingWindowLogAppender implements 
LogAppender {
       if ((logManager.getCommitLogIndex() - logManager.getAppliedIndex())
           <= config.getUnAppliedRaftLogNumForRejectThreshold()) {
         success = logManager.maybeAppend(logs);
+        Statistic.RAFT_RECEIVER_WINDOW_FLUSH_SIZE.add(logs.size());
         break;
       }
       try {
@@ -178,19 +179,8 @@ public class SlidingWindowLogAppender implements 
LogAppender {
           .setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
     }
 
-    AppendEntryResult result = null;
-    for (Entry entry : entries) {
-      long startTime = 
Statistic.RAFT_RECEIVER_APPEND_ONE_ENTRY.getOperationStartTime();
-      result = appendEntry(leaderCommit, entry);
-      
Statistic.RAFT_RECEIVER_APPEND_ONE_ENTRY.calOperationCostTimeFromStart(startTime);
-
-      if (result.status != Response.RESPONSE_AGREE
-          && result.status != Response.RESPONSE_STRONG_ACCEPT
-          && result.status != Response.RESPONSE_WEAK_ACCEPT) {
-        return result;
-      }
-    }
-
+    AppendEntryResult result;
+    result = appendEntries(leaderCommit, entries);
     return result;
   }
 
@@ -228,14 +218,69 @@ public class SlidingWindowLogAppender implements 
LogAppender {
         return result;
       }
     }
+    
Statistic.RAFT_RECEIVER_APPEND_ONE_ENTRY_SYNC.calOperationCostTimeFromStart(startTime);
+
+    if (appended) {
+      long startTime1 = 
Statistic.RAFT_RECEIVER_UPDATE_COMMIT_INDEX.getOperationStartTime();
+      result.setLastLogIndex(logManager.getLastEntryIndexUnsafe());
+      member.tryUpdateCommitIndex(member.getStatus().getTerm().get(), 
leaderCommit, -1);
+      
Statistic.RAFT_RECEIVER_UPDATE_COMMIT_INDEX.calOperationCostTimeFromStart(startTime1);
+    }
+    return result;
+  }
+
+  private AppendEntryResult appendEntries(long leaderCommit, List<Entry> 
entries) {
+    boolean appended = false;
+
+    AppendEntryResult result = new AppendEntryResult();
+    Collections.reverse(entries);
+    long startTime = 
Statistic.RAFT_RECEIVER_WAIT_FOR_WINDOW.getOperationStartTime();
+    synchronized (this) {
+      
Statistic.RAFT_RECEIVER_WAIT_FOR_WINDOW.calOperationCostTimeFromStart(startTime);
+      for (Entry entry : entries) {
+        long prevLogIndex = entry.getCurrLogIndex() - 1;
+        long entryStartTime = 
Statistic.RAFT_RECEIVER_WAIT_FOR_WINDOW.getOperationStartTime();
+        int windowPos = (int) (prevLogIndex - firstPosPrevIndex);
+        if (windowPos < 0) {
+          // the new entry may replace an appended entry
+          appended = logManager.maybeAppend(Collections.singletonList(entry));
+          moveWindowLeftward(-windowPos);
+          result.status = Response.RESPONSE_STRONG_ACCEPT;
+        } else if (windowPos < windowCapacity) {
+          // the new entry falls into the window
+          logWindow[windowPos] = entry;
+          if (windowLength < windowPos + 1) {
+            windowLength = windowPos + 1;
+          }
+          checkLog(windowPos);
+          if (windowPos == 0) {
+            appended = flushWindow(result);
+            
Statistic.RAFT_FOLLOWER_STRONG_ACCEPT.calOperationCostTimeFromStart(entryStartTime);
+          } else {
+            result.status = Response.RESPONSE_WEAK_ACCEPT;
+            
Statistic.RAFT_FOLLOWER_WEAK_ACCEPT.calOperationCostTimeFromStart(entryStartTime);
+          }
+        } else {
+          result.setStatus(Response.RESPONSE_OUT_OF_WINDOW);
+          
result.setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
+          return result;
+        }
+        
Statistic.RAFT_RECEIVER_APPEND_ONE_ENTRY_SYNC.calOperationCostTimeFromStart(entryStartTime);
+      }
+    }
+    
Statistic.RAFT_RECEIVER_APPEND_ENTRIES.calOperationCostTimeFromStart(startTime);
 
     if (appended) {
+      long startTime1 = 
Statistic.RAFT_RECEIVER_UPDATE_COMMIT_INDEX.getOperationStartTime();
       result.setLastLogIndex(logManager.getLastEntryIndexUnsafe());
       member.tryUpdateCommitIndex(member.getStatus().getTerm().get(), 
leaderCommit, -1);
+      
Statistic.RAFT_RECEIVER_UPDATE_COMMIT_INDEX.calOperationCostTimeFromStart(startTime1);
     }
     return result;
   }
 
+  public void windowInsertion() {}
+
   @Override
   public void reset() {
     this.firstPosPrevIndex = logManager.getLastLogIndex();
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/AsyncLogApplier.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/AsyncLogApplier.java
index 91f8b6e1e97..10164b81f18 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/AsyncLogApplier.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/AsyncLogApplier.java
@@ -38,7 +38,6 @@ import java.util.function.Consumer;
 public class AsyncLogApplier implements LogApplier {
 
   private static final Logger logger = 
LoggerFactory.getLogger(AsyncLogApplier.class);
-  private static final int CONCURRENT_CONSUMER_NUM = 16;
   private LogApplier embeddedApplier;
   private DataLogConsumer[] consumers;
   private ExecutorService consumerPool;
@@ -52,9 +51,10 @@ public class AsyncLogApplier implements LogApplier {
 
   public AsyncLogApplier(LogApplier embeddedApplier, String name, RaftConfig 
config) {
     this.embeddedApplier = embeddedApplier;
-    consumers = new DataLogConsumer[CONCURRENT_CONSUMER_NUM];
+    consumers = new DataLogConsumer[config.getApplierThreadNum()];
     consumerPool =
-        IoTDBThreadPoolFactory.newFixedThreadPool(CONCURRENT_CONSUMER_NUM, 
"ApplierThread");
+        IoTDBThreadPoolFactory.newFixedThreadPool(
+            config.getApplierThreadNum(), "ApplierThread-" + name);
     for (int i = 0; i < consumers.length; i++) {
       consumers[i] = new DataLogConsumer(name + "-" + i, 
config.getMaxNumOfLogsInMem());
       consumerPool.submit(consumers[i]);
@@ -96,7 +96,7 @@ public class AsyncLogApplier implements LogApplier {
   }
 
   private void provideLogToConsumers(PartialPath planKey, Entry e) {
-    consumers[Math.abs(planKey.hashCode()) % 
CONCURRENT_CONSUMER_NUM].accept(e);
+    consumers[Math.abs(planKey.hashCode()) % consumers.length].accept(e);
   }
 
   private void drainConsumers() {
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
index 23614011a29..85159c80290 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
@@ -39,11 +39,12 @@ public class DispatcherGroup {
   private final Peer peer;
   private final BlockingQueue<VotingEntry> entryQueue;
   private boolean nodeEnabled;
-  private final RateLimiter rateLimiter;
+  private RateLimiter rateLimiter;
   private final ExecutorService dispatcherThreadPool;
   private final LogDispatcher logDispatcher;
   private boolean delayed;
   private DynamicThreadGroup dynamicThreadGroup;
+  private String name;
 
   public DispatcherGroup(Peer peer, LogDispatcher logDispatcher, int 
maxBindingThreadNum) {
     this.logDispatcher = logDispatcher;
@@ -52,11 +53,12 @@ public class DispatcherGroup {
     this.nodeEnabled = true;
     this.rateLimiter = RateLimiter.create(Double.MAX_VALUE);
     this.dispatcherThreadPool = createPool(peer, 
logDispatcher.getMember().getName());
+    this.name = logDispatcher.member.getName() + "-" + peer;
     this.dynamicThreadGroup =
         new DynamicThreadGroup(
-            logDispatcher.member.getName() + "-" + peer,
+            name,
             dispatcherThreadPool::submit,
-            () -> newDispatcherThread(peer, entryQueue, rateLimiter),
+            () -> newDispatcherThread(peer, entryQueue),
             maxBindingThreadNum / 4,
             maxBindingThreadNum);
     this.dynamicThreadGroup.init();
@@ -71,14 +73,14 @@ public class DispatcherGroup {
     }
   }
 
-  DispatcherThread newDispatcherThread(
-      Peer node, BlockingQueue<VotingEntry> logBlockingQueue, RateLimiter 
rateLimiter) {
-    return new DispatcherThread(logDispatcher, node, logBlockingQueue, 
rateLimiter, this);
+  DispatcherThread newDispatcherThread(Peer node, BlockingQueue<VotingEntry> 
logBlockingQueue) {
+    return new DispatcherThread(logDispatcher, node, logBlockingQueue, this);
   }
 
   public void updateRate(double rate) {
-    rateLimiter.setRate(rate);
+    rateLimiter = RateLimiter.create(rate);
     delayed = rate != Double.MAX_VALUE;
+    logger.info("{} is delayed: {}", name, delayed);
   }
 
   ExecutorService createPool(Peer node, String name) {
@@ -126,4 +128,8 @@ public class DispatcherGroup {
       entryQueue.notifyAll();
     }
   }
+
+  public RateLimiter getRateLimiter() {
+    return rateLimiter;
+  }
 }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
index ec86100180d..3fdb6e1059d 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
@@ -31,8 +31,8 @@ import 
org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
 import org.apache.iotdb.consensus.raft.thrift.AppendCompressedEntriesRequest;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
-import 
org.apache.ratis.thirdparty.com.google.common.util.concurrent.RateLimiter;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,6 +42,7 @@ import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
 
 class DispatcherThread extends DynamicThread {
 
@@ -51,33 +52,27 @@ class DispatcherThread extends DynamicThread {
   Peer receiver;
   private final BlockingQueue<VotingEntry> logBlockingDeque;
   protected List<VotingEntry> currBatch = new ArrayList<>();
-  private final String baseName;
-  private final RateLimiter rateLimiter;
   private final DispatcherGroup group;
   private long lastDispatchTime;
+  private PublicBAOS batchLogBuffer = new PublicBAOS(64 * 1024);
+  private AtomicReference<byte[]> compressionBuffer = new 
AtomicReference<>(new byte[64 * 1024]);
 
   protected DispatcherThread(
       LogDispatcher logDispatcher,
       Peer receiver,
       BlockingQueue<VotingEntry> logBlockingDeque,
-      RateLimiter rateLimiter,
       DispatcherGroup group) {
     super(group.getDynamicThreadGroup());
     this.logDispatcher = logDispatcher;
     this.receiver = receiver;
     this.logBlockingDeque = logBlockingDeque;
-    this.rateLimiter = rateLimiter;
     this.group = group;
-    this.baseName = "LogDispatcher-" + logDispatcher.member.getName() + "-" + 
receiver;
   }
 
   @Override
   public void runInternal() {
-    if (logger.isDebugEnabled()) {
-      Thread.currentThread().setName(baseName);
-    }
     try {
-      while (!Thread.interrupted()) {
+      while (!Thread.interrupted() && 
!group.getDynamicThreadGroup().isStopped()) {
         if (group.isDelayed()) {
           if (logBlockingDeque.size() < logDispatcher.maxBatchSize
               && System.nanoTime() - lastDispatchTime < 1_000_000_000L) {
@@ -97,7 +92,7 @@ class DispatcherThread extends DynamicThread {
             if (group.getLogDispatcher().getMember().isLeader()) {
               logBlockingDeque.wait(1000);
             } else {
-              logBlockingDeque.wait(0);
+              logBlockingDeque.wait(5000);
             }
             continue;
           }
@@ -208,7 +203,9 @@ class DispatcherThread extends DynamicThread {
     request.setLeaderId(logDispatcher.member.getThisNode().getNodeId());
     
request.setLeaderCommit(logDispatcher.member.getLogManager().getCommitLogIndex());
     request.setTerm(logDispatcher.member.getStatus().getTerm().get());
-    request.setEntryBytes(LogUtils.compressEntries(logList, 
logDispatcher.compressor, request));
+    request.setEntryBytes(
+        LogUtils.compressEntries(
+            logList, logDispatcher.compressor, request, batchLogBuffer, 
compressionBuffer));
     request.setCompressionType((byte) 
logDispatcher.compressor.getType().ordinal());
     return request;
   }
@@ -252,7 +249,7 @@ class DispatcherThread extends DynamicThread {
       if (logDispatcher.getConfig().isUseFollowerLoadBalance()) {
         FlowMonitorManager.INSTANCE.report(receiver.getEndpoint(), logSize);
       }
-      rateLimiter.acquire((int) logSize);
+      group.getRateLimiter().acquire((int) logSize);
     }
   }
 
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
index a5b4296679f..75256a6030c 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
@@ -107,6 +107,7 @@ public class FlowBalancer {
         burstWindowNum++;
       }
     }
+
     double assumedFlow =
         latestWindows.stream().mapToLong(w -> w.sum).sum()
             * 1.0
@@ -131,7 +132,7 @@ public class FlowBalancer {
             entry.getValue().averageFlow(windowsToUse),
             inBurst);
       }
-    } else if (burstWindowNum < latestWindows.size() / 2 && inBurst) {
+    } else if (burstWindowNum <= latestWindows.size() / 2 && inBurst) {
       exitBurst(followerNum, nodesRate, followers);
       logDispatcher.updateRateLimiter();
       for (Entry<TEndPoint, FlowMonitor> entry : 
flowMonitorManager.getMonitorMap().entrySet()) {
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
index f6ccca1ef1f..a3a2ea890c2 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
 import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
 import org.apache.iotdb.consensus.natraft.protocol.log.LogParser;
 import 
org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization.SyncLogDequeSerializer.VersionController.SimpleFileVersionController;
+import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
 import org.apache.iotdb.tsfile.compress.ICompressor;
 import org.apache.iotdb.tsfile.compress.IUnCompressor;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -103,6 +104,7 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
   private ByteBuffer logIndexBuffer;
   private ByteBuffer flushingLogDataBuffer;
   private ByteBuffer flushingLogIndexBuffer;
+  private byte[] compressionBuffer;
 
   private long offsetOfTheCurrentLogDataOutputStream = 0;
 
@@ -164,7 +166,8 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
     logDataBuffer = ByteBuffer.allocate(config.getRaftLogBufferSize());
     logIndexBuffer = ByteBuffer.allocate(config.getRaftLogBufferSize());
     flushingLogDataBuffer = ByteBuffer.allocate(config.getRaftLogBufferSize());
-    flushingLogIndexBuffer = 
ByteBuffer.allocate(config.getRaftLogBufferSize());
+    flushingLogIndexBuffer = ByteBuffer.allocate(16);
+    compressionBuffer = new byte[64 * 1024];
 
     maxNumberOfLogsPerFetchOnDisk = config.getMaxNumberOfLogsPerFetchOnDisk();
     maxRaftLogIndexSizeInMemory = config.getMaxRaftLogIndexSizeInMemory();
@@ -298,22 +301,26 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
    * @param entries logs to put to buffer
    */
   private void putLogs(List<Entry> entries) {
-    for (Entry log : entries) {
+    long startTime = Statistic.RAFT_PUT_LOG.getOperationStartTime();
+    for (Entry entry : entries) {
+      long entryStartTime = Statistic.RAFT_PUT_ENTRY.getOperationStartTime();
       logDataBuffer.mark();
       logIndexBuffer.mark();
-      ByteBuffer logData = log.serialize();
-      try {
-        logDataBuffer.put(logData);
-        lastLogIndex = log.getCurrLogIndex();
-      } catch (BufferOverflowException e) {
-        logger.debug("Raft log buffer overflow!");
-        logDataBuffer.reset();
-        logIndexBuffer.reset();
-        flushLogBuffer(true);
-        logDataBuffer.put(logData);
-        lastLogIndex = log.getCurrLogIndex();
-      }
+      putOneEntry(entry);
+      Statistic.RAFT_PUT_ENTRY.calOperationCostTimeFromStart(entryStartTime);
+    }
+    Statistic.RAFT_PUT_LOG.calOperationCostTimeFromStart(startTime);
+  }
+
+  private void putOneEntry(Entry entry) {
+    ByteBuffer logData = entry.serialize();
+    if (logData.remaining() <= logDataBuffer.remaining()) {
+      logDataBuffer.put(logData);
+    } else {
+      flushLogBuffer(true);
+      logDataBuffer.put(logData);
     }
+    lastLogIndex = entry.getCurrLogIndex();
   }
 
   private void checkCloseCurrentFile(long fileEndIndex) {
@@ -423,15 +430,24 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
     try {
       checkStream();
       // 1. write to the log data file
-      byte[] compressed =
-          compressor.compress(flushingLogDataBuffer.array(), 0, 
flushingLogDataBuffer.position());
-      ReadWriteIOUtils.write(compressed.length, currentLogDataOutputStream);
+      int maxBytesForCompression =
+          
compressor.getMaxBytesForCompression(flushingLogDataBuffer.position());
+      if (maxBytesForCompression > compressionBuffer.length) {
+        compressionBuffer = new byte[maxBytesForCompression];
+      }
+      int compressedLength =
+          compressor.compress(
+              flushingLogDataBuffer.array(),
+              0,
+              flushingLogDataBuffer.position(),
+              compressionBuffer);
+      ReadWriteIOUtils.write(compressedLength, currentLogDataOutputStream);
       logIndexOffsetList.add(new Pair<>(lastLogIndex, 
offsetOfTheCurrentLogDataOutputStream));
       flushingLogIndexBuffer.putLong(lastLogIndex);
       flushingLogIndexBuffer.putLong(offsetOfTheCurrentLogDataOutputStream);
-      offsetOfTheCurrentLogDataOutputStream += Integer.BYTES + 
compressed.length;
+      offsetOfTheCurrentLogDataOutputStream += Integer.BYTES + 
compressedLength;
 
-      currentLogDataOutputStream.write(compressed);
+      currentLogDataOutputStream.write(compressionBuffer, 0, compressedLength);
       ReadWriteIOUtils.writeWithoutSize(
           logIndexBuffer, 0, logIndexBuffer.position(), 
currentLogIndexOutputStream);
       if (config.getFlushRaftLogThreshold() == 0) {
@@ -945,6 +961,7 @@ public class SyncLogDequeSerializer implements 
StableEntryManager {
       recoverMetaFile();
       meta = new LogManagerMeta();
       createNewLogFile(logDir, firstLogIndex);
+      this.persistedLogIndex = commitIndex;
       logger.info("{}, clean all logs success, the new firstLogIndex={}", 
this, firstLogIndex);
     } catch (IOException e) {
       logger.error("clear all logs failed,", e);
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
index fc4a576a426..6cfbb9f7931 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
@@ -41,6 +41,7 @@ import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class LogUtils {
 
@@ -79,9 +80,11 @@ public class LogUtils {
   public static ByteBuffer compressEntries(
       List<ByteBuffer> entryByteList,
       ICompressor compressor,
-      AppendCompressedEntriesRequest request) {
-    PublicBAOS baos = new PublicBAOS(entryByteList.size() * 16 * 1024);
-    DataOutputStream dataOutputStream = new DataOutputStream(baos);
+      AppendCompressedEntriesRequest request,
+      PublicBAOS batchLogBuffer,
+      AtomicReference<byte[]> compressionBuffer) {
+    batchLogBuffer.reset();
+    DataOutputStream dataOutputStream = new DataOutputStream(batchLogBuffer);
     try {
       dataOutputStream.writeInt(entryByteList.size());
       for (ByteBuffer byteBuffer : entryByteList) {
@@ -91,11 +94,29 @@ public class LogUtils {
             byteBuffer.arrayOffset() + byteBuffer.position(),
             byteBuffer.remaining());
       }
-      Statistic.LOG_DISPATCHER_RAW_SIZE.add(baos.size());
-      request.setUncompressedSize(baos.size());
-      byte[] compressed = compressor.compress(baos.getBuf(), 0, baos.size());
-      Statistic.LOG_DISPATCHER_COMPRESSED_SIZE.add(compressed.length);
-      return ByteBuffer.wrap(compressed);
+      Statistic.LOG_DISPATCHER_RAW_SIZE.add(batchLogBuffer.size());
+      request.setUncompressedSize(batchLogBuffer.size());
+
+      byte[] compressed;
+      int compressedSize;
+      if (compressionBuffer == null) {
+        compressed = compressor.compress(batchLogBuffer.getBuf(), 0, 
batchLogBuffer.size());
+        compressedSize = compressed.length;
+      } else {
+        compressed = compressionBuffer.get();
+        int maxBytesForCompression = 
compressor.getMaxBytesForCompression(batchLogBuffer.size());
+        if (compressed.length < maxBytesForCompression) {
+          compressed = new byte[maxBytesForCompression];
+          compressionBuffer.set(compressed);
+        }
+        compressedSize =
+            compressor.compress(batchLogBuffer.getBuf(), 0, 
batchLogBuffer.size(), compressed);
+      }
+
+      Statistic.LOG_DISPATCHER_COMPRESSED_SIZE.add(compressedSize);
+      ByteBuffer wrap = ByteBuffer.wrap(compressed);
+      wrap.limit(compressedSize);
+      return wrap;
     } catch (IOException e) {
       logger.warn("Failed to compress entries", e);
     }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeReport.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeReport.java
index ebbd5a61885..42393edab6b 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeReport.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeReport.java
@@ -52,6 +52,7 @@ public class NodeReport {
     for (RaftMemberReport memberReport : memberReports) {
       stringBuilder.append(memberReport).append(System.lineSeparator());
     }
+    stringBuilder.append(", \n timer: ").append(Timer.Statistic.getReport());
     return stringBuilder.toString();
   }
 
@@ -101,7 +102,6 @@ public class NodeReport {
 
     @Override
     public String toString() {
-      String transportCompressionReport = "";
       return "RaftReport {\n"
           + "character="
           + character
@@ -128,9 +128,6 @@ public class NodeReport {
           + "ms ago"
           + ", logIncrement="
           + (lastLogIndex - prevLastLogIndex)
-          + transportCompressionReport
-          + ", \n timer: "
-          + Timer.Statistic.getReport()
           + '}';
     }
 
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
index b2f0b824d97..f05424d7066 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
@@ -220,18 +220,42 @@ public class Timer {
         TIME_SCALE,
         true,
         META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    RAFT_RECEIVER_WINDOW_FLUSH_SIZE(
+        RAFT_MEMBER_RECEIVER,
+        "receiver window flush size",
+        1,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
     RAFT_RECEIVER_APPEND_INTERNAL(
         RAFT_MEMBER_RECEIVER,
         "append entry (internal)",
         TIME_SCALE,
         true,
         META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    RAFT_RECEIVER_APPEND_ENTRIES(
+        RAFT_MEMBER_RECEIVER,
+        "receiver append entries",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
     RAFT_RECEIVER_APPEND_ONE_ENTRY(
         RAFT_MEMBER_RECEIVER,
-        "receiver append one entries",
+        "receiver append one entry",
         TIME_SCALE,
         true,
         META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    RAFT_RECEIVER_APPEND_ONE_ENTRY_SYNC(
+        RAFT_MEMBER_RECEIVER,
+        "receiver append one entry (sync)",
+        TIME_SCALE,
+        true,
+        RAFT_RECEIVER_APPEND_ONE_ENTRY),
+    RAFT_RECEIVER_UPDATE_COMMIT_INDEX(
+        RAFT_MEMBER_RECEIVER,
+        "receiver update commit index",
+        TIME_SCALE,
+        true,
+        RAFT_RECEIVER_APPEND_ONE_ENTRY),
     RAFT_RECEIVER_APPEND_ENTRY(
         RAFT_MEMBER_RECEIVER,
         "append entrys",
@@ -420,6 +444,8 @@ public class Timer {
     RAFT_SENDER_OOW(RAFT_MEMBER_SENDER, "out of window", 1, true, ROOT),
     RAFT_LEADER_WEAK_ACCEPT(RAFT_MEMBER_SENDER, "leader weak accept", 1, true, 
ROOT),
     RAFT_FOLLOWER_WEAK_ACCEPT(RAFT_MEMBER_SENDER, "follower weak accept", 
TIME_SCALE, true, ROOT),
+    RAFT_PUT_LOG(RAFT_MEMBER_SENDER, "put logs", TIME_SCALE, true, ROOT),
+    RAFT_PUT_ENTRY(RAFT_MEMBER_SENDER, "put one entry", TIME_SCALE, true, 
ROOT),
     RAFT_FOLLOWER_STRONG_ACCEPT(
         RAFT_MEMBER_SENDER, "follower strong accept", TIME_SCALE, true, ROOT),
     RAFT_CONCURRENT_SENDER(RAFT_MEMBER_SENDER, "concurrent sender", 1, true, 
ROOT),
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
index f326f830029..cc7434dac50 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
@@ -22,6 +22,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -39,6 +40,7 @@ public class DynamicThreadGroup {
   private int minThreadCnt;
   private int maxThreadCnt;
   private Map<DynamicThread, Future<?>> threadFutureMap = new 
ConcurrentHashMap<>();
+  private boolean stopped;
 
   public DynamicThreadGroup(
       String name,
@@ -101,16 +103,24 @@ public class DynamicThreadGroup {
 
   public void cancelAll() {
     threadFutureMap.forEach((t, f) -> f.cancel(true));
-    threadFutureMap.clear();
     threadCnt.set(0);
+    stopped = true;
   }
 
   public void join() throws ExecutionException, InterruptedException {
     for (Future<?> future : threadFutureMap.values()) {
-      future.get();
+      try {
+        future.get();
+      } catch (CancellationException e) {
+        // ignore
+      }
     }
   }
 
+  public boolean isStopped() {
+    return stopped;
+  }
+
   @Override
   public String toString() {
     return name;

Reply via email to