This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch ml_0808_test_exp1_parallel in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3a743f41ae5b4f522626b85cdd4bf923f978e1d0 Author: Jinrui.Zhang <[email protected]> AuthorDate: Tue Aug 9 15:49:49 2022 +0800 clear log --- .../common/request/IndexedConsensusRequest.java | 13 +- .../multileader/MultiLeaderServerImpl.java | 60 +++--- .../multileader/client/DispatchLogHandler.java | 3 - .../multileader/logdispatcher/LogDispatcher.java | 206 ++++++++------------- .../service/MultiLeaderRPCServiceProcessor.java | 4 - .../statemachine/DataRegionStateMachine.java | 13 +- .../iotdb/db/engine/storagegroup/DataRegion.java | 3 - 7 files changed, 109 insertions(+), 193 deletions(-) diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java index 9ce706d09a..1a61cf3a09 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java @@ -19,8 +19,6 @@ package org.apache.iotdb.consensus.common.request; -import org.apache.iotdb.commons.StepTracker; - import java.nio.ByteBuffer; import java.util.LinkedList; import java.util.List; @@ -69,14 +67,9 @@ public class IndexedConsensusRequest implements IConsensusRequest { } public List<ByteBuffer> buildSerializedRequests() { - long startTime = System.nanoTime(); - try { - List<ByteBuffer> result = new LinkedList<>(); - this.requests.forEach(r -> result.add(r.serializeToByteBuffer())); - return result; - } finally { - StepTracker.trace("buildSerializedRequests", startTime, System.nanoTime()); - } + List<ByteBuffer> result = new LinkedList<>(); + this.requests.forEach(r -> result.add(r.serializeToByteBuffer())); + return result; } public long getSearchIndex() { diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java index a2fd7c0e47..096cc160a0 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java @@ -21,7 +21,6 @@ package org.apache.iotdb.consensus.multileader; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; -import org.apache.iotdb.commons.StepTracker; import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.consensus.IStateMachine; import org.apache.iotdb.consensus.common.DataSet; @@ -110,44 +109,33 @@ public class MultiLeaderServerImpl { * records the index of the log and writes locally, and then asynchronous replication is performed */ public TSStatus write(IConsensusRequest request) { - long leaderWriteStartTime = System.nanoTime(); synchronized (stateMachine) { - StepTracker.trace("LeaderWriteWaitLock", leaderWriteStartTime, System.nanoTime()); - long startTimeAfterLock = System.nanoTime(); - try { - IndexedConsensusRequest indexedConsensusRequest = - buildIndexedConsensusRequestForLocalRequest(request); - if (indexedConsensusRequest.getSearchIndex() % 1000 == 0) { - logger.info( - "DataRegion[{}]: index after build: safeIndex:{}, searchIndex: {}", - thisNode.getGroupId(), - getCurrentSafelyDeletedSearchIndex(), - indexedConsensusRequest.getSearchIndex()); - } - // TODO wal and memtable - TSStatus result = stateMachine.write(indexedConsensusRequest); - // TSStatus result = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); - StepTracker.trace("stateMachineWrite", startTimeAfterLock, System.nanoTime()); - long offerStartTime = System.nanoTime(); - if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - synchronized (index) { - logDispatcher.offer(indexedConsensusRequest); - index.incrementAndGet(); - } - } else { - logger.debug( - "{}: write operation failed. searchIndex: {}. Code: {}", - thisNode.getGroupId(), - indexedConsensusRequest.getSearchIndex(), - result.getCode()); - index.decrementAndGet(); + IndexedConsensusRequest indexedConsensusRequest = + buildIndexedConsensusRequestForLocalRequest(request); + if (indexedConsensusRequest.getSearchIndex() % 1000 == 0) { + logger.info( + "DataRegion[{}]: index after build: safeIndex:{}, searchIndex: {}", + thisNode.getGroupId(), + getCurrentSafelyDeletedSearchIndex(), + indexedConsensusRequest.getSearchIndex()); + } + // TODO wal and memtable + TSStatus result = stateMachine.write(indexedConsensusRequest); + long offerStartTime = System.nanoTime(); + if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + synchronized (index) { + logDispatcher.offer(indexedConsensusRequest); + index.incrementAndGet(); } - StepTracker.trace("serializeAndOfferToQueue", offerStartTime, System.nanoTime()); - return result; - } finally { - StepTracker.trace("MultiLeaderWriteAfterLock", startTimeAfterLock, System.nanoTime()); - StepTracker.trace("MultiLeaderWriteWhole", leaderWriteStartTime, System.nanoTime()); + } else { + logger.debug( + "{}: write operation failed. searchIndex: {}. Code: {}", + thisNode.getGroupId(), + indexedConsensusRequest.getSearchIndex(), + result.getCode()); + index.decrementAndGet(); } + return result; } } diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java index ab6cfe655f..b4cefed078 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java @@ -19,7 +19,6 @@ package org.apache.iotdb.consensus.multileader.client; -import org.apache.iotdb.commons.StepTracker; import org.apache.iotdb.consensus.multileader.logdispatcher.LogDispatcher.LogDispatcherThread; import org.apache.iotdb.consensus.multileader.logdispatcher.PendingBatch; import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes; @@ -38,7 +37,6 @@ public class DispatchLogHandler implements AsyncMethodCallback<TSyncLogRes> { private final LogDispatcherThread thread; private final PendingBatch batch; private int retryCount; - private final long startTime = System.nanoTime(); public DispatchLogHandler(LogDispatcherThread thread, PendingBatch batch) { this.thread = thread; @@ -47,7 +45,6 @@ public class DispatchLogHandler implements AsyncMethodCallback<TSyncLogRes> { @Override public void onComplete(TSyncLogRes response) { - StepTracker.trace("leaderSendUtilResponse", 25, startTime, System.nanoTime()); if (response.getStatus().size() == 1 && response.getStatus().get(0).getCode() == TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()) { diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java index daa34122a5..5f29665346 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java @@ -20,7 +20,6 @@ package org.apache.iotdb.consensus.multileader.logdispatcher; import org.apache.iotdb.common.rpc.thrift.TEndPoint; -import org.apache.iotdb.commons.StepTracker; import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.consensus.common.Peer; @@ -116,28 +115,9 @@ public class LogDispatcher { impl.getThisNode().getGroupId(), thread.getPeer().getEndpoint().getIp(), thread.getPendingRequest().size()); - // long putToQueueStartTime = System.nanoTime(); - // try { - // while (!thread - // .getPendingRequest() - // .offer(new IndexedConsensusRequest(serializedRequests, - // request.getSearchIndex()))) { - // impl.getIndexObject().wait(); - // } - // ; - // } catch (InterruptedException e) { - // e.printStackTrace(); - // } finally { - // logger.info("{}: Push a log to the queue, done", - // impl.getThisNode().getGroupId()); - // StepTracker.trace("putToQueueWaitingTime", putToQueueStartTime, - // System.nanoTime()); - // } - if (thread + if (!thread .getPendingRequest() .offer(new IndexedConsensusRequest(serializedRequests, request.getSearchIndex()))) { - thread.countQueue(request.getSearchIndex()); - } else { logger.info( "{}: Log queue of {} is full, ignore the log to this node, searchIndex: {}", impl.getThisNode().getGroupId(), @@ -147,14 +127,6 @@ public class LogDispatcher { }); } - private boolean needPutIntoQueue() { - return threads.stream() - .anyMatch( - t -> - t.getPendingRequest().size() - < t.config.getReplication().getMaxPendingRequestNumPerNode()); - } - public class LogDispatcherThread implements Runnable { private static final long PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC = 10; private final MultiLeaderConfig config; @@ -187,16 +159,6 @@ public class LogDispatcher { this.walEntryiterator = reader.getReqIterator(iteratorIndex); } - public void countQueue(long searchIndex) { - this.queueCount++; - if (queueCount % 100 == 0) { - logger.info( - String.format( - "DataRegion[%s]->%s: total request from queue: [%d], requestIndex: [%d]", - peer.getGroupId().getId(), peer.getEndpoint().ip, queueCount, searchIndex)); - } - } - public IndexController getController() { return controller; } @@ -244,10 +206,7 @@ public class LogDispatcher { } } // we may block here if the synchronization pipeline is full - long getBatchSlotStartTime = System.nanoTime(); syncStatus.addNextBatch(batch); - StepTracker.trace("batchSize", 25, 0, batch.getBatches().size() * 1000_000L); - StepTracker.trace("getBatchSlot", 25, getBatchSlotStartTime, System.nanoTime()); // sends batch asynchronously and migrates the retry logic into the callback handler sendBatchAsync(batch, new DispatchLogHandler(this, batch)); } @@ -267,99 +226,92 @@ public class LogDispatcher { } public PendingBatch getBatch() { - long getBatchStartTime = System.nanoTime(); - try { - PendingBatch batch; - List<TLogBatch> logBatches = new ArrayList<>(); - long startIndex = syncStatus.getNextSendingIndex(); - long maxIndexWhenBufferedRequestEmpty = startIndex; - logger.debug("[GetBatch] startIndex: {}", startIndex); - long endIndex; - if (bufferedRequest.size() <= config.getReplication().getMaxRequestPerBatch()) { - // Use drainTo instead of poll to reduce lock overhead - logger.debug( - "{} : pendingRequest Size: {}, bufferedRequest size: {}", - impl.getThisNode().getGroupId(), - pendingRequest.size(), - bufferedRequest.size()); - synchronized (impl.getIndexObject()) { - pendingRequest.drainTo( - bufferedRequest, - config.getReplication().getMaxRequestPerBatch() - bufferedRequest.size()); - maxIndexWhenBufferedRequestEmpty = impl.getIndex() + 1; - // impl.getIndexObject().notifyAll(); - } - // remove all request that searchIndex < startIndex - Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator(); - while (iterator.hasNext()) { - IndexedConsensusRequest request = iterator.next(); - if (request.getSearchIndex() < startIndex) { - iterator.remove(); - } else { - break; - } + PendingBatch batch; + List<TLogBatch> logBatches = new ArrayList<>(); + long startIndex = syncStatus.getNextSendingIndex(); + long maxIndexWhenBufferedRequestEmpty = startIndex; + logger.debug("[GetBatch] startIndex: {}", startIndex); + long endIndex; + if (bufferedRequest.size() <= config.getReplication().getMaxRequestPerBatch()) { + // Use drainTo instead of poll to reduce lock overhead + logger.debug( + "{} : pendingRequest Size: {}, bufferedRequest size: {}", + impl.getThisNode().getGroupId(), + pendingRequest.size(), + bufferedRequest.size()); + synchronized (impl.getIndexObject()) { + pendingRequest.drainTo( + bufferedRequest, + config.getReplication().getMaxRequestPerBatch() - bufferedRequest.size()); + maxIndexWhenBufferedRequestEmpty = impl.getIndex() + 1; + // impl.getIndexObject().notifyAll(); + } + // remove all request that searchIndex < startIndex + Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator(); + while (iterator.hasNext()) { + IndexedConsensusRequest request = iterator.next(); + if (request.getSearchIndex() < startIndex) { + iterator.remove(); + } else { + break; } } - // This condition will be executed in several scenarios: - // 1. restart - // 2. The getBatch() is invoked immediately at the moment the PendingRequests are consumed - // up. - if (bufferedRequest.isEmpty()) { - endIndex = - constructBatchFromWAL(startIndex, maxIndexWhenBufferedRequestEmpty, logBatches); + } + // This condition will be executed in several scenarios: + // 1. restart + // 2. The getBatch() is invoked immediately at the moment the PendingRequests are consumed + // up. + if (bufferedRequest.isEmpty()) { + endIndex = constructBatchFromWAL(startIndex, maxIndexWhenBufferedRequestEmpty, logBatches); + batch = new PendingBatch(startIndex, endIndex, logBatches); + logger.debug( + "{} : accumulated a {} from wal when empty", impl.getThisNode().getGroupId(), batch); + } else { + // Notice that prev searchIndex >= startIndex + Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator(); + IndexedConsensusRequest prev = iterator.next(); + // Prevents gap between logs. For example, some requests are not written into the queue + // when + // the queue is full. In this case, requests need to be loaded from the WAL + endIndex = constructBatchFromWAL(startIndex, prev.getSearchIndex(), logBatches); + if (logBatches.size() == config.getReplication().getMaxRequestPerBatch()) { batch = new PendingBatch(startIndex, endIndex, logBatches); - logger.debug( - "{} : accumulated a {} from wal when empty", impl.getThisNode().getGroupId(), batch); - } else { - // Notice that prev searchIndex >= startIndex - Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator(); - IndexedConsensusRequest prev = iterator.next(); - // Prevents gap between logs. For example, some requests are not written into the queue - // when + logger.debug("{} : accumulated a {} from wal", impl.getThisNode().getGroupId(), batch); + return batch; + } + constructBatchIndexedFromConsensusRequest(prev, logBatches); + endIndex = prev.getSearchIndex(); + iterator.remove(); + while (iterator.hasNext() + && logBatches.size() <= config.getReplication().getMaxRequestPerBatch()) { + IndexedConsensusRequest current = iterator.next(); + // Prevents gap between logs. For example, some logs are not written into the queue when // the queue is full. In this case, requests need to be loaded from the WAL - endIndex = constructBatchFromWAL(startIndex, prev.getSearchIndex(), logBatches); - if (logBatches.size() == config.getReplication().getMaxRequestPerBatch()) { - batch = new PendingBatch(startIndex, endIndex, logBatches); - logger.debug("{} : accumulated a {} from wal", impl.getThisNode().getGroupId(), batch); - return batch; - } - constructBatchIndexedFromConsensusRequest(prev, logBatches); - endIndex = prev.getSearchIndex(); - iterator.remove(); - while (iterator.hasNext() - && logBatches.size() <= config.getReplication().getMaxRequestPerBatch()) { - IndexedConsensusRequest current = iterator.next(); - // Prevents gap between logs. For example, some logs are not written into the queue when - // the queue is full. In this case, requests need to be loaded from the WAL - if (current.getSearchIndex() != prev.getSearchIndex() + 1) { - endIndex = - constructBatchFromWAL( - prev.getSearchIndex(), current.getSearchIndex(), logBatches); - if (logBatches.size() == config.getReplication().getMaxRequestPerBatch()) { - batch = new PendingBatch(startIndex, endIndex, logBatches); - logger.debug( - "gap {} : accumulated a {} from queue and wal when gap", - impl.getThisNode().getGroupId(), - batch); - return batch; - } + if (current.getSearchIndex() != prev.getSearchIndex() + 1) { + endIndex = + constructBatchFromWAL(prev.getSearchIndex(), current.getSearchIndex(), logBatches); + if (logBatches.size() == config.getReplication().getMaxRequestPerBatch()) { + batch = new PendingBatch(startIndex, endIndex, logBatches); + logger.debug( + "gap {} : accumulated a {} from queue and wal when gap", + impl.getThisNode().getGroupId(), + batch); + return batch; } - constructBatchIndexedFromConsensusRequest(current, logBatches); - endIndex = current.getSearchIndex(); - prev = current; - // We might not be able to remove all the elements in the bufferedRequest in the - // current function, but that's fine, we'll continue processing these elements in the - // bufferedRequest the next time we go into the function, they're never lost - iterator.remove(); } - batch = new PendingBatch(startIndex, endIndex, logBatches); - logger.debug( - "{} : accumulated a {} from queue and wal", impl.getThisNode().getGroupId(), batch); + constructBatchIndexedFromConsensusRequest(current, logBatches); + endIndex = current.getSearchIndex(); + prev = current; + // We might not be able to remove all the elements in the bufferedRequest in the + // current function, but that's fine, we'll continue processing these elements in the + // bufferedRequest the next time we go into the function, they're never lost + iterator.remove(); } - return batch; - } finally { - StepTracker.trace("getBatch()", 25, getBatchStartTime, System.nanoTime()); + batch = new PendingBatch(startIndex, endIndex, logBatches); + logger.debug( + "{} : accumulated a {} from queue and wal", impl.getThisNode().getGroupId(), batch); } + return batch; } public void sendBatchAsync(PendingBatch batch, DispatchLogHandler handler) { diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java index 1b0328975b..70a0387c40 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java @@ -20,7 +20,6 @@ package org.apache.iotdb.consensus.multileader.service; import org.apache.iotdb.common.rpc.thrift.TSStatus; -import org.apache.iotdb.commons.StepTracker; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.consensus.common.request.BatchIndexedConsensusRequest; import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest; @@ -54,7 +53,6 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ @Override public void syncLog(TSyncLogReq req, AsyncMethodCallback<TSyncLogRes> resultHandler) { - long syncLogStartTime = System.nanoTime(); try { ConsensusGroupId groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId()); @@ -103,8 +101,6 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ resultHandler.onComplete(new TSyncLogRes(writeStatus.subStatus)); } catch (Exception e) { resultHandler.onError(e); - } finally { - StepTracker.trace("syncLog", 25, syncLogStartTime, System.nanoTime()); } } diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java index 259d88b9fb..4b7a416740 100644 --- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java +++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java @@ -123,13 +123,6 @@ public class DataRegionStateMachine extends BaseStateMachine { } private TSStatus cacheAndInsertLatestNode(InsertNodeWrapper insertNodeWrapper) { - long cacheRequestStartTime = System.nanoTime(); - logger.info( - "region = {}, queue size = {}, syncIndex = {}, nextSyncIndex = {}", - region.getDataRegionId(), - requestCache.size(), - insertNodeWrapper.startSyncIndex, - nextSyncIndex); queueLock.lock(); try { requestCache.add(insertNodeWrapper); @@ -177,9 +170,9 @@ public class DataRegionStateMachine extends BaseStateMachine { Thread.currentThread().interrupt(); } } - StepTracker.trace("cacheAndQueueRequest", cacheRequestStartTime, System.nanoTime()); - logger.info( - "queue size {}, startSyncIndex = {}, endSyncIndex = {}", + logger.debug( + "region = {}, queue size {}, startSyncIndex = {}, endSyncIndex = {}", + region.getDataRegionId(), requestCache.size(), insertNodeWrapper.getStartSyncIndex(), insertNodeWrapper.getEndSyncIndex()); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java index ff1737e994..3c08cb50cb 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.engine.storagegroup; import org.apache.iotdb.common.rpc.thrift.TSStatus; -import org.apache.iotdb.commons.StepTracker; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; @@ -1062,7 +1061,6 @@ public class DataRegion { StorageEngineV2.blockInsertionIfReject(null); } writeLock("insertTablet"); - long startTime = System.nanoTime(); try { TSStatus[] results = new TSStatus[insertTabletNode.getRowCount()]; Arrays.fill(results, RpcUtils.SUCCESS_STATUS); @@ -1146,7 +1144,6 @@ public class DataRegion { // TODO: trigger // fire trigger after insertion // TriggerEngine.fire(TriggerEvent.AFTER_INSERT, insertTabletPlan, firePosition); } finally { - StepTracker.trace("insertTablet", startTime, System.nanoTime()); writeUnlock(); } }
