This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.1 by this push:
     new 97cbc9bc42 improve (#9772)
97cbc9bc42 is described below

commit 97cbc9bc4210ded2623808a9bcbeb2c42cd797a1
Author: Potato <[email protected]>
AuthorDate: Mon May 8 10:15:29 2023 +0800

    improve (#9772)
---
 .../iotdb/consensus/config/IoTConsensusConfig.java | 36 ++++++++++------------
 .../consensus/iot/logdispatcher/LogDispatcher.java |  8 ++---
 .../logdispatcher/LogDispatcherThreadMetrics.java  | 22 +++++++++++++
 .../consensus/iot/logdispatcher/SyncStatus.java    |  2 --
 4 files changed, 41 insertions(+), 27 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
index 9c676fdaa9..53bb05d012 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
@@ -235,8 +235,9 @@ public class IoTConsensusConfig {
     private final int maxLogEntriesNumPerBatch;
     private final int maxSizePerBatch;
     private final int maxPendingBatchesNum;
+
+    private final int maxQueueLength;
     private final long maxWaitingTimeForWaitBatchInMs;
-    private final int maxWaitingTimeForAccumulatingBatchInMs;
     private final long basicRetryWaitTimeMs;
     private final long maxRetryWaitTimeMs;
     private final long walThrottleThreshold;
@@ -249,8 +250,8 @@ public class IoTConsensusConfig {
         int maxLogEntriesNumPerBatch,
         int maxSizePerBatch,
         int maxPendingBatchesNum,
+        int maxQueueLength,
         long maxWaitingTimeForWaitBatchInMs,
-        int maxWaitingTimeForAccumulatingBatchInMs,
         long basicRetryWaitTimeMs,
         long maxRetryWaitTimeMs,
         long walThrottleThreshold,
@@ -261,8 +262,8 @@ public class IoTConsensusConfig {
       this.maxLogEntriesNumPerBatch = maxLogEntriesNumPerBatch;
       this.maxSizePerBatch = maxSizePerBatch;
       this.maxPendingBatchesNum = maxPendingBatchesNum;
+      this.maxQueueLength = maxQueueLength;
       this.maxWaitingTimeForWaitBatchInMs = maxWaitingTimeForWaitBatchInMs;
-      this.maxWaitingTimeForAccumulatingBatchInMs = 
maxWaitingTimeForAccumulatingBatchInMs;
       this.basicRetryWaitTimeMs = basicRetryWaitTimeMs;
       this.maxRetryWaitTimeMs = maxRetryWaitTimeMs;
       this.walThrottleThreshold = walThrottleThreshold;
@@ -284,12 +285,12 @@ public class IoTConsensusConfig {
       return maxPendingBatchesNum;
     }
 
-    public long getMaxWaitingTimeForWaitBatchInMs() {
-      return maxWaitingTimeForWaitBatchInMs;
+    public int getMaxQueueLength() {
+      return maxQueueLength;
     }
 
-    public int getMaxWaitingTimeForAccumulatingBatchInMs() {
-      return maxWaitingTimeForAccumulatingBatchInMs;
+    public long getMaxWaitingTimeForWaitBatchInMs() {
+      return maxWaitingTimeForWaitBatchInMs;
     }
 
     public long getBasicRetryWaitTimeMs() {
@@ -326,13 +327,11 @@ public class IoTConsensusConfig {
 
     public static class Builder {
 
-      private int maxLogEntriesNumPerBatch = 30;
+      private int maxLogEntriesNumPerBatch = 1024;
       private int maxSizePerBatch = 16 * 1024 * 1024;
-      // (IMPORTANT) Value of this variable should be the same with 
MAX_REQUEST_CACHE_SIZE
-      // in DataRegionStateMachine
-      private int maxPendingBatchesNum = 5;
+      private int maxPendingBatchesNum = 12;
+      private int maxQueueLength = 4096;
       private long maxWaitingTimeForWaitBatchInMs = 10 * 1000L;
-      private int maxWaitingTimeForAccumulatingBatchInMs = 500;
       private long basicRetryWaitTimeMs = TimeUnit.MILLISECONDS.toMillis(100);
       private long maxRetryWaitTimeMs = TimeUnit.SECONDS.toMillis(20);
       private long walThrottleThreshold = 50 * 1024 * 1024 * 1024L;
@@ -356,15 +355,14 @@ public class IoTConsensusConfig {
         return this;
       }
 
-      public Replication.Builder setMaxWaitingTimeForWaitBatchInMs(
-          long maxWaitingTimeForWaitBatchInMs) {
-        this.maxWaitingTimeForWaitBatchInMs = maxWaitingTimeForWaitBatchInMs;
+      public Builder setMaxQueueLength(int maxQueueLength) {
+        this.maxQueueLength = maxQueueLength;
         return this;
       }
 
-      public Replication.Builder setMaxWaitingTimeForAccumulatingBatchInMs(
-          int maxWaitingTimeForAccumulatingBatchInMs) {
-        this.maxWaitingTimeForAccumulatingBatchInMs = 
maxWaitingTimeForAccumulatingBatchInMs;
+      public Replication.Builder setMaxWaitingTimeForWaitBatchInMs(
+          long maxWaitingTimeForWaitBatchInMs) {
+        this.maxWaitingTimeForWaitBatchInMs = maxWaitingTimeForWaitBatchInMs;
         return this;
       }
 
@@ -408,8 +406,8 @@ public class IoTConsensusConfig {
             maxLogEntriesNumPerBatch,
             maxSizePerBatch,
             maxPendingBatchesNum,
+            maxQueueLength,
             maxWaitingTimeForWaitBatchInMs,
-            maxWaitingTimeForAccumulatingBatchInMs,
             basicRetryWaitTimeMs,
             maxRetryWaitTimeMs,
             walThrottleThreshold,
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index f6da0efd52..cd70efe655 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -46,9 +46,9 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Objects;
 import java.util.OptionalLong;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
@@ -214,7 +214,7 @@ public class LogDispatcher {
     public LogDispatcherThread(Peer peer, IoTConsensusConfig config, long 
initialSyncIndex) {
       this.peer = peer;
       this.config = config;
-      this.pendingEntries = new LinkedBlockingQueue<>();
+      this.pendingEntries = new 
ArrayBlockingQueue<>(config.getReplication().getMaxQueueLength());
       this.controller =
           new IndexController(
               impl.getStorageDir(),
@@ -314,10 +314,6 @@ public class LogDispatcher {
                 pendingEntries.poll(PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC, 
TimeUnit.SECONDS);
             if (request != null) {
               bufferedEntries.add(request);
-              // If write pressure is low, we simply sleep a little to reduce 
the number of RPC
-              if (pendingEntries.size() <= 
config.getReplication().getMaxLogEntriesNumPerBatch()) {
-                
Thread.sleep(config.getReplication().getMaxWaitingTimeForAccumulatingBatchInMs());
-              }
             }
           }
           MetricService.getInstance()
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcherThreadMetrics.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcherThreadMetrics.java
index 2ca5b91275..24d0960dc0 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcherThreadMetrics.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcherThreadMetrics.java
@@ -48,6 +48,18 @@ public class LogDispatcherThreadMetrics implements 
IMetricSet {
             logDispatcherThread.getPeer().getGroupId().toString(),
             Tag.TYPE.toString(),
             "currentSyncIndex");
+    MetricService.getInstance()
+        .createAutoGauge(
+            Metric.IOT_CONSENSUS.toString(),
+            MetricLevel.IMPORTANT,
+            logDispatcherThread,
+            x -> x.getSyncStatus().getPendingBatches().size(),
+            Tag.NAME.toString(),
+            formatName(),
+            Tag.REGION.toString(),
+            logDispatcherThread.getPeer().getGroupId().toString(),
+            Tag.TYPE.toString(),
+            "pipelineNum");
     MetricService.getInstance()
         .createAutoGauge(
             Metric.IOT_CONSENSUS.toString(),
@@ -74,6 +86,16 @@ public class LogDispatcherThreadMetrics implements 
IMetricSet {
             logDispatcherThread.getPeer().getGroupId().toString(),
             Tag.TYPE.toString(),
             "currentSyncIndex");
+    MetricService.getInstance()
+        .remove(
+            MetricType.AUTO_GAUGE,
+            Metric.IOT_CONSENSUS.toString(),
+            Tag.NAME.toString(),
+            formatName(),
+            Tag.REGION.toString(),
+            logDispatcherThread.getPeer().getGroupId().toString(),
+            Tag.TYPE.toString(),
+            "pipelineNum");
     MetricService.getInstance()
         .remove(
             MetricType.AUTO_GAUGE,
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
index de6d0691dc..8e1f40db8f 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.consensus.iot.logdispatcher;
 
-import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.config.IoTConsensusConfig;
 
 import java.util.Iterator;
@@ -102,7 +101,6 @@ public class SyncStatus {
     }
   }
 
-  @TestOnly
   public List<Batch> getPendingBatches() {
     return pendingBatches;
   }

Reply via email to