This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch native_raft in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a1006dc39d4f5bb2a73e678a1cdb7011a2d4880a Author: Tian Jiang <[email protected]> AuthorDate: Fri Apr 14 09:05:30 2023 +0800 add statistics --- .../common/request/ByteBufferConsensusRequest.java | 5 + .../iotdb/consensus/natraft/RaftConsensus.java | 31 ++++ .../consensus/natraft/protocol/RaftConfig.java | 55 +++++++ .../consensus/natraft/protocol/RaftMember.java | 83 +++++++--- .../protocol/heartbeat/HeartbeatThread.java | 4 + .../log/appender/SlidingWindowLogAppender.java | 6 +- .../log/dispatch/AppendNodeEntryHandler.java | 3 + .../protocol/log/dispatch/LogDispatcher.java | 19 ++- .../log/dispatch/flowcontrol/FlowBalancer.java | 60 +++++-- .../log/dispatch/flowcontrol/FlowMonitor.java | 86 ++++++---- .../dispatch/flowcontrol/FlowMonitorManager.java | 28 +++- .../log/dispatch/flowcontrol/FlowWindow.java} | 26 ++- .../protocol/log/manager/RaftLogManager.java | 3 + .../serialization/SyncLogDequeSerializer.java | 1 + .../iotdb/consensus/natraft/utils/NodeReport.java | 176 +++++++++++++++++++++ .../iotdb/consensus/natraft/utils/Timer.java | 23 ++- distribution/distribute-ecs.sh | 31 ++++ .../plan/planner/plan/node/write/InsertNode.java | 10 ++ .../planner/plan/node/write/InsertTabletNode.java | 43 +++++ .../java/org/apache/iotdb/session/Session.java | 5 +- 20 files changed, 599 insertions(+), 99 deletions(-) diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/ByteBufferConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/ByteBufferConsensusRequest.java index 0310a4e3d3..b05a35a561 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/ByteBufferConsensusRequest.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/ByteBufferConsensusRequest.java @@ -39,4 +39,9 @@ public class ByteBufferConsensusRequest implements IConsensusRequest { public ByteBuffer serializeToByteBuffer() { return byteBuffer; } + + @Override + public long estimateSize() { + return byteBuffer.remaining(); + } } diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java index ac1f88d2a1..2d4611b7be 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java @@ -22,6 +22,8 @@ package org.apache.iotdb.consensus.natraft; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.exception.StartupException; import org.apache.iotdb.commons.service.RegisterManager; @@ -48,6 +50,8 @@ import org.apache.iotdb.consensus.natraft.protocol.RaftMember; import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol.FlowMonitorManager; import org.apache.iotdb.consensus.natraft.service.RaftRPCService; import org.apache.iotdb.consensus.natraft.service.RaftRPCServiceProcessor; +import org.apache.iotdb.consensus.natraft.utils.NodeReport; +import org.apache.iotdb.consensus.natraft.utils.NodeReport.RaftMemberReport; import org.apache.iotdb.consensus.natraft.utils.StatusUtils; import org.apache.iotdb.consensus.natraft.utils.Timer; import org.apache.iotdb.rpc.TSStatusCode; @@ -65,6 +69,8 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; public class RaftConsensus implements IConsensus { @@ -79,6 +85,7 @@ public class RaftConsensus implements IConsensus { private final RegisterManager registerManager = new RegisterManager(); private final RaftConfig config; private final IClientManager<TEndPoint, AsyncRaftServiceClient> clientManager; + private ScheduledExecutorService reportThread; public RaftConsensus(ConsensusConfig config, Registry registry) { this.thisNode = config.getThisNodeEndPoint(); @@ -109,6 +116,9 @@ public class RaftConsensus implements IConsensus { () -> { logger.info(Timer.Statistic.getReport()); })); + reportThread = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("NodeReportThread"); + ScheduledExecutorUtil.safelyScheduleAtFixedRate( + reportThread, this::generateNodeReport, 5, 5, TimeUnit.SECONDS); } private void initAndRecover() throws IOException { @@ -372,4 +382,25 @@ public class RaftConsensus implements IConsensus { public TEndPoint getThisNode() { return thisNode; } + + private void generateNodeReport() { + if (logger.isInfoEnabled()) { + try { + NodeReport report = new NodeReport(thisNode); + List<RaftMemberReport> reports = new ArrayList<>(); + for (RaftMember value : stateMachineMap.values()) { + RaftMemberReport raftMemberReport = value.genMemberReport(); + if (raftMemberReport.getPrevLastLogIndex() != raftMemberReport.getLastLogIndex()) { + reports.add(raftMemberReport); + } + } + if (!reports.isEmpty()) { + report.setMemberReports(reports); + logger.info(report.toString()); + } + } catch (Exception e) { + logger.error("exception occurred when generating node report", e); + } + } + } } diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java index 3d2991da3e..be3e56d231 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java @@ -63,6 +63,7 @@ public class RaftConfig { private int raftLogBufferSize = 64 * 1024 * 1024; private int maxNumberOfLogsPerFetchOnDisk = 1000; private int maxRaftLogIndexSizeInMemory = 64 * 1024; + private int maxRaftLogPersistDataSizePerFile = 1024 * 1024 * 1024; private int maxNumberOfPersistRaftLogFiles = 128; private int maxPersistRaftLogNumberOnDisk = 10_000_000; private int flushRaftLogThreshold = 100_000; @@ -71,6 +72,9 @@ public class RaftConfig { private boolean enableCompressedDispatching = true; private boolean ignoreStateMachine = false; private boolean onlyTestNetwork = false; + private boolean waitApply = true; + private double flowControlMinFlow = 10_000_000; + private double flowControlMaxFlow = 100_000_000; private CompressionType dispatchingCompressionType = CompressionType.SNAPPY; private ConsistencyLevel consistencyLevel = ConsistencyLevel.STRONG_CONSISTENCY; private RPCConfig rpcConfig; @@ -400,6 +404,38 @@ public class RaftConfig { this.ignoreStateMachine = ignoreStateMachine; } + public double getFlowControlMinFlow() { + return flowControlMinFlow; + } + + public void setFlowControlMinFlow(double flowControlMinFlow) { + this.flowControlMinFlow = flowControlMinFlow; + } + + public double getFlowControlMaxFlow() { + return flowControlMaxFlow; + } + + public void setFlowControlMaxFlow(double flowControlMaxFlow) { + this.flowControlMaxFlow = flowControlMaxFlow; + } + + public boolean isWaitApply() { + return waitApply; + } + + public void setWaitApply(boolean waitApply) { + this.waitApply = waitApply; + } + + public int getMaxRaftLogPersistDataSizePerFile() { + return maxRaftLogPersistDataSizePerFile; + } + + public void setMaxRaftLogPersistDataSizePerFile(int maxRaftLogPersistDataSizePerFile) { + this.maxRaftLogPersistDataSizePerFile = maxRaftLogPersistDataSizePerFile; + } + public void loadProperties(Properties properties) { logger.debug("Loading properties: {}", properties); @@ -476,6 +512,11 @@ public class RaftConfig { properties.getProperty( "raft_log_buffer_size", String.valueOf(this.getRaftLogBufferSize())))); + this.setMaxRaftLogPersistDataSizePerFile( + Integer.parseInt( + properties.getProperty( + "raft_log_file_size", String.valueOf(this.getMaxRaftLogPersistDataSizePerFile())))); + this.setLogNumInBatch( Integer.parseInt( properties.getProperty("log_batch_num", String.valueOf(this.getLogNumInBatch())))); @@ -592,6 +633,20 @@ public class RaftConfig { Boolean.parseBoolean( properties.getProperty("only_test_network", String.valueOf(this.isOnlyTestNetwork())))); + this.setWaitApply( + Boolean.parseBoolean( + properties.getProperty("wait_apply", String.valueOf(this.isWaitApply())))); + + this.setFlowControlMinFlow( + Double.parseDouble( + properties.getProperty( + "flow_control_min_flow", String.valueOf(this.getFlowControlMinFlow())))); + + this.setFlowControlMaxFlow( + Double.parseDouble( + properties.getProperty( + "flow_control_max_flow", String.valueOf(this.getFlowControlMaxFlow())))); + String consistencyLevel = properties.getProperty("consistency_level"); if (consistencyLevel != null) { this.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel)); diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java index 5271e793aa..8d84036592 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java @@ -73,6 +73,7 @@ import org.apache.iotdb.consensus.natraft.protocol.log.sequencing.SynchronousSeq import org.apache.iotdb.consensus.natraft.protocol.log.snapshot.DirectorySnapshot; import org.apache.iotdb.consensus.natraft.utils.IOUtils; import org.apache.iotdb.consensus.natraft.utils.LogUtils; +import org.apache.iotdb.consensus.natraft.utils.NodeReport.RaftMemberReport; import org.apache.iotdb.consensus.natraft.utils.NodeUtils; import org.apache.iotdb.consensus.natraft.utils.Response; import org.apache.iotdb.consensus.natraft.utils.StatusUtils; @@ -176,6 +177,7 @@ public class RaftMember { private ExecutorService commitLogPool; private long lastCommitTaskTime; + private long lastReportIndex; /** * logDispatcher buff the logs orderly according to their log indexes and send them sequentially, @@ -512,7 +514,9 @@ public class RaftMember { } AppendEntryResult response; + long startTime = Statistic.RAFT_RECEIVER_PARSE_ENTRY.getOperationStartTime(); List<Entry> entries = LogUtils.parseEntries(request.entries, stateMachine); + Statistic.RAFT_RECEIVER_PARSE_ENTRY.calOperationCostTimeFromStart(startTime); response = logAppender.appendEntries(request.leaderCommit, request.term, entries); @@ -644,7 +648,7 @@ public class RaftMember { Statistic.LOG_DISPATCHER_FROM_RECEIVE_TO_CREATE.add(entry.createTime - entry.receiveTime); if (config.isUseFollowerLoadBalance()) { - FlowMonitorManager.INSTANCE.report(thisNode, entry.estimateSize()); + FlowMonitorManager.INSTANCE.report(thisNode.getEndpoint(), entry.estimateSize()); } if (votingEntry == null) { @@ -677,15 +681,15 @@ public class RaftMember { } } - private TSStatus includeLogNumbersInStatus(TSStatus status, Entry entry) { + private TSStatus includeLogNumbersInStatus(TSStatus status, long index, long term) { return status.setMessage( getRaftGroupId().getType().ordinal() + "-" + getRaftGroupId().getId() + "-" - + entry.getCurrLogIndex() + + index + "-" - + entry.getCurrLogTerm()); + + term); } protected AppendLogResult waitAppendResult(VotingEntry votingEntry) { @@ -748,28 +752,32 @@ public class RaftMember { } long waitTime = 1; AcceptedType acceptedType = votingLogList.computeAcceptedType(log); - synchronized (log.getEntry()) { - while (acceptedType == AcceptedType.NOT_ACCEPTED - && alreadyWait < config.getWriteOperationTimeoutMS()) { + + while (acceptedType == AcceptedType.NOT_ACCEPTED + && alreadyWait < config.getWriteOperationTimeoutMS()) { + long startTime = Statistic.RAFT_SENDER_LOG_APPEND_WAIT.getOperationStartTime(); + synchronized (log.getEntry()) { try { log.getEntry().wait(waitTime); } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.warn("Unexpected interruption when sending a log", e); } - acceptedType = votingLogList.computeAcceptedType(log); - - alreadyWait = (System.nanoTime() - waitStart) / 1000000; - if (alreadyWait > nextTimeToPrint) { - logger.info( - "Still not receive enough votes for {}, weakly " + "accepted {}, wait {}ms ", - log, - log.getWeaklyAcceptedNodes(), - alreadyWait); - nextTimeToPrint *= 2; - } + } + Statistic.RAFT_SENDER_LOG_APPEND_WAIT.calOperationCostTimeFromStart(startTime); + + acceptedType = votingLogList.computeAcceptedType(log); + alreadyWait = (System.nanoTime() - waitStart) / 1000000; + if (alreadyWait > nextTimeToPrint) { + logger.info( + "Still not receive enough votes for {}, weakly " + "accepted {}, wait {}ms ", + log, + log.getWeaklyAcceptedNodes(), + alreadyWait); + nextTimeToPrint *= 2; } } + if (logger.isDebugEnabled()) { Thread.currentThread().setName(threadBaseName); } @@ -973,7 +981,7 @@ public class RaftMember { logger.debug("{}: plan {} has no where to be forwarded", name, plan); return StatusUtils.NO_LEADER.deepCopy().setMessage("No leader to forward in: " + groupId); } - logger.debug("{}: Forward {} to node {}", name, plan, node); + logger.info("{}: Forward {} to node {}", name, plan, node); TSStatus status; status = forwardPlanAsync(plan, node, groupId); @@ -1244,11 +1252,25 @@ public class RaftMember { System.nanoTime() - votingEntry.getEntry().createTime); switch (appendLogResult) { case WEAK_ACCEPT: + Statistic.RAFT_LEADER_WEAK_ACCEPT.add(1); return includeLogNumbersInStatus( - StatusUtils.getStatus(TSStatusCode.WEAKLY_ACCEPTED), votingEntry.getEntry()); + StatusUtils.getStatus(TSStatusCode.SUCCESS_STATUS), + votingEntry.getEntry().getCurrLogIndex(), + votingEntry.getEntry().getCurrLogTerm()); case OK: - waitApply(votingEntry.getEntry()); - return includeLogNumbersInStatus(StatusUtils.OK.deepCopy(), votingEntry.getEntry()); + if (config.isWaitApply()) { + waitApply(votingEntry.getEntry()); + return includeLogNumbersInStatus( + StatusUtils.OK.deepCopy(), + votingEntry.getEntry().getCurrLogIndex(), + votingEntry.getEntry().getCurrLogTerm()); + } else { + return includeLogNumbersInStatus( + StatusUtils.OK.deepCopy(), + logManager.getAppliedIndex(), + logManager.getAppliedTerm()); + } + case TIME_OUT: logger.debug("{}: log {} timed out...", name, votingEntry.getEntry()); break; @@ -1345,4 +1367,21 @@ public class RaftMember { return StatusUtils.TIME_OUT; } } + + public RaftMemberReport genMemberReport() { + long prevLastLogIndex = lastReportIndex; + lastReportIndex = logManager.getLastLogIndex(); + return new RaftMemberReport( + status.role, + status.getLeader().get(), + status.getTerm().get(), + logManager.getLastLogTerm(), + lastReportIndex, + logManager.getCommitLogIndex(), + logManager.getCommitLogTerm(), + readOnly, + heartbeatThread.getLastHeartbeatReceivedTime(), + prevLastLogIndex, + logManager.getAppliedIndex()); + } } diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java index 660e6feff6..4f2bdc07b5 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java @@ -368,4 +368,8 @@ public class HeartbeatThread implements Runnable { public Object getElectionWaitObject() { return electionWaitObject; } + + public long getLastHeartbeatReceivedTime() { + return lastHeartbeatReceivedTime; + } } diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java index 39b58cd424..9702b4b461 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java @@ -82,7 +82,7 @@ public class SlidingWindowLogAppender implements LogAppender { // check the next entry Entry entry = logWindow[pos]; boolean nextMismatch = false; - if (logWindow[pos + 1] != null && pos < windowCapacity - 1) { + if (pos < windowCapacity - 1 && logWindow[pos + 1] != null) { long nextPrevTerm = logWindow[pos + 1].getPrevTerm(); if (nextPrevTerm != entry.getCurrLogTerm()) { nextMismatch = true; @@ -180,7 +180,9 @@ public class SlidingWindowLogAppender implements LogAppender { AppendEntryResult result = null; for (Entry entry : entries) { + long startTime = Statistic.RAFT_RECEIVER_APPEND_ONE_ENTRY.getOperationStartTime(); result = appendEntry(leaderCommit, entry); + Statistic.RAFT_RECEIVER_APPEND_ONE_ENTRY.calOperationCostTimeFromStart(startTime); if (result.status != Response.RESPONSE_AGREE && result.status != Response.RESPONSE_STRONG_ACCEPT @@ -215,8 +217,10 @@ public class SlidingWindowLogAppender implements LogAppender { checkLog(windowPos); if (windowPos == 0) { appended = flushWindow(result); + Statistic.RAFT_FOLLOWER_STRONG_ACCEPT.calOperationCostTimeFromStart(startTime); } else { result.status = Response.RESPONSE_WEAK_ACCEPT; + Statistic.RAFT_FOLLOWER_WEAK_ACCEPT.calOperationCostTimeFromStart(startTime); } } else { result.setStatus(Response.RESPONSE_OUT_OF_WINDOW); diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java index 3c98156091..7d169e6e66 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java @@ -76,6 +76,9 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe if (resp == RESPONSE_STRONG_ACCEPT || resp == RESPONSE_AGREE) { member.getVotingLogList().onStronglyAccept(votingEntry, trueReceiver); member.getStatus().getPeerMap().get(trueReceiver).setMatchIndex(response.lastLogIndex); + synchronized (votingEntry.getEntry()) { + votingEntry.getEntry().notifyAll(); + } } else if (resp > 0) { // a response > 0 is the follower's term // the leadership is stale, wait for the new leader's heartbeat diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java index 463f9f0748..38ad5ff228 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java @@ -96,9 +96,10 @@ public class LogDispatcher { } public void updateRateLimiter() { - logger.info("TEndPoint rates: {}", nodesRate); + logger.info("{}: TEndPoint rates: {}", member.getName(), nodesRate); for (Entry<Peer, Double> nodeDoubleEntry : nodesRate.entrySet()) { - nodesRateLimiter.get(nodeDoubleEntry.getKey()).setRate(nodeDoubleEntry.getValue()); + nodesRateLimiter.put( + nodeDoubleEntry.getKey(), RateLimiter.create(nodeDoubleEntry.getValue())); } } @@ -106,6 +107,7 @@ public class LogDispatcher { BlockingQueue<VotingEntry> logBlockingQueue; logBlockingQueue = new ArrayBlockingQueue<>(config.getMaxNumOfLogsInMem()); nodesLogQueuesMap.put(node, logBlockingQueue); + nodesRate.put(node, Double.MAX_VALUE); nodesRateLimiter.put(node, RateLimiter.create(Double.MAX_VALUE)); for (int i = 0; i < bindingThreadNum; i++) { @@ -261,6 +263,13 @@ public class LogDispatcher { request.getAppendEntryRequest().entry = request.getEntry().serialize(); request.getEntry().setByteSize(request.getAppendEntryRequest().entry.limit()); + logger.debug( + "{}/{}={}", + request.getEntry().estimateSize(), + request.getAppendEntryRequest().entry.remaining(), + request.getEntry().estimateSize() + * 1.0 + / request.getAppendEntryRequest().entry.remaining()); } } @@ -346,12 +355,12 @@ public class LogDispatcher { for (; logIndex < currBatch.size(); logIndex++) { VotingEntry entry = currBatch.get(logIndex); - long curSize = entry.getAppendEntryRequest().entry.array().length; + long curSize = entry.getAppendEntryRequest().entry.remaining(); if (logSizeLimit - curSize - logSize <= IoTDBConstant.LEFT_SIZE_IN_REQUEST) { break; } logSize += curSize; - logList.add(entry.getAppendEntryRequest().entry); + logList.add(entry.getAppendEntryRequest().entry.slice()); Statistic.LOG_DISPATCHER_FROM_CREATE_TO_SENDING.calOperationCostTimeFromStart( entry.getEntry().createTime); } @@ -365,7 +374,7 @@ public class LogDispatcher { } if (config.isUseFollowerLoadBalance()) { - FlowMonitorManager.INSTANCE.report(receiver, logSize); + FlowMonitorManager.INSTANCE.report(receiver.getEndpoint(), logSize); } nodesRateLimiter.get(receiver).acquire((int) logSize); } diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java index 3c838f81b7..bbc9fd66ee 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java @@ -19,6 +19,7 @@ package org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.natraft.protocol.RaftConfig; @@ -34,6 +35,7 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -42,22 +44,26 @@ import java.util.concurrent.TimeUnit; public class FlowBalancer { private static final Logger logger = LoggerFactory.getLogger(FlowBalancer.class); - private double maxFlow = 900_000_000; - private double minFlow = 10_000_000; + private double maxFlow; + private double minFlow; private int windowsToUse; private double overestimateFactor; private int flowBalanceIntervalMS = 1000; private FlowMonitorManager flowMonitorManager = FlowMonitorManager.INSTANCE; private LogDispatcher logDispatcher; private RaftMember member; - private ScheduledExecutorService scheduledExecutorService; + private volatile boolean inBurst = false; + private RaftConfig config; public FlowBalancer(LogDispatcher logDispatcher, RaftMember member, RaftConfig config) { this.logDispatcher = logDispatcher; this.member = member; - windowsToUse = config.getFollowerLoadBalanceWindowsToUse(); - overestimateFactor = config.getFollowerLoadBalanceOverestimateFactor(); + this.windowsToUse = config.getFollowerLoadBalanceWindowsToUse(); + this.overestimateFactor = config.getFollowerLoadBalanceOverestimateFactor(); + this.minFlow = config.getFlowControlMinFlow(); + this.maxFlow = config.getFlowControlMaxFlow(); + this.config = config; } public void start() { @@ -84,25 +90,54 @@ public class FlowBalancer { int nodeNum = member.getAllNodes().size(); int followerNum = nodeNum - 1; + long flowMonitorWindowInterval = config.getFlowMonitorWindowInterval(); - double thisNodeFlow = flowMonitorManager.averageFlow(member.getThisNode(), windowsToUse); - double assumedFlow = thisNodeFlow * overestimateFactor; - logger.info("Flow of this node: {}", thisNodeFlow); + List<FlowWindow> latestWindows = + flowMonitorManager.getLatestWindows(member.getThisNode().getEndpoint(), windowsToUse); + if (latestWindows.size() < windowsToUse) { + return; + } + int burstWindowNum = 0; + for (FlowWindow latestWindow : latestWindows) { + double assumedFlow = + latestWindow.sum * 1.0 * flowMonitorWindowInterval / 1000 * overestimateFactor; + if (assumedFlow * followerNum > maxFlow) { + burstWindowNum++; + } + } + double assumedFlow = + latestWindows.stream().mapToLong(w -> w.sum).sum() + * 1.0 + / latestWindows.size() + * flowMonitorWindowInterval + * overestimateFactor; + + for (Entry<TEndPoint, FlowMonitor> entry : flowMonitorManager.getMonitorMap().entrySet()) { + logger.info( + "{}: Flow of {}: {}, {}, {}", + member.getName(), + entry.getKey(), + entry.getValue().getLatestWindows(windowsToUse), + entry.getValue().averageFlow(windowsToUse), + inBurst); + } Map<Peer, BlockingQueue<VotingEntry>> nodesLogQueuesMap = logDispatcher.getNodesLogQueuesMap(); Map<Peer, Double> nodesRate = logDispatcher.getNodesRate(); // sort followers according to their queue length followers.sort(Comparator.comparing(node -> nodesLogQueuesMap.get(node).size())); - if (assumedFlow * followerNum > maxFlow) { + if (burstWindowNum > latestWindows.size() / 2 && !inBurst) { enterBurst(nodesRate, nodeNum, assumedFlow, followers); - } else { + logDispatcher.updateRateLimiter(); + } else if (burstWindowNum < latestWindows.size() / 2 && inBurst) { exitBurst(followerNum, nodesRate, followers); + logDispatcher.updateRateLimiter(); } - logDispatcher.updateRateLimiter(); } private void enterBurst( Map<Peer, Double> nodesRate, int nodeNum, double assumedFlow, List<Peer> followers) { + logger.info("{}: entering burst", member.getName()); int followerNum = nodeNum - 1; int quorumFollowerNum = nodeNum / 2; double remainingFlow = maxFlow; @@ -123,13 +158,16 @@ public class FlowBalancer { Peer node = followers.get(i); nodesRate.put(node, flowToRemaining); } + inBurst = true; } private void exitBurst(int followerNum, Map<Peer, Double> nodesRate, List<Peer> followers) { + logger.info("{}: exiting burst", member.getName()); // lift flow limits for (int i = 0; i < followerNum; i++) { Peer node = followers.get(i); nodesRate.put(node, maxFlow); } + inBurst = false; } } diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitor.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitor.java index 79ff88984b..5689a1effd 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitor.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitor.java @@ -19,9 +19,8 @@ package org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol; -import org.apache.iotdb.consensus.common.Peer; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.consensus.natraft.protocol.RaftConfig; -import org.apache.iotdb.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,24 +32,27 @@ import java.io.IOException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Date; import java.util.Iterator; +import java.util.List; public class FlowMonitor { private static final Logger logger = LoggerFactory.getLogger(FlowMonitor.class); private static final String FILE_SUFFIX = ".flow"; - private ArrayDeque<Pair<Long, Long>> windows; + private ArrayDeque<FlowWindow> windows; private long currWindowStart; private long currWindowSum; private long windowInterval; - private Peer node; + private TEndPoint node; private int maxWindowSize; private BufferedWriter writer; private DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); private RaftConfig config; + private boolean recordFlow = false; - public FlowMonitor(Peer node, RaftConfig config) throws IOException { + public FlowMonitor(TEndPoint node, RaftConfig config) throws IOException { this.maxWindowSize = config.getFlowMonitorMaxWindowNum(); this.windows = new ArrayDeque<>(maxWindowSize); this.windowInterval = config.getFlowMonitorWindowInterval(); @@ -61,17 +63,14 @@ public class FlowMonitor { private void initSerializer() throws IOException { String path = - config.getStorageDir() - + File.separator - + node.getEndpoint().getIp() - + "-" - + node.getEndpoint().getPort() - + FILE_SUFFIX; + config.getStorageDir() + File.separator + node.getIp() + "-" + node.getPort() + FILE_SUFFIX; File file = new File(path); file.delete(); - writer = new BufferedWriter(new FileWriter(file)); - writer.write("Time,FlowSum"); - writer.newLine(); + if (recordFlow) { + writer = new BufferedWriter(new FileWriter(file)); + writer.write("Time,FlowSum"); + writer.newLine(); + } } public void close() { @@ -79,11 +78,13 @@ public class FlowMonitor { while (windows.size() > 0) { serializeWindow(); } - try { - writer.close(); - logger.info("Flow monitor {} is closed", node); - } catch (IOException e) { - logger.warn("Cannot close serializer of {}", node, e); + if (writer != null) { + try { + writer.close(); + logger.info("Flow monitor {} is closed", node); + } catch (IOException e) { + logger.warn("Cannot close serializer of {}", node, e); + } } } @@ -93,15 +94,17 @@ public class FlowMonitor { } private void serializeWindow() { - Pair<Long, Long> window = windows.removeFirst(); - try { - String windowString = - String.format("%s,%d", dateFormat.format(new Date(window.left)), window.right); - logger.debug("New window {} serialized by {}", windowString, node); - writer.write(windowString); - writer.newLine(); - } catch (IOException e) { - logger.warn("Cannot serialize window {} of {}", window, node, e); + FlowWindow window = windows.removeFirst(); + if (writer != null) { + try { + String windowString = + String.format("%s,%d", dateFormat.format(new Date(window.start)), window.sum); + logger.debug("New window {} serialized by {}", windowString, node); + writer.write(windowString); + writer.newLine(); + } catch (IOException e) { + logger.warn("Cannot serialize window {} of {}", window, node, e); + } } } @@ -114,7 +117,7 @@ public class FlowMonitor { private void saveWindow() { if (currWindowSum != 0) { checkSize(); - windows.add(new Pair<>(currWindowStart, currWindowSum)); + windows.add(new FlowWindow(currWindowStart, currWindowSum)); logger.debug("New window {},{} generated by {}", currWindowStart, currWindowSum, node); } } @@ -134,11 +137,15 @@ public class FlowMonitor { public double averageFlow(int windowsToUse) { long flowSum = currWindowSum; long intervalSum = System.currentTimeMillis() - currWindowStart; - Iterator<Pair<Long, Long>> windowIterator = windows.descendingIterator(); + if (intervalSum > windowInterval) { + intervalSum = windowInterval; + } + + Iterator<FlowWindow> windowIterator = windows.descendingIterator(); for (int i = 1; i < windowsToUse; i++) { if (windowIterator.hasNext()) { - Pair<Long, Long> window = windowIterator.next(); - flowSum += window.right; + FlowWindow window = windowIterator.next(); + flowSum += window.sum; intervalSum += windowInterval; } else { break; @@ -146,4 +153,19 @@ public class FlowMonitor { } return flowSum * 1.0 / intervalSum * 1000; } + + public List<FlowWindow> getLatestWindows(int windowNum) { + List<FlowWindow> result = new ArrayList<>(); + result.add(new FlowWindow(currWindowStart, currWindowSum)); + Iterator<FlowWindow> windowIterator = windows.descendingIterator(); + for (int i = 1; i < windowNum; i++) { + if (windowIterator.hasNext()) { + FlowWindow window = windowIterator.next(); + result.add(window); + } else { + break; + } + } + return result; + } } diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitorManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitorManager.java index ea7c0f19bf..726ef093db 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitorManager.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitorManager.java @@ -19,13 +19,15 @@ package org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol; -import org.apache.iotdb.consensus.common.Peer; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.consensus.natraft.protocol.RaftConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -34,7 +36,7 @@ public class FlowMonitorManager { private static final Logger logger = LoggerFactory.getLogger(FlowMonitorManager.class); public static final FlowMonitorManager INSTANCE = new FlowMonitorManager(); - private Map<Peer, FlowMonitor> monitorMap = new ConcurrentHashMap<>(); + private Map<TEndPoint, FlowMonitor> monitorMap = new ConcurrentHashMap<>(); private RaftConfig config; private FlowMonitorManager() {} @@ -50,26 +52,36 @@ public class FlowMonitorManager { monitorMap.clear(); } - public void report(Peer peer, long val) { + public void report(TEndPoint endPoint, long val) { FlowMonitor flowMonitor = monitorMap.computeIfAbsent( - peer, + endPoint, p -> { try { return new FlowMonitor(p, config); } catch (IOException e) { - logger.warn("Cannot register flow monitor for {}", peer, e); + logger.warn("Cannot register flow monitor for {}", endPoint, e); return null; } }); if (flowMonitor != null) { flowMonitor.report(val); } else { - logger.warn("Flow monitor {} is not registered", peer); + logger.warn("Flow monitor {} is not registered", endPoint); } } - public double averageFlow(Peer peer, int windowsToUse) { - return monitorMap.get(peer).averageFlow(windowsToUse); + public double averageFlow(TEndPoint endPoint, int windowsToUse) { + FlowMonitor flowMonitor = monitorMap.get(endPoint); + return flowMonitor != null ? flowMonitor.averageFlow(windowsToUse) : 0.0; + } + + public List<FlowWindow> getLatestWindows(TEndPoint endPoint, int windowNum) { + FlowMonitor flowMonitor = monitorMap.get(endPoint); + return flowMonitor != null ? flowMonitor.getLatestWindows(windowNum) : Collections.emptyList(); + } + + public Map<TEndPoint, FlowMonitor> getMonitorMap() { + return monitorMap; } } diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/ByteBufferConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowWindow.java similarity index 55% copy from consensus/src/main/java/org/apache/iotdb/consensus/common/request/ByteBufferConsensusRequest.java copy to consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowWindow.java index 0310a4e3d3..157ecd398e 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/ByteBufferConsensusRequest.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowWindow.java @@ -16,27 +16,19 @@ * specific language governing permissions and limitations * under the License. */ +package org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol; -package org.apache.iotdb.consensus.common.request; +public class FlowWindow { + public long start; + public long sum; -import java.nio.ByteBuffer; - -/* -In general, for the requests from the leader, we can directly strong-cast the class to reduce -the cost of deserialization during the execution of the leader state machine. For the requests -received by the followers, the responsibility of deserialization can generally be transferred -to the state machine layer -*/ -public class ByteBufferConsensusRequest implements IConsensusRequest { - - private final ByteBuffer byteBuffer; - - public ByteBufferConsensusRequest(ByteBuffer byteBuffer) { - this.byteBuffer = byteBuffer; + public FlowWindow(long start, long sum) { + this.start = start; + this.sum = sum; } @Override - public ByteBuffer serializeToByteBuffer() { - return byteBuffer; + public String toString() { + return "[" + +start + "," + sum + ']'; } } diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java index aee2fa76cc..ebb0a0f37a 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java @@ -632,6 +632,9 @@ public abstract class RaftLogManager { if (entry.createTime != 0) { entry.committedTime = System.nanoTime(); Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_COMMIT.add(entry.committedTime - entry.createTime); + synchronized (entry) { + entry.notify(); + } } entry.setSerializationCache(null); } diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java index a9aaae7d83..24b6755771 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java @@ -157,6 +157,7 @@ public class SyncLogDequeSerializer implements StableEntryManager { maxRaftLogIndexSizeInMemory = config.getMaxRaftLogIndexSizeInMemory(); maxNumberOfPersistRaftLogFiles = config.getMaxNumberOfPersistRaftLogFiles(); maxPersistRaftLogNumberOnDisk = config.getMaxPersistRaftLogNumberOnDisk(); + maxRaftLogPersistDataSizePerFile = config.getMaxRaftLogPersistDataSizePerFile(); this.logDataFileList = new ArrayList<>(); this.logIndexFileList = new ArrayList<>(); diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeReport.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeReport.java new file mode 100644 index 0000000000..e6220a188d --- /dev/null +++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeReport.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.consensus.natraft.utils; + +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.consensus.common.Peer; +import org.apache.iotdb.consensus.natraft.protocol.RaftRole; + +import java.util.ArrayList; +import java.util.List; + +/** + * A node report collects the current runtime information of the local node, which contains: 1. The + * MetaMemberReport of the meta member. 2. The DataMemberReports of each data member. + */ +@SuppressWarnings("java:S107") // reports need enough parameters +public class NodeReport { + + private TEndPoint thisNode; + private List<RaftMemberReport> memberReports; + + public NodeReport(TEndPoint thisNode) { + this.thisNode = thisNode; + memberReports = new ArrayList<>(); + } + + public void setMemberReports(List<RaftMemberReport> memberReports) { + this.memberReports = memberReports; + } + + @Override + public String toString() { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append("Report of ").append(thisNode).append(System.lineSeparator()); + for (RaftMemberReport memberReport : memberReports) { + stringBuilder.append(memberReport).append(System.lineSeparator()); + } + return stringBuilder.toString(); + } + + /** + * A RaftMemberReport contains the character, leader, term, last log term/index of a raft member. + */ + public static class RaftMemberReport { + RaftRole character; + Peer leader; + long term; + long lastLogTerm; + long lastLogIndex; + long commitIndex; + long commitTerm; + boolean isReadOnly; + long lastHeartbeatReceivedTime; + long prevLastLogIndex; + long maxAppliedLogIndex; + + public RaftMemberReport( + RaftRole character, + Peer leader, + long term, + long lastLogTerm, + long lastLogIndex, + long commitIndex, + long commitTerm, + boolean isReadOnly, + long lastHeartbeatReceivedTime, + long prevLastLogIndex, + long maxAppliedLogIndex) { + this.character = character; + this.leader = leader; + this.term = term; + this.lastLogTerm = lastLogTerm; + this.lastLogIndex = lastLogIndex; + this.commitIndex = commitIndex; + this.commitTerm = commitTerm; + this.isReadOnly = isReadOnly; + this.lastHeartbeatReceivedTime = lastHeartbeatReceivedTime; + this.prevLastLogIndex = prevLastLogIndex; + this.maxAppliedLogIndex = maxAppliedLogIndex; + } + + @Override + public String toString() { + String transportCompressionReport = ""; + return "RaftReport {\n" + + "character=" + + character + + ", Leader=" + + leader + + ", term=" + + term + + ", lastLogTerm=" + + lastLogTerm + + ", lastLogIndex=" + + lastLogIndex + + ", commitIndex=" + + commitIndex + + ", commitTerm=" + + commitTerm + + ", appliedLogIndex=" + + maxAppliedLogIndex + + ", readOnly=" + + isReadOnly + + ", lastHeartbeat=" + + (System.currentTimeMillis() - lastHeartbeatReceivedTime) + + "ms ago" + + ", logIncrement=" + + (lastLogIndex - prevLastLogIndex) + + transportCompressionReport + + ", \n timer: " + + Timer.Statistic.getReport() + + '}'; + } + + public RaftRole getCharacter() { + return character; + } + + public Peer getLeader() { + return leader; + } + + public long getTerm() { + return term; + } + + public long getLastLogTerm() { + return lastLogTerm; + } + + public long getLastLogIndex() { + return lastLogIndex; + } + + public long getCommitIndex() { + return commitIndex; + } + + public long getCommitTerm() { + return commitTerm; + } + + public boolean isReadOnly() { + return isReadOnly; + } + + public long getLastHeartbeatReceivedTime() { + return lastHeartbeatReceivedTime; + } + + public long getPrevLastLogIndex() { + return prevLastLogIndex; + } + + public long getMaxAppliedLogIndex() { + return maxAppliedLogIndex; + } + } +} diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java index 9631b892fd..de89657540 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java @@ -196,6 +196,12 @@ public class Timer { TIME_SCALE, true, META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP), + RAFT_RECEIVER_PARSE_ENTRY( + RAFT_MEMBER_RECEIVER, + "receiver parse entries", + TIME_SCALE, + true, + META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP), RAFT_RECEIVER_WAIT_FOR_PREV_LOG( RAFT_MEMBER_RECEIVER, "receiver wait for prev log", @@ -220,6 +226,12 @@ public class Timer { TIME_SCALE, true, META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP), + RAFT_RECEIVER_APPEND_ONE_ENTRY( + RAFT_MEMBER_RECEIVER, + "receiver append one entries", + TIME_SCALE, + true, + META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP), RAFT_RECEIVER_APPEND_ENTRY( RAFT_MEMBER_RECEIVER, "append entrys", @@ -344,6 +356,12 @@ public class Timer { TIME_SCALE, true, META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP), + RAFT_SENDER_LOG_APPEND_WAIT( + LOG_DISPATCHER, + "wait for being appended", + TIME_SCALE, + true, + META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP), RAFT_SENDER_LOG_FROM_CREATE_TO_WAIT_APPEND_END( LOG_DISPATCHER, "from create to wait append end", @@ -382,7 +400,10 @@ public class Timer { RAFT_RELAYED_LEVEL1_NUM(RAFT_MEMBER_SENDER, "level 1 relay node number", 1, true, ROOT), RAFT_RECEIVE_RELAY_ACK(RAFT_MEMBER_SENDER, "receive relay ack", 1, true, ROOT), RAFT_SENDER_OOW(RAFT_MEMBER_SENDER, "out of window", 1, true, ROOT), - RAFT_WEAK_ACCEPT(RAFT_MEMBER_SENDER, "weak accept", 1, true, ROOT), + RAFT_LEADER_WEAK_ACCEPT(RAFT_MEMBER_SENDER, "leader weak accept", 1, true, ROOT), + RAFT_FOLLOWER_WEAK_ACCEPT(RAFT_MEMBER_SENDER, "follower weak accept", TIME_SCALE, true, ROOT), + RAFT_FOLLOWER_STRONG_ACCEPT( + RAFT_MEMBER_SENDER, "follower strong accept", TIME_SCALE, true, ROOT), RAFT_CONCURRENT_SENDER(RAFT_MEMBER_SENDER, "concurrent sender", 1, true, ROOT), RAFT_INDEX_BLOCKER(RAFT_MEMBER_SENDER, "index blocker", 1, true, ROOT), RAFT_APPEND_BLOCKER(RAFT_MEMBER_SENDER, "append blocker", 1, true, ROOT), diff --git a/distribution/distribute-ecs.sh b/distribution/distribute-ecs.sh new file mode 100644 index 0000000000..434e485e31 --- /dev/null +++ b/distribution/distribute-ecs.sh @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +src_lib_path=/d/CodeRepo/iotdb2/distribution/target/apache-iotdb-1.1.0-SNAPSHOT-all-bin/apache-iotdb-1.1.0-SNAPSHOT-all-bin/lib/iotdb* + +ips=(ecs1 ecs2 ecs3 ecs4) +#ips=(dc11 dc12 dc13 dc14 dc11 dc12) +target_lib_path=/root/jt/iotdb_expr/apache-iotdb-1.1.0-SNAPSHOT-all-bin/lib/ + +for ip in ${ips[*]} + do + ssh root@$ip "mkdir $target_lib_path" + scp -r $src_lib_path root@$ip:$target_lib_path + done \ No newline at end of file diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java index ae890e6a32..4b40393b25 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java @@ -399,4 +399,14 @@ public abstract class InsertNode extends WritePlanNode { public PartialPath conflictKey() { return devicePath; } + + @Override + public long estimateSize() { + long size = 0; + size += devicePath.getFullPath().length() * 2L; + for (String measurement : measurements) { + size += measurement.length() * 2L; + } + return size; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java index 19e157493f..7fa6b8ba30 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java @@ -1159,4 +1159,47 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche throw new SemanticException(e); } } + + @Override + public long estimateSize() { + long size = super.estimateSize(); + + size += times.length * 8L; + + for (int measurementIndex = 0; measurementIndex < columns.length; measurementIndex++) { + switch (dataTypes[measurementIndex]) { + case INT32: + int[] intValues = (int[]) columns[measurementIndex]; + size += intValues.length * 4L; + break; + case INT64: + long[] longValues = (long[]) columns[measurementIndex]; + size += longValues.length * 8L; + break; + case FLOAT: + float[] floatValues = (float[]) columns[measurementIndex]; + size += floatValues.length * 4L; + break; + case DOUBLE: + double[] doubleValues = (double[]) columns[measurementIndex]; + size += doubleValues.length * 8L; + break; + case BOOLEAN: + boolean[] boolValues = (boolean[]) columns[measurementIndex]; + size += boolValues.length; + break; + case TEXT: + Binary[] binaryValues = (Binary[]) columns[measurementIndex]; + for (Binary binaryValue : binaryValues) { + size += binaryValue.getLength(); + } + break; + default: + throw new UnSupportedDataTypeException( + String.format(DATATYPE_UNSUPPORTED, dataTypes[measurementIndex])); + } + } + + return size; + } } diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java index d3edee11d5..6010d8a87f 100644 --- a/session/src/main/java/org/apache/iotdb/session/Session.java +++ b/session/src/main/java/org/apache/iotdb/session/Session.java @@ -746,7 +746,7 @@ public class Session implements ISession { } catch (RedirectException e) { handleQueryRedirection(e.getEndPoint()); if (enableQueryRedirection) { - logger.debug( + logger.info( "{} redirect query {} to {}", defaultSessionConnection.getEndPoint(), sql, @@ -795,7 +795,7 @@ public class Session implements ISession { } catch (RedirectException e) { handleQueryRedirection(e.getEndPoint()); if (enableQueryRedirection) { - logger.debug("redirect query {} to {}", paths, e.getEndPoint()); + logger.info("redirect query {} to {}", paths, e.getEndPoint()); // retry try { return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime, timeOut); @@ -1089,6 +1089,7 @@ public class Session implements ISession { private void handleRedirection(String deviceId, TEndPoint endpoint) { if (enableRedirection) { + logger.info("Redirect to {}", endpoint); // no need to redirection if (endpoint.ip.equals("0.0.0.0")) { return;
