This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch iot_consensus in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b41680416316508a43fbff0baa48630419c21700 Author: OneSizeFitQuorum <[email protected]> AuthorDate: Wed Jun 21 09:58:50 2023 +0200 change default parameter Signed-off-by: OneSizeFitQuorum <[email protected]> --- .../iotdb/consensus/config/IoTConsensusConfig.java | 18 +++++++++++++++++- .../consensus/iot/logdispatcher/LogDispatcher.java | 5 +++++ .../assembly/resources/conf/iotdb-common.properties | 2 +- 3 files changed, 23 insertions(+), 2 deletions(-) diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java index a4300d68b5a..91b4fb262ac 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java @@ -256,6 +256,7 @@ public class IoTConsensusConfig { private final int maxQueueLength; private final long maxWaitingTimeForWaitBatchInMs; + private final int maxWaitingTimeForAccumulatingBatchInMs; private final long basicRetryWaitTimeMs; private final long maxRetryWaitTimeMs; private final long walThrottleThreshold; @@ -270,6 +271,7 @@ public class IoTConsensusConfig { int maxPendingBatchesNum, int maxQueueLength, long maxWaitingTimeForWaitBatchInMs, + int maxWaitingTimeForAccumulatingBatchInMs, long basicRetryWaitTimeMs, long maxRetryWaitTimeMs, long walThrottleThreshold, @@ -282,6 +284,7 @@ public class IoTConsensusConfig { this.maxPendingBatchesNum = maxPendingBatchesNum; this.maxQueueLength = maxQueueLength; this.maxWaitingTimeForWaitBatchInMs = maxWaitingTimeForWaitBatchInMs; + this.maxWaitingTimeForAccumulatingBatchInMs = maxWaitingTimeForAccumulatingBatchInMs; this.basicRetryWaitTimeMs = basicRetryWaitTimeMs; this.maxRetryWaitTimeMs = maxRetryWaitTimeMs; this.walThrottleThreshold = walThrottleThreshold; @@ -311,6 +314,10 @@ public class IoTConsensusConfig { return maxWaitingTimeForWaitBatchInMs; } + public int getMaxWaitingTimeForAccumulatingBatchInMs() { + return maxWaitingTimeForAccumulatingBatchInMs; + } + public long getBasicRetryWaitTimeMs() { return basicRetryWaitTimeMs; } @@ -347,9 +354,11 @@ public class IoTConsensusConfig { private int maxLogEntriesNumPerBatch = 1024; private int maxSizePerBatch = 16 * 1024 * 1024; - private int maxPendingBatchesNum = 12; + private int maxPendingBatchesNum = 5; private int maxQueueLength = 4096; private long maxWaitingTimeForWaitBatchInMs = 10 * 1000L; + + private int maxWaitingTimeForAccumulatingBatchInMs = 500; private long basicRetryWaitTimeMs = TimeUnit.MILLISECONDS.toMillis(100); private long maxRetryWaitTimeMs = TimeUnit.SECONDS.toMillis(20); private long walThrottleThreshold = 50 * 1024 * 1024 * 1024L; @@ -384,6 +393,12 @@ public class IoTConsensusConfig { return this; } + public Builder setMaxWaitingTimeForAccumulatingBatchInMs( + int maxWaitingTimeForAccumulatingBatchInMs) { + this.maxWaitingTimeForAccumulatingBatchInMs = maxWaitingTimeForAccumulatingBatchInMs; + return this; + } + public Replication.Builder setBasicRetryWaitTimeMs(long basicRetryWaitTimeMs) { this.basicRetryWaitTimeMs = basicRetryWaitTimeMs; return this; @@ -426,6 +441,7 @@ public class IoTConsensusConfig { maxPendingBatchesNum, maxQueueLength, maxWaitingTimeForWaitBatchInMs, + maxWaitingTimeForAccumulatingBatchInMs, basicRetryWaitTimeMs, maxRetryWaitTimeMs, walThrottleThreshold, diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java index dbf9336043c..be7a5d83a1f 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java @@ -316,6 +316,11 @@ public class LogDispatcher { pendingEntries.poll(PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC, TimeUnit.SECONDS); if (request != null) { bufferedEntries.add(request); + // If write pressure is low, we simply sleep a little to reduce the number of RPC + if (pendingEntries.size() <= config.getReplication().getMaxLogEntriesNumPerBatch() + && bufferedEntries.isEmpty()) { + Thread.sleep(config.getReplication().getMaxWaitingTimeForAccumulatingBatchInMs()); + } } } logDispatcherThreadMetrics.recordConstructBatchTime( diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties index aa9cae0552d..733289836aa 100644 --- a/node-commons/src/assembly/resources/conf/iotdb-common.properties +++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties @@ -761,7 +761,7 @@ cluster_name=defaultCluster # The maximum pending batches num in IoTConsensus # Datatype: int -# data_region_iot_max_pending_batches_num = 12 +# data_region_iot_max_pending_batches_num = 5 # The maximum memory ratio for queue in IoTConsensus # Datatype: double
