This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch jira6022_cp in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9b2477009e6b8868c82ec9cedc81f54ca94d0749 Author: Potato <[email protected]> AuthorDate: Thu Jun 22 03:27:56 2023 +0200 [To rel/1.2][IOTDB-6022] Modify the default parameters of iotconsensus (#10267) --- .../iotdb/consensus/config/IoTConsensusConfig.java | 18 +++++++++++++++++- .../consensus/iot/logdispatcher/LogDispatcher.java | 5 +++++ .../assembly/resources/conf/iotdb-common.properties | 2 +- .../apache/iotdb/commons/concurrent/ThreadName.java | 7 +++++++ 4 files changed, 30 insertions(+), 2 deletions(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java index a4300d68b5a..91b4fb262ac 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java @@ -256,6 +256,7 @@ public class IoTConsensusConfig { private final int maxQueueLength; private final long maxWaitingTimeForWaitBatchInMs; + private final int maxWaitingTimeForAccumulatingBatchInMs; private final long basicRetryWaitTimeMs; private final long maxRetryWaitTimeMs; private final long walThrottleThreshold; @@ -270,6 +271,7 @@ public class IoTConsensusConfig { int maxPendingBatchesNum, int maxQueueLength, long maxWaitingTimeForWaitBatchInMs, + int maxWaitingTimeForAccumulatingBatchInMs, long basicRetryWaitTimeMs, long maxRetryWaitTimeMs, long walThrottleThreshold, @@ -282,6 +284,7 @@ public class IoTConsensusConfig { this.maxPendingBatchesNum = maxPendingBatchesNum; this.maxQueueLength = maxQueueLength; this.maxWaitingTimeForWaitBatchInMs = maxWaitingTimeForWaitBatchInMs; + this.maxWaitingTimeForAccumulatingBatchInMs = maxWaitingTimeForAccumulatingBatchInMs; this.basicRetryWaitTimeMs = basicRetryWaitTimeMs; this.maxRetryWaitTimeMs = maxRetryWaitTimeMs; this.walThrottleThreshold = walThrottleThreshold; @@ -311,6 +314,10 @@ public class IoTConsensusConfig { return maxWaitingTimeForWaitBatchInMs; } + public int getMaxWaitingTimeForAccumulatingBatchInMs() { + return maxWaitingTimeForAccumulatingBatchInMs; + } + public long getBasicRetryWaitTimeMs() { return basicRetryWaitTimeMs; } @@ -347,9 +354,11 @@ public class IoTConsensusConfig { private int maxLogEntriesNumPerBatch = 1024; private int maxSizePerBatch = 16 * 1024 * 1024; - private int maxPendingBatchesNum = 12; + private int maxPendingBatchesNum = 5; private int maxQueueLength = 4096; private long maxWaitingTimeForWaitBatchInMs = 10 * 1000L; + + private int maxWaitingTimeForAccumulatingBatchInMs = 500; private long basicRetryWaitTimeMs = TimeUnit.MILLISECONDS.toMillis(100); private long maxRetryWaitTimeMs = TimeUnit.SECONDS.toMillis(20); private long walThrottleThreshold = 50 * 1024 * 1024 * 1024L; @@ -384,6 +393,12 @@ public class IoTConsensusConfig { return this; } + public Builder setMaxWaitingTimeForAccumulatingBatchInMs( + int maxWaitingTimeForAccumulatingBatchInMs) { + this.maxWaitingTimeForAccumulatingBatchInMs = maxWaitingTimeForAccumulatingBatchInMs; + return this; + } + public Replication.Builder setBasicRetryWaitTimeMs(long basicRetryWaitTimeMs) { this.basicRetryWaitTimeMs = basicRetryWaitTimeMs; return this; @@ -426,6 +441,7 @@ public class IoTConsensusConfig { maxPendingBatchesNum, maxQueueLength, maxWaitingTimeForWaitBatchInMs, + maxWaitingTimeForAccumulatingBatchInMs, basicRetryWaitTimeMs, maxRetryWaitTimeMs, walThrottleThreshold, diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java index 4332c88d2e1..42ac263071d 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java @@ -316,6 +316,11 @@ public class LogDispatcher { pendingEntries.poll(PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC, TimeUnit.SECONDS); if (request != null) { bufferedEntries.add(request); + // If write pressure is low, we simply sleep a little to reduce the number of RPC + if (pendingEntries.size() <= config.getReplication().getMaxLogEntriesNumPerBatch() + && bufferedEntries.isEmpty()) { + Thread.sleep(config.getReplication().getMaxWaitingTimeForAccumulatingBatchInMs()); + } } } logDispatcherThreadMetrics.recordConstructBatchTime( diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties index ffd682c0e68..4e464f611d8 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties @@ -765,7 +765,7 @@ cluster_name=defaultCluster # The maximum pending batches num in IoTConsensus # Datatype: int -# data_region_iot_max_pending_batches_num = 12 +# data_region_iot_max_pending_batches_num = 5 # The maximum memory ratio for queue in IoTConsensus # Datatype: double diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java index be9d1090de6..fbd56d6c82e 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java @@ -18,6 +18,9 @@ */ package org.apache.iotdb.commons.concurrent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Arrays; import java.util.HashSet; import java.util.Set; @@ -133,6 +136,7 @@ public enum ThreadName { SYSTEM_SCHEDULE_METRICS("SystemScheduleMetrics"), RESOURCE_CONTROL_DISK_STATISTIC("ResourceControl-DataRegionDiskStatistics"), PROMETHEUS_REACTOR_HTTP_NIO("reactor-http-nio"), + PROMETHEUS_REACTOR_HTTP_EPOLL("reactor-http-epoll"), PROMETHEUS_BOUNDED_ELASTIC("boundedElastic-evictor"), // -------------------------- Other -------------------------- TTL_CHECK("TTL-CHECK"), @@ -147,6 +151,7 @@ public enum ThreadName { UNKOWN("UNKNOWN"); private final String name; + private static final Logger log = LoggerFactory.getLogger(ThreadName.class); private static Set<ThreadName> queryThreadNames = new HashSet<>( Arrays.asList( @@ -258,6 +263,7 @@ public enum ThreadName { SYSTEM_SCHEDULE_METRICS, RESOURCE_CONTROL_DISK_STATISTIC, PROMETHEUS_REACTOR_HTTP_NIO, + PROMETHEUS_REACTOR_HTTP_EPOLL, PROMETHEUS_BOUNDED_ELASTIC)); private static Set<ThreadName> otherThreadNames = new HashSet<>( @@ -352,6 +358,7 @@ public enum ThreadName { } } } + log.debug("Unknown thread name: {}", givenThreadName); return ThreadName.UNKOWN; } }
