This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch ml_0729_test_exp1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ml_0729_test_exp1 by this push:
     new 7ac0ef1290 add more metrics for each step in leader and follower
7ac0ef1290 is described below

commit 7ac0ef129071e413e069236112671c225944e08d
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Thu Aug 4 16:57:59 2022 +0800

    add more metrics for each step in leader and follower
---
 .../multileader/MultiLeaderServerImpl.java         |  55 ++++----
 .../multileader/client/DispatchLogHandler.java     |   3 +
 .../multileader/logdispatcher/LogDispatcher.java   | 146 +++++++++++----------
 .../service/MultiLeaderRPCServiceProcessor.java    |   4 +
 4 files changed, 118 insertions(+), 90 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index 8b21fa920e..e610ff78f1 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.consensus.multileader;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.StepTracker;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.DataSet;
@@ -109,30 +110,40 @@ public class MultiLeaderServerImpl {
    * records the index of the log and writes locally, and then asynchronous 
replication is performed
    */
   public TSStatus write(IConsensusRequest request) {
+    long leaderWriteStartTime = System.nanoTime();
     synchronized (stateMachine) {
-      IndexedConsensusRequest indexedConsensusRequest =
-          buildIndexedConsensusRequestForLocalRequest(request);
-      if (indexedConsensusRequest.getSearchIndex() % 1000 == 0) {
-        logger.info(
-            "DataRegion[{}]: index after build: safeIndex:{}, searchIndex: {}",
-            thisNode.getGroupId(),
-            getCurrentSafelyDeletedSearchIndex(),
-            indexedConsensusRequest.getSearchIndex());
+      StepTracker.trace("LeaderWriteWaitLock", leaderWriteStartTime, 
System.nanoTime());
+      long startTimeAfterLock = System.nanoTime();
+      try {
+        IndexedConsensusRequest indexedConsensusRequest =
+            buildIndexedConsensusRequestForLocalRequest(request);
+        if (indexedConsensusRequest.getSearchIndex() % 1000 == 0) {
+          logger.info(
+              "DataRegion[{}]: index after build: safeIndex:{}, searchIndex: 
{}",
+              thisNode.getGroupId(),
+              getCurrentSafelyDeletedSearchIndex(),
+              indexedConsensusRequest.getSearchIndex());
+        }
+        // TODO wal and memtable
+        TSStatus result = stateMachine.write(indexedConsensusRequest);
+        StepTracker.trace("stateMachineWrite", startTimeAfterLock, 
System.nanoTime());
+        long offerStartTime = System.nanoTime();
+        if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          logDispatcher.offer(indexedConsensusRequest);
+        } else {
+          logger.debug(
+              "{}: write operation failed. searchIndex: {}. Code: {}",
+              thisNode.getGroupId(),
+              indexedConsensusRequest.getSearchIndex(),
+              result.getCode());
+          index.decrementAndGet();
+        }
+        StepTracker.trace("serializeAndOfferToQueue", offerStartTime, 
System.nanoTime());
+        return result;
+      } finally {
+        StepTracker.trace("MultiLeaderWriteAfterLock", startTimeAfterLock, 
System.nanoTime());
+        StepTracker.trace("MultiLeaderWriteWhole", leaderWriteStartTime, 
System.nanoTime());
       }
-      // TODO wal and memtable
-      TSStatus result = stateMachine.write(indexedConsensusRequest);
-      if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        logDispatcher.offer(indexedConsensusRequest);
-      } else {
-        logger.debug(
-            "{}: write operation failed. searchIndex: {}. Code: {}",
-            thisNode.getGroupId(),
-            indexedConsensusRequest.getSearchIndex(),
-            result.getCode());
-        index.decrementAndGet();
-      }
-
-      return result;
     }
   }
 
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
index b4cefed078..ab6cfe655f 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.consensus.multileader.client;
 
+import org.apache.iotdb.commons.StepTracker;
 import 
org.apache.iotdb.consensus.multileader.logdispatcher.LogDispatcher.LogDispatcherThread;
 import org.apache.iotdb.consensus.multileader.logdispatcher.PendingBatch;
 import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
@@ -37,6 +38,7 @@ public class DispatchLogHandler implements 
AsyncMethodCallback<TSyncLogRes> {
   private final LogDispatcherThread thread;
   private final PendingBatch batch;
   private int retryCount;
+  private final long startTime = System.nanoTime();
 
   public DispatchLogHandler(LogDispatcherThread thread, PendingBatch batch) {
     this.thread = thread;
@@ -45,6 +47,7 @@ public class DispatchLogHandler implements 
AsyncMethodCallback<TSyncLogRes> {
 
   @Override
   public void onComplete(TSyncLogRes response) {
+    StepTracker.trace("leaderSendUtilResponse", 25, startTime, 
System.nanoTime());
     if (response.getStatus().size() == 1
         && response.getStatus().get(0).getCode()
             == TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()) {
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 4d79dabcff..b08069aa9e 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.consensus.multileader.logdispatcher;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.StepTracker;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.consensus.common.Peer;
@@ -224,7 +225,9 @@ public class LogDispatcher {
             }
           }
           // we may block here if the synchronization pipeline is full
+          long getBatchSlotStartTime = System.nanoTime();
           syncStatus.addNextBatch(batch);
+          StepTracker.trace("getBatchSlot", getBatchSlotStartTime, 
System.nanoTime());
           // sends batch asynchronously and migrates the retry logic into the 
callback handler
           sendBatchAsync(batch, new DispatchLogHandler(this, batch));
         }
@@ -244,82 +247,89 @@ public class LogDispatcher {
     }
 
     public PendingBatch getBatch() {
-      PendingBatch batch;
-      List<TLogBatch> logBatches = new ArrayList<>();
-      long startIndex = syncStatus.getNextSendingIndex();
-      logger.debug("[GetBatch] startIndex: {}", startIndex);
-      long endIndex;
-      if (bufferedRequest.size() <= 
config.getReplication().getMaxRequestPerBatch()) {
-        // Use drainTo instead of poll to reduce lock overhead
-        logger.debug(
-            "{} : pendingRequest Size: {}, bufferedRequest size: {}",
-            impl.getThisNode().getGroupId(),
-            pendingRequest.size(),
-            bufferedRequest.size());
-        pendingRequest.drainTo(
-            bufferedRequest,
-            config.getReplication().getMaxRequestPerBatch() - 
bufferedRequest.size());
-        // remove all request that searchIndex < startIndex
-        Iterator<IndexedConsensusRequest> iterator = 
bufferedRequest.iterator();
-        while (iterator.hasNext()) {
-          IndexedConsensusRequest request = iterator.next();
-          if (request.getSearchIndex() < startIndex) {
-            iterator.remove();
-          } else {
-            break;
+      long getBatchStartTime = System.nanoTime();
+      try {
+        PendingBatch batch;
+        List<TLogBatch> logBatches = new ArrayList<>();
+        long startIndex = syncStatus.getNextSendingIndex();
+        logger.debug("[GetBatch] startIndex: {}", startIndex);
+        long endIndex;
+        if (bufferedRequest.size() <= 
config.getReplication().getMaxRequestPerBatch()) {
+          // Use drainTo instead of poll to reduce lock overhead
+          logger.debug(
+              "{} : pendingRequest Size: {}, bufferedRequest size: {}",
+              impl.getThisNode().getGroupId(),
+              pendingRequest.size(),
+              bufferedRequest.size());
+          pendingRequest.drainTo(
+              bufferedRequest,
+              config.getReplication().getMaxRequestPerBatch() - 
bufferedRequest.size());
+          // remove all request that searchIndex < startIndex
+          Iterator<IndexedConsensusRequest> iterator = 
bufferedRequest.iterator();
+          while (iterator.hasNext()) {
+            IndexedConsensusRequest request = iterator.next();
+            if (request.getSearchIndex() < startIndex) {
+              iterator.remove();
+            } else {
+              break;
+            }
           }
         }
-      }
-      if (bufferedRequest.isEmpty()) { // only execute this after a restart
-        endIndex = constructBatchFromWAL(startIndex, impl.getIndex() + 1, 
logBatches);
-        batch = new PendingBatch(startIndex, endIndex, logBatches);
-        logger.debug(
-            "{} : accumulated a {} from wal when empty", 
impl.getThisNode().getGroupId(), batch);
-      } else {
-        // Notice that prev searchIndex >= startIndex
-        Iterator<IndexedConsensusRequest> iterator = 
bufferedRequest.iterator();
-        IndexedConsensusRequest prev = iterator.next();
-        // Prevents gap between logs. For example, some requests are not 
written into the queue when
-        // the queue is full. In this case, requests need to be loaded from 
the WAL
-        endIndex = constructBatchFromWAL(startIndex, prev.getSearchIndex(), 
logBatches);
-        if (logBatches.size() == 
config.getReplication().getMaxRequestPerBatch()) {
+        if (bufferedRequest.isEmpty()) { // only execute this after a restart
+          endIndex = constructBatchFromWAL(startIndex, impl.getIndex() + 1, 
logBatches);
           batch = new PendingBatch(startIndex, endIndex, logBatches);
-          logger.debug("{} : accumulated a {} from wal", 
impl.getThisNode().getGroupId(), batch);
-          return batch;
-        }
-        constructBatchIndexedFromConsensusRequest(prev, logBatches);
-        endIndex = prev.getSearchIndex();
-        iterator.remove();
-        while (iterator.hasNext()
-            && logBatches.size() <= 
config.getReplication().getMaxRequestPerBatch()) {
-          IndexedConsensusRequest current = iterator.next();
-          // Prevents gap between logs. For example, some logs are not written 
into the queue when
+          logger.debug(
+              "{} : accumulated a {} from wal when empty", 
impl.getThisNode().getGroupId(), batch);
+        } else {
+          // Notice that prev searchIndex >= startIndex
+          Iterator<IndexedConsensusRequest> iterator = 
bufferedRequest.iterator();
+          IndexedConsensusRequest prev = iterator.next();
+          // Prevents gap between logs. For example, some requests are not 
written into the queue
+          // when
           // the queue is full. In this case, requests need to be loaded from 
the WAL
-          if (current.getSearchIndex() != prev.getSearchIndex() + 1) {
-            endIndex =
-                constructBatchFromWAL(prev.getSearchIndex(), 
current.getSearchIndex(), logBatches);
-            if (logBatches.size() == 
config.getReplication().getMaxRequestPerBatch()) {
-              batch = new PendingBatch(startIndex, endIndex, logBatches);
-              logger.debug(
-                  "gap {} : accumulated a {} from queue and wal when gap",
-                  impl.getThisNode().getGroupId(),
-                  batch);
-              return batch;
-            }
+          endIndex = constructBatchFromWAL(startIndex, prev.getSearchIndex(), 
logBatches);
+          if (logBatches.size() == 
config.getReplication().getMaxRequestPerBatch()) {
+            batch = new PendingBatch(startIndex, endIndex, logBatches);
+            logger.debug("{} : accumulated a {} from wal", 
impl.getThisNode().getGroupId(), batch);
+            return batch;
           }
-          constructBatchIndexedFromConsensusRequest(current, logBatches);
-          endIndex = current.getSearchIndex();
-          prev = current;
-          // We might not be able to remove all the elements in the 
bufferedRequest in the
-          // current function, but that's fine, we'll continue processing 
these elements in the
-          // bufferedRequest the next time we go into the function, they're 
never lost
+          constructBatchIndexedFromConsensusRequest(prev, logBatches);
+          endIndex = prev.getSearchIndex();
           iterator.remove();
+          while (iterator.hasNext()
+              && logBatches.size() <= 
config.getReplication().getMaxRequestPerBatch()) {
+            IndexedConsensusRequest current = iterator.next();
+            // Prevents gap between logs. For example, some logs are not 
written into the queue when
+            // the queue is full. In this case, requests need to be loaded 
from the WAL
+            if (current.getSearchIndex() != prev.getSearchIndex() + 1) {
+              endIndex =
+                  constructBatchFromWAL(
+                      prev.getSearchIndex(), current.getSearchIndex(), 
logBatches);
+              if (logBatches.size() == 
config.getReplication().getMaxRequestPerBatch()) {
+                batch = new PendingBatch(startIndex, endIndex, logBatches);
+                logger.debug(
+                    "gap {} : accumulated a {} from queue and wal when gap",
+                    impl.getThisNode().getGroupId(),
+                    batch);
+                return batch;
+              }
+            }
+            constructBatchIndexedFromConsensusRequest(current, logBatches);
+            endIndex = current.getSearchIndex();
+            prev = current;
+            // We might not be able to remove all the elements in the 
bufferedRequest in the
+            // current function, but that's fine, we'll continue processing 
these elements in the
+            // bufferedRequest the next time we go into the function, they're 
never lost
+            iterator.remove();
+          }
+          batch = new PendingBatch(startIndex, endIndex, logBatches);
+          logger.debug(
+              "{} : accumulated a {} from queue and wal", 
impl.getThisNode().getGroupId(), batch);
         }
-        batch = new PendingBatch(startIndex, endIndex, logBatches);
-        logger.debug(
-            "{} : accumulated a {} from queue and wal", 
impl.getThisNode().getGroupId(), batch);
+        return batch;
+      } finally {
+        StepTracker.trace("getBatch()", 25, getBatchStartTime, 
System.nanoTime());
       }
-      return batch;
     }
 
     public void sendBatchAsync(PendingBatch batch, DispatchLogHandler handler) 
{
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 157b8b0207..8f2fbc7080 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.consensus.multileader.service;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.StepTracker;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
@@ -54,6 +55,7 @@ public class MultiLeaderRPCServiceProcessor implements 
MultiLeaderConsensusIServ
 
   @Override
   public void syncLog(TSyncLogReq req, AsyncMethodCallback<TSyncLogRes> 
resultHandler) {
+    long syncLogStartTime = System.nanoTime();
     try {
       ConsensusGroupId groupId =
           
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
@@ -99,6 +101,8 @@ public class MultiLeaderRPCServiceProcessor implements 
MultiLeaderConsensusIServ
       }
     } catch (Exception e) {
       resultHandler.onError(e);
+    } finally {
+      StepTracker.trace("syncLog", 25, syncLogStartTime, System.nanoTime());
     }
   }
 

Reply via email to