This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.1 by this push:
new 97cbc9bc42 improve (#9772)
97cbc9bc42 is described below
commit 97cbc9bc4210ded2623808a9bcbeb2c42cd797a1
Author: Potato <[email protected]>
AuthorDate: Mon May 8 10:15:29 2023 +0800
improve (#9772)
---
.../iotdb/consensus/config/IoTConsensusConfig.java | 36 ++++++++++------------
.../consensus/iot/logdispatcher/LogDispatcher.java | 8 ++---
.../logdispatcher/LogDispatcherThreadMetrics.java | 22 +++++++++++++
.../consensus/iot/logdispatcher/SyncStatus.java | 2 --
4 files changed, 41 insertions(+), 27 deletions(-)
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
b/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
index 9c676fdaa9..53bb05d012 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
@@ -235,8 +235,9 @@ public class IoTConsensusConfig {
private final int maxLogEntriesNumPerBatch;
private final int maxSizePerBatch;
private final int maxPendingBatchesNum;
+
+ private final int maxQueueLength;
private final long maxWaitingTimeForWaitBatchInMs;
- private final int maxWaitingTimeForAccumulatingBatchInMs;
private final long basicRetryWaitTimeMs;
private final long maxRetryWaitTimeMs;
private final long walThrottleThreshold;
@@ -249,8 +250,8 @@ public class IoTConsensusConfig {
int maxLogEntriesNumPerBatch,
int maxSizePerBatch,
int maxPendingBatchesNum,
+ int maxQueueLength,
long maxWaitingTimeForWaitBatchInMs,
- int maxWaitingTimeForAccumulatingBatchInMs,
long basicRetryWaitTimeMs,
long maxRetryWaitTimeMs,
long walThrottleThreshold,
@@ -261,8 +262,8 @@ public class IoTConsensusConfig {
this.maxLogEntriesNumPerBatch = maxLogEntriesNumPerBatch;
this.maxSizePerBatch = maxSizePerBatch;
this.maxPendingBatchesNum = maxPendingBatchesNum;
+ this.maxQueueLength = maxQueueLength;
this.maxWaitingTimeForWaitBatchInMs = maxWaitingTimeForWaitBatchInMs;
- this.maxWaitingTimeForAccumulatingBatchInMs =
maxWaitingTimeForAccumulatingBatchInMs;
this.basicRetryWaitTimeMs = basicRetryWaitTimeMs;
this.maxRetryWaitTimeMs = maxRetryWaitTimeMs;
this.walThrottleThreshold = walThrottleThreshold;
@@ -284,12 +285,12 @@ public class IoTConsensusConfig {
return maxPendingBatchesNum;
}
- public long getMaxWaitingTimeForWaitBatchInMs() {
- return maxWaitingTimeForWaitBatchInMs;
+ public int getMaxQueueLength() {
+ return maxQueueLength;
}
- public int getMaxWaitingTimeForAccumulatingBatchInMs() {
- return maxWaitingTimeForAccumulatingBatchInMs;
+ public long getMaxWaitingTimeForWaitBatchInMs() {
+ return maxWaitingTimeForWaitBatchInMs;
}
public long getBasicRetryWaitTimeMs() {
@@ -326,13 +327,11 @@ public class IoTConsensusConfig {
public static class Builder {
- private int maxLogEntriesNumPerBatch = 30;
+ private int maxLogEntriesNumPerBatch = 1024;
private int maxSizePerBatch = 16 * 1024 * 1024;
- // (IMPORTANT) Value of this variable should be the same with
MAX_REQUEST_CACHE_SIZE
- // in DataRegionStateMachine
- private int maxPendingBatchesNum = 5;
+ private int maxPendingBatchesNum = 12;
+ private int maxQueueLength = 4096;
private long maxWaitingTimeForWaitBatchInMs = 10 * 1000L;
- private int maxWaitingTimeForAccumulatingBatchInMs = 500;
private long basicRetryWaitTimeMs = TimeUnit.MILLISECONDS.toMillis(100);
private long maxRetryWaitTimeMs = TimeUnit.SECONDS.toMillis(20);
private long walThrottleThreshold = 50 * 1024 * 1024 * 1024L;
@@ -356,15 +355,14 @@ public class IoTConsensusConfig {
return this;
}
- public Replication.Builder setMaxWaitingTimeForWaitBatchInMs(
- long maxWaitingTimeForWaitBatchInMs) {
- this.maxWaitingTimeForWaitBatchInMs = maxWaitingTimeForWaitBatchInMs;
+ public Builder setMaxQueueLength(int maxQueueLength) {
+ this.maxQueueLength = maxQueueLength;
return this;
}
- public Replication.Builder setMaxWaitingTimeForAccumulatingBatchInMs(
- int maxWaitingTimeForAccumulatingBatchInMs) {
- this.maxWaitingTimeForAccumulatingBatchInMs =
maxWaitingTimeForAccumulatingBatchInMs;
+ public Replication.Builder setMaxWaitingTimeForWaitBatchInMs(
+ long maxWaitingTimeForWaitBatchInMs) {
+ this.maxWaitingTimeForWaitBatchInMs = maxWaitingTimeForWaitBatchInMs;
return this;
}
@@ -408,8 +406,8 @@ public class IoTConsensusConfig {
maxLogEntriesNumPerBatch,
maxSizePerBatch,
maxPendingBatchesNum,
+ maxQueueLength,
maxWaitingTimeForWaitBatchInMs,
- maxWaitingTimeForAccumulatingBatchInMs,
basicRetryWaitTimeMs,
maxRetryWaitTimeMs,
walThrottleThreshold,
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index f6da0efd52..cd70efe655 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -46,9 +46,9 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.OptionalLong;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@@ -214,7 +214,7 @@ public class LogDispatcher {
public LogDispatcherThread(Peer peer, IoTConsensusConfig config, long
initialSyncIndex) {
this.peer = peer;
this.config = config;
- this.pendingEntries = new LinkedBlockingQueue<>();
+ this.pendingEntries = new
ArrayBlockingQueue<>(config.getReplication().getMaxQueueLength());
this.controller =
new IndexController(
impl.getStorageDir(),
@@ -314,10 +314,6 @@ public class LogDispatcher {
pendingEntries.poll(PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC,
TimeUnit.SECONDS);
if (request != null) {
bufferedEntries.add(request);
- // If write pressure is low, we simply sleep a little to reduce
the number of RPC
- if (pendingEntries.size() <=
config.getReplication().getMaxLogEntriesNumPerBatch()) {
-
Thread.sleep(config.getReplication().getMaxWaitingTimeForAccumulatingBatchInMs());
- }
}
}
MetricService.getInstance()
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcherThreadMetrics.java
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcherThreadMetrics.java
index 2ca5b91275..24d0960dc0 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcherThreadMetrics.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcherThreadMetrics.java
@@ -48,6 +48,18 @@ public class LogDispatcherThreadMetrics implements
IMetricSet {
logDispatcherThread.getPeer().getGroupId().toString(),
Tag.TYPE.toString(),
"currentSyncIndex");
+ MetricService.getInstance()
+ .createAutoGauge(
+ Metric.IOT_CONSENSUS.toString(),
+ MetricLevel.IMPORTANT,
+ logDispatcherThread,
+ x -> x.getSyncStatus().getPendingBatches().size(),
+ Tag.NAME.toString(),
+ formatName(),
+ Tag.REGION.toString(),
+ logDispatcherThread.getPeer().getGroupId().toString(),
+ Tag.TYPE.toString(),
+ "pipelineNum");
MetricService.getInstance()
.createAutoGauge(
Metric.IOT_CONSENSUS.toString(),
@@ -74,6 +86,16 @@ public class LogDispatcherThreadMetrics implements
IMetricSet {
logDispatcherThread.getPeer().getGroupId().toString(),
Tag.TYPE.toString(),
"currentSyncIndex");
+ MetricService.getInstance()
+ .remove(
+ MetricType.AUTO_GAUGE,
+ Metric.IOT_CONSENSUS.toString(),
+ Tag.NAME.toString(),
+ formatName(),
+ Tag.REGION.toString(),
+ logDispatcherThread.getPeer().getGroupId().toString(),
+ Tag.TYPE.toString(),
+ "pipelineNum");
MetricService.getInstance()
.remove(
MetricType.AUTO_GAUGE,
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
index de6d0691dc..8e1f40db8f 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.consensus.iot.logdispatcher;
-import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.config.IoTConsensusConfig;
import java.util.Iterator;
@@ -102,7 +101,6 @@ public class SyncStatus {
}
}
- @TestOnly
public List<Batch> getPendingBatches() {
return pendingBatches;
}