This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch multileader_restart_test
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/multileader_restart_test by
this push:
new 2b1aa06769 add more metrics
2b1aa06769 is described below
commit 2b1aa06769266812b3f7041dab752c6c233a56f9
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Fri Jul 8 15:50:43 2022 +0800
add more metrics
---
.../multileader/logdispatcher/LogDispatcher.java | 15 +++++++++++++++
1 file changed, 15 insertions(+)
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index fbd1945536..414710aca1 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -192,6 +192,7 @@ public class LogDispatcher {
long startTime = System.nanoTime();
IndexedConsensusRequest request =
pendingRequest.poll(PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC,
TimeUnit.SECONDS);
+ logger.info("{} - {}: revoke from pendingRequest poll",
impl.getThisNode(), peer);
StepTracker.trace("MultiLeaderPoll", 50, startTime,
System.nanoTime());
if (request != null) {
bufferedRequest.add(request);
@@ -202,7 +203,13 @@ public class LogDispatcher {
}
}
// we may block here if the synchronization pipeline is full
+ long getSendTokenStartTime = System.nanoTime();
syncStatus.addNextBatch(batch);
+ StepTracker.trace(
+ "getSendBatchToken-" + this.getPeer().getEndpoint().getIp(),
+ 10,
+ getSendTokenStartTime,
+ System.nanoTime());
// sends batch asynchronously and migrates the retry logic into the
callback handler
StepTracker.trace(
"prepareBatch-" + this.getPeer().getEndpoint().getIp(),
@@ -329,17 +336,25 @@ public class LogDispatcher {
}
while (currentIndex < maxIndex
&& logBatches.size() <
config.getReplication().getMaxRequestPerBatch()) {
+ long getOneEntryStartTime = System.nanoTime();
logger.debug("construct from WAL for one Entry, index : {}",
currentIndex);
try {
+ long waitForNextStartTime = System.nanoTime();
walEntryiterator.waitForNextReady();
+ StepTracker.trace("waitForNextReady()", 400, waitForNextStartTime,
System.nanoTime());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("wait for next WAL entry is interrupted");
}
+ long walNextStartTime = System.nanoTime();
IndexedConsensusRequest data = walEntryiterator.next();
+ StepTracker.trace("walNext()", 400, walNextStartTime,
System.nanoTime());
currentIndex = data.getSearchIndex();
iteratorIndex = currentIndex;
+ long serializeStartTime = System.nanoTime();
logBatches.add(new TLogBatch(data.serializeToByteBuffer()));
+ StepTracker.trace("TLogBatchSerialize", 400, serializeStartTime,
System.nanoTime());
+ StepTracker.trace("constructOneEntry", 400, getOneEntryStartTime,
System.nanoTime());
if (currentIndex == maxIndex - 1) {
break;
}