This is an automated email from the ASF dual-hosted git repository.

yuyuankang pushed a commit to branch cluster_performance
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit e6b288f3a38c4d5e0e3e27d448c7c6e293a13ab8
Author: Ring-k <[email protected]>
AuthorDate: Tue Sep 1 14:55:15 2020 +0800

    timer
---
 .../org/apache/iotdb/cluster/server/Timer.java     | 21 ++++++++++++
 .../cluster/server/member/DataGroupMember.java     |  7 ++++
 .../cluster/server/member/MetaGroupMember.java     | 40 ++++++++++++++++------
 .../iotdb/cluster/server/member/RaftMember.java    | 14 ++++++--
 4 files changed, 70 insertions(+), 12 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java
new file mode 100644
index 0000000..c1c0a5d
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java
@@ -0,0 +1,21 @@
+package org.apache.iotdb.cluster.server;
+
+public class Timer {
+
+  public static long dataGroupMemberProcessPlanLocallyMS = 0L;
+  public static long dataGroupMemberProcessPlanLocallyCounter = 0L;
+  public static long dataGroupMemberWaitLeaderMS = 0L;
+  public static long dataGroupMemberWaitLeaderCounter = 0L;
+  public static long metaGroupMemberExecuteNonQueryMS = 0L;
+  public static long metaGroupMemberExecuteNonQueryCounter = 0L;
+  public static long metaGroupMemberExecuteNonQueryInLocalGroupMS = 0L;
+  public static long metaGroupMemberExecuteNonQueryInLocalGroupCounter = 0L;
+  public static long metaGroupMemberExecuteNonQueryInRemoteGroupMS = 0L;
+  public static long metaGroupMemberExecuteNonQueryInRemoteGroupCounter = 0L;
+  public static long raftMemberAppendLogMS = 0L;
+  public static long raftMemberAppendLogCounter = 0L;
+  public static long raftMemberSendLogToFollowerMS = 0L;
+  public static long raftMemberSendLogToFollowerCounter = 0L;
+  public static long raftMemberCommitLogMS = 0L;
+  public static long raftMemberCommitLogCounter = 0L;
+}
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index b16dd7d..61fe4ba 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -95,6 +95,7 @@ import 
org.apache.iotdb.cluster.server.NodeReport.DataMemberReport;
 import org.apache.iotdb.cluster.server.Peer;
 import org.apache.iotdb.cluster.server.PullSnapshotHintService;
 import org.apache.iotdb.cluster.server.Response;
+import org.apache.iotdb.cluster.server.Timer;
 import org.apache.iotdb.cluster.server.heartbeat.DataHeartbeatThread;
 import org.apache.iotdb.cluster.utils.ClusterQueryUtils;
 import org.apache.iotdb.cluster.utils.PartitionUtils;
@@ -1036,7 +1037,10 @@ public class DataGroupMember extends RaftMember {
    */
   TSStatus executeNonQuery(PhysicalPlan plan) {
     if (character == NodeCharacter.LEADER) {
+      long start = System.currentTimeMillis();
       TSStatus status = processPlanLocally(plan);
+      Timer.dataGroupMemberProcessPlanLocallyMS += (System.currentTimeMillis() 
- start);
+      Timer.dataGroupMemberProcessPlanLocallyCounter++;
       if (status != null) {
         return status;
       }
@@ -1044,7 +1048,10 @@ public class DataGroupMember extends RaftMember {
       return forwardPlan(plan, leader, getHeader());
     }
 
+    long start = System.currentTimeMillis();
     waitLeader();
+    Timer.dataGroupMemberWaitLeaderMS += (System.currentTimeMillis() - start);
+    Timer.dataGroupMemberWaitLeaderCounter++;
     // the leader can be itself after waiting
     if (character == NodeCharacter.LEADER) {
       TSStatus status = processPlanLocally(plan);
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 170ccaa..d3ad631 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -132,6 +132,7 @@ import org.apache.iotdb.cluster.server.NodeReport;
 import org.apache.iotdb.cluster.server.NodeReport.MetaMemberReport;
 import org.apache.iotdb.cluster.server.RaftServer;
 import org.apache.iotdb.cluster.server.Response;
+import org.apache.iotdb.cluster.server.Timer;
 import org.apache.iotdb.cluster.server.handlers.caller.AppendGroupEntryHandler;
 import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
 import org.apache.iotdb.cluster.server.handlers.caller.NodeStatusHandler;
@@ -412,7 +413,8 @@ public class MetaGroupMember extends RaftMember {
     // initialize allNodes
     for (String seedUrl : seedUrls) {
       Node node = generateNode(seedUrl);
-      if (node != null && (!node.getIp().equals(thisNode.ip) || 
node.getMetaPort() != thisNode.getMetaPort())
+      if (node != null && (!node.getIp().equals(thisNode.ip) || 
node.getMetaPort() != thisNode
+          .getMetaPort())
           && !allNodes.contains(node)) {
         // do not add the local node since it is added in `setThisNode()`
         allNodes.add(node);
@@ -1391,21 +1393,26 @@ public class MetaGroupMember extends RaftMember {
    */
   @Override
   public TSStatus executeNonQuery(PhysicalPlan plan) {
+    TSStatus result;
+    long start = System.currentTimeMillis();
     if (PartitionUtils.isLocalNonQueryPlan(plan)) { // run locally
-      return executeNonQueryLocally(plan);
+      result = executeNonQueryLocally(plan);
     } else if (PartitionUtils.isGlobalMetaPlan(plan)) { //forward the plan to 
all meta group nodes
-      return processNonPartitionedMetaPlan(plan);
+      result = processNonPartitionedMetaPlan(plan);
     } else if (PartitionUtils.isGlobalDataPlan(plan)) { //forward the plan to 
all data group nodes
-      return processNonPartitionedDataPlan(plan);
+      result = processNonPartitionedDataPlan(plan);
     } else { //split the plan and forward them to some PartitionGroups
       try {
-        return processPartitionedPlan(plan);
+        result = processPartitionedPlan(plan);
       } catch (UnsupportedPlanException e) {
         TSStatus status = StatusUtils.UNSUPPORTED_OPERATION.deepCopy();
         status.setMessage(e.getMessage());
-        return status;
+        result = status;
       }
     }
+    Timer.metaGroupMemberExecuteNonQueryMS += (System.currentTimeMillis() - 
start);
+    Timer.metaGroupMemberExecuteNonQueryCounter++;
+    return result;
   }
 
   private TSStatus executeNonQueryLocally(PhysicalPlan plan) {
@@ -1679,18 +1686,27 @@ public class MetaGroupMember extends RaftMember {
   }
 
   private TSStatus forwardToSingleGroup(Map.Entry<PhysicalPlan, 
PartitionGroup> entry) {
+    TSStatus result;
     if (entry.getValue().contains(thisNode)) {
       // the query should be handled by a group the local node is in, handle 
it with in the group
+      long start = System.currentTimeMillis();
       logger.debug("Execute {} in a local group of {}", entry.getKey(),
           entry.getValue().getHeader());
-      return getLocalDataMember(entry.getValue().getHeader())
+      result = getLocalDataMember(entry.getValue().getHeader())
           .executeNonQuery(entry.getKey());
+      Timer.metaGroupMemberExecuteNonQueryInLocalGroupMS += 
(System.currentTimeMillis() - start);
+      Timer.metaGroupMemberExecuteNonQueryInLocalGroupCounter++;
+
     } else {
       // forward the query to the group that should handle it
+      long start = System.currentTimeMillis();
       logger.debug("Forward {} to a remote group of {}", entry.getKey(),
           entry.getValue().getHeader());
-      return forwardPlan(entry.getKey(), entry.getValue());
+      result = forwardPlan(entry.getKey(), entry.getValue());
+      Timer.metaGroupMemberExecuteNonQueryInRemoteGroupMS += 
(System.currentTimeMillis() - start);
+      Timer.metaGroupMemberExecuteNonQueryInRemoteGroupCounter++;
     }
+    return result;
   }
 
   private TSStatus forwardToMultipleGroup(Map<PhysicalPlan, PartitionGroup> 
planGroupMap) {
@@ -1894,7 +1910,8 @@ public class MetaGroupMember extends RaftMember {
    * @param header   to determine which DataGroupMember of "receiver" will 
process the request.
    * @return a TSStatus indicating if the forwarding is successful.
    */
-  private TSStatus forwardDataPlanAsync(PhysicalPlan plan, Node receiver, Node 
header) throws IOException {
+  private TSStatus forwardDataPlanAsync(PhysicalPlan plan, Node receiver, Node 
header)
+      throws IOException {
     RaftService.AsyncClient client = getAsyncDataClient(receiver,
         RaftServer.getWriteOperationTimeoutMS());
     try {
@@ -1916,8 +1933,10 @@ public class MetaGroupMember extends RaftMember {
       return StatusUtils.TIME_OUT;
     }
   }
+
   TSIService.Client cli;
   long sId;
+
   private TSStatus forwardDataPlanSync(PhysicalPlan plan, Node receiver, Node 
header) {
     Client client = getSyncDataClient(receiver, 
RaftServer.getWriteOperationTimeoutMS());
     try {
@@ -2009,7 +2028,8 @@ public class MetaGroupMember extends RaftMember {
 
       }
       if (!partitionGroup.getHeader().equals(ignoredGroup)) {
-        partitionGroupPathMap.computeIfAbsent(partitionGroup, g -> new 
ArrayList<>()).add(prefixPath);
+        partitionGroupPathMap.computeIfAbsent(partitionGroup, g -> new 
ArrayList<>())
+            .add(prefixPath);
       }
     }
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 29d830b..74bba96 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -83,6 +83,7 @@ import org.apache.iotdb.cluster.server.NodeCharacter;
 import org.apache.iotdb.cluster.server.Peer;
 import org.apache.iotdb.cluster.server.RaftServer;
 import org.apache.iotdb.cluster.server.Response;
+import org.apache.iotdb.cluster.server.Timer;
 import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
 import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
 import org.apache.iotdb.cluster.utils.PlanSerializer;
@@ -220,7 +221,7 @@ public abstract class RaftMember {
     appendLogThreadPool =
         
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 10,
             new ThreadFactoryBuilder().setNameFormat(getName() +
-        "-AppendLog%d").build());
+                "-AppendLog%d").build());
     asyncThreadPool = new 
ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), 100,
         0L, TimeUnit.MILLISECONDS,
         new LinkedBlockingQueue<>());
@@ -461,7 +462,7 @@ public abstract class RaftMember {
     long alreadyWait = 0;
     Object logUpdateCondition = logManager.getLogUpdateCondition();
     while (logManager.getLastLogIndex() < prevLogIndex &&
-    alreadyWait <= RaftServer.getWriteOperationTimeoutMS()) {
+        alreadyWait <= RaftServer.getWriteOperationTimeoutMS()) {
       synchronized (logUpdateCondition) {
         try {
           logUpdateCondition.wait(100);
@@ -1232,6 +1233,7 @@ public abstract class RaftMember {
     if (readOnly) {
       return StatusUtils.NODE_READ_ONLY;
     }
+    long start = System.currentTimeMillis();
     PhysicalPlanLog log = new PhysicalPlanLog();
     // assign term and index to the new log and append it
     synchronized (logManager) {
@@ -1241,6 +1243,8 @@ public abstract class RaftMember {
       log.setPlan(plan);
       logManager.append(log);
     }
+    Timer.raftMemberAppendLogMS += (System.currentTimeMillis() - start);
+    Timer.raftMemberAppendLogCounter++;
 
     try {
       if (appendLogInGroup(log)) {
@@ -1287,12 +1291,18 @@ public abstract class RaftMember {
       throws LogExecutionException {
     int retryTime = 0;
     while (true) {
+      long start = System.currentTimeMillis();
       logger.debug("{}: Send log {} to other nodes, retry times: {}", name, 
log, retryTime);
       AppendLogResult result = sendLogToFollowers(log, allNodes.size() / 2);
+      Timer.raftMemberSendLogToFollowerMS += (System.currentTimeMillis() - 
start);
+      Timer.raftMemberSendLogToFollowerCounter++;
       switch (result) {
         case OK:
+          start = System.currentTimeMillis();
           logger.debug("{}: log {} is accepted", name, log);
           commitLog(log);
+          Timer.raftMemberCommitLogMS += (System.currentTimeMillis() - start);
+          Timer.raftMemberCommitLogCounter++;
           return true;
         case TIME_OUT:
           logger.debug("{}: log {} timed out, retrying...", name, log);

Reply via email to