This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/native_raft by this push:
new 44c5248969e fix Status returned by EmptyStateMachine add applier
thread num config optimize appending order in SlidingWindowLogAppender optimize
buffer management in log serialization
44c5248969e is described below
commit 44c5248969e43f9fbafb688b36bd46a6f2d598bd
Author: Tian Jiang <[email protected]>
AuthorDate: Fri May 19 09:37:54 2023 +0800
fix Status returned by EmptyStateMachine
add applier thread num config
optimize appending order in SlidingWindowLogAppender
optimize buffer management in log serialization
---
.../apache/iotdb/consensus/EmptyStateMachine.java | 3 +-
.../consensus/natraft/protocol/RaftConfig.java | 14 +++++
.../consensus/natraft/protocol/RaftMember.java | 1 +
.../log/appender/SlidingWindowLogAppender.java | 71 ++++++++++++++++++----
.../protocol/log/applier/AsyncLogApplier.java | 8 +--
.../protocol/log/dispatch/DispatcherGroup.java | 20 +++---
.../protocol/log/dispatch/DispatcherThread.java | 23 +++----
.../log/dispatch/flowcontrol/FlowBalancer.java | 3 +-
.../serialization/SyncLogDequeSerializer.java | 55 +++++++++++------
.../iotdb/consensus/natraft/utils/LogUtils.java | 37 ++++++++---
.../iotdb/consensus/natraft/utils/NodeReport.java | 5 +-
.../iotdb/consensus/natraft/utils/Timer.java | 28 ++++++++-
.../concurrent/dynamic/DynamicThreadGroup.java | 14 ++++-
13 files changed, 209 insertions(+), 73 deletions(-)
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/EmptyStateMachine.java
b/consensus/src/main/java/org/apache/iotdb/consensus/EmptyStateMachine.java
index daa8da958de..5111f820215 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/EmptyStateMachine.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/EmptyStateMachine.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.consensus;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.rpc.TSStatusCode;
import java.io.File;
@@ -37,7 +38,7 @@ public class EmptyStateMachine implements IStateMachine,
IStateMachine.EventApi
@Override
public TSStatus write(IConsensusRequest IConsensusRequest) {
- return new TSStatus(0);
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
@Override
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
index cc0a44f4cd9..76864ba5795 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
@@ -52,6 +52,7 @@ public class RaftConfig {
private long writeOperationTimeoutMS = 20_000L;
private int logNumInBatch = 100;
private int dispatcherBindingThreadNum = 16;
+ private int applierThreadNum = 16;
private int followerLoadBalanceWindowsToUse = 1;
private double followerLoadBalanceOverestimateFactor = 1.1;
private int flowMonitorMaxWindowNum = 1000;
@@ -454,6 +455,14 @@ public class RaftConfig {
this.logPersistCompressor = logPersistCompressor;
}
+ public int getApplierThreadNum() {
+ return applierThreadNum;
+ }
+
+ public void setApplierThreadNum(int applierThreadNum) {
+ this.applierThreadNum = applierThreadNum;
+ }
+
public void loadProperties(Properties properties) {
logger.debug("Loading properties: {}", properties);
@@ -607,6 +616,11 @@ public class RaftConfig {
"dispatcher_binding_thread_num",
String.valueOf(this.getDispatcherBindingThreadNum()))));
+ this.setApplierThreadNum(
+ Integer.parseInt(
+ properties.getProperty(
+ "applier_thread_num",
String.valueOf(this.getApplierThreadNum()))));
+
this.setUseFollowerLoadBalance(
Boolean.parseBoolean(
properties.getProperty(
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index 1855e2cde11..1db0d383440 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -377,6 +377,7 @@ public class RaftMember {
return;
}
setStopped(true);
+ logger.info("Member {} stopping", name);
closeLogManager();
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
index 9702b4b461e..07486cb4edb 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
@@ -130,6 +130,7 @@ public class SlidingWindowLogAppender implements
LogAppender {
if ((logManager.getCommitLogIndex() - logManager.getAppliedIndex())
<= config.getUnAppliedRaftLogNumForRejectThreshold()) {
success = logManager.maybeAppend(logs);
+ Statistic.RAFT_RECEIVER_WINDOW_FLUSH_SIZE.add(logs.size());
break;
}
try {
@@ -178,19 +179,8 @@ public class SlidingWindowLogAppender implements
LogAppender {
.setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
}
- AppendEntryResult result = null;
- for (Entry entry : entries) {
- long startTime =
Statistic.RAFT_RECEIVER_APPEND_ONE_ENTRY.getOperationStartTime();
- result = appendEntry(leaderCommit, entry);
-
Statistic.RAFT_RECEIVER_APPEND_ONE_ENTRY.calOperationCostTimeFromStart(startTime);
-
- if (result.status != Response.RESPONSE_AGREE
- && result.status != Response.RESPONSE_STRONG_ACCEPT
- && result.status != Response.RESPONSE_WEAK_ACCEPT) {
- return result;
- }
- }
-
+ AppendEntryResult result;
+ result = appendEntries(leaderCommit, entries);
return result;
}
@@ -228,14 +218,69 @@ public class SlidingWindowLogAppender implements
LogAppender {
return result;
}
}
+
Statistic.RAFT_RECEIVER_APPEND_ONE_ENTRY_SYNC.calOperationCostTimeFromStart(startTime);
+
+ if (appended) {
+ long startTime1 =
Statistic.RAFT_RECEIVER_UPDATE_COMMIT_INDEX.getOperationStartTime();
+ result.setLastLogIndex(logManager.getLastEntryIndexUnsafe());
+ member.tryUpdateCommitIndex(member.getStatus().getTerm().get(),
leaderCommit, -1);
+
Statistic.RAFT_RECEIVER_UPDATE_COMMIT_INDEX.calOperationCostTimeFromStart(startTime1);
+ }
+ return result;
+ }
+
+ private AppendEntryResult appendEntries(long leaderCommit, List<Entry>
entries) {
+ boolean appended = false;
+
+ AppendEntryResult result = new AppendEntryResult();
+ Collections.reverse(entries);
+ long startTime =
Statistic.RAFT_RECEIVER_WAIT_FOR_WINDOW.getOperationStartTime();
+ synchronized (this) {
+
Statistic.RAFT_RECEIVER_WAIT_FOR_WINDOW.calOperationCostTimeFromStart(startTime);
+ for (Entry entry : entries) {
+ long prevLogIndex = entry.getCurrLogIndex() - 1;
+ long entryStartTime =
Statistic.RAFT_RECEIVER_WAIT_FOR_WINDOW.getOperationStartTime();
+ int windowPos = (int) (prevLogIndex - firstPosPrevIndex);
+ if (windowPos < 0) {
+ // the new entry may replace an appended entry
+ appended = logManager.maybeAppend(Collections.singletonList(entry));
+ moveWindowLeftward(-windowPos);
+ result.status = Response.RESPONSE_STRONG_ACCEPT;
+ } else if (windowPos < windowCapacity) {
+ // the new entry falls into the window
+ logWindow[windowPos] = entry;
+ if (windowLength < windowPos + 1) {
+ windowLength = windowPos + 1;
+ }
+ checkLog(windowPos);
+ if (windowPos == 0) {
+ appended = flushWindow(result);
+
Statistic.RAFT_FOLLOWER_STRONG_ACCEPT.calOperationCostTimeFromStart(entryStartTime);
+ } else {
+ result.status = Response.RESPONSE_WEAK_ACCEPT;
+
Statistic.RAFT_FOLLOWER_WEAK_ACCEPT.calOperationCostTimeFromStart(entryStartTime);
+ }
+ } else {
+ result.setStatus(Response.RESPONSE_OUT_OF_WINDOW);
+
result.setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
+ return result;
+ }
+
Statistic.RAFT_RECEIVER_APPEND_ONE_ENTRY_SYNC.calOperationCostTimeFromStart(entryStartTime);
+ }
+ }
+
Statistic.RAFT_RECEIVER_APPEND_ENTRIES.calOperationCostTimeFromStart(startTime);
if (appended) {
+ long startTime1 =
Statistic.RAFT_RECEIVER_UPDATE_COMMIT_INDEX.getOperationStartTime();
result.setLastLogIndex(logManager.getLastEntryIndexUnsafe());
member.tryUpdateCommitIndex(member.getStatus().getTerm().get(),
leaderCommit, -1);
+
Statistic.RAFT_RECEIVER_UPDATE_COMMIT_INDEX.calOperationCostTimeFromStart(startTime1);
}
return result;
}
+ public void windowInsertion() {}
+
@Override
public void reset() {
this.firstPosPrevIndex = logManager.getLastLogIndex();
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/AsyncLogApplier.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/AsyncLogApplier.java
index 91f8b6e1e97..10164b81f18 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/AsyncLogApplier.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/AsyncLogApplier.java
@@ -38,7 +38,6 @@ import java.util.function.Consumer;
public class AsyncLogApplier implements LogApplier {
private static final Logger logger =
LoggerFactory.getLogger(AsyncLogApplier.class);
- private static final int CONCURRENT_CONSUMER_NUM = 16;
private LogApplier embeddedApplier;
private DataLogConsumer[] consumers;
private ExecutorService consumerPool;
@@ -52,9 +51,10 @@ public class AsyncLogApplier implements LogApplier {
public AsyncLogApplier(LogApplier embeddedApplier, String name, RaftConfig
config) {
this.embeddedApplier = embeddedApplier;
- consumers = new DataLogConsumer[CONCURRENT_CONSUMER_NUM];
+ consumers = new DataLogConsumer[config.getApplierThreadNum()];
consumerPool =
- IoTDBThreadPoolFactory.newFixedThreadPool(CONCURRENT_CONSUMER_NUM,
"ApplierThread");
+ IoTDBThreadPoolFactory.newFixedThreadPool(
+ config.getApplierThreadNum(), "ApplierThread-" + name);
for (int i = 0; i < consumers.length; i++) {
consumers[i] = new DataLogConsumer(name + "-" + i,
config.getMaxNumOfLogsInMem());
consumerPool.submit(consumers[i]);
@@ -96,7 +96,7 @@ public class AsyncLogApplier implements LogApplier {
}
private void provideLogToConsumers(PartialPath planKey, Entry e) {
- consumers[Math.abs(planKey.hashCode()) %
CONCURRENT_CONSUMER_NUM].accept(e);
+ consumers[Math.abs(planKey.hashCode()) % consumers.length].accept(e);
}
private void drainConsumers() {
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
index 23614011a29..85159c80290 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
@@ -39,11 +39,12 @@ public class DispatcherGroup {
private final Peer peer;
private final BlockingQueue<VotingEntry> entryQueue;
private boolean nodeEnabled;
- private final RateLimiter rateLimiter;
+ private RateLimiter rateLimiter;
private final ExecutorService dispatcherThreadPool;
private final LogDispatcher logDispatcher;
private boolean delayed;
private DynamicThreadGroup dynamicThreadGroup;
+ private String name;
public DispatcherGroup(Peer peer, LogDispatcher logDispatcher, int
maxBindingThreadNum) {
this.logDispatcher = logDispatcher;
@@ -52,11 +53,12 @@ public class DispatcherGroup {
this.nodeEnabled = true;
this.rateLimiter = RateLimiter.create(Double.MAX_VALUE);
this.dispatcherThreadPool = createPool(peer,
logDispatcher.getMember().getName());
+ this.name = logDispatcher.member.getName() + "-" + peer;
this.dynamicThreadGroup =
new DynamicThreadGroup(
- logDispatcher.member.getName() + "-" + peer,
+ name,
dispatcherThreadPool::submit,
- () -> newDispatcherThread(peer, entryQueue, rateLimiter),
+ () -> newDispatcherThread(peer, entryQueue),
maxBindingThreadNum / 4,
maxBindingThreadNum);
this.dynamicThreadGroup.init();
@@ -71,14 +73,14 @@ public class DispatcherGroup {
}
}
- DispatcherThread newDispatcherThread(
- Peer node, BlockingQueue<VotingEntry> logBlockingQueue, RateLimiter
rateLimiter) {
- return new DispatcherThread(logDispatcher, node, logBlockingQueue,
rateLimiter, this);
+ DispatcherThread newDispatcherThread(Peer node, BlockingQueue<VotingEntry>
logBlockingQueue) {
+ return new DispatcherThread(logDispatcher, node, logBlockingQueue, this);
}
public void updateRate(double rate) {
- rateLimiter.setRate(rate);
+ rateLimiter = RateLimiter.create(rate);
delayed = rate != Double.MAX_VALUE;
+ logger.info("{} is delayed: {}", name, delayed);
}
ExecutorService createPool(Peer node, String name) {
@@ -126,4 +128,8 @@ public class DispatcherGroup {
entryQueue.notifyAll();
}
}
+
+ public RateLimiter getRateLimiter() {
+ return rateLimiter;
+ }
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
index ec86100180d..3fdb6e1059d 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
@@ -31,8 +31,8 @@ import
org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
import org.apache.iotdb.consensus.raft.thrift.AppendCompressedEntriesRequest;
import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
-import
org.apache.ratis.thirdparty.com.google.common.util.concurrent.RateLimiter;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,6 +42,7 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
class DispatcherThread extends DynamicThread {
@@ -51,33 +52,27 @@ class DispatcherThread extends DynamicThread {
Peer receiver;
private final BlockingQueue<VotingEntry> logBlockingDeque;
protected List<VotingEntry> currBatch = new ArrayList<>();
- private final String baseName;
- private final RateLimiter rateLimiter;
private final DispatcherGroup group;
private long lastDispatchTime;
+ private PublicBAOS batchLogBuffer = new PublicBAOS(64 * 1024);
+ private AtomicReference<byte[]> compressionBuffer = new
AtomicReference<>(new byte[64 * 1024]);
protected DispatcherThread(
LogDispatcher logDispatcher,
Peer receiver,
BlockingQueue<VotingEntry> logBlockingDeque,
- RateLimiter rateLimiter,
DispatcherGroup group) {
super(group.getDynamicThreadGroup());
this.logDispatcher = logDispatcher;
this.receiver = receiver;
this.logBlockingDeque = logBlockingDeque;
- this.rateLimiter = rateLimiter;
this.group = group;
- this.baseName = "LogDispatcher-" + logDispatcher.member.getName() + "-" +
receiver;
}
@Override
public void runInternal() {
- if (logger.isDebugEnabled()) {
- Thread.currentThread().setName(baseName);
- }
try {
- while (!Thread.interrupted()) {
+ while (!Thread.interrupted() &&
!group.getDynamicThreadGroup().isStopped()) {
if (group.isDelayed()) {
if (logBlockingDeque.size() < logDispatcher.maxBatchSize
&& System.nanoTime() - lastDispatchTime < 1_000_000_000L) {
@@ -97,7 +92,7 @@ class DispatcherThread extends DynamicThread {
if (group.getLogDispatcher().getMember().isLeader()) {
logBlockingDeque.wait(1000);
} else {
- logBlockingDeque.wait(0);
+ logBlockingDeque.wait(5000);
}
continue;
}
@@ -208,7 +203,9 @@ class DispatcherThread extends DynamicThread {
request.setLeaderId(logDispatcher.member.getThisNode().getNodeId());
request.setLeaderCommit(logDispatcher.member.getLogManager().getCommitLogIndex());
request.setTerm(logDispatcher.member.getStatus().getTerm().get());
- request.setEntryBytes(LogUtils.compressEntries(logList,
logDispatcher.compressor, request));
+ request.setEntryBytes(
+ LogUtils.compressEntries(
+ logList, logDispatcher.compressor, request, batchLogBuffer,
compressionBuffer));
request.setCompressionType((byte)
logDispatcher.compressor.getType().ordinal());
return request;
}
@@ -252,7 +249,7 @@ class DispatcherThread extends DynamicThread {
if (logDispatcher.getConfig().isUseFollowerLoadBalance()) {
FlowMonitorManager.INSTANCE.report(receiver.getEndpoint(), logSize);
}
- rateLimiter.acquire((int) logSize);
+ group.getRateLimiter().acquire((int) logSize);
}
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
index a5b4296679f..75256a6030c 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
@@ -107,6 +107,7 @@ public class FlowBalancer {
burstWindowNum++;
}
}
+
double assumedFlow =
latestWindows.stream().mapToLong(w -> w.sum).sum()
* 1.0
@@ -131,7 +132,7 @@ public class FlowBalancer {
entry.getValue().averageFlow(windowsToUse),
inBurst);
}
- } else if (burstWindowNum < latestWindows.size() / 2 && inBurst) {
+ } else if (burstWindowNum <= latestWindows.size() / 2 && inBurst) {
exitBurst(followerNum, nodesRate, followers);
logDispatcher.updateRateLimiter();
for (Entry<TEndPoint, FlowMonitor> entry :
flowMonitorManager.getMonitorMap().entrySet()) {
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
index f6ccca1ef1f..a3a2ea890c2 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
import org.apache.iotdb.consensus.natraft.protocol.log.LogParser;
import
org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization.SyncLogDequeSerializer.VersionController.SimpleFileVersionController;
+import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
import org.apache.iotdb.tsfile.compress.ICompressor;
import org.apache.iotdb.tsfile.compress.IUnCompressor;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -103,6 +104,7 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
private ByteBuffer logIndexBuffer;
private ByteBuffer flushingLogDataBuffer;
private ByteBuffer flushingLogIndexBuffer;
+ private byte[] compressionBuffer;
private long offsetOfTheCurrentLogDataOutputStream = 0;
@@ -164,7 +166,8 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
logDataBuffer = ByteBuffer.allocate(config.getRaftLogBufferSize());
logIndexBuffer = ByteBuffer.allocate(config.getRaftLogBufferSize());
flushingLogDataBuffer = ByteBuffer.allocate(config.getRaftLogBufferSize());
- flushingLogIndexBuffer =
ByteBuffer.allocate(config.getRaftLogBufferSize());
+ flushingLogIndexBuffer = ByteBuffer.allocate(16);
+ compressionBuffer = new byte[64 * 1024];
maxNumberOfLogsPerFetchOnDisk = config.getMaxNumberOfLogsPerFetchOnDisk();
maxRaftLogIndexSizeInMemory = config.getMaxRaftLogIndexSizeInMemory();
@@ -298,22 +301,26 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
* @param entries logs to put to buffer
*/
private void putLogs(List<Entry> entries) {
- for (Entry log : entries) {
+ long startTime = Statistic.RAFT_PUT_LOG.getOperationStartTime();
+ for (Entry entry : entries) {
+ long entryStartTime = Statistic.RAFT_PUT_ENTRY.getOperationStartTime();
logDataBuffer.mark();
logIndexBuffer.mark();
- ByteBuffer logData = log.serialize();
- try {
- logDataBuffer.put(logData);
- lastLogIndex = log.getCurrLogIndex();
- } catch (BufferOverflowException e) {
- logger.debug("Raft log buffer overflow!");
- logDataBuffer.reset();
- logIndexBuffer.reset();
- flushLogBuffer(true);
- logDataBuffer.put(logData);
- lastLogIndex = log.getCurrLogIndex();
- }
+ putOneEntry(entry);
+ Statistic.RAFT_PUT_ENTRY.calOperationCostTimeFromStart(entryStartTime);
+ }
+ Statistic.RAFT_PUT_LOG.calOperationCostTimeFromStart(startTime);
+ }
+
+ private void putOneEntry(Entry entry) {
+ ByteBuffer logData = entry.serialize();
+ if (logData.remaining() <= logDataBuffer.remaining()) {
+ logDataBuffer.put(logData);
+ } else {
+ flushLogBuffer(true);
+ logDataBuffer.put(logData);
}
+ lastLogIndex = entry.getCurrLogIndex();
}
private void checkCloseCurrentFile(long fileEndIndex) {
@@ -423,15 +430,24 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
try {
checkStream();
// 1. write to the log data file
- byte[] compressed =
- compressor.compress(flushingLogDataBuffer.array(), 0,
flushingLogDataBuffer.position());
- ReadWriteIOUtils.write(compressed.length, currentLogDataOutputStream);
+ int maxBytesForCompression =
+
compressor.getMaxBytesForCompression(flushingLogDataBuffer.position());
+ if (maxBytesForCompression > compressionBuffer.length) {
+ compressionBuffer = new byte[maxBytesForCompression];
+ }
+ int compressedLength =
+ compressor.compress(
+ flushingLogDataBuffer.array(),
+ 0,
+ flushingLogDataBuffer.position(),
+ compressionBuffer);
+ ReadWriteIOUtils.write(compressedLength, currentLogDataOutputStream);
logIndexOffsetList.add(new Pair<>(lastLogIndex,
offsetOfTheCurrentLogDataOutputStream));
flushingLogIndexBuffer.putLong(lastLogIndex);
flushingLogIndexBuffer.putLong(offsetOfTheCurrentLogDataOutputStream);
- offsetOfTheCurrentLogDataOutputStream += Integer.BYTES +
compressed.length;
+ offsetOfTheCurrentLogDataOutputStream += Integer.BYTES +
compressedLength;
- currentLogDataOutputStream.write(compressed);
+ currentLogDataOutputStream.write(compressionBuffer, 0, compressedLength);
ReadWriteIOUtils.writeWithoutSize(
logIndexBuffer, 0, logIndexBuffer.position(),
currentLogIndexOutputStream);
if (config.getFlushRaftLogThreshold() == 0) {
@@ -945,6 +961,7 @@ public class SyncLogDequeSerializer implements
StableEntryManager {
recoverMetaFile();
meta = new LogManagerMeta();
createNewLogFile(logDir, firstLogIndex);
+ this.persistedLogIndex = commitIndex;
logger.info("{}, clean all logs success, the new firstLogIndex={}",
this, firstLogIndex);
} catch (IOException e) {
logger.error("clear all logs failed,", e);
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
index fc4a576a426..6cfbb9f7931 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
@@ -41,6 +41,7 @@ import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
public class LogUtils {
@@ -79,9 +80,11 @@ public class LogUtils {
public static ByteBuffer compressEntries(
List<ByteBuffer> entryByteList,
ICompressor compressor,
- AppendCompressedEntriesRequest request) {
- PublicBAOS baos = new PublicBAOS(entryByteList.size() * 16 * 1024);
- DataOutputStream dataOutputStream = new DataOutputStream(baos);
+ AppendCompressedEntriesRequest request,
+ PublicBAOS batchLogBuffer,
+ AtomicReference<byte[]> compressionBuffer) {
+ batchLogBuffer.reset();
+ DataOutputStream dataOutputStream = new DataOutputStream(batchLogBuffer);
try {
dataOutputStream.writeInt(entryByteList.size());
for (ByteBuffer byteBuffer : entryByteList) {
@@ -91,11 +94,29 @@ public class LogUtils {
byteBuffer.arrayOffset() + byteBuffer.position(),
byteBuffer.remaining());
}
- Statistic.LOG_DISPATCHER_RAW_SIZE.add(baos.size());
- request.setUncompressedSize(baos.size());
- byte[] compressed = compressor.compress(baos.getBuf(), 0, baos.size());
- Statistic.LOG_DISPATCHER_COMPRESSED_SIZE.add(compressed.length);
- return ByteBuffer.wrap(compressed);
+ Statistic.LOG_DISPATCHER_RAW_SIZE.add(batchLogBuffer.size());
+ request.setUncompressedSize(batchLogBuffer.size());
+
+ byte[] compressed;
+ int compressedSize;
+ if (compressionBuffer == null) {
+ compressed = compressor.compress(batchLogBuffer.getBuf(), 0,
batchLogBuffer.size());
+ compressedSize = compressed.length;
+ } else {
+ compressed = compressionBuffer.get();
+ int maxBytesForCompression =
compressor.getMaxBytesForCompression(batchLogBuffer.size());
+ if (compressed.length < maxBytesForCompression) {
+ compressed = new byte[maxBytesForCompression];
+ compressionBuffer.set(compressed);
+ }
+ compressedSize =
+ compressor.compress(batchLogBuffer.getBuf(), 0,
batchLogBuffer.size(), compressed);
+ }
+
+ Statistic.LOG_DISPATCHER_COMPRESSED_SIZE.add(compressedSize);
+ ByteBuffer wrap = ByteBuffer.wrap(compressed);
+ wrap.limit(compressedSize);
+ return wrap;
} catch (IOException e) {
logger.warn("Failed to compress entries", e);
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeReport.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeReport.java
index ebbd5a61885..42393edab6b 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeReport.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeReport.java
@@ -52,6 +52,7 @@ public class NodeReport {
for (RaftMemberReport memberReport : memberReports) {
stringBuilder.append(memberReport).append(System.lineSeparator());
}
+ stringBuilder.append(", \n timer: ").append(Timer.Statistic.getReport());
return stringBuilder.toString();
}
@@ -101,7 +102,6 @@ public class NodeReport {
@Override
public String toString() {
- String transportCompressionReport = "";
return "RaftReport {\n"
+ "character="
+ character
@@ -128,9 +128,6 @@ public class NodeReport {
+ "ms ago"
+ ", logIncrement="
+ (lastLogIndex - prevLastLogIndex)
- + transportCompressionReport
- + ", \n timer: "
- + Timer.Statistic.getReport()
+ '}';
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
index b2f0b824d97..f05424d7066 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
@@ -220,18 +220,42 @@ public class Timer {
TIME_SCALE,
true,
META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ RAFT_RECEIVER_WINDOW_FLUSH_SIZE(
+ RAFT_MEMBER_RECEIVER,
+ "receiver window flush size",
+ 1,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
RAFT_RECEIVER_APPEND_INTERNAL(
RAFT_MEMBER_RECEIVER,
"append entry (internal)",
TIME_SCALE,
true,
META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ RAFT_RECEIVER_APPEND_ENTRIES(
+ RAFT_MEMBER_RECEIVER,
+ "receiver append entries",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
RAFT_RECEIVER_APPEND_ONE_ENTRY(
RAFT_MEMBER_RECEIVER,
- "receiver append one entries",
+ "receiver append one entry",
TIME_SCALE,
true,
META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ RAFT_RECEIVER_APPEND_ONE_ENTRY_SYNC(
+ RAFT_MEMBER_RECEIVER,
+ "receiver append one entry (sync)",
+ TIME_SCALE,
+ true,
+ RAFT_RECEIVER_APPEND_ONE_ENTRY),
+ RAFT_RECEIVER_UPDATE_COMMIT_INDEX(
+ RAFT_MEMBER_RECEIVER,
+ "receiver update commit index",
+ TIME_SCALE,
+ true,
+ RAFT_RECEIVER_APPEND_ONE_ENTRY),
RAFT_RECEIVER_APPEND_ENTRY(
RAFT_MEMBER_RECEIVER,
"append entrys",
@@ -420,6 +444,8 @@ public class Timer {
RAFT_SENDER_OOW(RAFT_MEMBER_SENDER, "out of window", 1, true, ROOT),
RAFT_LEADER_WEAK_ACCEPT(RAFT_MEMBER_SENDER, "leader weak accept", 1, true,
ROOT),
RAFT_FOLLOWER_WEAK_ACCEPT(RAFT_MEMBER_SENDER, "follower weak accept",
TIME_SCALE, true, ROOT),
+ RAFT_PUT_LOG(RAFT_MEMBER_SENDER, "put logs", TIME_SCALE, true, ROOT),
+ RAFT_PUT_ENTRY(RAFT_MEMBER_SENDER, "put one entry", TIME_SCALE, true,
ROOT),
RAFT_FOLLOWER_STRONG_ACCEPT(
RAFT_MEMBER_SENDER, "follower strong accept", TIME_SCALE, true, ROOT),
RAFT_CONCURRENT_SENDER(RAFT_MEMBER_SENDER, "concurrent sender", 1, true,
ROOT),
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
index f326f830029..cc7434dac50 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThreadGroup.java
@@ -22,6 +22,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -39,6 +40,7 @@ public class DynamicThreadGroup {
private int minThreadCnt;
private int maxThreadCnt;
private Map<DynamicThread, Future<?>> threadFutureMap = new
ConcurrentHashMap<>();
+ private boolean stopped;
public DynamicThreadGroup(
String name,
@@ -101,16 +103,24 @@ public class DynamicThreadGroup {
public void cancelAll() {
threadFutureMap.forEach((t, f) -> f.cancel(true));
- threadFutureMap.clear();
threadCnt.set(0);
+ stopped = true;
}
public void join() throws ExecutionException, InterruptedException {
for (Future<?> future : threadFutureMap.values()) {
- future.get();
+ try {
+ future.get();
+ } catch (CancellationException e) {
+ // ignore
+ }
}
}
+ public boolean isStopped() {
+ return stopped;
+ }
+
@Override
public String toString() {
return name;