This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch multi_leader_memory_pendingBatch_control in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c7944ce18bfe9ccd2d05746a3482cfcf9942225a Author: OneSizeFitQuorum <[email protected]> AuthorDate: Tue Nov 15 11:08:30 2022 +0800 enhance TLogBatch struct Signed-off-by: OneSizeFitQuorum <[email protected]> --- .../common/request/IndexedConsensusRequest.java | 15 ++++----- .../multileader/logdispatcher/LogDispatcher.java | 16 +++------ .../multileader/logdispatcher/PendingBatch.java | 4 ++- .../service/MultiLeaderRPCServiceProcessor.java | 38 +++++++--------------- .../src/main/thrift/mutlileader.thrift | 2 +- 5 files changed, 25 insertions(+), 50 deletions(-) diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java index ca236da1c3..6b0cb2634d 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java @@ -39,6 +39,12 @@ public class IndexedConsensusRequest implements IConsensusRequest { this.searchIndex = searchIndex; this.requests = requests; this.syncIndex = -1L; + this.requests.forEach( + r -> { + ByteBuffer buffer = r.serializeToByteBuffer(); + this.serializedRequests.add(buffer); + this.serializedSize += buffer.capacity(); + }); } public IndexedConsensusRequest( @@ -61,15 +67,6 @@ public class IndexedConsensusRequest implements IConsensusRequest { return serializedRequests; } - public void buildSerializedRequests() { - this.requests.forEach( - r -> { - ByteBuffer buffer = r.serializeToByteBuffer(); - this.serializedRequests.add(buffer); - this.serializedSize += buffer.capacity(); - }); - } - public long getSerializedSize() { return serializedSize; } diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java index 8c7a443d3c..f0b4791ea8 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java @@ -26,7 +26,6 @@ import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.service.metric.enums.Metric; import org.apache.iotdb.commons.service.metric.enums.Tag; import org.apache.iotdb.consensus.common.Peer; -import org.apache.iotdb.consensus.common.request.IConsensusRequest; import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest; import org.apache.iotdb.consensus.config.MultiLeaderConfig; import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl; @@ -44,7 +43,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -156,9 +154,6 @@ public class LogDispatcher { } public void offer(IndexedConsensusRequest request) { - request.buildSerializedRequests(); - // we put the serialization step outside the synchronized block because it is stateless and - // time-consuming synchronized (this) { threads.forEach( thread -> { @@ -500,18 +495,15 @@ public class LogDispatcher { } targetIndex = data.getSearchIndex() + 1; // construct request from wal - for (IConsensusRequest innerRequest : data.getRequests()) { - logBatches.addTLogBatch( - new TLogBatch(innerRequest.serializeToByteBuffer(), data.getSearchIndex(), true)); - } + logBatches.addTLogBatch( + new TLogBatch(data.getSerializedRequests(), data.getSearchIndex(), true)); } } private void constructBatchIndexedFromConsensusRequest( IndexedConsensusRequest request, PendingBatch logBatches) { - for (ByteBuffer innerRequest : request.getSerializedRequests()) { - logBatches.addTLogBatch(new TLogBatch(innerRequest, request.getSearchIndex(), false)); - } + logBatches.addTLogBatch( + new TLogBatch(request.getSerializedRequests(), request.getSearchIndex(), false)); } } } diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java index 5d4685b19e..dbfb52d515 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java @@ -22,6 +22,7 @@ package org.apache.iotdb.consensus.multileader.logdispatcher; import org.apache.iotdb.consensus.config.MultiLeaderConfig; import org.apache.iotdb.consensus.multileader.thrift.TLogBatch; +import java.nio.Buffer; import java.util.ArrayList; import java.util.List; @@ -55,7 +56,8 @@ public class PendingBatch { public void addTLogBatch(TLogBatch batch) { batches.add(batch); // TODO Maybe we need to add in additional fields for more accurate calculations - serializedSize += batch.getData() == null ? 0 : batch.getData().length; + serializedSize += + batch.getData() == null ? 0 : batch.getData().stream().mapToInt(Buffer::capacity).sum(); } public boolean canAccumulate() { diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java index 5f90cfffb5..67bceadd12 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java @@ -24,7 +24,6 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.common.request.BatchIndexedConsensusRequest; import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest; -import org.apache.iotdb.consensus.common.request.IConsensusRequest; import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest; import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException; import org.apache.iotdb.consensus.multileader.MultiLeaderConsensus; @@ -56,9 +55,8 @@ import org.apache.thrift.async.AsyncMethodCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; import java.util.Collections; -import java.util.List; +import java.util.stream.Collectors; public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIService.AsyncIface { @@ -104,30 +102,16 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ BatchIndexedConsensusRequest requestsInThisBatch = new BatchIndexedConsensusRequest(req.peerId); // We use synchronized to ensure atomicity of executing multiple logs - if (!req.getBatches().isEmpty()) { - List<IConsensusRequest> consensusRequests = new ArrayList<>(); - long currentSearchIndex = req.getBatches().get(0).getSearchIndex(); - for (TLogBatch batch : req.getBatches()) { - IConsensusRequest request = - batch.isFromWAL() - ? new MultiLeaderConsensusRequest(batch.data) - : new ByteBufferConsensusRequest(batch.data); - // merge TLogBatch with same search index into one request - if (batch.getSearchIndex() != currentSearchIndex) { - requestsInThisBatch.add( - impl.buildIndexedConsensusRequestForRemoteRequest( - currentSearchIndex, consensusRequests)); - consensusRequests = new ArrayList<>(); - currentSearchIndex = batch.getSearchIndex(); - } - consensusRequests.add(request); - } - // write last request - if (!consensusRequests.isEmpty()) { - requestsInThisBatch.add( - impl.buildIndexedConsensusRequestForRemoteRequest( - currentSearchIndex, consensusRequests)); - } + for (TLogBatch batch : req.getBatches()) { + requestsInThisBatch.add( + impl.buildIndexedConsensusRequestForRemoteRequest( + batch.getSearchIndex(), + batch.getData().stream() + .map( + batch.isFromWAL() + ? MultiLeaderConsensusRequest::new + : ByteBufferConsensusRequest::new) + .collect(Collectors.toList()))); } TSStatus writeStatus = impl.getStateMachine().write(requestsInThisBatch); logger.debug( diff --git a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift index 0336c19303..28e9047178 100644 --- a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift +++ b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift @@ -21,7 +21,7 @@ include "common.thrift" namespace java org.apache.iotdb.consensus.multileader.thrift struct TLogBatch { - 1: required binary data + 1: required list<binary> data 2: required i64 searchIndex 3: required bool fromWAL }
