This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch fix_iot_memory_2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d1f6557132d9dd80a40c9470357e0e2c9644afb8 Author: Tian Jiang <[email protected]> AuthorDate: Wed Aug 13 12:25:12 2025 +0800 fix IoTConsensus memory management --- .../logdispatcher/IoTConsensusMemoryManager.java | 64 ++++++++++++++++++++-- .../consensus/iot/logdispatcher/LogDispatcher.java | 22 ++------ .../consensus/iot/logdispatcher/SyncStatus.java | 18 ++++-- 3 files changed, 77 insertions(+), 27 deletions(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java index f993ea386da..9a9d38946fe 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java @@ -41,16 +41,53 @@ public class IoTConsensusMemoryManager { MetricService.getInstance().addMetricSet(new IoTConsensusMemoryManagerMetrics(this)); } - public boolean reserve(IndexedConsensusRequest request, boolean fromQueue) { + public boolean reserve(IndexedConsensusRequest request) { long prevRef = request.incRef(); if (prevRef == 0) { - return reserve(request.getMemorySize(), fromQueue); + boolean reserved = reserve(request.getMemorySize(), true); + if (reserved) { + logger.info( + "Reserving {} bytes for request {} succeeds, current total usage {}", + request.getMemorySize(), + request.getSearchIndex(), + memoryBlock.getUsedMemoryInBytes()); + } else { + logger.info( + "Reserving {} bytes for request {} fails, current total usage {}", + request.getMemorySize(), + request.getSearchIndex(), + memoryBlock.getUsedMemoryInBytes()); + } + return reserved; } else { + logger.info( + "Skip memory reservation for {} because its ref count is not 0", + request.getSearchIndex()); return true; } } - public boolean reserve(long size, boolean fromQueue) { + public boolean reserve(Batch batch) { + boolean reserved = reserve(batch.getMemorySize(), false); + if (reserved) { + logger.info( + "Reserving {} bytes for batch {}-{} succeeds, current total usage {}", + batch.getMemorySize(), + batch.getStartIndex(), + batch.getEndIndex(), + memoryBlock.getUsedMemoryInBytes()); + } else { + logger.info( + "Reserving {} bytes for batch {}-{} fails, current total usage {}", + batch.getMemorySize(), + batch.getStartIndex(), + batch.getEndIndex(), + memoryBlock.getUsedMemoryInBytes()); + } + return reserved; + } + + private boolean reserve(long size, boolean fromQueue) { boolean result = fromQueue ? memoryBlock.allocateIfSufficient(size, maxMemoryRatioForQueue) @@ -65,14 +102,29 @@ public class IoTConsensusMemoryManager { return result; } - public void free(IndexedConsensusRequest request, boolean fromQueue) { + public void free(IndexedConsensusRequest request) { long prevRef = request.decRef(); if (prevRef == 1) { - free(request.getMemorySize(), fromQueue); + free(request.getMemorySize(), true); + logger.info( + "Freed {} bytes for request {}, current total usage {}", + request.getMemorySize(), + request.getSearchIndex(), + memoryBlock.getUsedMemoryInBytes()); } } - public void free(long size, boolean fromQueue) { + public void free(Batch batch) { + free(batch.getMemorySize(), false); + logger.info( + "Freed {} bytes for batch {}-{}, current total usage {}", + batch.getMemorySize(), + batch.getStartIndex(), + batch.getEndIndex(), + getMemorySizeInByte()); + } + + private void free(long size, boolean fromQueue) { long currentUsedMemory = memoryBlock.release(size); if (fromQueue) { queueMemorySizeInByte.addAndGet(-size); diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java index 00cd6d1376f..374691bf38b 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java @@ -283,7 +283,7 @@ public class LogDispatcher { /** try to offer a request into queue with memory control. */ public boolean offer(IndexedConsensusRequest indexedConsensusRequest) { - if (!iotConsensusMemoryManager.reserve(indexedConsensusRequest, true)) { + if (!iotConsensusMemoryManager.reserve(indexedConsensusRequest)) { return false; } boolean success; @@ -291,19 +291,19 @@ public class LogDispatcher { success = pendingEntries.offer(indexedConsensusRequest); } catch (Throwable t) { // If exception occurs during request offer, the reserved memory should be released - iotConsensusMemoryManager.free(indexedConsensusRequest, true); + iotConsensusMemoryManager.free(indexedConsensusRequest); throw t; } if (!success) { // If offer failed, the reserved memory should be released - iotConsensusMemoryManager.free(indexedConsensusRequest, true); + iotConsensusMemoryManager.free(indexedConsensusRequest); } return success; } /** try to remove a request from queue with memory control. */ private void releaseReservedMemory(IndexedConsensusRequest indexedConsensusRequest) { - iotConsensusMemoryManager.free(indexedConsensusRequest, true); + iotConsensusMemoryManager.free(indexedConsensusRequest); } public void stop() { @@ -323,23 +323,13 @@ public class LogDispatcher { } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - long requestSize = 0; for (IndexedConsensusRequest indexedConsensusRequest : pendingEntries) { - long prevRef = indexedConsensusRequest.decRef(); - if (prevRef == 1) { - requestSize += indexedConsensusRequest.getMemorySize(); - } + iotConsensusMemoryManager.free(indexedConsensusRequest); } pendingEntries.clear(); - iotConsensusMemoryManager.free(requestSize, true); - requestSize = 0; for (IndexedConsensusRequest indexedConsensusRequest : bufferedEntries) { - long prevRef = indexedConsensusRequest.decRef(); - if (prevRef == 1) { - requestSize += indexedConsensusRequest.getMemorySize(); - } + iotConsensusMemoryManager.free(indexedConsensusRequest); } - iotConsensusMemoryManager.free(requestSize, true); syncStatus.free(); MetricService.getInstance().removeMetricSet(logDispatcherThreadMetrics); } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java index c5a426d88b8..3d547c6ad90 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java @@ -21,12 +21,16 @@ package org.apache.iotdb.consensus.iot.logdispatcher; import org.apache.iotdb.consensus.config.IoTConsensusConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Iterator; import java.util.LinkedList; import java.util.List; public class SyncStatus { + private static final Logger LOGGER = LoggerFactory.getLogger(SyncStatus.class); private final IoTConsensusConfig config; private final IndexController controller; private final LinkedList<Batch> pendingBatches = new LinkedList<>(); @@ -45,10 +49,16 @@ public class SyncStatus { */ public synchronized void addNextBatch(Batch batch) throws InterruptedException { while ((pendingBatches.size() >= config.getReplication().getMaxPendingBatchesNum() - || !iotConsensusMemoryManager.reserve(batch.getMemorySize(), false)) + || !iotConsensusMemoryManager.reserve(batch)) && !Thread.interrupted()) { wait(); } + LOGGER.info( + "Reserved {} bytes for batch {}-{}, current total usage {}", + batch.getMemorySize(), + batch.getStartIndex(), + batch.getEndIndex(), + iotConsensusMemoryManager.getMemorySizeInByte()); pendingBatches.add(batch); } @@ -65,7 +75,7 @@ public class SyncStatus { while (current.isSynced()) { controller.update(current.getEndIndex(), false); iterator.remove(); - iotConsensusMemoryManager.free(current.getMemorySize(), false); + iotConsensusMemoryManager.free(current); if (iterator.hasNext()) { current = iterator.next(); } else { @@ -78,13 +88,11 @@ public class SyncStatus { } public synchronized void free() { - long size = 0; for (Batch pendingBatch : pendingBatches) { - size += pendingBatch.getMemorySize(); + iotConsensusMemoryManager.free(pendingBatch); } pendingBatches.clear(); controller.update(0L, true); - iotConsensusMemoryManager.free(size, false); } /** Gets the first index that is not currently synchronized. */
