This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch cp_iot_1.3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e161b3489b888b9f9768195ad4b05d9b9fdf9433 Author: Jiang Tian <[email protected]> AuthorDate: Wed Aug 6 09:27:28 2025 +0800 Add a multiplier to avoid receiver OOM in IoTConsensus (#16102) --- .../request/DeserializedBatchIndexedConsensusRequest.java | 7 +++++++ .../apache/iotdb/consensus/iot/client/DispatchLogHandler.java | 5 +++++ .../org/apache/iotdb/consensus/iot/logdispatcher/Batch.java | 10 +++++++++- .../iotdb/consensus/iot/logdispatcher/LogDispatcher.java | 11 +++++++++++ .../iot/service/IoTConsensusRPCServiceProcessor.java | 3 ++- .../thrift-consensus/src/main/thrift/iotconsensus.thrift | 1 + 6 files changed, 35 insertions(+), 2 deletions(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/DeserializedBatchIndexedConsensusRequest.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/DeserializedBatchIndexedConsensusRequest.java index 3cb81fa95c7..9cdeaf60c30 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/DeserializedBatchIndexedConsensusRequest.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/DeserializedBatchIndexedConsensusRequest.java @@ -29,6 +29,7 @@ public class DeserializedBatchIndexedConsensusRequest private final long startSyncIndex; private final long endSyncIndex; private final List<IConsensusRequest> insertNodes; + private long memorySize; public DeserializedBatchIndexedConsensusRequest( long startSyncIndex, long endSyncIndex, int size) { @@ -52,6 +53,7 @@ public class DeserializedBatchIndexedConsensusRequest public void add(IConsensusRequest insertNode) { this.insertNodes.add(insertNode); + this.memorySize += insertNode.getMemorySize(); } @Override @@ -82,4 +84,9 @@ public class DeserializedBatchIndexedConsensusRequest public ByteBuffer serializeToByteBuffer() { return null; } + + @Override + public long getMemorySize() { + return memorySize; + } } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java index 9b0979bf6be..3b812f6c297 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java @@ -21,6 +21,7 @@ package org.apache.iotdb.consensus.iot.client; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.consensus.iot.logdispatcher.Batch; +import org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcher; import org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcher.LogDispatcherThread; import org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcherThreadMetrics; import org.apache.iotdb.consensus.iot.thrift.TSyncLogEntriesRes; @@ -89,6 +90,10 @@ public class DispatchLogHandler implements AsyncMethodCallback<TSyncLogEntriesRe } completeBatch(batch); } + if (response.isSetReceiverMemSize()) { + LogDispatcher.getReceiverMemSizeSum().addAndGet(response.getReceiverMemSize()); + LogDispatcher.getSenderMemSizeSum().addAndGet(batch.getMemorySize()); + } logDispatcherThreadMetrics.recordSyncLogTimePerRequest(System.nanoTime() - createTime); } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java index 4e89226c393..72b68ab96ac 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java @@ -63,8 +63,16 @@ public class Batch { } public boolean canAccumulate() { + // When reading entries from the WAL, the memory size is calculated based on the serialized + // size, which can be significantly smaller than the actual size. + // Thus, we add a multiplier to sender's memory size to estimate the receiver's memory cost. + // The multiplier is calculated based on the receiver's feedback. + long receiverMemSize = LogDispatcher.getReceiverMemSizeSum().get(); + long senderMemSize = LogDispatcher.getSenderMemSizeSum().get(); + double multiplier = senderMemSize > 0 ? (double) receiverMemSize / senderMemSize : 1.0; + multiplier = Math.max(multiplier, 1.0); return logEntries.size() < config.getReplication().getMaxLogEntriesNumPerBatch() - && memorySize < config.getReplication().getMaxSizePerBatch(); + && ((long) (memorySize * multiplier)) < config.getReplication().getMaxSizePerBatch(); } public long getStartIndex() { diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java index e196df43209..a363e7dd9aa 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java @@ -69,6 +69,9 @@ public class LogDispatcher { private final AtomicLong logEntriesFromWAL = new AtomicLong(0); private final AtomicLong logEntriesFromQueue = new AtomicLong(0); + private static final AtomicLong senderMemSizeSum = new AtomicLong(0); + private static final AtomicLong receiverMemSizeSum = new AtomicLong(0); + public LogDispatcher( IoTConsensusServerImpl impl, IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager) { @@ -585,4 +588,12 @@ public class LogDispatcher { request.getMemorySize())); } } + + public static AtomicLong getReceiverMemSizeSum() { + return receiverMemSizeSum; + } + + public static AtomicLong getSenderMemSizeSum() { + return senderMemSizeSum; + } } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java index 2bac66738fd..71c14aebaa1 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java @@ -129,7 +129,8 @@ public class IoTConsensusRPCServiceProcessor implements IoTConsensusIService.Ifa "execute TSyncLogEntriesReq for {} with result {}", req.consensusGroupId, writeStatus.subStatus); - return new TSyncLogEntriesRes(writeStatus.subStatus); + return new TSyncLogEntriesRes(writeStatus.subStatus) + .setReceiverMemSize(deserializedRequest.getMemorySize()); } @Override diff --git a/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift b/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift index dbcffe02ba6..829443f9552 100644 --- a/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift +++ b/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift @@ -38,6 +38,7 @@ struct TSyncLogEntriesReq { struct TSyncLogEntriesRes { 1: required list<common.TSStatus> statuses + 2: optional i64 receiverMemSize } struct TInactivatePeerReq {
