This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch cp_iot_1.3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2a7a8e65a421f01083a105a1c18df3eeedabf164 Author: Xiangpeng Hu <[email protected]> AuthorDate: Mon Jun 16 09:40:00 2025 +0800 more accurate mermory size (#15713) --- .../apache/iotdb/consensus/iot/logdispatcher/Batch.java | 17 +++++++---------- .../consensus/iot/logdispatcher/LogDispatcher.java | 9 +++++++-- .../iotdb/consensus/iot/logdispatcher/SyncStatus.java | 6 +++--- .../src/main/thrift/iotconsensus.thrift | 1 + 4 files changed, 18 insertions(+), 15 deletions(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java index 7a556c85a04..4e89226c393 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java @@ -22,7 +22,6 @@ package org.apache.iotdb.consensus.iot.logdispatcher; import org.apache.iotdb.consensus.config.IoTConsensusConfig; import org.apache.iotdb.consensus.iot.thrift.TLogEntry; -import java.nio.Buffer; import java.util.ArrayList; import java.util.List; @@ -37,7 +36,7 @@ public class Batch { private long logEntriesNumFromWAL = 0L; - private long serializedSize; + private long memorySize; // indicates whether this batch has been successfully synchronized to another node private boolean synced; @@ -60,14 +59,12 @@ public class Batch { if (entry.fromWAL) { logEntriesNumFromWAL++; } - // TODO Maybe we need to add in additional fields for more accurate calculations - serializedSize += - entry.getData() == null ? 0 : entry.getData().stream().mapToInt(Buffer::capacity).sum(); + memorySize += entry.getMemorySize(); } public boolean canAccumulate() { return logEntries.size() < config.getReplication().getMaxLogEntriesNumPerBatch() - && serializedSize < config.getReplication().getMaxSizePerBatch(); + && memorySize < config.getReplication().getMaxSizePerBatch(); } public long getStartIndex() { @@ -94,8 +91,8 @@ public class Batch { return logEntries.isEmpty(); } - public long getSerializedSize() { - return serializedSize; + public long getMemorySize() { + return memorySize; } public long getLogEntriesNumFromWAL() { @@ -111,8 +108,8 @@ public class Batch { + endIndex + ", size=" + logEntries.size() - + ", serializedSize=" - + serializedSize + + ", memorySize=" + + memorySize + '}'; } } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java index 2d4e7cd5e01..c3a0665be6a 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java @@ -567,7 +567,8 @@ public class LogDispatcher { data.buildSerializedRequests(); // construct request from wal logBatches.addTLogEntry( - new TLogEntry(data.getSerializedRequests(), data.getSearchIndex(), true)); + new TLogEntry( + data.getSerializedRequests(), data.getSearchIndex(), true, data.getMemorySize())); } // In the case of corrupt Data, we return true so that we can send a batch as soon as // possible, avoiding potential duplication @@ -577,7 +578,11 @@ public class LogDispatcher { private void constructBatchIndexedFromConsensusRequest( IndexedConsensusRequest request, Batch logBatches) { logBatches.addTLogEntry( - new TLogEntry(request.getSerializedRequests(), request.getSearchIndex(), false)); + new TLogEntry( + request.getSerializedRequests(), + request.getSearchIndex(), + false, + request.getMemorySize())); } } } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java index e11b6302114..fe00939050e 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java @@ -45,7 +45,7 @@ public class SyncStatus { */ public synchronized void addNextBatch(Batch batch) throws InterruptedException { while (pendingBatches.size() >= config.getReplication().getMaxPendingBatchesNum() - || !iotConsensusMemoryManager.reserve(batch.getSerializedSize(), false)) { + || !iotConsensusMemoryManager.reserve(batch.getMemorySize(), false)) { wait(); } pendingBatches.add(batch); @@ -64,7 +64,7 @@ public class SyncStatus { while (current.isSynced()) { controller.update(current.getEndIndex(), false); iterator.remove(); - iotConsensusMemoryManager.free(current.getSerializedSize(), false); + iotConsensusMemoryManager.free(current.getMemorySize(), false); if (iterator.hasNext()) { current = iterator.next(); } else { @@ -79,7 +79,7 @@ public class SyncStatus { public synchronized void free() { long size = 0; for (Batch pendingBatch : pendingBatches) { - size += pendingBatch.getSerializedSize(); + size += pendingBatch.getMemorySize(); } pendingBatches.clear(); controller.update(0L, true); diff --git a/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift b/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift index d0b4808977e..dbcffe02ba6 100644 --- a/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift +++ b/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift @@ -26,6 +26,7 @@ struct TLogEntry { 1: required list<binary> data 2: required i64 searchIndex 3: required bool fromWAL + 4: required i64 memorySize } struct TSyncLogEntriesReq {
