This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/native_raft by this push:
new 3230b84075 optimize notifying and serialization
3230b84075 is described below
commit 3230b840754fbd8a40fbf4bfbefc461c843ccf61
Author: Tian Jiang <[email protected]>
AuthorDate: Tue Apr 18 10:19:51 2023 +0800
optimize notifying and serialization
---
client-go | 2 +-
.../common/request/IConsensusRequest.java | 10 ++++
.../iotdb/consensus/natraft/RaftConsensus.java | 9 ++++
.../consensus/natraft/protocol/RaftConfig.java | 15 ++++++
.../consensus/natraft/protocol/RaftMember.java | 29 ++++++++----
.../consensus/natraft/protocol/log/Entry.java | 50 ++++++++++++++++----
.../natraft/protocol/log/VotingEntry.java | 28 +++++++++++
.../log/dispatch/AppendNodeEntryHandler.java | 31 +++++++++++--
.../protocol/log/dispatch/DispatcherGroup.java | 24 +++++-----
.../protocol/log/dispatch/DispatcherThread.java | 54 ++++++++++++++--------
.../protocol/log/dispatch/LogDispatcher.java | 26 ++++++-----
.../protocol/log/dispatch/VotingLogList.java | 13 +-----
.../log/dispatch/flowcontrol/FlowBalancer.java | 9 +---
.../natraft/protocol/log/logtype/RequestEntry.java | 27 +++++------
.../iotdb/consensus/natraft/utils/Timer.java | 18 ++++++++
.../db/mpp/plan/planner/plan/node/PlanNode.java | 5 ++
.../planner/plan/node/write/InsertTabletNode.java | 14 +++---
17 files changed, 255 insertions(+), 109 deletions(-)
diff --git a/client-go b/client-go
index a05323c73a..84b8d45829 160000
--- a/client-go
+++ b/client-go
@@ -1 +1 @@
-Subproject commit a05323c73a3d615efde25d4d3287fcee32ec1292
+Subproject commit 84b8d45829d846440a3246400e7bc5e39587dcb5
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
index f18fd7413d..8238caeb24 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.consensus.common.request;
import org.apache.iotdb.commons.path.PartialPath;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
public interface IConsensusRequest {
@@ -39,6 +41,14 @@ public interface IConsensusRequest {
*/
ByteBuffer serializeToByteBuffer();
+ default void serializeTo(DataOutputStream outputStream) throws IOException {
+ ByteBuffer byteBuffer = serializeToByteBuffer();
+ outputStream.write(
+ byteBuffer.array(),
+ byteBuffer.arrayOffset() + byteBuffer.position(),
+ byteBuffer.remaining());
+ }
+
default long estimateSize() {
return 0;
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
index 2d4611b7be..e7bc64c42f 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
@@ -47,6 +47,7 @@ import
org.apache.iotdb.consensus.natraft.client.SyncClientAdaptor;
import org.apache.iotdb.consensus.natraft.exception.CheckConsistencyException;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
+import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
import
org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol.FlowMonitorManager;
import org.apache.iotdb.consensus.natraft.service.RaftRPCService;
import org.apache.iotdb.consensus.natraft.service.RaftRPCServiceProcessor;
@@ -99,6 +100,7 @@ public class RaftConsensus implements IConsensus {
.createClientManager(new
AsyncRaftServiceClientPoolFactory(this.config));
FlowMonitorManager.INSTANCE.setConfig(this.config);
SyncClientAdaptor.setConfig(this.config);
+ Entry.DEFAULT_SERIALIZATION_BUFFER_SIZE =
this.config.getEntryDefaultSerializationBufferSize();
}
@Override
@@ -115,6 +117,11 @@ public class RaftConsensus implements IConsensus {
new Thread(
() -> {
logger.info(Timer.Statistic.getReport());
+ try {
+ stop();
+ } catch (IOException e) {
+ logger.error("Error during exiting", e);
+ }
}));
reportThread =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("NodeReportThread");
ScheduledExecutorUtil.safelyScheduleAtFixedRate(
@@ -158,6 +165,7 @@ public class RaftConsensus implements IConsensus {
@Override
public void stop() throws IOException {
+ reportThread.shutdownNow();
clientManager.close();
stateMachineMap.values().parallelStream().forEach(RaftMember::stop);
registerManager.deregisterAll();
@@ -167,6 +175,7 @@ public class RaftConsensus implements IConsensus {
@Override
public ConsensusWriteResponse write(ConsensusGroupId groupId,
IConsensusRequest request) {
if (config.isOnlyTestNetwork()) {
+ request.serializeToByteBuffer();
return
ConsensusWriteResponse.newBuilder().setStatus(StatusUtils.OK).build();
}
RaftMember impl = stateMachineMap.get(groupId);
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
index be3e56d231..b1218ce689 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
@@ -39,6 +39,7 @@ public class RaftConfig {
private boolean enableWeakAcceptance = false;
private int maxNumOfLogsInMem = 10000;
private int minNumOfLogsInMem = 1000;
+ private int entryDefaultSerializationBufferSize = 16 * 1024;
private long maxMemorySizeForRaftLog = 512 * 1024 * 1024L;
private int logDeleteCheckIntervalSecond = 1;
private boolean enableRaftLogPersistence = true;
@@ -436,6 +437,14 @@ public class RaftConfig {
this.maxRaftLogPersistDataSizePerFile = maxRaftLogPersistDataSizePerFile;
}
+ public int getEntryDefaultSerializationBufferSize() {
+ return entryDefaultSerializationBufferSize;
+ }
+
+ public void setEntryDefaultSerializationBufferSize(int
entryDefaultSerializationBufferSize) {
+ this.entryDefaultSerializationBufferSize =
entryDefaultSerializationBufferSize;
+ }
+
public void loadProperties(Properties properties) {
logger.debug("Loading properties: {}", properties);
@@ -647,6 +656,12 @@ public class RaftConfig {
properties.getProperty(
"flow_control_max_flow",
String.valueOf(this.getFlowControlMaxFlow()))));
+ this.setEntryDefaultSerializationBufferSize(
+ Integer.parseInt(
+ properties.getProperty(
+ "entry_serialization_buffer_size",
+
String.valueOf(this.getEntryDefaultSerializationBufferSize()))));
+
String consistencyLevel = properties.getProperty("consistency_level");
if (consistencyLevel != null) {
this.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel));
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index 8d84036592..a7c00d3bc2 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -385,6 +385,7 @@ public class RaftMember {
return;
}
+ logDispatcher.stop();
heartbeatThread.stop();
catchUpManager.stop();
@@ -633,6 +634,7 @@ public class RaftMember {
logger.debug("{}: Processing request {}", name, request);
Entry entry = new RequestEntry(request);
+ entry.preSerialize();
entry.receiveTime = System.nanoTime();
// just like processPlanLocally,we need to check the size of log
@@ -655,19 +657,15 @@ public class RaftMember {
return StatusUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT);
}
- TSStatus tsStatus1 = waitForEntryResult(votingEntry);
- entry.waitEndTime = System.nanoTime();
- Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_WAIT_APPLY_END.add(
- entry.waitEndTime - entry.createTime);
- return tsStatus1;
+ return waitForEntryResult(votingEntry);
}
protected void waitApply(Entry entry) throws LogExecutionException {
// when using async applier, the log here may not be applied. To return
the execution
// result, we must wait until the log is applied.
- synchronized (entry) {
- while (!entry.isApplied()) {
- // wait until the log is applied
+ while (!entry.isApplied()) {
+ // wait until the log is applied
+ synchronized (entry) {
try {
entry.wait(1);
} catch (InterruptedException e) {
@@ -676,6 +674,7 @@ public class RaftMember {
}
}
}
+
if (entry.getException() != null) {
throw new LogExecutionException(entry.getException());
}
@@ -753,10 +752,17 @@ public class RaftMember {
long waitTime = 1;
AcceptedType acceptedType = votingLogList.computeAcceptedType(log);
+
Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_WAIT_APPEND_START.calOperationCostTimeFromStart(
+ log.getEntry().createTime);
+ long startTime =
Statistic.RAFT_SENDER_LOG_APPEND_WAIT.getOperationStartTime();
while (acceptedType == AcceptedType.NOT_ACCEPTED
&& alreadyWait < config.getWriteOperationTimeoutMS()) {
- long startTime =
Statistic.RAFT_SENDER_LOG_APPEND_WAIT.getOperationStartTime();
synchronized (log.getEntry()) {
+ acceptedType = votingLogList.computeAcceptedType(log);
+ if (acceptedType != AcceptedType.NOT_ACCEPTED) {
+ break;
+ }
+
try {
log.getEntry().wait(waitTime);
} catch (InterruptedException e) {
@@ -764,7 +770,6 @@ public class RaftMember {
logger.warn("Unexpected interruption when sending a log", e);
}
}
-
Statistic.RAFT_SENDER_LOG_APPEND_WAIT.calOperationCostTimeFromStart(startTime);
acceptedType = votingLogList.computeAcceptedType(log);
alreadyWait = (System.nanoTime() - waitStart) / 1000000;
@@ -777,6 +782,7 @@ public class RaftMember {
nextTimeToPrint *= 2;
}
}
+
Statistic.RAFT_SENDER_LOG_APPEND_WAIT.calOperationCostTimeFromStart(startTime);
if (logger.isDebugEnabled()) {
Thread.currentThread().setName(threadBaseName);
@@ -1260,6 +1266,9 @@ public class RaftMember {
case OK:
if (config.isWaitApply()) {
waitApply(votingEntry.getEntry());
+ votingEntry.getEntry().waitEndTime = System.nanoTime();
+ Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_WAIT_APPLY_END.add(
+ votingEntry.getEntry().waitEndTime -
votingEntry.getEntry().createTime);
return includeLogNumbersInStatus(
StatusUtils.OK.deepCopy(),
votingEntry.getEntry().getCurrLogIndex(),
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
index 208678c6b2..9f509c93f2 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
@@ -35,7 +35,7 @@ public abstract class Entry implements Comparable<Entry> {
Comparator.comparingLong(Entry::getCurrLogIndex).thenComparing(Entry::getCurrLogTerm);
// make this configurable or adaptive
- protected static final int DEFAULT_SERIALIZATION_BUFFER_SIZE = 16 * 1024;
+ public static int DEFAULT_SERIALIZATION_BUFFER_SIZE = 16 * 1024;
private volatile long currLogIndex = Long.MIN_VALUE;
private long currLogTerm = -1;
private long prevTerm = -1;
@@ -55,6 +55,8 @@ public abstract class Entry implements Comparable<Entry> {
public long committedTime;
public long applyTime;
public long waitEndTime;
+
+ private ByteBuffer preSerializationCache;
private ByteBuffer serializationCache;
public int getDefaultSerializationBufferSize() {
@@ -63,22 +65,46 @@ public abstract class Entry implements Comparable<Entry> {
protected abstract ByteBuffer serializeInternal();
+ /**
+ * Perform serialization before indexing to avoid serialization under locked
environment. It
+ * should be noticed that at this time point, the index is not set yet, so
when the final
+ * serialization is called, it must set the correct index, term, and
prevTerm (starting from the
+ * second byte in the ByteBuffer).
+ */
+ public void preSerialize() {
+ if (preSerializationCache != null || serializationCache != null) {
+ return;
+ }
+ long startTime = Statistic.SERIALIZE_ENTRY.getOperationStartTime();
+ ByteBuffer byteBuffer = serializeInternal();
+ Statistic.SERIALIZE_ENTRY.calOperationCostTimeFromStart(startTime);
+ preSerializationCache = byteBuffer;
+ }
+
public ByteBuffer serialize() {
ByteBuffer cache = serializationCache;
if (cache != null) {
return cache.slice();
}
- ByteBuffer byteBuffer = serializeInternal();
- serializationCache = byteBuffer;
- return byteBuffer.slice();
- };
+ if (preSerializationCache != null) {
+ preSerializationCache.position(1);
+ preSerializationCache.putLong(getCurrLogIndex());
+ preSerializationCache.putLong(getCurrLogTerm());
+ preSerializationCache.putLong(getPrevTerm());
+ preSerializationCache.position(0);
+ serializationCache = preSerializationCache;
+ preSerializationCache = null;
+ } else {
+ long startTime = Statistic.SERIALIZE_ENTRY.getOperationStartTime();
+ ByteBuffer byteBuffer = serializeInternal();
+ Statistic.SERIALIZE_ENTRY.calOperationCostTimeFromStart(startTime);
+ serializationCache = byteBuffer;
+ }
+ return serializationCache.slice();
+ }
public abstract void deserialize(ByteBuffer buffer);
- public void serialize(ByteBuffer buffer) {
- buffer.put(serialize());
- }
-
public enum Types {
// DO CHECK LogParser when you add a new type of log
CLIENT_REQUEST,
@@ -152,6 +178,12 @@ public abstract class Entry implements Comparable<Entry> {
}
public long estimateSize() {
+ ByteBuffer cache;
+ if ((cache = serializationCache) != null) {
+ return cache.remaining();
+ } else if ((cache = preSerializationCache) != null) {
+ return cache.remaining();
+ }
return byteSize;
};
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingEntry.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingEntry.java
index b654a3a872..ee5c25c3c6 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingEntry.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingEntry.java
@@ -38,6 +38,8 @@ public class VotingEntry {
protected List<Peer> currNodes;
protected List<Peer> newNodes;
private boolean isStronglyAccepted;
+ private boolean isWeaklyAccepted;
+ private boolean notified;
public VotingEntry(
Entry entry,
@@ -108,6 +110,24 @@ public class VotingEntry {
return stronglyAcceptedByCurrNodes && stronglyAcceptedByNewNodes;
}
+ public boolean isWeaklyAccepted(Map<Peer, Long> stronglyAcceptedIndices) {
+ if (isWeaklyAccepted) {
+ return true;
+ }
+ int currNodeQuorumNum = currNodesQuorumNum();
+ int newNodeQuorumNum = newNodesQuorumNum();
+ int stronglyAcceptedNumByCurrNodes =
stronglyAcceptedNumByCurrNodes(stronglyAcceptedIndices);
+ int stronglyAcceptedNumByNewNodes =
stronglyAcceptedNumByNewNodes(stronglyAcceptedIndices);
+ int weaklyAcceptedNumByCurrNodes =
weaklyAcceptedNumByCurrNodes(stronglyAcceptedIndices);
+ int weaklyAcceptedNumByNewNodes =
weaklyAcceptedNumByNewNodes(stronglyAcceptedIndices);
+ if ((weaklyAcceptedNumByCurrNodes + stronglyAcceptedNumByCurrNodes) >=
currNodeQuorumNum
+ && (weaklyAcceptedNumByNewNodes + stronglyAcceptedNumByNewNodes) >=
newNodeQuorumNum) {
+ isWeaklyAccepted = true;
+ return true;
+ }
+ return false;
+ }
+
public int stronglyAcceptedNumByCurrNodes(Map<Peer, Long>
stronglyAcceptedIndices) {
int num = 0;
for (Peer node : currNodes) {
@@ -159,4 +179,12 @@ public class VotingEntry {
public boolean hasNewNodes() {
return newNodes != null;
}
+
+ public boolean isNotified() {
+ return notified;
+ }
+
+ public void setNotified(boolean notified) {
+ this.notified = notified;
+ }
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
index 7d169e6e66..6cb53f71d2 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
+import
org.apache.iotdb.consensus.natraft.protocol.log.dispatch.VotingLogList.AcceptedType;
import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
@@ -76,8 +77,17 @@ public class AppendNodeEntryHandler implements
AsyncMethodCallback<AppendEntryRe
if (resp == RESPONSE_STRONG_ACCEPT || resp == RESPONSE_AGREE) {
member.getVotingLogList().onStronglyAccept(votingEntry, trueReceiver);
member.getStatus().getPeerMap().get(trueReceiver).setMatchIndex(response.lastLogIndex);
- synchronized (votingEntry.getEntry()) {
- votingEntry.getEntry().notifyAll();
+ if (!votingEntry.isNotified()) {
+ AcceptedType acceptedType =
member.getVotingLogList().computeAcceptedType(votingEntry);
+ if (acceptedType == AcceptedType.STRONGLY_ACCEPTED
+ || acceptedType == AcceptedType.WEAKLY_ACCEPTED) {
+ synchronized (votingEntry.getEntry()) {
+ votingEntry.getEntry().notifyAll();
+ votingEntry.setNotified(true);
+
Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_NOTIFIED.calOperationCostTimeFromStart(
+ votingEntry.getEntry().createTime);
+ }
+ }
}
} else if (resp > 0) {
// a response > 0 is the follower's term
@@ -91,14 +101,25 @@ public class AppendNodeEntryHandler implements
AsyncMethodCallback<AppendEntryRe
member.stepDown(resp, null);
synchronized (votingEntry.getEntry()) {
votingEntry.getEntry().notifyAll();
+ votingEntry.setNotified(true);
+
Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_NOTIFIED.calOperationCostTimeFromStart(
+ votingEntry.getEntry().createTime);
}
} else if (resp == RESPONSE_WEAK_ACCEPT) {
votingEntry.getEntry().acceptedTime = System.nanoTime();
Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_ACCEPT.add(
votingEntry.getEntry().acceptedTime -
votingEntry.getEntry().createTime);
- synchronized (votingEntry.getEntry()) {
- votingEntry.addWeaklyAcceptedNodes(trueReceiver);
- votingEntry.getEntry().notifyAll();
+ votingEntry.addWeaklyAcceptedNodes(trueReceiver);
+ if (!votingEntry.isNotified()) {
+ AcceptedType acceptedType =
member.getVotingLogList().computeAcceptedType(votingEntry);
+ if (acceptedType == AcceptedType.WEAKLY_ACCEPTED) {
+ synchronized (votingEntry.getEntry()) {
+ votingEntry.getEntry().notifyAll();
+ votingEntry.setNotified(true);
+
Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_NOTIFIED.calOperationCostTimeFromStart(
+ votingEntry.getEntry().createTime);
+ }
+ }
}
} else {
// e.g., Response.RESPONSE_LOG_MISMATCH
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
index 28ab02c7c4..e953d1bc0a 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherGroup.java
@@ -19,18 +19,20 @@
package org.apache.iotdb.consensus.natraft.protocol.log.dispatch;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
+
import
org.apache.ratis.thirdparty.com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
public class DispatcherGroup {
private static final Logger logger =
LoggerFactory.getLogger(DispatcherGroup.class);
private final Peer peer;
@@ -65,22 +67,22 @@ public class DispatcherGroup {
// ignore
}
if (!closeSucceeded) {
- logger.warn("Cannot shut down dispatcher pool of {}-{}",
logDispatcher.member.getName(),
- peer);
+ logger.warn(
+ "Cannot shut down dispatcher pool of {}-{}",
logDispatcher.member.getName(), peer);
}
}
+
public void addThread() {
int threadNum = groupThreadNum.incrementAndGet();
if (threadNum <= maxBindingThreadNum) {
- dispatcherThreadPool
- .submit(newDispatcherThread(peer, entryQueue, rateLimiter));
+ dispatcherThreadPool.submit(newDispatcherThread(peer, entryQueue,
rateLimiter));
} else {
groupThreadNum.decrementAndGet();
}
}
- DispatcherThread newDispatcherThread(Peer node, BlockingQueue<VotingEntry>
logBlockingQueue,
- RateLimiter rateLimiter) {
+ DispatcherThread newDispatcherThread(
+ Peer node, BlockingQueue<VotingEntry> logBlockingQueue, RateLimiter
rateLimiter) {
return new DispatcherThread(logDispatcher, node, logBlockingQueue,
rateLimiter, this);
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
index 7173249696..7e45c26f8a 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
@@ -19,11 +19,6 @@
package org.apache.iotdb.consensus.natraft.protocol.log.dispatch;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.client.AsyncRaftServiceClient;
@@ -35,12 +30,20 @@ import
org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
import org.apache.iotdb.consensus.raft.thrift.AppendCompressedEntriesRequest;
import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
+
import
org.apache.ratis.thirdparty.com.google.common.util.concurrent.RateLimiter;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+
class DispatcherThread implements Runnable {
+
private static final Logger logger =
LoggerFactory.getLogger(DispatcherThread.class);
private final LogDispatcher logDispatcher;
@@ -54,8 +57,11 @@ class DispatcherThread implements Runnable {
private long runningTimeSum;
private long lastDispatchTime;
- protected DispatcherThread(LogDispatcher logDispatcher, Peer receiver,
- BlockingQueue<VotingEntry> logBlockingDeque, RateLimiter rateLimiter,
+ protected DispatcherThread(
+ LogDispatcher logDispatcher,
+ Peer receiver,
+ BlockingQueue<VotingEntry> logBlockingDeque,
+ RateLimiter rateLimiter,
DispatcherGroup group) {
this.logDispatcher = logDispatcher;
this.receiver = receiver;
@@ -75,8 +81,8 @@ class DispatcherThread implements Runnable {
long runningStart = 0;
while (!Thread.interrupted()) {
if (group.isDelayed()) {
- if (logBlockingDeque.size() < logDispatcher.maxBatchSize &&
- System.nanoTime() - lastDispatchTime < 1_000_000_000L) {
+ if (logBlockingDeque.size() < logDispatcher.maxBatchSize
+ && System.nanoTime() - lastDispatchTime < 1_000_000_000L) {
// the follower is being delayed, if there is not enough requests,
and it has
// dispatched recently, wait for a while to get a larger batch
Thread.sleep(100);
@@ -95,7 +101,7 @@ class DispatcherThread implements Runnable {
}
}
long currTime = System.nanoTime();
- idleTimeSum = currTime - idleStart;
+ idleTimeSum += currTime - idleStart;
runningStart = currTime;
if (logger.isDebugEnabled()) {
logger.debug("Sending {} logs to {}", currBatch.size(), receiver);
@@ -110,22 +116,23 @@ class DispatcherThread implements Runnable {
currTime = System.nanoTime();
lastDispatchTime = currTime;
- runningTimeSum = currTime - runningStart;
+ runningTimeSum += currTime - runningStart;
idleStart = currTime;
// thread too idle
- if (idleTimeSum * 1.0 / (idleTimeSum + runningTimeSum) < 0.5 &&
- runningTimeSum > 10_000_000_000L) {
+ if (idleTimeSum * 1.0 / (idleTimeSum + runningTimeSum) > 0.5
+ && runningTimeSum > 10_000_000_000L) {
int remaining = group.getGroupThreadNum().decrementAndGet();
if (remaining > 1) {
logger.info("Dispatcher thread too idle");
+ group.getGroupThreadNum().incrementAndGet();
break;
} else {
group.getGroupThreadNum().incrementAndGet();
}
// thread too busy
- } else if (idleTimeSum * 1.0 / (idleTimeSum + runningTimeSum) > 0.5 &&
- runningTimeSum > 10_000_000_000L) {
+ } else if (idleTimeSum * 1.0 / (idleTimeSum + runningTimeSum) < 0.1
+ && runningTimeSum > 10_000_000_000L) {
int groupThreadNum = group.getGroupThreadNum().get();
if (groupThreadNum < group.getMaxBindingThreadNum()) {
group.addThread();
@@ -140,11 +147,14 @@ class DispatcherThread implements Runnable {
} catch (Exception e) {
logger.error("Unexpected error in log dispatcher", e);
}
- logger.info("Dispatcher exits, idle ratio: {}", idleTimeSum * 1.0 /
(idleTimeSum + runningTimeSum));
+ logger.info(
+ "Dispatcher exits, idle ratio: {}, running time: {}ms, idle time:
{}ms",
+ idleTimeSum * 1.0 / (idleTimeSum + runningTimeSum),
+ runningTimeSum / 1_000_000L,
+ idleTimeSum / 1_000_000L);
group.getGroupThreadNum().decrementAndGet();
}
-
protected void serializeEntries() throws InterruptedException {
for (VotingEntry request : currBatch) {
@@ -167,7 +177,9 @@ class DispatcherThread implements Runnable {
}
if (logger.isDebugEnabled()) {
logger.debug(
- "{}: append entries {} with {} logs",
logDispatcher.member.getName(), receiver,
+ "{}: append entries {} with {} logs",
+ logDispatcher.member.getName(),
+ receiver,
logList.size());
}
}
@@ -189,7 +201,9 @@ class DispatcherThread implements Runnable {
}
if (logger.isDebugEnabled()) {
logger.debug(
- "{}: append entries {} with {} logs",
logDispatcher.member.getName(), receiver,
+ "{}: append entries {} with {} logs",
+ logDispatcher.member.getName(),
+ receiver,
logList.size());
}
}
@@ -256,7 +270,7 @@ class DispatcherThread implements Runnable {
}
if (logDispatcher.getConfig().isUseFollowerLoadBalance()) {
- FlowMonitorManager.INSTANCE.report(receiver, logSize);
+ FlowMonitorManager.INSTANCE.report(receiver.getEndpoint(), logSize);
}
rateLimiter.acquire((int) logSize);
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
index d63b4f9649..6640528ec6 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
@@ -19,24 +19,26 @@
package org.apache.iotdb.consensus.natraft.protocol.log.dispatch;
-import static org.apache.iotdb.consensus.natraft.utils.NodeUtils.unionNodes;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.BlockingQueue;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
import org.apache.iotdb.tsfile.compress.ICompressor;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.BlockingQueue;
+
+import static org.apache.iotdb.consensus.natraft.utils.NodeUtils.unionNodes;
+
/**
* A LogDispatcher serves a raft leader by queuing logs that the leader wants
to send to its
* followers and send the logs in an ordered manner so that the followers will
not wait for previous
@@ -138,8 +140,6 @@ public class LogDispatcher {
}
}
-
-
public void applyNewNodes() {
allNodes = newNodes;
newNodes = null;
@@ -180,4 +180,8 @@ public class LogDispatcher {
public RaftMember getMember() {
return member;
}
+
+ public void stop() {
+ dispatcherGroupMap.forEach((p, g) -> g.close());
+ }
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
index d19c6d33d3..6dd088ec28 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
@@ -124,18 +124,7 @@ public class VotingLogList {
}
if (enableWeakAcceptance) {
- int currNodeQuorumNum = votingEntry.currNodesQuorumNum();
- int newNodeQuorumNum = votingEntry.newNodesQuorumNum();
- int stronglyAcceptedNumByCurrNodes =
- votingEntry.stronglyAcceptedNumByCurrNodes(stronglyAcceptedIndices);
- int stronglyAcceptedNumByNewNodes =
- votingEntry.stronglyAcceptedNumByNewNodes(stronglyAcceptedIndices);
- int weaklyAcceptedNumByCurrNodes =
- votingEntry.weaklyAcceptedNumByCurrNodes(stronglyAcceptedIndices);
- int weaklyAcceptedNumByNewNodes =
- votingEntry.weaklyAcceptedNumByNewNodes(stronglyAcceptedIndices);
- if ((weaklyAcceptedNumByCurrNodes + stronglyAcceptedNumByCurrNodes) >=
currNodeQuorumNum
- && (weaklyAcceptedNumByNewNodes + stronglyAcceptedNumByNewNodes) >=
newNodeQuorumNum) {
+ if (votingEntry.isWeaklyAccepted(stronglyAcceptedIndices)) {
return AcceptedType.WEAKLY_ACCEPTED;
}
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
index eb2888d610..5a996c41e7 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
@@ -20,13 +20,6 @@
package org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
@@ -34,6 +27,7 @@ import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
import org.apache.iotdb.consensus.natraft.protocol.RaftRole;
import
org.apache.iotdb.consensus.natraft.protocol.log.dispatch.DispatcherGroup;
import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.LogDispatcher;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +36,6 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java
index 42f1a2659f..de8a9f6e57 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java
@@ -49,6 +49,8 @@ public class RequestEntry extends Entry {
@Override
protected ByteBuffer serializeInternal() {
PublicBAOS byteArrayOutputStream = new
PublicBAOS(getDefaultSerializationBufferSize());
+ int requestSize = 0;
+ int requestPos = 0;
try (DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream)) {
dataOutputStream.writeByte((byte) CLIENT_REQUEST.ordinal());
@@ -56,25 +58,20 @@ public class RequestEntry extends Entry {
dataOutputStream.writeLong(getCurrLogTerm());
dataOutputStream.writeLong(getPrevTerm());
- ByteBuffer byteBuffer = request.serializeToByteBuffer();
- byteBuffer.rewind();
- dataOutputStream.writeInt(byteBuffer.remaining());
- dataOutputStream.write(byteBuffer.array(), byteBuffer.arrayOffset(),
byteBuffer.remaining());
+ requestPos = byteArrayOutputStream.size();
+ dataOutputStream.writeInt(0);
+ request.serializeTo(dataOutputStream);
+ requestSize = byteArrayOutputStream.size() - requestPos - 4;
} catch (IOException e) {
// unreachable
}
- return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
- }
-
- @Override
- public void serialize(ByteBuffer buffer) {
- buffer.put((byte) CLIENT_REQUEST.ordinal());
- buffer.putLong(getCurrLogIndex());
- buffer.putLong(getCurrLogTerm());
- ByteBuffer byteBuffer = request.serializeToByteBuffer();
- buffer.putInt(byteBuffer.remaining());
- buffer.put(byteBuffer);
+ ByteBuffer wrap =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ wrap.position(requestPos);
+ wrap.putInt(requestSize);
+ wrap.position(0);
+ return wrap;
}
@Override
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
index de89657540..b2f0b824d9 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
@@ -278,6 +278,12 @@ public class Timer {
META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
LOG_DISPATCHER_LOG_BATCH_SIZE(
LOG_DISPATCHER, "batch size", 1, true,
META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ SERIALIZE_ENTRY(
+ LOG_DISPATCHER,
+ "serialize entry",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
LOG_DISPATCHER_FROM_RECEIVE_TO_CREATE(
LOG_DISPATCHER,
"from receive to create",
@@ -356,6 +362,18 @@ public class Timer {
TIME_SCALE,
true,
META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ RAFT_SENDER_LOG_FROM_CREATE_TO_NOTIFIED(
+ LOG_DISPATCHER,
+ "from create to notified",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ RAFT_SENDER_LOG_FROM_CREATE_TO_WAIT_APPEND_START(
+ LOG_DISPATCHER,
+ "from create to wait append start",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
RAFT_SENDER_LOG_APPEND_WAIT(
LOG_DISPATCHER,
"wait for being appended",
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
index 0e85bd9461..55b869dc0d 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNode.java
@@ -160,6 +160,11 @@ public abstract class PlanNode implements
IConsensusRequest {
}
}
+ @Override
+ public void serializeTo(DataOutputStream outputStream) throws IOException {
+ serialize(outputStream);
+ }
+
protected abstract void serializeAttributes(ByteBuffer byteBuffer);
protected abstract void serializeAttributes(DataOutputStream stream) throws
IOException;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
index 7fa6b8ba30..f801f9a290 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
@@ -460,8 +460,8 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue, ISche
/** Serialize measurements or measurement schemas, ignoring failed time
series */
private void writeMeasurementsOrSchemas(DataOutputStream stream) throws
IOException {
- ReadWriteIOUtils.write(measurements.length - getFailedMeasurementNumber(),
stream);
- ReadWriteIOUtils.write((byte) (measurementSchemas != null ? 1 : 0),
stream);
+ stream.writeInt(measurements.length - getFailedMeasurementNumber());
+ stream.write((byte) (measurementSchemas != null ? 1 : 0));
for (int i = 0; i < measurements.length; i++) {
// ignore failed partial insert
@@ -624,31 +624,31 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue, ISche
case INT32:
int[] intValues = (int[]) column;
for (int j = 0; j < rowCount; j++) {
- ReadWriteIOUtils.write(intValues[j], stream);
+ stream.writeInt(intValues[j]);
}
break;
case INT64:
long[] longValues = (long[]) column;
for (int j = 0; j < rowCount; j++) {
- ReadWriteIOUtils.write(longValues[j], stream);
+ stream.writeLong(longValues[j]);
}
break;
case FLOAT:
float[] floatValues = (float[]) column;
for (int j = 0; j < rowCount; j++) {
- ReadWriteIOUtils.write(floatValues[j], stream);
+ stream.writeFloat(floatValues[j]);
}
break;
case DOUBLE:
double[] doubleValues = (double[]) column;
for (int j = 0; j < rowCount; j++) {
- ReadWriteIOUtils.write(doubleValues[j], stream);
+ stream.writeDouble(doubleValues[j]);
}
break;
case BOOLEAN:
boolean[] boolValues = (boolean[]) column;
for (int j = 0; j < rowCount; j++) {
- ReadWriteIOUtils.write(BytesUtils.boolToByte(boolValues[j]), stream);
+ stream.write(BytesUtils.boolToByte(boolValues[j]));
}
break;
case TEXT: