This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch nbraft in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 44427a98301e6dd02e60dfc51fd359e932e97e3b Author: jt <[email protected]> AuthorDate: Fri Jul 8 21:44:26 2022 +0800 nbraft snapshot --- .../org/apache/iotdb/cluster/config/ClusterConfig.java | 9 +++++++++ .../apache/iotdb/cluster/config/ClusterDescriptor.java | 4 ++++ .../iotdb/cluster/log/FragmentedLogDispatcher.java | 6 +++--- .../org/apache/iotdb/cluster/log/LogDispatcher.java | 2 +- .../java/org/apache/iotdb/cluster/log/VotingLog.java | 6 +++--- .../org/apache/iotdb/cluster/log/VotingLogList.java | 6 ++---- .../cluster/log/appender/SlidingWindowLogAppender.java | 16 ++++++++++++++-- .../iotdb/cluster/log/applier/DataLogApplier.java | 3 ++- .../iotdb/cluster/log/logtypes/PhysicalPlanLog.java | 14 +++++++++++++- .../server/handlers/caller/AppendNodeEntryHandler.java | 16 +++++++++++----- .../apache/iotdb/cluster/server/member/RaftMember.java | 17 +++++++++-------- .../org/apache/iotdb/cluster/server/monitor/Timer.java | 12 ++++++++++++ 12 files changed, 83 insertions(+), 28 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java index 187509b2ef..fb2bbcb8e9 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java @@ -185,6 +185,7 @@ public class ClusterConfig { private boolean useAsyncSequencing = true; private boolean useFollowerSlidingWindow = true; + private int slidingWindowSize = 10000; private boolean enableWeakAcceptance = true; @@ -600,4 +601,12 @@ public class ClusterConfig { public void setOptimizeIndirectBroadcasting(boolean optimizeIndirectBroadcasting) { this.optimizeIndirectBroadcasting = optimizeIndirectBroadcasting; } + + public int getSlidingWindowSize() { + return slidingWindowSize; + } + + public void setSlidingWindowSize(int slidingWindowSize) { + this.slidingWindowSize = slidingWindowSize; + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java index a089e2d75a..765ddbdc1b 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java @@ -323,6 +323,10 @@ public class ClusterDescriptor { properties.getProperty( "use_follower_sliding_window", String.valueOf(config.isUseFollowerSlidingWindow())))); + config.setSlidingWindowSize( + Integer.parseInt( + properties.getProperty( + "sliding_window_size", String.valueOf(config.getSlidingWindowSize())))); config.setEnableWeakAcceptance( Boolean.parseBoolean( diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java index f8954effdc..c6f3c8498a 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/FragmentedLogDispatcher.java @@ -45,13 +45,13 @@ public class FragmentedLogDispatcher extends LogDispatcher { long startTime = Statistic.LOG_DISPATCHER_LOG_ENQUEUE.getOperationStartTime(); request.getVotingLog().getLog().setEnqueueTime(System.nanoTime()); - for (int i = 0; i < nodesLogQueues.size(); i++) { - BlockingQueue<SendLogRequest> nodeLogQueue = nodesLogQueues.get(i); + int i = 0; + for (BlockingQueue<SendLogRequest> nodeLogQueue : nodesLogQueues.values()) { SendLogRequest fragmentedRequest = new SendLogRequest(request); fragmentedRequest.setVotingLog(new VotingLog(request.getVotingLog())); fragmentedRequest .getVotingLog() - .setLog(new FragmentedLog((FragmentedLog) request.getVotingLog().getLog(), i)); + .setLog(new FragmentedLog((FragmentedLog) request.getVotingLog().getLog(), i++)); try { boolean addSucceeded; if (ClusterDescriptor.getInstance().getConfig().isWaitForSlowNode()) { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java index 3e659a1468..23729063ff 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java @@ -80,7 +80,7 @@ public class LogDispatcher { Map<Node, Boolean> nodesEnabled = new HashMap<>(); ExecutorService executorService; private static ExecutorService serializationService = - IoTDBThreadPoolFactory.newFixedThreadPoolWithDaemonThread( + IoTDBThreadPoolFactory.newFixedThreadPool( Runtime.getRuntime().availableProcessors(), "DispatcherEncoder"); public static int bindingThreadNum = clusterConfig.getDispatcherBindingThreadNum(); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java index ebfdccc999..447ff36363 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java @@ -32,10 +32,10 @@ public class VotingLog { public VotingLog(Log log, int groupSize) { this.log = log; - stronglyAcceptedNodeIds = new HashSet<>(groupSize); - weaklyAcceptedNodeIds = new HashSet<>(groupSize); + stronglyAcceptedNodeIds = new HashSet<>(groupSize * 2); + weaklyAcceptedNodeIds = new HashSet<>(groupSize * 2); acceptedTime = new AtomicLong(); - failedNodeIds = new HashSet<>(groupSize); + failedNodeIds = new HashSet<>(groupSize * 2); } public VotingLog(VotingLog another) { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java index 7b1dde2d3e..86721ed8dd 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLogList.java @@ -78,9 +78,7 @@ public class VotingLogList { VotingLog votingLog = logList.get(i); if (votingLog.getLog().getCurrLogIndex() <= index && votingLog.getLog().getCurrLogTerm() == term) { - synchronized (votingLog) { - votingLog.getStronglyAcceptedNodeIds().add(acceptingNodeId); - } + votingLog.getStronglyAcceptedNodeIds().add(acceptingNodeId); if (votingLog.getStronglyAcceptedNodeIds().size() >= quorumSize) { lastEntryIndexToCommit = i; } @@ -110,8 +108,8 @@ public class VotingLogList { } for (VotingLog acceptedLog : acceptedLogs) { + acceptedLog.acceptedTime.set(System.nanoTime()); synchronized (acceptedLog) { - acceptedLog.acceptedTime.set(System.nanoTime()); acceptedLog.notifyAll(); } if (ClusterDescriptor.getInstance().getConfig().isUseIndirectBroadcasting()) { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java index 1ab08449bb..c3b2339465 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/appender/SlidingWindowLogAppender.java @@ -40,7 +40,7 @@ public class SlidingWindowLogAppender implements LogAppender { private static final Logger logger = LoggerFactory.getLogger(SlidingWindowLogAppender.class); - private int windowCapacity = ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem(); + private int windowCapacity = ClusterDescriptor.getInstance().getConfig().getSlidingWindowSize(); private int windowLength = 0; private Log[] logWindow = new Log[windowCapacity]; private long firstPosPrevIndex; @@ -48,6 +48,7 @@ public class SlidingWindowLogAppender implements LogAppender { private RaftMember member; private RaftLogManager logManager; + private Object oowWaitCond = new Object(); public SlidingWindowLogAppender(RaftMember member) { this.member = member; @@ -144,6 +145,7 @@ public class SlidingWindowLogAppender implements LogAppender { for (int i = 1; i <= step; i++) { logWindow[windowCapacity - i] = null; } + windowLength -= step; firstPosPrevIndex = logManager.getLastLogIndex(); } @@ -197,7 +199,9 @@ public class SlidingWindowLogAppender implements LogAppender { retryTime = System.currentTimeMillis() - start; if (result.status == Response.RESPONSE_OUT_OF_WINDOW && retryTime < maxRetry) { try { - Thread.sleep(10); + synchronized (oowWaitCond) { + oowWaitCond.wait(1); + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; @@ -219,6 +223,7 @@ public class SlidingWindowLogAppender implements LogAppender { long appendedPos = 0; AppendEntryResult result = new AppendEntryResult(); + boolean flushed = false; synchronized (logManager) { int windowPos = (int) (log.getCurrLogIndex() - logManager.getLastLogIndex() - 1); if (windowPos < 0) { @@ -238,6 +243,7 @@ public class SlidingWindowLogAppender implements LogAppender { checkLog(windowPos); if (windowPos == 0) { appendedPos = flushWindow(result, leaderCommit); + flushed = true; } else { result.status = Response.RESPONSE_WEAK_ACCEPT; } @@ -250,6 +256,12 @@ public class SlidingWindowLogAppender implements LogAppender { } } + if (flushed) { + synchronized (oowWaitCond) { + oowWaitCond.notifyAll(); + } + } + if (appendedPos == -1) { // the incoming log points to an illegal position, reject it result.status = Response.RESPONSE_LOG_MISMATCH; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java index 7e0e033483..545c39ace7 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java @@ -24,6 +24,7 @@ import org.apache.iotdb.cluster.exception.CheckConsistencyException; import org.apache.iotdb.cluster.log.Log; import org.apache.iotdb.cluster.log.logtypes.AddNodeLog; import org.apache.iotdb.cluster.log.logtypes.CloseFileLog; +import org.apache.iotdb.cluster.log.logtypes.FragmentedLog; import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog; import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog; import org.apache.iotdb.cluster.server.member.DataGroupMember; @@ -91,7 +92,7 @@ public class DataLogApplier extends BaseApplier { closeFileLog.getPartitionId(), closeFileLog.isSeq(), false); - } else { + } else if (!(log instanceof FragmentedLog)) { logger.error("Unsupported log: {}", log); } } catch (Exception e) { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java index 5d5793d350..2a2092fbbd 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java @@ -57,9 +57,21 @@ public class PhysicalPlanLog extends Log { return DEFAULT_BUFFER_SIZE; } + private ThreadLocal<PublicBAOS> baosThreadLocal = new ThreadLocal<>(); + + private PublicBAOS getSerializeOutputStream() { + PublicBAOS publicBAOS = baosThreadLocal.get(); + if (publicBAOS == null) { + publicBAOS = new PublicBAOS(getDefaultBufferSize()); + baosThreadLocal.set(publicBAOS); + } + publicBAOS.reset(); + return publicBAOS; + } + @Override public ByteBuffer serialize() { - PublicBAOS byteArrayOutputStream = new PublicBAOS(getDefaultBufferSize()); + PublicBAOS byteArrayOutputStream = getSerializeOutputStream(); try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) { dataOutputStream.writeByte((byte) PHYSICAL_PLAN.ordinal()); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java index 7f8c7df4f4..cbd0682f5a 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java @@ -90,6 +90,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe long resp = response.status; if (resp == RESPONSE_STRONG_ACCEPT || resp == RESPONSE_AGREE) { + long operationStartTime = Statistic.RAFT_SENDER_HANDLE_STRONG_ACCEPT.getOperationStartTime(); member .getVotingLogList() .onStronglyAccept( @@ -97,6 +98,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe log.getLog().getCurrLogTerm(), trueReceiver.nodeIdentifier); member.getPeer(trueReceiver).setMatchIndex(response.lastLogIndex); + Statistic.RAFT_SENDER_HANDLE_STRONG_ACCEPT.calOperationCostTimeFromStart(operationStartTime); } else if (resp > 0) { // a response > 0 is the follower's term // the leader ship is stale, wait for the new leader's heartbeat @@ -120,14 +122,18 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe new Pair<>(log.getLog().getCurrLogIndex(), log.getLog().getCurrLogTerm())); } } else if (resp == RESPONSE_WEAK_ACCEPT) { - synchronized (log) { + long operationStartTime = Statistic.RAFT_SENDER_HANDLE_WEAK_ACCEPT.getOperationStartTime(); + synchronized (log.getWeaklyAcceptedNodeIds()) { log.getWeaklyAcceptedNodeIds().add(trueReceiver.nodeIdentifier); - if (log.getWeaklyAcceptedNodeIds().size() + log.getStronglyAcceptedNodeIds().size() - >= quorumSize) { - log.acceptedTime.set(System.nanoTime()); - } + } + if (log.getWeaklyAcceptedNodeIds().size() + log.getStronglyAcceptedNodeIds().size() + >= quorumSize) { + log.acceptedTime.set(System.nanoTime()); + } + synchronized (log) { log.notifyAll(); } + Statistic.RAFT_SENDER_HANDLE_WEAK_ACCEPT.calOperationCostTimeFromStart(operationStartTime); } else { // e.g., Response.RESPONSE_LOG_MISMATCH if (resp == RESPONSE_LOG_MISMATCH || resp == RESPONSE_OUT_OF_WINDOW) { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java index 0d605d3f46..9401d6c9cb 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java @@ -598,6 +598,14 @@ public abstract class RaftMember implements RaftMemberMBean { long operationStartTime = Statistic.RAFT_RECEIVER_APPEND_ENTRY_FULL.getOperationStartTime(); Thread.currentThread() .setName(getThreadBaseName() + "-appending-" + (request.prevLogIndex + 1)); + // if (true) { + // AppendEntryResult result = new AppendEntryResult(); + // result.setLastLogTerm(request.prevLogTerm); + // result.setLastLogIndex(request.prevLogIndex + 1); + // result.setStatus(Response.RESPONSE_STRONG_ACCEPT); + // result.setHeader(getHeader()); + // return result; + // } AppendEntryResult result = appendEntryInternal(request); Statistic.RAFT_RECEIVER_APPEND_ENTRY_FULL.calOperationCostTimeFromStart(operationStartTime); return result; @@ -1761,14 +1769,7 @@ public abstract class RaftMember implements RaftMemberMBean { logger.warn("Unexpected interruption when sending a log", e); } Thread.currentThread() - .setName( - threadBaseName - + "-waiting-" - + log.getLog().getCurrLogIndex() - + "-" - + log.getStronglyAcceptedNodeIds() - + "-" - + log.getWeaklyAcceptedNodeIds()); + .setName(threadBaseName + "-waiting-" + log.getLog().getCurrLogIndex()); alreadyWait = (System.nanoTime() - waitStart) / 1000000; if (alreadyWait > nextTimeToPrint) { logger.info( diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java index 72892bdb56..4abde4d974 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java @@ -152,6 +152,18 @@ public class Timer { TIME_SCALE, true, RAFT_SENDER_SEND_LOG_TO_FOLLOWERS), + RAFT_SENDER_HANDLE_STRONG_ACCEPT( + RAFT_MEMBER_SENDER, + "handle strong accept", + TIME_SCALE, + true, + RAFT_SENDER_SEND_LOG_TO_FOLLOWERS), + RAFT_SENDER_HANDLE_WEAK_ACCEPT( + RAFT_MEMBER_SENDER, + "handle weak accept", + TIME_SCALE, + true, + RAFT_SENDER_SEND_LOG_TO_FOLLOWERS), RAFT_RECEIVER_RELAY_OFFER_LOG( RAFT_MEMBER_RECEIVER, "relay offer log",
