This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8d20e48b09 Optimize multiLeaderConsensus performance (#6413)
8d20e48b09 is described below
commit 8d20e48b09bbd90a0e1c5eb92a28dbc63680e7d0
Author: Potato <[email protected]>
AuthorDate: Sat Jun 25 00:13:58 2022 +0800
Optimize multiLeaderConsensus performance (#6413)
* optimize multiLeaderConsensus performance
* fix UT
---
.../resources/conf/iotdb-confignode.properties | 4 +-
.../iotdb/consensus/config/MultiLeaderConfig.java | 8 ++--
.../multileader/MultiLeaderConsensus.java | 2 +-
.../multileader/logdispatcher/LogDispatcher.java | 34 +++++++++++------
.../multileader/MultiLeaderConsensusTest.java | 44 ++++++++++++----------
5 files changed, 53 insertions(+), 39 deletions(-)
diff --git a/confignode/src/assembly/resources/conf/iotdb-confignode.properties
b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
index f7fb90f633..59803ee9ab 100644
--- a/confignode/src/assembly/resources/conf/iotdb-confignode.properties
+++ b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
@@ -56,7 +56,7 @@ target_confignode=0.0.0.0:22277
# DataRegion consensus protocol type
# These consensus protocols are currently supported:
-# 1. org.apache.iotdb.consensus.standalone.StandAloneConsensus(No protocol,
only supports stand-alone machine)
+# 1. org.apache.iotdb.consensus.standalone.StandAloneConsensus(Consensus
patterns optimized specifically for single replica)
# 2. org.apache.iotdb.consensus.ratis.RatisConsensus(Raft protocol)
# 3. org.apache.iotdb.consensus.multileader.MultiLeaderConsensus(weak
consistency, high performance)
# Datatype: String
@@ -64,7 +64,7 @@ target_confignode=0.0.0.0:22277
# SchemaRegion consensus protocol type
# These consensus protocols are currently supported:
-# 1. org.apache.iotdb.consensus.standalone.StandAloneConsensus(No protocol,
only supports stand-alone machine)
+# 1. org.apache.iotdb.consensus.standalone.StandAloneConsensus(Consensus
patterns optimized specifically for single replica)
# 2. org.apache.iotdb.consensus.ratis.RatisConsensus(Raft protocol)
# Datatype: String
#
schema_region_consensus_protocol_class=org.apache.iotdb.consensus.standalone.StandAloneConsensus
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
index 7f4c0b4de9..119bd17b0a 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
@@ -205,10 +205,10 @@ public class MultiLeaderConfig {
}
public static class Builder {
- private int maxPendingRequestNumPerNode = 1000;
- private int maxRequestPerBatch = 100;
- private int maxPendingBatch = 20;
- private int maxWaitingTimeForAccumulatingBatchInMs = 10;
+ private int maxPendingRequestNumPerNode = 200;
+ private int maxRequestPerBatch = 40;
+ private int maxPendingBatch = 6;
+ private int maxWaitingTimeForAccumulatingBatchInMs = 500;
private long basicRetryWaitTimeMs = TimeUnit.MILLISECONDS.toMillis(100);
private long maxRetryWaitTimeMs = TimeUnit.SECONDS.toMillis(20);
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
index 8a95c1288e..eecfa473c6 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
@@ -124,9 +124,9 @@ public class MultiLeaderConsensus implements IConsensus {
@Override
public void stop() {
+ clientManager.close();
stateMachineMap.values().parallelStream().forEach(MultiLeaderServerImpl::stop);
registerManager.deregisterAll();
- clientManager.close();
}
@Override
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index a2add314ae..1d651a0ba5 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -74,7 +74,8 @@ public class LogDispatcher {
.collect(Collectors.toList());
if (!threads.isEmpty()) {
this.executorService =
- IoTDBThreadPoolFactory.newFixedThreadPool(threads.size(),
"LogDispatcher");
+ IoTDBThreadPoolFactory.newFixedThreadPool(
+ threads.size(), "LogDispatcher-" +
impl.getThisNode().getGroupId());
}
}
@@ -107,8 +108,15 @@ public class LogDispatcher {
public void offer(IndexedConsensusRequest request) {
threads.forEach(
thread -> {
- if (!thread.offer(request)) {
- logger.debug("Log queue of {} is full, ignore the log to this
node", thread.getPeer());
+ logger.debug(
+ "{}: Push a log to the queue, where the queue length is {}",
+ impl.getThisNode().getGroupId(),
+ thread.getPendingRequest().size());
+ if (!thread.getPendingRequest().offer(request)) {
+ logger.debug(
+ "{}: Log queue of {} is full, ignore the log to this node",
+ impl.getThisNode().getGroupId(),
+ thread.getPeer());
}
});
}
@@ -156,8 +164,8 @@ public class LogDispatcher {
return config;
}
- public boolean offer(IndexedConsensusRequest request) {
- return pendingRequest.offer(request);
+ public BlockingQueue<IndexedConsensusRequest> getPendingRequest() {
+ return pendingRequest;
}
public void stop() {
@@ -199,7 +207,7 @@ public class LogDispatcher {
PendingBatch batch;
List<TLogBatch> logBatches = new ArrayList<>();
long startIndex = syncStatus.getNextSendingIndex();
- long maxIndex = impl.getController().getCurrentIndex();
+ long maxIndex = impl.getController().getCurrentIndex() + 1;
long endIndex;
if (bufferedRequest.size() <=
config.getReplication().getMaxRequestPerBatch()) {
// Use drainTo instead of poll to reduce lock overhead
@@ -211,7 +219,7 @@ public class LogDispatcher {
// only execute this after a restart
endIndex = constructBatchFromWAL(startIndex, maxIndex, logBatches);
batch = new PendingBatch(startIndex, endIndex, logBatches);
- logger.debug("accumulated a {} from wal", batch);
+ logger.debug("{} : accumulated a {} from wal",
impl.getThisNode().getGroupId(), batch);
} else {
Iterator<IndexedConsensusRequest> iterator =
bufferedRequest.iterator();
IndexedConsensusRequest prev = iterator.next();
@@ -220,7 +228,7 @@ public class LogDispatcher {
endIndex = constructBatchFromWAL(startIndex, prev.getSearchIndex(),
logBatches);
if (logBatches.size() ==
config.getReplication().getMaxRequestPerBatch()) {
batch = new PendingBatch(startIndex, endIndex, logBatches);
- logger.debug("accumulated a {} from wal", batch);
+ logger.debug("{} : accumulated a {} from wal",
impl.getThisNode().getGroupId(), batch);
return batch;
}
constructBatchIndexedFromConsensusRequest(prev, logBatches);
@@ -236,7 +244,10 @@ public class LogDispatcher {
constructBatchFromWAL(prev.getSearchIndex(),
current.getSearchIndex(), logBatches);
if (logBatches.size() ==
config.getReplication().getMaxRequestPerBatch()) {
batch = new PendingBatch(startIndex, endIndex, logBatches);
- logger.debug("accumulated a {} from queue and wal", batch);
+ logger.debug(
+ "{} : accumulated a {} from queue and wal",
+ impl.getThisNode().getGroupId(),
+ batch);
return batch;
}
}
@@ -249,7 +260,8 @@ public class LogDispatcher {
iterator.remove();
}
batch = new PendingBatch(startIndex, endIndex, logBatches);
- logger.debug("accumulated a {} from queue and wal", batch);
+ logger.debug(
+ "{} : accumulated a {} from queue and wal",
impl.getThisNode().getGroupId(), batch);
}
return batch;
}
@@ -276,8 +288,6 @@ public class LogDispatcher {
// TODO iterator
IConsensusRequest data = reader.getReq(currentIndex++);
if (data != null) {
- // since WAL can no longer recover FragmentInstance, but only
PlanNode, we need to give
- // special flags to use different deserialization methods in the
dataRegion stateMachine
logBatches.add(new TLogBatch(data.serializeToByteBuffer()));
}
}
diff --git
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
index fa86215466..5b3b8253f8 100644
---
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
+++
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
@@ -365,25 +365,27 @@ public class MultiLeaderConsensusTest {
public void stop() {}
@Override
- public synchronized TSStatus write(IConsensusRequest request) {
- IConsensusRequest innerRequest = ((IndexedConsensusRequest)
request).getRequest();
- if (innerRequest instanceof ByteBufferConsensusRequest) {
- ByteBuffer buffer = innerRequest.serializeToByteBuffer();
- requestSet.add(
- new IndexedConsensusRequest(
- ((IndexedConsensusRequest) request).getSearchIndex(),
- -1,
- new TestEntry(buffer.getInt(), Peer.deserialize(buffer))));
- } else {
- requestSet.add(((IndexedConsensusRequest) request));
+ public TSStatus write(IConsensusRequest request) {
+ synchronized (requestSet) {
+ IConsensusRequest innerRequest = ((IndexedConsensusRequest)
request).getRequest();
+ if (innerRequest instanceof ByteBufferConsensusRequest) {
+ ByteBuffer buffer = innerRequest.serializeToByteBuffer();
+ requestSet.add(
+ new IndexedConsensusRequest(
+ ((IndexedConsensusRequest) request).getSearchIndex(),
+ -1,
+ new TestEntry(buffer.getInt(), Peer.deserialize(buffer))));
+ } else {
+ requestSet.add(((IndexedConsensusRequest) request));
+ }
+ return new TSStatus();
}
- return new TSStatus();
}
@Override
public synchronized DataSet read(IConsensusRequest request) {
if (request instanceof GetConsensusReqReaderPlan) {
- return new FakeConsensusReqReader(new ArrayList<>(requestSet));
+ return new FakeConsensusReqReader(requestSet);
}
return null;
}
@@ -399,20 +401,22 @@ public class MultiLeaderConsensusTest {
public static class FakeConsensusReqReader implements ConsensusReqReader,
DataSet {
- private final List<IndexedConsensusRequest> requestList;
+ private final Set<IndexedConsensusRequest> requestSet;
- public FakeConsensusReqReader(List<IndexedConsensusRequest> requestList) {
- this.requestList = requestList;
+ public FakeConsensusReqReader(Set<IndexedConsensusRequest> requestSet) {
+ this.requestSet = requestSet;
}
@Override
public IConsensusRequest getReq(long index) {
- for (IndexedConsensusRequest indexedConsensusRequest : requestList) {
- if (indexedConsensusRequest.getSearchIndex() == index) {
- return indexedConsensusRequest;
+ synchronized (requestSet) {
+ for (IndexedConsensusRequest indexedConsensusRequest : requestSet) {
+ if (indexedConsensusRequest.getSearchIndex() == index) {
+ return indexedConsensusRequest;
+ }
}
+ return null;
}
- return null;
}
@Override