This is an automated email from the ASF dual-hosted git repository.

yuyuankang pushed a commit to branch cluster_performance
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 6b788324019b81435ca315352e1178ff17799a32
Merge: 0477577 e5b3a98
Author: Ring-k <[email protected]>
AuthorDate: Thu Sep 3 16:29:07 2020 +0800

    merge log order

 .../resources/conf/iotdb-engine.properties         |   2 +-
 .../apache/iotdb/cluster/log/LogDispatcher.java    | 145 ++++++++++++++++++
 .../org/apache/iotdb/cluster/server/Timer.java     |  23 +++
 .../iotdb/cluster/server/member/RaftMember.java    | 162 ++++++++++++++++-----
 4 files changed, 296 insertions(+), 36 deletions(-)

diff --cc cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java
index 8174342,0000000..fda0b15
mode 100644,000000..100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java
@@@ -1,137 -1,0 +1,160 @@@
 +package org.apache.iotdb.cluster.server;
 +
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.concurrent.atomic.AtomicLong;
 +
 +public class Timer {
 +
 +  public static AtomicLong dataGroupMemberProcessPlanLocallyMS = new 
AtomicLong(0);
 +  public static AtomicLong dataGroupMemberProcessPlanLocallyCounter = new 
AtomicLong(0);
 +  public static AtomicLong dataGroupMemberWaitLeaderMS = new AtomicLong(0);
 +  public static AtomicLong dataGroupMemberWaitLeaderCounter = new 
AtomicLong(0);
 +  public static AtomicLong metaGroupMemberExecuteNonQueryMS = new 
AtomicLong(0);
 +  public static AtomicLong metaGroupMemberExecuteNonQueryCounter = new 
AtomicLong(0);
 +  public static AtomicLong metaGroupMemberExecuteNonQueryInLocalGroupMS = new 
AtomicLong(0);
 +  public static AtomicLong metaGroupMemberExecuteNonQueryInLocalGroupCounter 
= new AtomicLong(0);
 +  public static AtomicLong metaGroupMemberExecuteNonQueryInRemoteGroupMS = 
new AtomicLong(0);
 +  public static AtomicLong metaGroupMemberExecuteNonQueryInRemoteGroupCounter 
= new AtomicLong(0);
 +  public static AtomicLong raftMemberAppendLogMS = new AtomicLong(0);
 +  public static AtomicLong raftMemberAppendLogCounter = new AtomicLong(0);
 +  public static AtomicLong raftMemberSendLogToFollowerMS = new AtomicLong(0);
 +  public static AtomicLong raftMemberSendLogToFollowerCounter = new 
AtomicLong(0);
 +  public static AtomicLong raftMemberCommitLogMS = new AtomicLong(0);
 +  public static AtomicLong raftMemberCommitLogCounter = new AtomicLong(0);
 +  public static AtomicLong raftFollowerAppendEntryMS = new AtomicLong(0);
 +  public static AtomicLong raftFollowerAppendEntryCounter = new AtomicLong(0);
 +  public static AtomicLong dataGroupMemberForwardPlanMS = new AtomicLong(0);
 +  public static AtomicLong dataGroupMemberForwardPlanCounter = new 
AtomicLong(0);
 +  public static AtomicLong raftMemberWaitForPrevLogMS = new AtomicLong(0);
 +  public static AtomicLong raftMemberWaitForPrevLogCounter = new 
AtomicLong(0);
 +  public static AtomicLong raftMemberSendLogAyncMS = new AtomicLong(0);
 +  public static AtomicLong raftMemberSendLogAyncCounter = new AtomicLong(0);
 +  public static AtomicLong raftMemberVoteCounterMS = new AtomicLong(0);
 +  public static AtomicLong raftMemberVoteCounterCounter = new AtomicLong(0);
 +  public static AtomicLong raftMemberLogParseMS = new AtomicLong(0);
 +  public static AtomicLong raftMemberLogParseCounter = new AtomicLong(0);
 +  public static AtomicLong rafTMemberReceiverWaitForPrevLogMS = new 
AtomicLong(0);
 +  public static AtomicLong rafTMemberReceiverWaitForPrevLogCounter = new 
AtomicLong(0);
 +  public static AtomicLong rafTMemberMayBeAppendMS = new AtomicLong(0);
 +  public static AtomicLong rafTMemberMayBeAppendCounter = new AtomicLong(0);
++  public static AtomicLong raftMemberOfferLogMS = new AtomicLong(0);
++  public static AtomicLong raftMemberOfferLogCounter = new AtomicLong(0);
++  public static AtomicLong raftMemberCommitLogResultMS = new AtomicLong(0);
++  public static AtomicLong raftMemberCommitLogResultCounter = new 
AtomicLong(0);
++  public static AtomicLong raftMemberAppendLogResultMS = new AtomicLong(0);
++  public static AtomicLong raftMemberAppendLogResultCounter = new 
AtomicLong(0);
 +
 +
 +  private static final String dataGroupMemberProcessPlanLocallyMSString = 
"Data group member - process plan locally : ";
 +  private static final String dataGroupMemberWaitLeaderMSString = "Data group 
member - wait leader: ";
 +  private static final String metaGroupMemberExecuteNonQueryMSString = "Meta 
group member - execute non query: ";
 +  private static final String 
metaGroupMemberExecuteNonQueryInLocalGroupMSString = "Meta group member - 
execute in local group: ";
 +  private static final String 
metaGroupMemberExecuteNonQueryInRemoteGroupMSString = "Meta group member - 
execute in remote group: ";
 +  private static final String raftMemberAppendLogMSString = "Raft member - 
append log: ";
 +  private static final String raftMemberSendLogToFollowerMSString = "Raft 
member - send log to follower: ";
 +  private static final String raftMemberCommitLogMSString = "Raft member - 
commit log: ";
 +  private static final String raftFollowerAppendEntryString = "Raft member - 
follower append entry: ";
 +  private static final String dataGroupMemberForwardPlanString = "Data group 
member - forward plan: ";
 +  private static final String raftMemberWaitForPrevLogString = "Raft member - 
wait for prev log: ";
 +  private static final String raftMemberSendLogAyncString = "Raft member - 
send log aync: ";
 +  private static final String raftMemberVoteCounterString = "Raft member - 
vote counter: ";
 +  private static final String raftMemberLogParseString = "Raft member - log 
parse: ";
 +  private static final String rafTMemberReceiverWaitForPrevLogString = "Raft 
member - receiver wait for prev log: ";
 +  private static final String rafTMemberMayBeAppendString = "Raft member - 
maybe append: ";
++  private static final String rafTMemberOfferLogString = "Raft member - offer 
log: ";
++  private static final String raftMemberCommitLogResultString = "Raft member 
- commit log result: ";
++  private static final String raftMemberAppendLogResultString = "Raft member 
- append log result: ";
 +
 +
 +
 +  public static String getReport() {
 +    String result = "\n";
 +    result += dataGroupMemberProcessPlanLocallyMSString
 +        + dataGroupMemberProcessPlanLocallyMS.get()/1000000L + ", "
 +        + dataGroupMemberProcessPlanLocallyCounter + ", "
 +        + (double) dataGroupMemberProcessPlanLocallyMS.get()/1000000L
 +        / dataGroupMemberProcessPlanLocallyCounter.get()
 +        + "\n";
 +    result += dataGroupMemberWaitLeaderMSString
 +        + dataGroupMemberWaitLeaderMS.get()/1000000L + ", "
 +        + dataGroupMemberWaitLeaderCounter + ", "
 +        + (double) dataGroupMemberWaitLeaderMS.get()/1000000L / 
dataGroupMemberWaitLeaderCounter.get()
 +        + "\n";
 +    result += metaGroupMemberExecuteNonQueryMSString
 +        + metaGroupMemberExecuteNonQueryMS.get()/1000000L + ", "
 +        + metaGroupMemberExecuteNonQueryCounter + ", "
 +        + (double) metaGroupMemberExecuteNonQueryMS.get()/1000000L / 
metaGroupMemberExecuteNonQueryCounter
 +        .get() + "\n";
 +    result += metaGroupMemberExecuteNonQueryInLocalGroupMSString
 +        + metaGroupMemberExecuteNonQueryInLocalGroupMS.get()/1000000L + ", "
 +        + metaGroupMemberExecuteNonQueryInLocalGroupCounter + ", "
 +        + (double) metaGroupMemberExecuteNonQueryInLocalGroupMS.get()/1000000L
 +        / metaGroupMemberExecuteNonQueryInLocalGroupCounter.get() + "\n";
 +    result += metaGroupMemberExecuteNonQueryInRemoteGroupMSString
 +        + metaGroupMemberExecuteNonQueryInRemoteGroupMS.get()/1000000L + ", "
 +        + metaGroupMemberExecuteNonQueryInRemoteGroupCounter + ", "
 +        + (double) 
metaGroupMemberExecuteNonQueryInRemoteGroupMS.get()/1000000L
 +        / metaGroupMemberExecuteNonQueryInRemoteGroupCounter.get() + "\n";
 +    result += raftMemberAppendLogMSString
 +        + raftMemberAppendLogMS.get()/1000000L + ", "
 +        + raftMemberAppendLogCounter + ", "
 +        + (double) raftMemberAppendLogMS.get()/1000000L / 
raftMemberAppendLogCounter.get() + "\n";
 +    result += raftMemberSendLogToFollowerMSString
 +        + raftMemberSendLogToFollowerMS.get()/1000000L + ", "
 +        + raftMemberSendLogToFollowerCounter + ", "
 +        + (double) raftMemberSendLogToFollowerMS.get()/1000000L / 
raftMemberSendLogToFollowerCounter.get()
 +        + "\n";
 +    result += raftMemberCommitLogMSString
 +        + raftMemberCommitLogMS.get()/1000000L + ", "
 +        + raftMemberCommitLogCounter + ", "
 +        + (double) raftMemberCommitLogMS.get()/1000000L / 
raftMemberCommitLogCounter.get() + "\n";
 +    result += raftFollowerAppendEntryString
 +        + raftFollowerAppendEntryMS.get()/1000000L + ", "
 +        + raftFollowerAppendEntryCounter + ", "
 +        + (double) raftFollowerAppendEntryMS.get()/1000000L / 
raftFollowerAppendEntryCounter.get() + "\n";
 +    result += dataGroupMemberForwardPlanString
 +        + dataGroupMemberForwardPlanMS.get()/1000000L + ", "
 +        + dataGroupMemberForwardPlanCounter + ", "
 +        + (double) dataGroupMemberForwardPlanMS.get()/1000000L / 
dataGroupMemberForwardPlanCounter.get() + "\n";
 +    result += raftMemberWaitForPrevLogString
 +        + raftMemberWaitForPrevLogMS.get()/1000000L + ", "
 +        + raftMemberWaitForPrevLogCounter + ", "
 +        + (double) raftMemberWaitForPrevLogMS.get()/1000000L / 
raftMemberWaitForPrevLogCounter.get() + "\n";
 +    result += raftMemberSendLogAyncString
 +        + raftMemberSendLogAyncMS.get()/1000000L + ", "
 +        + raftMemberSendLogAyncCounter + ", "
 +        + (double) raftMemberSendLogAyncMS.get()/1000000L / 
raftMemberSendLogAyncCounter.get() + "\n";
 +    result += raftMemberVoteCounterString
 +        + raftMemberVoteCounterMS.get()/1000000L + ", "
 +        + raftMemberVoteCounterCounter + ", "
 +        + (double) raftMemberVoteCounterMS.get()/1000000L / 
raftMemberVoteCounterCounter.get() + "\n";
 +    result += raftMemberLogParseString
 +        + raftMemberLogParseMS.get()/1000000L + ", "
 +        + raftMemberLogParseCounter + ", "
 +        + (double) raftMemberLogParseMS.get()/1000000L / 
raftMemberLogParseCounter.get() + "\n";
 +    result += rafTMemberReceiverWaitForPrevLogString
 +        + rafTMemberReceiverWaitForPrevLogMS.get()/1000000L + ", "
 +        + rafTMemberReceiverWaitForPrevLogCounter + ", "
 +        + (double) rafTMemberReceiverWaitForPrevLogMS.get()/1000000L / 
rafTMemberReceiverWaitForPrevLogCounter.get() + "\n";
 +    result += rafTMemberMayBeAppendString
 +        + rafTMemberMayBeAppendMS.get()/1000000L + ", "
 +        + rafTMemberMayBeAppendCounter + ", "
 +        + (double) rafTMemberMayBeAppendMS.get()/1000000L / 
rafTMemberMayBeAppendCounter.get() + "\n";
++    result += rafTMemberOfferLogString
++        + raftMemberOfferLogMS.get()/1000000L + ", "
++        + raftMemberOfferLogCounter + ", "
++        + (double) raftMemberOfferLogMS.get()/1000000L / 
raftMemberOfferLogCounter.get() + "\n";
++    result += raftMemberAppendLogResultString
++        + raftMemberAppendLogResultMS.get()/1000000L + ", "
++        + raftMemberAppendLogResultCounter + ", "
++        + (double) raftMemberAppendLogResultMS.get()/1000000L / 
raftMemberAppendLogResultCounter.get() + "\n";
++    result += raftMemberCommitLogResultString
++        + raftMemberCommitLogResultMS.get()/1000000L + ", "
++        + raftMemberCommitLogResultCounter + ", "
++        + (double) raftMemberCommitLogResultMS.get()/1000000L / 
raftMemberCommitLogResultCounter.get() + "\n";
++
++
 +
 +    return result;
 +  }
 +}
diff --cc 
cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 5e91492,f749af1..4a5305d
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@@ -66,9 -66,10 +66,11 @@@ import org.apache.iotdb.cluster.excepti
  import org.apache.iotdb.cluster.log.CommitLogTask;
  import org.apache.iotdb.cluster.log.HardState;
  import org.apache.iotdb.cluster.log.Log;
+ import org.apache.iotdb.cluster.log.LogDispatcher;
+ import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
  import org.apache.iotdb.cluster.log.LogParser;
  import org.apache.iotdb.cluster.log.catchup.CatchUpTask;
 +import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
  import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
  import org.apache.iotdb.cluster.log.manage.RaftLogManager;
  import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
@@@ -688,8 -650,12 +676,12 @@@ public abstract class RaftMember 
        // retry if allNodes has changed
        return AppendLogResult.TIME_OUT;
      }
 -
+     return waitAppendResult(voteCounter, leaderShipStale, newLeaderTerm);
+   }
  
+   public AppendLogResult waitAppendResult(AtomicInteger voteCounter,
+       AtomicBoolean leaderShipStale, AtomicLong newLeaderTerm) {
 +    long start = System.nanoTime();
      synchronized (voteCounter) {
        if (voteCounter.get() > 0) {
          try {
@@@ -744,11 -705,15 +737,18 @@@
      } else {
        sendLogSync(log, voteCounter, node, leaderShipStale, newLeaderTerm, 
request, peer);
      }
 +    Timer.raftMemberSendLogAyncMS.addAndGet(System.nanoTime() - start);
 +    Timer.raftMemberSendLogAyncCounter.incrementAndGet();
 +
    }
  
+   public synchronized LogDispatcher getLogDispatcher() {
+     if (logDispatcher == null) {
+       logDispatcher = new LogDispatcher(this);
+     }
+     return logDispatcher;
+   }
+ 
    private boolean waitForPrevLog(Peer peer, Log log) {
      long waitStart = System.currentTimeMillis();
      long alreadyWait = 0;
@@@ -1288,23 -1254,50 +1292,60 @@@
          return StatusUtils.OK;
        }
      } catch (LogExecutionException e) {
-       Throwable cause = getRootCause(e);
-       if (cause instanceof BatchInsertionException) {
-         return RpcUtils
-             .getStatus(Arrays.asList(((BatchInsertionException) 
cause).getFailingStatus()));
-       }
-       TSStatus tsStatus = StatusUtils.EXECUTE_STATEMENT_ERROR.deepCopy();
-       if (cause instanceof IoTDBException) {
-         tsStatus.setCode(((IoTDBException) cause).getErrorCode());
-       }
-       if (!(cause instanceof PathNotExistException) &&
-           !(cause instanceof StorageGroupNotSetException) &&
-           !(cause instanceof PathAlreadyExistException) &&
-           !(cause instanceof StorageGroupAlreadySetException)) {
-         logger.debug("{} cannot be executed because {}", plan, cause);
+       return handleLogExecutionException(log, e);
+     }
+     return StatusUtils.TIME_OUT;
+   }
+ 
+ 
+   TSStatus processPlanLocallyV2(PhysicalPlan plan) {
+     logger.debug("{}: Processing plan {}", name, plan);
+     if (readOnly) {
+       return StatusUtils.NODE_READ_ONLY;
+     }
+     PhysicalPlanLog log = new PhysicalPlanLog();
+     // assign term and index to the new log and append it
+     SendLogRequest sendLogRequest;
++    long start = System.nanoTime();
+     synchronized (logManager) {
+       log.setCurrLogTerm(getTerm().get());
+       log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
+ 
+       log.setPlan(plan);
+       logManager.append(log);
+ 
+       sendLogRequest = buildSendLogRequest(log);
+       getLogDispatcher().offer(sendLogRequest);
+     }
++    Timer.raftMemberOfferLogMS.addAndGet(System.nanoTime() - start);
++    Timer.raftMemberOfferLogCounter.incrementAndGet();
++
+ 
+     try {
++      start = System.nanoTime();
+       AppendLogResult appendLogResult = 
waitAppendResult(sendLogRequest.voteCounter,
+           sendLogRequest.leaderShipStale,
+           sendLogRequest.newLeaderTerm);
++      Timer.raftMemberAppendLogResultMS.addAndGet(System.nanoTime() - start);
++      Timer.raftMemberAppendLogResultCounter.incrementAndGet();
+ 
+       switch (appendLogResult) {
+         case OK:
+           logger.debug("{}: log {} is accepted", name, log);
++          start = System.nanoTime();
+           commitLog(log);
++          Timer.raftMemberCommitLogResultMS.addAndGet(System.nanoTime() - 
start);
++          Timer.raftMemberCommitLogResultCounter.incrementAndGet();
+           return StatusUtils.OK;
+         case TIME_OUT:
+           logger.debug("{}: log {} timed out, retrying...", name, log);
+         case LEADERSHIP_STALE:
+           // abort the appending, the new leader will fix the local logs by 
catch-up
+         default:
+           break;
        }
-       tsStatus.setMessage(cause.getClass().getName() + ":" + 
cause.getMessage());
-       return tsStatus;
+     } catch (LogExecutionException e) {
+       return handleLogExecutionException(log, e);
      }
      return StatusUtils.TIME_OUT;
    }

Reply via email to