This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new e12b7350151 Load: Fix Memory Allocation and Release Mismatch in
LoadTsFileDataCacheMemoryBlock (#14375) (#14466)
e12b7350151 is described below
commit e12b7350151eb7e6fb9f0c94a886eb59c42221e5
Author: Itami Sho <[email protected]>
AuthorDate: Wed Dec 18 10:51:12 2024 +0800
Load: Fix Memory Allocation and Release Mismatch in
LoadTsFileDataCacheMemoryBlock (#14375) (#14466)
---
.../plan/planner/LocalExecutionPlanner.java | 9 +++++
.../memory/LoadTsFileAnalyzeSchemaMemoryBlock.java | 12 ++++---
.../memory/LoadTsFileDataCacheMemoryBlock.java | 38 +++++-----------------
.../load/memory/LoadTsFileMemoryManager.java | 10 ++++--
4 files changed, 33 insertions(+), 36 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
index 1f02e3bb81f..b643047dcfe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
@@ -262,6 +262,15 @@ public class LocalExecutionPlanner {
public synchronized void releaseToFreeMemoryForOperators(final long
memoryInBytes) {
freeMemoryForOperators += memoryInBytes;
+
+ if (freeMemoryForOperators > ALLOCATE_MEMORY_FOR_OPERATORS) {
+ LOGGER.error(
+ "The free memory {} is more than allocated memory {}, last released
memory: {}",
+ freeMemoryForOperators,
+ ALLOCATE_MEMORY_FOR_OPERATORS,
+ memoryInBytes);
+ freeMemoryForOperators = ALLOCATE_MEMORY_FOR_OPERATORS;
+ }
}
public long getAllocateMemoryForOperators() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAnalyzeSchemaMemoryBlock.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAnalyzeSchemaMemoryBlock.java
index c7add4b446f..0b2555f67d3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAnalyzeSchemaMemoryBlock.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAnalyzeSchemaMemoryBlock.java
@@ -45,13 +45,15 @@ public class LoadTsFileAnalyzeSchemaMemoryBlock extends
LoadTsFileAbstractMemory
}
@Override
- public boolean hasEnoughMemory(long memoryTobeAddedInBytes) {
+ public synchronized boolean hasEnoughMemory(long memoryTobeAddedInBytes) {
return memoryUsageInBytes.get() + memoryTobeAddedInBytes <=
totalMemorySizeInBytes;
}
@Override
- public void addMemoryUsage(long memoryInBytes) {
- memoryUsageInBytes.addAndGet(memoryInBytes);
+ public synchronized void addMemoryUsage(long memoryInBytes) {
+ if (memoryUsageInBytes.addAndGet(memoryInBytes) > totalMemorySizeInBytes) {
+ LOGGER.warn("{} has exceed total memory size", this);
+ }
MetricService.getInstance()
.getOrCreateGauge(
@@ -63,7 +65,7 @@ public class LoadTsFileAnalyzeSchemaMemoryBlock extends
LoadTsFileAbstractMemory
}
@Override
- public void reduceMemoryUsage(long memoryInBytes) {
+ public synchronized void reduceMemoryUsage(long memoryInBytes) {
if (memoryUsageInBytes.addAndGet(-memoryInBytes) < 0) {
LOGGER.warn("{} has reduce memory usage to negative", this);
}
@@ -78,7 +80,7 @@ public class LoadTsFileAnalyzeSchemaMemoryBlock extends
LoadTsFileAbstractMemory
}
@Override
- protected void releaseAllMemory() {
+ protected synchronized void releaseAllMemory() {
if (memoryUsageInBytes.get() != 0) {
LOGGER.warn(
"Try to release memory from a memory block {} which has not released
all memory", this);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java
index e0709cece9e..e620984038d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java
@@ -31,15 +31,9 @@ public class LoadTsFileDataCacheMemoryBlock extends
LoadTsFileAbstractMemoryBloc
private static final Logger LOGGER =
LoggerFactory.getLogger(LoadTsFileDataCacheMemoryBlock.class);
private static final long MINIMUM_MEMORY_SIZE_IN_BYTES = 1024 * 1024L; // 1
MB
- private static final int MAX_ASK_FOR_MEMORY_COUNT = 256; // must be a power
of 2
- private static final long EACH_ASK_MEMORY_SIZE_IN_BYTES =
- Math.max(
- MINIMUM_MEMORY_SIZE_IN_BYTES,
- LoadTsFileMemoryManager.MEMORY_TOTAL_SIZE_FROM_QUERY_IN_BYTES >> 4);
private final AtomicLong limitedMemorySizeInBytes;
private final AtomicLong memoryUsageInBytes;
- private final AtomicInteger askForMemoryCount;
private final AtomicInteger referenceCount;
LoadTsFileDataCacheMemoryBlock(long initialLimitedMemorySizeInBytes) {
@@ -54,7 +48,6 @@ public class LoadTsFileDataCacheMemoryBlock extends
LoadTsFileAbstractMemoryBloc
this.limitedMemorySizeInBytes = new
AtomicLong(initialLimitedMemorySizeInBytes);
this.memoryUsageInBytes = new AtomicLong(0L);
- this.askForMemoryCount = new AtomicInteger(1);
this.referenceCount = new AtomicInteger(0);
}
@@ -64,29 +57,17 @@ public class LoadTsFileDataCacheMemoryBlock extends
LoadTsFileAbstractMemoryBloc
}
@Override
- public void addMemoryUsage(long memoryInBytes) {
- memoryUsageInBytes.addAndGet(memoryInBytes);
-
- askForMemoryCount.getAndUpdate(
- count -> {
- if ((count & (count - 1)) == 0) {
- // count is a power of 2
- long actuallyAllocateMemorySizeInBytes =
-
MEMORY_MANAGER.tryAllocateFromQuery(EACH_ASK_MEMORY_SIZE_IN_BYTES);
-
limitedMemorySizeInBytes.addAndGet(actuallyAllocateMemorySizeInBytes);
- if (actuallyAllocateMemorySizeInBytes <
EACH_ASK_MEMORY_SIZE_IN_BYTES) {
- return (count & (MAX_ASK_FOR_MEMORY_COUNT - 1)) + 1;
- } else {
- return 1;
- }
- }
- return (count & (MAX_ASK_FOR_MEMORY_COUNT - 1)) + 1;
- });
+ public synchronized void addMemoryUsage(long memoryInBytes) {
+ if (memoryUsageInBytes.addAndGet(memoryInBytes) >
limitedMemorySizeInBytes.get()) {
+ LOGGER.warn("{} has exceed total memory size", this);
+ }
}
@Override
- public void reduceMemoryUsage(long memoryInBytes) {
- memoryUsageInBytes.addAndGet(-memoryInBytes);
+ public synchronized void reduceMemoryUsage(long memoryInBytes) {
+ if (memoryUsageInBytes.addAndGet(-memoryInBytes) < 0) {
+ LOGGER.warn("{} has reduce memory usage to negative", this);
+ }
}
@Override
@@ -113,6 +94,7 @@ public class LoadTsFileDataCacheMemoryBlock extends
LoadTsFileAbstractMemoryBloc
return false;
}
+ MEMORY_MANAGER.releaseToQuery(shrinkMemoryInBytes);
limitedMemorySizeInBytes.addAndGet(-shrinkMemoryInBytes);
return true;
}
@@ -140,8 +122,6 @@ public class LoadTsFileDataCacheMemoryBlock extends
LoadTsFileAbstractMemoryBloc
+ limitedMemorySizeInBytes.get()
+ ", memoryUsageInBytes="
+ memoryUsageInBytes.get()
- + ", askForMemoryCount="
- + askForMemoryCount.get()
+ '}';
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
index be6e8dcef97..97b8d7f68b0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
@@ -44,7 +44,7 @@ public class LoadTsFileMemoryManager {
private final AtomicLong usedMemorySizeInBytes = new AtomicLong(0);
private LoadTsFileDataCacheMemoryBlock dataCacheMemoryBlock;
- private synchronized void forceAllocatedFromQuery(long sizeInBytes)
+ private synchronized void forceAllocateFromQuery(long sizeInBytes)
throws LoadRuntimeOutOfMemoryException {
for (int i = 0; i < MEMORY_ALLOCATE_MAX_RETRIES; i++) {
// allocate memory from queryEngine
@@ -82,6 +82,12 @@ public class LoadTsFileMemoryManager {
}
public synchronized void releaseToQuery(long sizeInBytes) {
+ if (usedMemorySizeInBytes.get() < sizeInBytes) {
+ LOGGER.error(
+ "Load: Attempting to release more memory ({}) than allocated ({})",
+ sizeInBytes,
+ usedMemorySizeInBytes.get());
+ }
usedMemorySizeInBytes.addAndGet(-sizeInBytes);
QUERY_ENGINE_MEMORY_MANAGER.releaseToFreeMemoryForOperators(sizeInBytes);
this.notifyAll();
@@ -90,7 +96,7 @@ public class LoadTsFileMemoryManager {
public synchronized LoadTsFileAnalyzeSchemaMemoryBlock
allocateAnalyzeSchemaMemoryBlock(
long sizeInBytes) throws LoadRuntimeOutOfMemoryException {
try {
- forceAllocatedFromQuery(sizeInBytes);
+ forceAllocateFromQuery(sizeInBytes);
} catch (LoadRuntimeOutOfMemoryException e) {
if (dataCacheMemoryBlock != null &&
dataCacheMemoryBlock.doShrink(sizeInBytes)) {
return new LoadTsFileAnalyzeSchemaMemoryBlock(sizeInBytes);