This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 71916f63916bc2738c809787f887843fb04c0245
Merge: a1006dc39d 0ec0351658
Author: Tian Jiang <[email protected]>
AuthorDate: Fri Apr 14 09:13:47 2023 +0800

    Merge branch 'native_raft' of github.com:apache/iotdb into native_raft
    
    # Conflicts:
    #       
consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
    #       
consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java

 consensus/pom.xml                                  |   6 -
 .../protocol/log/dispatch/DispatcherGroup.java     | 131 ++++++++
 .../protocol/log/dispatch/DispatcherThread.java    | 299 +++++++++++++++++++
 .../protocol/log/dispatch/LogDispatcher.java       | 330 +++------------------
 .../log/dispatch/flowcontrol/FlowBalancer.java     |  18 +-
 5 files changed, 481 insertions(+), 303 deletions(-)

diff --cc 
consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
index 38ad5ff228,e3a6459104..d63b4f9649
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
@@@ -96,24 -73,11 +73,11 @@@ public class LogDispatcher 
    }
  
    public void updateRateLimiter() {
 -    logger.info("TEndPoint rates: {}", nodesRate);
 +    logger.info("{}: TEndPoint rates: {}", member.getName(), nodesRate);
      for (Entry<Peer, Double> nodeDoubleEntry : nodesRate.entrySet()) {
-       nodesRateLimiter.put(
-           nodeDoubleEntry.getKey(), 
RateLimiter.create(nodeDoubleEntry.getValue()));
-     }
-   }
- 
-   void createQueue(Peer node) {
-     BlockingQueue<VotingEntry> logBlockingQueue;
-     logBlockingQueue = new 
ArrayBlockingQueue<>(config.getMaxNumOfLogsInMem());
-     nodesLogQueuesMap.put(node, logBlockingQueue);
-     nodesRate.put(node, Double.MAX_VALUE);
-     nodesRateLimiter.put(node, RateLimiter.create(Double.MAX_VALUE));
- 
-     for (int i = 0; i < bindingThreadNum; i++) {
-       executorServices
-           .computeIfAbsent(node, n -> createPool(node))
-           .submit(newDispatcherThread(node, logBlockingQueue));
+       Peer peer = nodeDoubleEntry.getKey();
+       Double rate = nodeDoubleEntry.getValue();
+       dispatcherGroupMap.get(peer).updateRate(rate);
      }
    }
  
diff --cc 
consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
index bbc9fd66ee,36b692b76a..eb2888d610
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
@@@ -19,7 -19,13 +19,14 @@@
  
  package org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol;
  
 +import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+ import java.util.ArrayList;
+ import java.util.Comparator;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.concurrent.Executors;
+ import java.util.concurrent.ScheduledExecutorService;
+ import java.util.concurrent.TimeUnit;
  import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
  import org.apache.iotdb.consensus.common.Peer;
  import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
@@@ -90,49 -81,21 +96,49 @@@ public class FlowBalancer 
  
      int nodeNum = member.getAllNodes().size();
      int followerNum = nodeNum - 1;
 +    long flowMonitorWindowInterval = config.getFlowMonitorWindowInterval();
  
 -    double thisNodeFlow = 
flowMonitorManager.averageFlow(member.getThisNode(), windowsToUse);
 -    double assumedFlow = thisNodeFlow * overestimateFactor;
 -    logger.info("Flow of this node: {}", thisNodeFlow);
 +    List<FlowWindow> latestWindows =
 +        
flowMonitorManager.getLatestWindows(member.getThisNode().getEndpoint(), 
windowsToUse);
 +    if (latestWindows.size() < windowsToUse) {
 +      return;
 +    }
 +    int burstWindowNum = 0;
 +    for (FlowWindow latestWindow : latestWindows) {
 +      double assumedFlow =
 +          latestWindow.sum * 1.0 * flowMonitorWindowInterval / 1000 * 
overestimateFactor;
 +      if (assumedFlow * followerNum > maxFlow) {
 +        burstWindowNum++;
 +      }
 +    }
 +    double assumedFlow =
 +        latestWindows.stream().mapToLong(w -> w.sum).sum()
 +            * 1.0
 +            / latestWindows.size()
 +            * flowMonitorWindowInterval
 +            * overestimateFactor;
 +
 +    for (Entry<TEndPoint, FlowMonitor> entry : 
flowMonitorManager.getMonitorMap().entrySet()) {
 +      logger.info(
 +          "{}: Flow of {}: {}, {}, {}",
 +          member.getName(),
 +          entry.getKey(),
 +          entry.getValue().getLatestWindows(windowsToUse),
 +          entry.getValue().averageFlow(windowsToUse),
 +          inBurst);
 +    }
-     Map<Peer, BlockingQueue<VotingEntry>> nodesLogQueuesMap = 
logDispatcher.getNodesLogQueuesMap();
+     Map<Peer, DispatcherGroup> dispatcherGroupMap = 
logDispatcher.getDispatcherGroupMap();
      Map<Peer, Double> nodesRate = logDispatcher.getNodesRate();
  
      // sort followers according to their queue length
-     followers.sort(Comparator.comparing(node -> 
nodesLogQueuesMap.get(node).size()));
+     followers.sort(Comparator.comparing(node -> 
dispatcherGroupMap.get(node).getQueueSize()));
 -    if (assumedFlow * followerNum > maxFlow) {
 +    if (burstWindowNum > latestWindows.size() / 2 && !inBurst) {
        enterBurst(nodesRate, nodeNum, assumedFlow, followers);
 -    } else {
 +      logDispatcher.updateRateLimiter();
 +    } else if (burstWindowNum < latestWindows.size() / 2 && inBurst) {
        exitBurst(followerNum, nodesRate, followers);
 +      logDispatcher.updateRateLimiter();
      }
 -    logDispatcher.updateRateLimiter();
    }
  
    private void enterBurst(
@@@ -166,8 -126,7 +172,8 @@@
      // lift flow limits
      for (int i = 0; i < followerNum; i++) {
        Peer node = followers.get(i);
-       nodesRate.put(node, maxFlow);
+       nodesRate.put(node, Double.MAX_VALUE);
      }
 +    inBurst = false;
    }
  }

Reply via email to