This is an automated email from the ASF dual-hosted git repository. yuyuankang pushed a commit to branch cluster_performance in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit e6b288f3a38c4d5e0e3e27d448c7c6e293a13ab8 Author: Ring-k <[email protected]> AuthorDate: Tue Sep 1 14:55:15 2020 +0800 timer --- .../org/apache/iotdb/cluster/server/Timer.java | 21 ++++++++++++ .../cluster/server/member/DataGroupMember.java | 7 ++++ .../cluster/server/member/MetaGroupMember.java | 40 ++++++++++++++++------ .../iotdb/cluster/server/member/RaftMember.java | 14 ++++++-- 4 files changed, 70 insertions(+), 12 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java new file mode 100644 index 0000000..c1c0a5d --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java @@ -0,0 +1,21 @@ +package org.apache.iotdb.cluster.server; + +public class Timer { + + public static long dataGroupMemberProcessPlanLocallyMS = 0L; + public static long dataGroupMemberProcessPlanLocallyCounter = 0L; + public static long dataGroupMemberWaitLeaderMS = 0L; + public static long dataGroupMemberWaitLeaderCounter = 0L; + public static long metaGroupMemberExecuteNonQueryMS = 0L; + public static long metaGroupMemberExecuteNonQueryCounter = 0L; + public static long metaGroupMemberExecuteNonQueryInLocalGroupMS = 0L; + public static long metaGroupMemberExecuteNonQueryInLocalGroupCounter = 0L; + public static long metaGroupMemberExecuteNonQueryInRemoteGroupMS = 0L; + public static long metaGroupMemberExecuteNonQueryInRemoteGroupCounter = 0L; + public static long raftMemberAppendLogMS = 0L; + public static long raftMemberAppendLogCounter = 0L; + public static long raftMemberSendLogToFollowerMS = 0L; + public static long raftMemberSendLogToFollowerCounter = 0L; + public static long raftMemberCommitLogMS = 0L; + public static long raftMemberCommitLogCounter = 0L; +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java index b16dd7d..61fe4ba 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java @@ -95,6 +95,7 @@ import org.apache.iotdb.cluster.server.NodeReport.DataMemberReport; import org.apache.iotdb.cluster.server.Peer; import org.apache.iotdb.cluster.server.PullSnapshotHintService; import org.apache.iotdb.cluster.server.Response; +import org.apache.iotdb.cluster.server.Timer; import org.apache.iotdb.cluster.server.heartbeat.DataHeartbeatThread; import org.apache.iotdb.cluster.utils.ClusterQueryUtils; import org.apache.iotdb.cluster.utils.PartitionUtils; @@ -1036,7 +1037,10 @@ public class DataGroupMember extends RaftMember { */ TSStatus executeNonQuery(PhysicalPlan plan) { if (character == NodeCharacter.LEADER) { + long start = System.currentTimeMillis(); TSStatus status = processPlanLocally(plan); + Timer.dataGroupMemberProcessPlanLocallyMS += (System.currentTimeMillis() - start); + Timer.dataGroupMemberProcessPlanLocallyCounter++; if (status != null) { return status; } @@ -1044,7 +1048,10 @@ public class DataGroupMember extends RaftMember { return forwardPlan(plan, leader, getHeader()); } + long start = System.currentTimeMillis(); waitLeader(); + Timer.dataGroupMemberWaitLeaderMS += (System.currentTimeMillis() - start); + Timer.dataGroupMemberWaitLeaderCounter++; // the leader can be itself after waiting if (character == NodeCharacter.LEADER) { TSStatus status = processPlanLocally(plan); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java index 170ccaa..d3ad631 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java @@ -132,6 +132,7 @@ import org.apache.iotdb.cluster.server.NodeReport; import org.apache.iotdb.cluster.server.NodeReport.MetaMemberReport; import org.apache.iotdb.cluster.server.RaftServer; import org.apache.iotdb.cluster.server.Response; +import org.apache.iotdb.cluster.server.Timer; import org.apache.iotdb.cluster.server.handlers.caller.AppendGroupEntryHandler; import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler; import org.apache.iotdb.cluster.server.handlers.caller.NodeStatusHandler; @@ -412,7 +413,8 @@ public class MetaGroupMember extends RaftMember { // initialize allNodes for (String seedUrl : seedUrls) { Node node = generateNode(seedUrl); - if (node != null && (!node.getIp().equals(thisNode.ip) || node.getMetaPort() != thisNode.getMetaPort()) + if (node != null && (!node.getIp().equals(thisNode.ip) || node.getMetaPort() != thisNode + .getMetaPort()) && !allNodes.contains(node)) { // do not add the local node since it is added in `setThisNode()` allNodes.add(node); @@ -1391,21 +1393,26 @@ public class MetaGroupMember extends RaftMember { */ @Override public TSStatus executeNonQuery(PhysicalPlan plan) { + TSStatus result; + long start = System.currentTimeMillis(); if (PartitionUtils.isLocalNonQueryPlan(plan)) { // run locally - return executeNonQueryLocally(plan); + result = executeNonQueryLocally(plan); } else if (PartitionUtils.isGlobalMetaPlan(plan)) { //forward the plan to all meta group nodes - return processNonPartitionedMetaPlan(plan); + result = processNonPartitionedMetaPlan(plan); } else if (PartitionUtils.isGlobalDataPlan(plan)) { //forward the plan to all data group nodes - return processNonPartitionedDataPlan(plan); + result = processNonPartitionedDataPlan(plan); } else { //split the plan and forward them to some PartitionGroups try { - return processPartitionedPlan(plan); + result = processPartitionedPlan(plan); } catch (UnsupportedPlanException e) { TSStatus status = StatusUtils.UNSUPPORTED_OPERATION.deepCopy(); status.setMessage(e.getMessage()); - return status; + result = status; } } + Timer.metaGroupMemberExecuteNonQueryMS += (System.currentTimeMillis() - start); + Timer.metaGroupMemberExecuteNonQueryCounter++; + return result; } private TSStatus executeNonQueryLocally(PhysicalPlan plan) { @@ -1679,18 +1686,27 @@ public class MetaGroupMember extends RaftMember { } private TSStatus forwardToSingleGroup(Map.Entry<PhysicalPlan, PartitionGroup> entry) { + TSStatus result; if (entry.getValue().contains(thisNode)) { // the query should be handled by a group the local node is in, handle it with in the group + long start = System.currentTimeMillis(); logger.debug("Execute {} in a local group of {}", entry.getKey(), entry.getValue().getHeader()); - return getLocalDataMember(entry.getValue().getHeader()) + result = getLocalDataMember(entry.getValue().getHeader()) .executeNonQuery(entry.getKey()); + Timer.metaGroupMemberExecuteNonQueryInLocalGroupMS += (System.currentTimeMillis() - start); + Timer.metaGroupMemberExecuteNonQueryInLocalGroupCounter++; + } else { // forward the query to the group that should handle it + long start = System.currentTimeMillis(); logger.debug("Forward {} to a remote group of {}", entry.getKey(), entry.getValue().getHeader()); - return forwardPlan(entry.getKey(), entry.getValue()); + result = forwardPlan(entry.getKey(), entry.getValue()); + Timer.metaGroupMemberExecuteNonQueryInRemoteGroupMS += (System.currentTimeMillis() - start); + Timer.metaGroupMemberExecuteNonQueryInRemoteGroupCounter++; } + return result; } private TSStatus forwardToMultipleGroup(Map<PhysicalPlan, PartitionGroup> planGroupMap) { @@ -1894,7 +1910,8 @@ public class MetaGroupMember extends RaftMember { * @param header to determine which DataGroupMember of "receiver" will process the request. * @return a TSStatus indicating if the forwarding is successful. */ - private TSStatus forwardDataPlanAsync(PhysicalPlan plan, Node receiver, Node header) throws IOException { + private TSStatus forwardDataPlanAsync(PhysicalPlan plan, Node receiver, Node header) + throws IOException { RaftService.AsyncClient client = getAsyncDataClient(receiver, RaftServer.getWriteOperationTimeoutMS()); try { @@ -1916,8 +1933,10 @@ public class MetaGroupMember extends RaftMember { return StatusUtils.TIME_OUT; } } + TSIService.Client cli; long sId; + private TSStatus forwardDataPlanSync(PhysicalPlan plan, Node receiver, Node header) { Client client = getSyncDataClient(receiver, RaftServer.getWriteOperationTimeoutMS()); try { @@ -2009,7 +2028,8 @@ public class MetaGroupMember extends RaftMember { } if (!partitionGroup.getHeader().equals(ignoredGroup)) { - partitionGroupPathMap.computeIfAbsent(partitionGroup, g -> new ArrayList<>()).add(prefixPath); + partitionGroupPathMap.computeIfAbsent(partitionGroup, g -> new ArrayList<>()) + .add(prefixPath); } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java index 29d830b..74bba96 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java @@ -83,6 +83,7 @@ import org.apache.iotdb.cluster.server.NodeCharacter; import org.apache.iotdb.cluster.server.Peer; import org.apache.iotdb.cluster.server.RaftServer; import org.apache.iotdb.cluster.server.Response; +import org.apache.iotdb.cluster.server.Timer; import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler; import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler; import org.apache.iotdb.cluster.utils.PlanSerializer; @@ -220,7 +221,7 @@ public abstract class RaftMember { appendLogThreadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 10, new ThreadFactoryBuilder().setNameFormat(getName() + - "-AppendLog%d").build()); + "-AppendLog%d").build()); asyncThreadPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), 100, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); @@ -461,7 +462,7 @@ public abstract class RaftMember { long alreadyWait = 0; Object logUpdateCondition = logManager.getLogUpdateCondition(); while (logManager.getLastLogIndex() < prevLogIndex && - alreadyWait <= RaftServer.getWriteOperationTimeoutMS()) { + alreadyWait <= RaftServer.getWriteOperationTimeoutMS()) { synchronized (logUpdateCondition) { try { logUpdateCondition.wait(100); @@ -1232,6 +1233,7 @@ public abstract class RaftMember { if (readOnly) { return StatusUtils.NODE_READ_ONLY; } + long start = System.currentTimeMillis(); PhysicalPlanLog log = new PhysicalPlanLog(); // assign term and index to the new log and append it synchronized (logManager) { @@ -1241,6 +1243,8 @@ public abstract class RaftMember { log.setPlan(plan); logManager.append(log); } + Timer.raftMemberAppendLogMS += (System.currentTimeMillis() - start); + Timer.raftMemberAppendLogCounter++; try { if (appendLogInGroup(log)) { @@ -1287,12 +1291,18 @@ public abstract class RaftMember { throws LogExecutionException { int retryTime = 0; while (true) { + long start = System.currentTimeMillis(); logger.debug("{}: Send log {} to other nodes, retry times: {}", name, log, retryTime); AppendLogResult result = sendLogToFollowers(log, allNodes.size() / 2); + Timer.raftMemberSendLogToFollowerMS += (System.currentTimeMillis() - start); + Timer.raftMemberSendLogToFollowerCounter++; switch (result) { case OK: + start = System.currentTimeMillis(); logger.debug("{}: log {} is accepted", name, log); commitLog(log); + Timer.raftMemberCommitLogMS += (System.currentTimeMillis() - start); + Timer.raftMemberCommitLogCounter++; return true; case TIME_OUT: logger.debug("{}: log {} timed out, retrying...", name, log);
