This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch ml_0729_test_exp1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ml_0729_test_exp1 by this push:
new 7ac0ef1290 add more metrics for each step in leader and follower
7ac0ef1290 is described below
commit 7ac0ef129071e413e069236112671c225944e08d
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Thu Aug 4 16:57:59 2022 +0800
add more metrics for each step in leader and follower
---
.../multileader/MultiLeaderServerImpl.java | 55 ++++----
.../multileader/client/DispatchLogHandler.java | 3 +
.../multileader/logdispatcher/LogDispatcher.java | 146 +++++++++++----------
.../service/MultiLeaderRPCServiceProcessor.java | 4 +
4 files changed, 118 insertions(+), 90 deletions(-)
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index 8b21fa920e..e610ff78f1 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.consensus.multileader;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.StepTracker;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
@@ -109,30 +110,40 @@ public class MultiLeaderServerImpl {
* records the index of the log and writes locally, and then asynchronous
replication is performed
*/
public TSStatus write(IConsensusRequest request) {
+ long leaderWriteStartTime = System.nanoTime();
synchronized (stateMachine) {
- IndexedConsensusRequest indexedConsensusRequest =
- buildIndexedConsensusRequestForLocalRequest(request);
- if (indexedConsensusRequest.getSearchIndex() % 1000 == 0) {
- logger.info(
- "DataRegion[{}]: index after build: safeIndex:{}, searchIndex: {}",
- thisNode.getGroupId(),
- getCurrentSafelyDeletedSearchIndex(),
- indexedConsensusRequest.getSearchIndex());
+ StepTracker.trace("LeaderWriteWaitLock", leaderWriteStartTime,
System.nanoTime());
+ long startTimeAfterLock = System.nanoTime();
+ try {
+ IndexedConsensusRequest indexedConsensusRequest =
+ buildIndexedConsensusRequestForLocalRequest(request);
+ if (indexedConsensusRequest.getSearchIndex() % 1000 == 0) {
+ logger.info(
+ "DataRegion[{}]: index after build: safeIndex:{}, searchIndex:
{}",
+ thisNode.getGroupId(),
+ getCurrentSafelyDeletedSearchIndex(),
+ indexedConsensusRequest.getSearchIndex());
+ }
+ // TODO wal and memtable
+ TSStatus result = stateMachine.write(indexedConsensusRequest);
+ StepTracker.trace("stateMachineWrite", startTimeAfterLock,
System.nanoTime());
+ long offerStartTime = System.nanoTime();
+ if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ logDispatcher.offer(indexedConsensusRequest);
+ } else {
+ logger.debug(
+ "{}: write operation failed. searchIndex: {}. Code: {}",
+ thisNode.getGroupId(),
+ indexedConsensusRequest.getSearchIndex(),
+ result.getCode());
+ index.decrementAndGet();
+ }
+ StepTracker.trace("serializeAndOfferToQueue", offerStartTime,
System.nanoTime());
+ return result;
+ } finally {
+ StepTracker.trace("MultiLeaderWriteAfterLock", startTimeAfterLock,
System.nanoTime());
+ StepTracker.trace("MultiLeaderWriteWhole", leaderWriteStartTime,
System.nanoTime());
}
- // TODO wal and memtable
- TSStatus result = stateMachine.write(indexedConsensusRequest);
- if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- logDispatcher.offer(indexedConsensusRequest);
- } else {
- logger.debug(
- "{}: write operation failed. searchIndex: {}. Code: {}",
- thisNode.getGroupId(),
- indexedConsensusRequest.getSearchIndex(),
- result.getCode());
- index.decrementAndGet();
- }
-
- return result;
}
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
index b4cefed078..ab6cfe655f 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.consensus.multileader.client;
+import org.apache.iotdb.commons.StepTracker;
import
org.apache.iotdb.consensus.multileader.logdispatcher.LogDispatcher.LogDispatcherThread;
import org.apache.iotdb.consensus.multileader.logdispatcher.PendingBatch;
import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
@@ -37,6 +38,7 @@ public class DispatchLogHandler implements
AsyncMethodCallback<TSyncLogRes> {
private final LogDispatcherThread thread;
private final PendingBatch batch;
private int retryCount;
+ private final long startTime = System.nanoTime();
public DispatchLogHandler(LogDispatcherThread thread, PendingBatch batch) {
this.thread = thread;
@@ -45,6 +47,7 @@ public class DispatchLogHandler implements
AsyncMethodCallback<TSyncLogRes> {
@Override
public void onComplete(TSyncLogRes response) {
+ StepTracker.trace("leaderSendUtilResponse", 25, startTime,
System.nanoTime());
if (response.getStatus().size() == 1
&& response.getStatus().get(0).getCode()
== TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()) {
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 4d79dabcff..b08069aa9e 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.consensus.multileader.logdispatcher;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.StepTracker;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.consensus.common.Peer;
@@ -224,7 +225,9 @@ public class LogDispatcher {
}
}
// we may block here if the synchronization pipeline is full
+ long getBatchSlotStartTime = System.nanoTime();
syncStatus.addNextBatch(batch);
+ StepTracker.trace("getBatchSlot", getBatchSlotStartTime,
System.nanoTime());
// sends batch asynchronously and migrates the retry logic into the
callback handler
sendBatchAsync(batch, new DispatchLogHandler(this, batch));
}
@@ -244,82 +247,89 @@ public class LogDispatcher {
}
public PendingBatch getBatch() {
- PendingBatch batch;
- List<TLogBatch> logBatches = new ArrayList<>();
- long startIndex = syncStatus.getNextSendingIndex();
- logger.debug("[GetBatch] startIndex: {}", startIndex);
- long endIndex;
- if (bufferedRequest.size() <=
config.getReplication().getMaxRequestPerBatch()) {
- // Use drainTo instead of poll to reduce lock overhead
- logger.debug(
- "{} : pendingRequest Size: {}, bufferedRequest size: {}",
- impl.getThisNode().getGroupId(),
- pendingRequest.size(),
- bufferedRequest.size());
- pendingRequest.drainTo(
- bufferedRequest,
- config.getReplication().getMaxRequestPerBatch() -
bufferedRequest.size());
- // remove all request that searchIndex < startIndex
- Iterator<IndexedConsensusRequest> iterator =
bufferedRequest.iterator();
- while (iterator.hasNext()) {
- IndexedConsensusRequest request = iterator.next();
- if (request.getSearchIndex() < startIndex) {
- iterator.remove();
- } else {
- break;
+ long getBatchStartTime = System.nanoTime();
+ try {
+ PendingBatch batch;
+ List<TLogBatch> logBatches = new ArrayList<>();
+ long startIndex = syncStatus.getNextSendingIndex();
+ logger.debug("[GetBatch] startIndex: {}", startIndex);
+ long endIndex;
+ if (bufferedRequest.size() <=
config.getReplication().getMaxRequestPerBatch()) {
+ // Use drainTo instead of poll to reduce lock overhead
+ logger.debug(
+ "{} : pendingRequest Size: {}, bufferedRequest size: {}",
+ impl.getThisNode().getGroupId(),
+ pendingRequest.size(),
+ bufferedRequest.size());
+ pendingRequest.drainTo(
+ bufferedRequest,
+ config.getReplication().getMaxRequestPerBatch() -
bufferedRequest.size());
+ // remove all request that searchIndex < startIndex
+ Iterator<IndexedConsensusRequest> iterator =
bufferedRequest.iterator();
+ while (iterator.hasNext()) {
+ IndexedConsensusRequest request = iterator.next();
+ if (request.getSearchIndex() < startIndex) {
+ iterator.remove();
+ } else {
+ break;
+ }
}
}
- }
- if (bufferedRequest.isEmpty()) { // only execute this after a restart
- endIndex = constructBatchFromWAL(startIndex, impl.getIndex() + 1,
logBatches);
- batch = new PendingBatch(startIndex, endIndex, logBatches);
- logger.debug(
- "{} : accumulated a {} from wal when empty",
impl.getThisNode().getGroupId(), batch);
- } else {
- // Notice that prev searchIndex >= startIndex
- Iterator<IndexedConsensusRequest> iterator =
bufferedRequest.iterator();
- IndexedConsensusRequest prev = iterator.next();
- // Prevents gap between logs. For example, some requests are not
written into the queue when
- // the queue is full. In this case, requests need to be loaded from
the WAL
- endIndex = constructBatchFromWAL(startIndex, prev.getSearchIndex(),
logBatches);
- if (logBatches.size() ==
config.getReplication().getMaxRequestPerBatch()) {
+ if (bufferedRequest.isEmpty()) { // only execute this after a restart
+ endIndex = constructBatchFromWAL(startIndex, impl.getIndex() + 1,
logBatches);
batch = new PendingBatch(startIndex, endIndex, logBatches);
- logger.debug("{} : accumulated a {} from wal",
impl.getThisNode().getGroupId(), batch);
- return batch;
- }
- constructBatchIndexedFromConsensusRequest(prev, logBatches);
- endIndex = prev.getSearchIndex();
- iterator.remove();
- while (iterator.hasNext()
- && logBatches.size() <=
config.getReplication().getMaxRequestPerBatch()) {
- IndexedConsensusRequest current = iterator.next();
- // Prevents gap between logs. For example, some logs are not written
into the queue when
+ logger.debug(
+ "{} : accumulated a {} from wal when empty",
impl.getThisNode().getGroupId(), batch);
+ } else {
+ // Notice that prev searchIndex >= startIndex
+ Iterator<IndexedConsensusRequest> iterator =
bufferedRequest.iterator();
+ IndexedConsensusRequest prev = iterator.next();
+ // Prevents gap between logs. For example, some requests are not
written into the queue
+ // when
// the queue is full. In this case, requests need to be loaded from
the WAL
- if (current.getSearchIndex() != prev.getSearchIndex() + 1) {
- endIndex =
- constructBatchFromWAL(prev.getSearchIndex(),
current.getSearchIndex(), logBatches);
- if (logBatches.size() ==
config.getReplication().getMaxRequestPerBatch()) {
- batch = new PendingBatch(startIndex, endIndex, logBatches);
- logger.debug(
- "gap {} : accumulated a {} from queue and wal when gap",
- impl.getThisNode().getGroupId(),
- batch);
- return batch;
- }
+ endIndex = constructBatchFromWAL(startIndex, prev.getSearchIndex(),
logBatches);
+ if (logBatches.size() ==
config.getReplication().getMaxRequestPerBatch()) {
+ batch = new PendingBatch(startIndex, endIndex, logBatches);
+ logger.debug("{} : accumulated a {} from wal",
impl.getThisNode().getGroupId(), batch);
+ return batch;
}
- constructBatchIndexedFromConsensusRequest(current, logBatches);
- endIndex = current.getSearchIndex();
- prev = current;
- // We might not be able to remove all the elements in the
bufferedRequest in the
- // current function, but that's fine, we'll continue processing
these elements in the
- // bufferedRequest the next time we go into the function, they're
never lost
+ constructBatchIndexedFromConsensusRequest(prev, logBatches);
+ endIndex = prev.getSearchIndex();
iterator.remove();
+ while (iterator.hasNext()
+ && logBatches.size() <=
config.getReplication().getMaxRequestPerBatch()) {
+ IndexedConsensusRequest current = iterator.next();
+ // Prevents gap between logs. For example, some logs are not
written into the queue when
+ // the queue is full. In this case, requests need to be loaded
from the WAL
+ if (current.getSearchIndex() != prev.getSearchIndex() + 1) {
+ endIndex =
+ constructBatchFromWAL(
+ prev.getSearchIndex(), current.getSearchIndex(),
logBatches);
+ if (logBatches.size() ==
config.getReplication().getMaxRequestPerBatch()) {
+ batch = new PendingBatch(startIndex, endIndex, logBatches);
+ logger.debug(
+ "gap {} : accumulated a {} from queue and wal when gap",
+ impl.getThisNode().getGroupId(),
+ batch);
+ return batch;
+ }
+ }
+ constructBatchIndexedFromConsensusRequest(current, logBatches);
+ endIndex = current.getSearchIndex();
+ prev = current;
+ // We might not be able to remove all the elements in the
bufferedRequest in the
+ // current function, but that's fine, we'll continue processing
these elements in the
+ // bufferedRequest the next time we go into the function, they're
never lost
+ iterator.remove();
+ }
+ batch = new PendingBatch(startIndex, endIndex, logBatches);
+ logger.debug(
+ "{} : accumulated a {} from queue and wal",
impl.getThisNode().getGroupId(), batch);
}
- batch = new PendingBatch(startIndex, endIndex, logBatches);
- logger.debug(
- "{} : accumulated a {} from queue and wal",
impl.getThisNode().getGroupId(), batch);
+ return batch;
+ } finally {
+ StepTracker.trace("getBatch()", 25, getBatchStartTime,
System.nanoTime());
}
- return batch;
}
public void sendBatchAsync(PendingBatch batch, DispatchLogHandler handler)
{
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 157b8b0207..8f2fbc7080 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.consensus.multileader.service;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.StepTracker;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
@@ -54,6 +55,7 @@ public class MultiLeaderRPCServiceProcessor implements
MultiLeaderConsensusIServ
@Override
public void syncLog(TSyncLogReq req, AsyncMethodCallback<TSyncLogRes>
resultHandler) {
+ long syncLogStartTime = System.nanoTime();
try {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
@@ -99,6 +101,8 @@ public class MultiLeaderRPCServiceProcessor implements
MultiLeaderConsensusIServ
}
} catch (Exception e) {
resultHandler.onError(e);
+ } finally {
+ StepTracker.trace("syncLog", 25, syncLogStartTime, System.nanoTime());
}
}