This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch cp_iot_1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e161b3489b888b9f9768195ad4b05d9b9fdf9433
Author: Jiang Tian <[email protected]>
AuthorDate: Wed Aug 6 09:27:28 2025 +0800

    Add a multiplier to avoid receiver OOM in IoTConsensus (#16102)
---
 .../request/DeserializedBatchIndexedConsensusRequest.java     |  7 +++++++
 .../apache/iotdb/consensus/iot/client/DispatchLogHandler.java |  5 +++++
 .../org/apache/iotdb/consensus/iot/logdispatcher/Batch.java   | 10 +++++++++-
 .../iotdb/consensus/iot/logdispatcher/LogDispatcher.java      | 11 +++++++++++
 .../iot/service/IoTConsensusRPCServiceProcessor.java          |  3 ++-
 .../thrift-consensus/src/main/thrift/iotconsensus.thrift      |  1 +
 6 files changed, 35 insertions(+), 2 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/DeserializedBatchIndexedConsensusRequest.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/DeserializedBatchIndexedConsensusRequest.java
index 3cb81fa95c7..9cdeaf60c30 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/DeserializedBatchIndexedConsensusRequest.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/DeserializedBatchIndexedConsensusRequest.java
@@ -29,6 +29,7 @@ public class DeserializedBatchIndexedConsensusRequest
   private final long startSyncIndex;
   private final long endSyncIndex;
   private final List<IConsensusRequest> insertNodes;
+  private long memorySize;
 
   public DeserializedBatchIndexedConsensusRequest(
       long startSyncIndex, long endSyncIndex, int size) {
@@ -52,6 +53,7 @@ public class DeserializedBatchIndexedConsensusRequest
 
   public void add(IConsensusRequest insertNode) {
     this.insertNodes.add(insertNode);
+    this.memorySize += insertNode.getMemorySize();
   }
 
   @Override
@@ -82,4 +84,9 @@ public class DeserializedBatchIndexedConsensusRequest
   public ByteBuffer serializeToByteBuffer() {
     return null;
   }
+
+  @Override
+  public long getMemorySize() {
+    return memorySize;
+  }
 }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
index 9b0979bf6be..3b812f6c297 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.consensus.iot.client;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.consensus.iot.logdispatcher.Batch;
+import org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcher;
 import 
org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcher.LogDispatcherThread;
 import org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcherThreadMetrics;
 import org.apache.iotdb.consensus.iot.thrift.TSyncLogEntriesRes;
@@ -89,6 +90,10 @@ public class DispatchLogHandler implements 
AsyncMethodCallback<TSyncLogEntriesRe
       }
       completeBatch(batch);
     }
+    if (response.isSetReceiverMemSize()) {
+      
LogDispatcher.getReceiverMemSizeSum().addAndGet(response.getReceiverMemSize());
+      LogDispatcher.getSenderMemSizeSum().addAndGet(batch.getMemorySize());
+    }
     logDispatcherThreadMetrics.recordSyncLogTimePerRequest(System.nanoTime() - 
createTime);
   }
 
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java
index 4e89226c393..72b68ab96ac 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java
@@ -63,8 +63,16 @@ public class Batch {
   }
 
   public boolean canAccumulate() {
+    // When reading entries from the WAL, the memory size is calculated based 
on the serialized
+    // size, which can be significantly smaller than the actual size.
+    // Thus, we add a multiplier to sender's memory size to estimate the 
receiver's memory cost.
+    // The multiplier is calculated based on the receiver's feedback.
+    long receiverMemSize = LogDispatcher.getReceiverMemSizeSum().get();
+    long senderMemSize = LogDispatcher.getSenderMemSizeSum().get();
+    double multiplier = senderMemSize > 0 ? (double) receiverMemSize / 
senderMemSize : 1.0;
+    multiplier = Math.max(multiplier, 1.0);
     return logEntries.size() < 
config.getReplication().getMaxLogEntriesNumPerBatch()
-        && memorySize < config.getReplication().getMaxSizePerBatch();
+        && ((long) (memorySize * multiplier)) < 
config.getReplication().getMaxSizePerBatch();
   }
 
   public long getStartIndex() {
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index e196df43209..a363e7dd9aa 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -69,6 +69,9 @@ public class LogDispatcher {
   private final AtomicLong logEntriesFromWAL = new AtomicLong(0);
   private final AtomicLong logEntriesFromQueue = new AtomicLong(0);
 
+  private static final AtomicLong senderMemSizeSum = new AtomicLong(0);
+  private static final AtomicLong receiverMemSizeSum = new AtomicLong(0);
+
   public LogDispatcher(
       IoTConsensusServerImpl impl,
       IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager) 
{
@@ -585,4 +588,12 @@ public class LogDispatcher {
               request.getMemorySize()));
     }
   }
+
+  public static AtomicLong getReceiverMemSizeSum() {
+    return receiverMemSizeSum;
+  }
+
+  public static AtomicLong getSenderMemSizeSum() {
+    return senderMemSizeSum;
+  }
 }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
index 2bac66738fd..71c14aebaa1 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
@@ -129,7 +129,8 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Ifa
         "execute TSyncLogEntriesReq for {} with result {}",
         req.consensusGroupId,
         writeStatus.subStatus);
-    return new TSyncLogEntriesRes(writeStatus.subStatus);
+    return new TSyncLogEntriesRes(writeStatus.subStatus)
+        .setReceiverMemSize(deserializedRequest.getMemorySize());
   }
 
   @Override
diff --git 
a/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift 
b/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift
index dbcffe02ba6..829443f9552 100644
--- a/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift
+++ b/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift
@@ -38,6 +38,7 @@ struct TSyncLogEntriesReq {
 
 struct TSyncLogEntriesRes {
   1: required list<common.TSStatus> statuses
+  2: optional i64 receiverMemSize
 }
 
 struct TInactivatePeerReq {

Reply via email to