This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch native_raft in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 71916f63916bc2738c809787f887843fb04c0245 Merge: a1006dc39d 0ec0351658 Author: Tian Jiang <[email protected]> AuthorDate: Fri Apr 14 09:13:47 2023 +0800 Merge branch 'native_raft' of github.com:apache/iotdb into native_raft # Conflicts: # consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java # consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java consensus/pom.xml | 6 - .../protocol/log/dispatch/DispatcherGroup.java | 131 ++++++++ .../protocol/log/dispatch/DispatcherThread.java | 299 +++++++++++++++++++ .../protocol/log/dispatch/LogDispatcher.java | 330 +++------------------ .../log/dispatch/flowcontrol/FlowBalancer.java | 18 +- 5 files changed, 481 insertions(+), 303 deletions(-) diff --cc consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java index 38ad5ff228,e3a6459104..d63b4f9649 --- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java @@@ -96,24 -73,11 +73,11 @@@ public class LogDispatcher } public void updateRateLimiter() { - logger.info("TEndPoint rates: {}", nodesRate); + logger.info("{}: TEndPoint rates: {}", member.getName(), nodesRate); for (Entry<Peer, Double> nodeDoubleEntry : nodesRate.entrySet()) { - nodesRateLimiter.put( - nodeDoubleEntry.getKey(), RateLimiter.create(nodeDoubleEntry.getValue())); - } - } - - void createQueue(Peer node) { - BlockingQueue<VotingEntry> logBlockingQueue; - logBlockingQueue = new ArrayBlockingQueue<>(config.getMaxNumOfLogsInMem()); - nodesLogQueuesMap.put(node, logBlockingQueue); - nodesRate.put(node, Double.MAX_VALUE); - nodesRateLimiter.put(node, RateLimiter.create(Double.MAX_VALUE)); - - for (int i = 0; i < bindingThreadNum; i++) { - executorServices - .computeIfAbsent(node, n -> createPool(node)) - .submit(newDispatcherThread(node, logBlockingQueue)); + Peer peer = nodeDoubleEntry.getKey(); + Double rate = nodeDoubleEntry.getValue(); + dispatcherGroupMap.get(peer).updateRate(rate); } } diff --cc consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java index bbc9fd66ee,36b692b76a..eb2888d610 --- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java @@@ -19,7 -19,13 +19,14 @@@ package org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; + import java.util.ArrayList; + import java.util.Comparator; + import java.util.List; + import java.util.Map; + import java.util.concurrent.Executors; + import java.util.concurrent.ScheduledExecutorService; + import java.util.concurrent.TimeUnit; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.natraft.protocol.RaftConfig; @@@ -90,49 -81,21 +96,49 @@@ public class FlowBalancer int nodeNum = member.getAllNodes().size(); int followerNum = nodeNum - 1; + long flowMonitorWindowInterval = config.getFlowMonitorWindowInterval(); - double thisNodeFlow = flowMonitorManager.averageFlow(member.getThisNode(), windowsToUse); - double assumedFlow = thisNodeFlow * overestimateFactor; - logger.info("Flow of this node: {}", thisNodeFlow); + List<FlowWindow> latestWindows = + flowMonitorManager.getLatestWindows(member.getThisNode().getEndpoint(), windowsToUse); + if (latestWindows.size() < windowsToUse) { + return; + } + int burstWindowNum = 0; + for (FlowWindow latestWindow : latestWindows) { + double assumedFlow = + latestWindow.sum * 1.0 * flowMonitorWindowInterval / 1000 * overestimateFactor; + if (assumedFlow * followerNum > maxFlow) { + burstWindowNum++; + } + } + double assumedFlow = + latestWindows.stream().mapToLong(w -> w.sum).sum() + * 1.0 + / latestWindows.size() + * flowMonitorWindowInterval + * overestimateFactor; + + for (Entry<TEndPoint, FlowMonitor> entry : flowMonitorManager.getMonitorMap().entrySet()) { + logger.info( + "{}: Flow of {}: {}, {}, {}", + member.getName(), + entry.getKey(), + entry.getValue().getLatestWindows(windowsToUse), + entry.getValue().averageFlow(windowsToUse), + inBurst); + } - Map<Peer, BlockingQueue<VotingEntry>> nodesLogQueuesMap = logDispatcher.getNodesLogQueuesMap(); + Map<Peer, DispatcherGroup> dispatcherGroupMap = logDispatcher.getDispatcherGroupMap(); Map<Peer, Double> nodesRate = logDispatcher.getNodesRate(); // sort followers according to their queue length - followers.sort(Comparator.comparing(node -> nodesLogQueuesMap.get(node).size())); + followers.sort(Comparator.comparing(node -> dispatcherGroupMap.get(node).getQueueSize())); - if (assumedFlow * followerNum > maxFlow) { + if (burstWindowNum > latestWindows.size() / 2 && !inBurst) { enterBurst(nodesRate, nodeNum, assumedFlow, followers); - } else { + logDispatcher.updateRateLimiter(); + } else if (burstWindowNum < latestWindows.size() / 2 && inBurst) { exitBurst(followerNum, nodesRate, followers); + logDispatcher.updateRateLimiter(); } - logDispatcher.updateRateLimiter(); } private void enterBurst( @@@ -166,8 -126,7 +172,8 @@@ // lift flow limits for (int i = 0; i < followerNum; i++) { Peer node = followers.get(i); - nodesRate.put(node, maxFlow); + nodesRate.put(node, Double.MAX_VALUE); } + inBurst = false; } }
