This is an automated email from the ASF dual-hosted git repository. yuyuankang pushed a commit to branch cluster_performance in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 6b788324019b81435ca315352e1178ff17799a32 Merge: 0477577 e5b3a98 Author: Ring-k <[email protected]> AuthorDate: Thu Sep 3 16:29:07 2020 +0800 merge log order .../resources/conf/iotdb-engine.properties | 2 +- .../apache/iotdb/cluster/log/LogDispatcher.java | 145 ++++++++++++++++++ .../org/apache/iotdb/cluster/server/Timer.java | 23 +++ .../iotdb/cluster/server/member/RaftMember.java | 162 ++++++++++++++++----- 4 files changed, 296 insertions(+), 36 deletions(-) diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java index 8174342,0000000..fda0b15 mode 100644,000000..100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java @@@ -1,137 -1,0 +1,160 @@@ +package org.apache.iotdb.cluster.server; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +public class Timer { + + public static AtomicLong dataGroupMemberProcessPlanLocallyMS = new AtomicLong(0); + public static AtomicLong dataGroupMemberProcessPlanLocallyCounter = new AtomicLong(0); + public static AtomicLong dataGroupMemberWaitLeaderMS = new AtomicLong(0); + public static AtomicLong dataGroupMemberWaitLeaderCounter = new AtomicLong(0); + public static AtomicLong metaGroupMemberExecuteNonQueryMS = new AtomicLong(0); + public static AtomicLong metaGroupMemberExecuteNonQueryCounter = new AtomicLong(0); + public static AtomicLong metaGroupMemberExecuteNonQueryInLocalGroupMS = new AtomicLong(0); + public static AtomicLong metaGroupMemberExecuteNonQueryInLocalGroupCounter = new AtomicLong(0); + public static AtomicLong metaGroupMemberExecuteNonQueryInRemoteGroupMS = new AtomicLong(0); + public static AtomicLong metaGroupMemberExecuteNonQueryInRemoteGroupCounter = new AtomicLong(0); + public static AtomicLong raftMemberAppendLogMS = new AtomicLong(0); + public static AtomicLong raftMemberAppendLogCounter = new AtomicLong(0); + public static AtomicLong raftMemberSendLogToFollowerMS = new AtomicLong(0); + public static AtomicLong raftMemberSendLogToFollowerCounter = new AtomicLong(0); + public static AtomicLong raftMemberCommitLogMS = new AtomicLong(0); + public static AtomicLong raftMemberCommitLogCounter = new AtomicLong(0); + public static AtomicLong raftFollowerAppendEntryMS = new AtomicLong(0); + public static AtomicLong raftFollowerAppendEntryCounter = new AtomicLong(0); + public static AtomicLong dataGroupMemberForwardPlanMS = new AtomicLong(0); + public static AtomicLong dataGroupMemberForwardPlanCounter = new AtomicLong(0); + public static AtomicLong raftMemberWaitForPrevLogMS = new AtomicLong(0); + public static AtomicLong raftMemberWaitForPrevLogCounter = new AtomicLong(0); + public static AtomicLong raftMemberSendLogAyncMS = new AtomicLong(0); + public static AtomicLong raftMemberSendLogAyncCounter = new AtomicLong(0); + public static AtomicLong raftMemberVoteCounterMS = new AtomicLong(0); + public static AtomicLong raftMemberVoteCounterCounter = new AtomicLong(0); + public static AtomicLong raftMemberLogParseMS = new AtomicLong(0); + public static AtomicLong raftMemberLogParseCounter = new AtomicLong(0); + public static AtomicLong rafTMemberReceiverWaitForPrevLogMS = new AtomicLong(0); + public static AtomicLong rafTMemberReceiverWaitForPrevLogCounter = new AtomicLong(0); + public static AtomicLong rafTMemberMayBeAppendMS = new AtomicLong(0); + public static AtomicLong rafTMemberMayBeAppendCounter = new AtomicLong(0); ++ public static AtomicLong raftMemberOfferLogMS = new AtomicLong(0); ++ public static AtomicLong raftMemberOfferLogCounter = new AtomicLong(0); ++ public static AtomicLong raftMemberCommitLogResultMS = new AtomicLong(0); ++ public static AtomicLong raftMemberCommitLogResultCounter = new AtomicLong(0); ++ public static AtomicLong raftMemberAppendLogResultMS = new AtomicLong(0); ++ public static AtomicLong raftMemberAppendLogResultCounter = new AtomicLong(0); + + + private static final String dataGroupMemberProcessPlanLocallyMSString = "Data group member - process plan locally : "; + private static final String dataGroupMemberWaitLeaderMSString = "Data group member - wait leader: "; + private static final String metaGroupMemberExecuteNonQueryMSString = "Meta group member - execute non query: "; + private static final String metaGroupMemberExecuteNonQueryInLocalGroupMSString = "Meta group member - execute in local group: "; + private static final String metaGroupMemberExecuteNonQueryInRemoteGroupMSString = "Meta group member - execute in remote group: "; + private static final String raftMemberAppendLogMSString = "Raft member - append log: "; + private static final String raftMemberSendLogToFollowerMSString = "Raft member - send log to follower: "; + private static final String raftMemberCommitLogMSString = "Raft member - commit log: "; + private static final String raftFollowerAppendEntryString = "Raft member - follower append entry: "; + private static final String dataGroupMemberForwardPlanString = "Data group member - forward plan: "; + private static final String raftMemberWaitForPrevLogString = "Raft member - wait for prev log: "; + private static final String raftMemberSendLogAyncString = "Raft member - send log aync: "; + private static final String raftMemberVoteCounterString = "Raft member - vote counter: "; + private static final String raftMemberLogParseString = "Raft member - log parse: "; + private static final String rafTMemberReceiverWaitForPrevLogString = "Raft member - receiver wait for prev log: "; + private static final String rafTMemberMayBeAppendString = "Raft member - maybe append: "; ++ private static final String rafTMemberOfferLogString = "Raft member - offer log: "; ++ private static final String raftMemberCommitLogResultString = "Raft member - commit log result: "; ++ private static final String raftMemberAppendLogResultString = "Raft member - append log result: "; + + + + public static String getReport() { + String result = "\n"; + result += dataGroupMemberProcessPlanLocallyMSString + + dataGroupMemberProcessPlanLocallyMS.get()/1000000L + ", " + + dataGroupMemberProcessPlanLocallyCounter + ", " + + (double) dataGroupMemberProcessPlanLocallyMS.get()/1000000L + / dataGroupMemberProcessPlanLocallyCounter.get() + + "\n"; + result += dataGroupMemberWaitLeaderMSString + + dataGroupMemberWaitLeaderMS.get()/1000000L + ", " + + dataGroupMemberWaitLeaderCounter + ", " + + (double) dataGroupMemberWaitLeaderMS.get()/1000000L / dataGroupMemberWaitLeaderCounter.get() + + "\n"; + result += metaGroupMemberExecuteNonQueryMSString + + metaGroupMemberExecuteNonQueryMS.get()/1000000L + ", " + + metaGroupMemberExecuteNonQueryCounter + ", " + + (double) metaGroupMemberExecuteNonQueryMS.get()/1000000L / metaGroupMemberExecuteNonQueryCounter + .get() + "\n"; + result += metaGroupMemberExecuteNonQueryInLocalGroupMSString + + metaGroupMemberExecuteNonQueryInLocalGroupMS.get()/1000000L + ", " + + metaGroupMemberExecuteNonQueryInLocalGroupCounter + ", " + + (double) metaGroupMemberExecuteNonQueryInLocalGroupMS.get()/1000000L + / metaGroupMemberExecuteNonQueryInLocalGroupCounter.get() + "\n"; + result += metaGroupMemberExecuteNonQueryInRemoteGroupMSString + + metaGroupMemberExecuteNonQueryInRemoteGroupMS.get()/1000000L + ", " + + metaGroupMemberExecuteNonQueryInRemoteGroupCounter + ", " + + (double) metaGroupMemberExecuteNonQueryInRemoteGroupMS.get()/1000000L + / metaGroupMemberExecuteNonQueryInRemoteGroupCounter.get() + "\n"; + result += raftMemberAppendLogMSString + + raftMemberAppendLogMS.get()/1000000L + ", " + + raftMemberAppendLogCounter + ", " + + (double) raftMemberAppendLogMS.get()/1000000L / raftMemberAppendLogCounter.get() + "\n"; + result += raftMemberSendLogToFollowerMSString + + raftMemberSendLogToFollowerMS.get()/1000000L + ", " + + raftMemberSendLogToFollowerCounter + ", " + + (double) raftMemberSendLogToFollowerMS.get()/1000000L / raftMemberSendLogToFollowerCounter.get() + + "\n"; + result += raftMemberCommitLogMSString + + raftMemberCommitLogMS.get()/1000000L + ", " + + raftMemberCommitLogCounter + ", " + + (double) raftMemberCommitLogMS.get()/1000000L / raftMemberCommitLogCounter.get() + "\n"; + result += raftFollowerAppendEntryString + + raftFollowerAppendEntryMS.get()/1000000L + ", " + + raftFollowerAppendEntryCounter + ", " + + (double) raftFollowerAppendEntryMS.get()/1000000L / raftFollowerAppendEntryCounter.get() + "\n"; + result += dataGroupMemberForwardPlanString + + dataGroupMemberForwardPlanMS.get()/1000000L + ", " + + dataGroupMemberForwardPlanCounter + ", " + + (double) dataGroupMemberForwardPlanMS.get()/1000000L / dataGroupMemberForwardPlanCounter.get() + "\n"; + result += raftMemberWaitForPrevLogString + + raftMemberWaitForPrevLogMS.get()/1000000L + ", " + + raftMemberWaitForPrevLogCounter + ", " + + (double) raftMemberWaitForPrevLogMS.get()/1000000L / raftMemberWaitForPrevLogCounter.get() + "\n"; + result += raftMemberSendLogAyncString + + raftMemberSendLogAyncMS.get()/1000000L + ", " + + raftMemberSendLogAyncCounter + ", " + + (double) raftMemberSendLogAyncMS.get()/1000000L / raftMemberSendLogAyncCounter.get() + "\n"; + result += raftMemberVoteCounterString + + raftMemberVoteCounterMS.get()/1000000L + ", " + + raftMemberVoteCounterCounter + ", " + + (double) raftMemberVoteCounterMS.get()/1000000L / raftMemberVoteCounterCounter.get() + "\n"; + result += raftMemberLogParseString + + raftMemberLogParseMS.get()/1000000L + ", " + + raftMemberLogParseCounter + ", " + + (double) raftMemberLogParseMS.get()/1000000L / raftMemberLogParseCounter.get() + "\n"; + result += rafTMemberReceiverWaitForPrevLogString + + rafTMemberReceiverWaitForPrevLogMS.get()/1000000L + ", " + + rafTMemberReceiverWaitForPrevLogCounter + ", " + + (double) rafTMemberReceiverWaitForPrevLogMS.get()/1000000L / rafTMemberReceiverWaitForPrevLogCounter.get() + "\n"; + result += rafTMemberMayBeAppendString + + rafTMemberMayBeAppendMS.get()/1000000L + ", " + + rafTMemberMayBeAppendCounter + ", " + + (double) rafTMemberMayBeAppendMS.get()/1000000L / rafTMemberMayBeAppendCounter.get() + "\n"; ++ result += rafTMemberOfferLogString ++ + raftMemberOfferLogMS.get()/1000000L + ", " ++ + raftMemberOfferLogCounter + ", " ++ + (double) raftMemberOfferLogMS.get()/1000000L / raftMemberOfferLogCounter.get() + "\n"; ++ result += raftMemberAppendLogResultString ++ + raftMemberAppendLogResultMS.get()/1000000L + ", " ++ + raftMemberAppendLogResultCounter + ", " ++ + (double) raftMemberAppendLogResultMS.get()/1000000L / raftMemberAppendLogResultCounter.get() + "\n"; ++ result += raftMemberCommitLogResultString ++ + raftMemberCommitLogResultMS.get()/1000000L + ", " ++ + raftMemberCommitLogResultCounter + ", " ++ + (double) raftMemberCommitLogResultMS.get()/1000000L / raftMemberCommitLogResultCounter.get() + "\n"; ++ ++ + + return result; + } +} diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java index 5e91492,f749af1..4a5305d --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java @@@ -66,9 -66,10 +66,11 @@@ import org.apache.iotdb.cluster.excepti import org.apache.iotdb.cluster.log.CommitLogTask; import org.apache.iotdb.cluster.log.HardState; import org.apache.iotdb.cluster.log.Log; + import org.apache.iotdb.cluster.log.LogDispatcher; + import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest; import org.apache.iotdb.cluster.log.LogParser; import org.apache.iotdb.cluster.log.catchup.CatchUpTask; +import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog; import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog; import org.apache.iotdb.cluster.log.manage.RaftLogManager; import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest; @@@ -688,8 -650,12 +676,12 @@@ public abstract class RaftMember // retry if allNodes has changed return AppendLogResult.TIME_OUT; } - + return waitAppendResult(voteCounter, leaderShipStale, newLeaderTerm); + } + public AppendLogResult waitAppendResult(AtomicInteger voteCounter, + AtomicBoolean leaderShipStale, AtomicLong newLeaderTerm) { + long start = System.nanoTime(); synchronized (voteCounter) { if (voteCounter.get() > 0) { try { @@@ -744,11 -705,15 +737,18 @@@ } else { sendLogSync(log, voteCounter, node, leaderShipStale, newLeaderTerm, request, peer); } + Timer.raftMemberSendLogAyncMS.addAndGet(System.nanoTime() - start); + Timer.raftMemberSendLogAyncCounter.incrementAndGet(); + } + public synchronized LogDispatcher getLogDispatcher() { + if (logDispatcher == null) { + logDispatcher = new LogDispatcher(this); + } + return logDispatcher; + } + private boolean waitForPrevLog(Peer peer, Log log) { long waitStart = System.currentTimeMillis(); long alreadyWait = 0; @@@ -1288,23 -1254,50 +1292,60 @@@ return StatusUtils.OK; } } catch (LogExecutionException e) { - Throwable cause = getRootCause(e); - if (cause instanceof BatchInsertionException) { - return RpcUtils - .getStatus(Arrays.asList(((BatchInsertionException) cause).getFailingStatus())); - } - TSStatus tsStatus = StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy(); - if (cause instanceof IoTDBException) { - tsStatus.setCode(((IoTDBException) cause).getErrorCode()); - } - if (!(cause instanceof PathNotExistException) && - !(cause instanceof StorageGroupNotSetException) && - !(cause instanceof PathAlreadyExistException) && - !(cause instanceof StorageGroupAlreadySetException)) { - logger.debug("{} cannot be executed because {}", plan, cause); + return handleLogExecutionException(log, e); + } + return StatusUtils.TIME_OUT; + } + + + TSStatus processPlanLocallyV2(PhysicalPlan plan) { + logger.debug("{}: Processing plan {}", name, plan); + if (readOnly) { + return StatusUtils.NODE_READ_ONLY; + } + PhysicalPlanLog log = new PhysicalPlanLog(); + // assign term and index to the new log and append it + SendLogRequest sendLogRequest; ++ long start = System.nanoTime(); + synchronized (logManager) { + log.setCurrLogTerm(getTerm().get()); + log.setCurrLogIndex(logManager.getLastLogIndex() + 1); + + log.setPlan(plan); + logManager.append(log); + + sendLogRequest = buildSendLogRequest(log); + getLogDispatcher().offer(sendLogRequest); + } ++ Timer.raftMemberOfferLogMS.addAndGet(System.nanoTime() - start); ++ Timer.raftMemberOfferLogCounter.incrementAndGet(); ++ + + try { ++ start = System.nanoTime(); + AppendLogResult appendLogResult = waitAppendResult(sendLogRequest.voteCounter, + sendLogRequest.leaderShipStale, + sendLogRequest.newLeaderTerm); ++ Timer.raftMemberAppendLogResultMS.addAndGet(System.nanoTime() - start); ++ Timer.raftMemberAppendLogResultCounter.incrementAndGet(); + + switch (appendLogResult) { + case OK: + logger.debug("{}: log {} is accepted", name, log); ++ start = System.nanoTime(); + commitLog(log); ++ Timer.raftMemberCommitLogResultMS.addAndGet(System.nanoTime() - start); ++ Timer.raftMemberCommitLogResultCounter.incrementAndGet(); + return StatusUtils.OK; + case TIME_OUT: + logger.debug("{}: log {} timed out, retrying...", name, log); + case LEADERSHIP_STALE: + // abort the appending, the new leader will fix the local logs by catch-up + default: + break; } - tsStatus.setMessage(cause.getClass().getName() + ":" + cause.getMessage()); - return tsStatus; + } catch (LogExecutionException e) { + return handleLogExecutionException(log, e); } return StatusUtils.TIME_OUT; }
