This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 256194a9e28 [IOTDB-6240] Pipe: Fix bug with PipeMemoryManager 
allocating memory for the DisruptorQueue (#11485)
256194a9e28 is described below

commit 256194a9e284ff3959ac4ec166866df64e1442bc
Author: Itami Sho <[email protected]>
AuthorDate: Tue Nov 7 14:05:46 2023 +0800

    [IOTDB-6240] Pipe: Fix bug with PipeMemoryManager allocating memory for the 
DisruptorQueue (#11485)
    
    Allocating memory for the DisruptorQueue requires ensuring that the memory 
size is a power of 2, so the Memory Framework provides `Function` method for 
users to customize their own reallocate strategy.
---
 .../db/pipe/extractor/realtime/assigner/DisruptorQueue.java    |  4 +++-
 .../iotdb/db/pipe/resource/memory/PipeMemoryManager.java       | 10 +++++++++-
 2 files changed, 12 insertions(+), 2 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
index 67acb16540f..39a23e0a9d5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
@@ -54,7 +54,9 @@ public class DisruptorQueue {
         config.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes();
 
     allocatedMemoryBlock =
-        PipeResourceManager.memory().tryAllocate(ringBufferSize * 
ringBufferEntrySizeInBytes);
+        PipeResourceManager.memory()
+            .tryAllocate(
+                ringBufferSize * ringBufferEntrySizeInBytes, (currentSize) -> 
currentSize / 2);
 
     disruptor =
         new Disruptor<>(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index 97688e7b87d..f0c8b89c735 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -31,6 +31,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.function.Function;
 
 public class PipeMemoryManager {
 
@@ -145,6 +146,11 @@ public class PipeMemoryManager {
   }
 
   public synchronized PipeMemoryBlock tryAllocate(long sizeInBytes) {
+    return tryAllocate(sizeInBytes, (currentSize) -> currentSize * 2 / 3);
+  }
+
+  public synchronized PipeMemoryBlock tryAllocate(
+      long sizeInBytes, Function<Long, Long> customAllocateStrategy) {
     if (!PIPE_MEMORY_MANAGEMENT_ENABLED) {
       return new PipeMemoryBlock(sizeInBytes);
     }
@@ -171,7 +177,9 @@ public class PipeMemoryManager {
       }
 
       sizeToAllocateInBytes =
-          Math.max(sizeToAllocateInBytes * 2 / 3, 
MEMORY_ALLOCATE_MIN_SIZE_IN_BYTES);
+          Math.max(
+              customAllocateStrategy.apply(sizeToAllocateInBytes),
+              MEMORY_ALLOCATE_MIN_SIZE_IN_BYTES);
     }
 
     LOGGER.warn(

Reply via email to