This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch ml_test_1_async in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2126e2febc731d657ca6a112b87bc9cbd13a9eb6 Author: Jinrui.Zhang <[email protected]> AuthorDate: Wed Aug 3 16:56:58 2022 +0800 leverage client insert thread to do serialization of request for dispatching --- .../common/request/IndexedConsensusRequest.java | 19 +++++++++++++- .../multileader/logdispatcher/LogDispatcher.java | 29 +++++++++++++++++++--- 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java index de3aca433b..1c004264dd 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java @@ -20,6 +20,7 @@ package org.apache.iotdb.consensus.common.request; import java.nio.ByteBuffer; +import java.util.LinkedList; import java.util.List; import java.util.Objects; @@ -29,13 +30,19 @@ public class IndexedConsensusRequest implements IConsensusRequest { /** we do not need to serialize these two fields as they are useless in other nodes. */ private final long searchIndex; - private final List<IConsensusRequest> requests; + private List<IConsensusRequest> requests; + private List<ByteBuffer> serializedRequests; public IndexedConsensusRequest(long searchIndex, List<IConsensusRequest> requests) { this.searchIndex = searchIndex; this.requests = requests; } + public IndexedConsensusRequest(List<ByteBuffer> serializedRequests, long searchIndex) { + this.searchIndex = searchIndex; + this.serializedRequests = serializedRequests; + } + @Override public ByteBuffer serializeToByteBuffer() { throw new UnsupportedOperationException(); @@ -49,6 +56,16 @@ public class IndexedConsensusRequest implements IConsensusRequest { return searchIndex; } + public List<ByteBuffer> getSerializedRequests() { + return serializedRequests; + } + + public List<ByteBuffer> buildSerializedRequests() { + List<ByteBuffer> result = new LinkedList<>(); + this.requests.forEach(r -> result.add(r.serializeToByteBuffer())); + return result; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java index ed41ec5e16..da5ad41aaa 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java @@ -40,6 +40,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; @@ -106,17 +107,27 @@ public class LogDispatcher { } public void offer(IndexedConsensusRequest request) { + List<ByteBuffer> serializedRequest = request.buildSerializedRequests(); threads.forEach( thread -> { logger.debug( "{}: Push a log to the queue, where the queue length is {}", impl.getThisNode().getGroupId(), thread.getPendingRequest().size()); - if (!thread.getPendingRequest().offer(request)) { + if (!thread + .getPendingRequest() + .offer(new IndexedConsensusRequest(serializedRequest, request.getSearchIndex()))) { + logger.info( + "{}: Log queue to {} is full. skip current request: {}", + impl.getThisNode().getGroupId(), + thread.getPeer().getEndpoint().getIp(), + request.getSearchIndex()); logger.debug( "{}: Log queue of {} is full, ignore the log to this node", impl.getThisNode().getGroupId(), thread.getPeer()); + } else { + thread.countQueueUsage(request.getSearchIndex()); } }); } @@ -139,6 +150,7 @@ public class LogDispatcher { private ConsensusReqReader.ReqIterator walEntryiterator; private long iteratorIndex = 1; + private long queueProcessCount = 0; public LogDispatcherThread(Peer peer, MultiLeaderConfig config) { this.peer = peer; @@ -156,6 +168,16 @@ public class LogDispatcher { return controller; } + public void countQueueUsage(long searchIndex) { + this.queueProcessCount++; + logger.info( + "{}: queue to {}: put request to queue. count: {}, searchIndex {}", + impl.getThisNode().getGroupId(), + getPeer().getEndpoint().getIp(), + this.queueProcessCount, + searchIndex); + } + public long getCurrentSyncIndex() { return controller.getCurrentIndex(); } @@ -355,9 +377,8 @@ public class LogDispatcher { private void constructBatchIndexedFromConsensusRequest( IndexedConsensusRequest request, List<TLogBatch> logBatches) { - for (IConsensusRequest innerRequest : request.getRequests()) { - logBatches.add( - new TLogBatch(innerRequest.serializeToByteBuffer(), request.getSearchIndex(), false)); + for (ByteBuffer innerRequest : request.getSerializedRequests()) { + logBatches.add(new TLogBatch(innerRequest, request.getSearchIndex(), false)); } } }
