This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new e12b7350151 Load: Fix Memory Allocation and Release Mismatch in 
LoadTsFileDataCacheMemoryBlock (#14375) (#14466)
e12b7350151 is described below

commit e12b7350151eb7e6fb9f0c94a886eb59c42221e5
Author: Itami Sho <[email protected]>
AuthorDate: Wed Dec 18 10:51:12 2024 +0800

    Load: Fix Memory Allocation and Release Mismatch in 
LoadTsFileDataCacheMemoryBlock (#14375) (#14466)
---
 .../plan/planner/LocalExecutionPlanner.java        |  9 +++++
 .../memory/LoadTsFileAnalyzeSchemaMemoryBlock.java | 12 ++++---
 .../memory/LoadTsFileDataCacheMemoryBlock.java     | 38 +++++-----------------
 .../load/memory/LoadTsFileMemoryManager.java       | 10 ++++--
 4 files changed, 33 insertions(+), 36 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
index 1f02e3bb81f..b643047dcfe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
@@ -262,6 +262,15 @@ public class LocalExecutionPlanner {
 
   public synchronized void releaseToFreeMemoryForOperators(final long 
memoryInBytes) {
     freeMemoryForOperators += memoryInBytes;
+
+    if (freeMemoryForOperators > ALLOCATE_MEMORY_FOR_OPERATORS) {
+      LOGGER.error(
+          "The free memory {} is more than allocated memory {}, last released 
memory: {}",
+          freeMemoryForOperators,
+          ALLOCATE_MEMORY_FOR_OPERATORS,
+          memoryInBytes);
+      freeMemoryForOperators = ALLOCATE_MEMORY_FOR_OPERATORS;
+    }
   }
 
   public long getAllocateMemoryForOperators() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAnalyzeSchemaMemoryBlock.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAnalyzeSchemaMemoryBlock.java
index c7add4b446f..0b2555f67d3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAnalyzeSchemaMemoryBlock.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAnalyzeSchemaMemoryBlock.java
@@ -45,13 +45,15 @@ public class LoadTsFileAnalyzeSchemaMemoryBlock extends 
LoadTsFileAbstractMemory
   }
 
   @Override
-  public boolean hasEnoughMemory(long memoryTobeAddedInBytes) {
+  public synchronized boolean hasEnoughMemory(long memoryTobeAddedInBytes) {
     return memoryUsageInBytes.get() + memoryTobeAddedInBytes <= 
totalMemorySizeInBytes;
   }
 
   @Override
-  public void addMemoryUsage(long memoryInBytes) {
-    memoryUsageInBytes.addAndGet(memoryInBytes);
+  public synchronized void addMemoryUsage(long memoryInBytes) {
+    if (memoryUsageInBytes.addAndGet(memoryInBytes) > totalMemorySizeInBytes) {
+      LOGGER.warn("{} has exceed total memory size", this);
+    }
 
     MetricService.getInstance()
         .getOrCreateGauge(
@@ -63,7 +65,7 @@ public class LoadTsFileAnalyzeSchemaMemoryBlock extends 
LoadTsFileAbstractMemory
   }
 
   @Override
-  public void reduceMemoryUsage(long memoryInBytes) {
+  public synchronized void reduceMemoryUsage(long memoryInBytes) {
     if (memoryUsageInBytes.addAndGet(-memoryInBytes) < 0) {
       LOGGER.warn("{} has reduce memory usage to negative", this);
     }
@@ -78,7 +80,7 @@ public class LoadTsFileAnalyzeSchemaMemoryBlock extends 
LoadTsFileAbstractMemory
   }
 
   @Override
-  protected void releaseAllMemory() {
+  protected synchronized void releaseAllMemory() {
     if (memoryUsageInBytes.get() != 0) {
       LOGGER.warn(
           "Try to release memory from a memory block {} which has not released 
all memory", this);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java
index e0709cece9e..e620984038d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java
@@ -31,15 +31,9 @@ public class LoadTsFileDataCacheMemoryBlock extends 
LoadTsFileAbstractMemoryBloc
   private static final Logger LOGGER =
       LoggerFactory.getLogger(LoadTsFileDataCacheMemoryBlock.class);
   private static final long MINIMUM_MEMORY_SIZE_IN_BYTES = 1024 * 1024L; // 1 
MB
-  private static final int MAX_ASK_FOR_MEMORY_COUNT = 256; // must be a power 
of 2
-  private static final long EACH_ASK_MEMORY_SIZE_IN_BYTES =
-      Math.max(
-          MINIMUM_MEMORY_SIZE_IN_BYTES,
-          LoadTsFileMemoryManager.MEMORY_TOTAL_SIZE_FROM_QUERY_IN_BYTES >> 4);
 
   private final AtomicLong limitedMemorySizeInBytes;
   private final AtomicLong memoryUsageInBytes;
-  private final AtomicInteger askForMemoryCount;
   private final AtomicInteger referenceCount;
 
   LoadTsFileDataCacheMemoryBlock(long initialLimitedMemorySizeInBytes) {
@@ -54,7 +48,6 @@ public class LoadTsFileDataCacheMemoryBlock extends 
LoadTsFileAbstractMemoryBloc
 
     this.limitedMemorySizeInBytes = new 
AtomicLong(initialLimitedMemorySizeInBytes);
     this.memoryUsageInBytes = new AtomicLong(0L);
-    this.askForMemoryCount = new AtomicInteger(1);
     this.referenceCount = new AtomicInteger(0);
   }
 
@@ -64,29 +57,17 @@ public class LoadTsFileDataCacheMemoryBlock extends 
LoadTsFileAbstractMemoryBloc
   }
 
   @Override
-  public void addMemoryUsage(long memoryInBytes) {
-    memoryUsageInBytes.addAndGet(memoryInBytes);
-
-    askForMemoryCount.getAndUpdate(
-        count -> {
-          if ((count & (count - 1)) == 0) {
-            // count is a power of 2
-            long actuallyAllocateMemorySizeInBytes =
-                
MEMORY_MANAGER.tryAllocateFromQuery(EACH_ASK_MEMORY_SIZE_IN_BYTES);
-            
limitedMemorySizeInBytes.addAndGet(actuallyAllocateMemorySizeInBytes);
-            if (actuallyAllocateMemorySizeInBytes < 
EACH_ASK_MEMORY_SIZE_IN_BYTES) {
-              return (count & (MAX_ASK_FOR_MEMORY_COUNT - 1)) + 1;
-            } else {
-              return 1;
-            }
-          }
-          return (count & (MAX_ASK_FOR_MEMORY_COUNT - 1)) + 1;
-        });
+  public synchronized void addMemoryUsage(long memoryInBytes) {
+    if (memoryUsageInBytes.addAndGet(memoryInBytes) > 
limitedMemorySizeInBytes.get()) {
+      LOGGER.warn("{} has exceed total memory size", this);
+    }
   }
 
   @Override
-  public void reduceMemoryUsage(long memoryInBytes) {
-    memoryUsageInBytes.addAndGet(-memoryInBytes);
+  public synchronized void reduceMemoryUsage(long memoryInBytes) {
+    if (memoryUsageInBytes.addAndGet(-memoryInBytes) < 0) {
+      LOGGER.warn("{} has reduce memory usage to negative", this);
+    }
   }
 
   @Override
@@ -113,6 +94,7 @@ public class LoadTsFileDataCacheMemoryBlock extends 
LoadTsFileAbstractMemoryBloc
       return false;
     }
 
+    MEMORY_MANAGER.releaseToQuery(shrinkMemoryInBytes);
     limitedMemorySizeInBytes.addAndGet(-shrinkMemoryInBytes);
     return true;
   }
@@ -140,8 +122,6 @@ public class LoadTsFileDataCacheMemoryBlock extends 
LoadTsFileAbstractMemoryBloc
         + limitedMemorySizeInBytes.get()
         + ", memoryUsageInBytes="
         + memoryUsageInBytes.get()
-        + ", askForMemoryCount="
-        + askForMemoryCount.get()
         + '}';
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
index be6e8dcef97..97b8d7f68b0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
@@ -44,7 +44,7 @@ public class LoadTsFileMemoryManager {
   private final AtomicLong usedMemorySizeInBytes = new AtomicLong(0);
   private LoadTsFileDataCacheMemoryBlock dataCacheMemoryBlock;
 
-  private synchronized void forceAllocatedFromQuery(long sizeInBytes)
+  private synchronized void forceAllocateFromQuery(long sizeInBytes)
       throws LoadRuntimeOutOfMemoryException {
     for (int i = 0; i < MEMORY_ALLOCATE_MAX_RETRIES; i++) {
       // allocate memory from queryEngine
@@ -82,6 +82,12 @@ public class LoadTsFileMemoryManager {
   }
 
   public synchronized void releaseToQuery(long sizeInBytes) {
+    if (usedMemorySizeInBytes.get() < sizeInBytes) {
+      LOGGER.error(
+          "Load: Attempting to release more memory ({}) than allocated ({})",
+          sizeInBytes,
+          usedMemorySizeInBytes.get());
+    }
     usedMemorySizeInBytes.addAndGet(-sizeInBytes);
     QUERY_ENGINE_MEMORY_MANAGER.releaseToFreeMemoryForOperators(sizeInBytes);
     this.notifyAll();
@@ -90,7 +96,7 @@ public class LoadTsFileMemoryManager {
   public synchronized LoadTsFileAnalyzeSchemaMemoryBlock 
allocateAnalyzeSchemaMemoryBlock(
       long sizeInBytes) throws LoadRuntimeOutOfMemoryException {
     try {
-      forceAllocatedFromQuery(sizeInBytes);
+      forceAllocateFromQuery(sizeInBytes);
     } catch (LoadRuntimeOutOfMemoryException e) {
       if (dataCacheMemoryBlock != null && 
dataCacheMemoryBlock.doShrink(sizeInBytes)) {
         return new LoadTsFileAnalyzeSchemaMemoryBlock(sizeInBytes);

Reply via email to