This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-3164
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 86d6e5dd40535177512101ab72fdd4f09dc285ce
Author: Liu Xuxin <[email protected]>
AuthorDate: Tue Jul 19 19:52:23 2022 +0800

    temp
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  2 +-
 .../iotdb/db/engine/flush/NotifyFlushMemTable.java | 13 +++++
 .../apache/iotdb/db/engine/memtable/IMemTable.java |  6 +++
 .../db/engine/memtable/PrimitiveMemTable.java      | 17 ++++++
 .../db/engine/storagegroup/StorageGroupInfo.java   | 14 ++++-
 .../db/engine/storagegroup/TsFileProcessor.java    | 23 +++++---
 .../iotdb/db/rescon/memory/MemoryController.java   |  4 +-
 .../db/rescon/memory/WriteMemoryController.java    | 63 +++++++++++++++-------
 8 files changed, 109 insertions(+), 33 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index e8470e94ef..e0a2906d68 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -138,7 +138,7 @@ public class IoTDBConfig {
   private double timeIndexMemoryProportion = 0.2;
 
   /** Flush proportion for system */
-  private double flushProportion = 0.4;
+  private double flushProportion = 0.3;
 
   /** Reject proportion for system */
   private double rejectProportion = 0.8;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java
index 8d80b8affb..c501288c53 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java
@@ -36,4 +36,17 @@ public class NotifyFlushMemTable extends AbstractMemTable {
   public boolean isSignalMemTable() {
     return true;
   }
+
+  @Override
+  public void addAllocatedMemSize(long size) {}
+
+  @Override
+  public long getAllocatedMemSize() {
+    return 0;
+  }
+
+  @Override
+  public boolean needToAllocate(long newSize) {
+    return false;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index adf995e948..bc6f6ad7e0 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -188,4 +188,10 @@ public interface IMemTable extends WALEntryValue {
   FlushStatus getFlushStatus();
 
   void setFlushStatus(FlushStatus flushStatus);
+
+  void addAllocatedMemSize(long size);
+
+  long getAllocatedMemSize();
+
+  boolean needToAllocate(long size);
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
index 9bcf4f5807..5ce2b5f7e9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java
@@ -23,8 +23,10 @@ import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class PrimitiveMemTable extends AbstractMemTable {
+  private final AtomicLong allocatedMemSize = new AtomicLong(0L);
 
   public PrimitiveMemTable() {}
 
@@ -52,4 +54,19 @@ public class PrimitiveMemTable extends AbstractMemTable {
   public String toString() {
     return "PrimitiveMemTable{planIndex=[" + getMinPlanIndex() + "," + 
getMaxPlanIndex() + "]}";
   }
+
+  @Override
+  public void addAllocatedMemSize(long size) {
+    allocatedMemSize.addAndGet(size);
+  }
+
+  @Override
+  public long getAllocatedMemSize() {
+    return allocatedMemSize.get();
+  }
+
+  @Override
+  public boolean needToAllocate(long newSize) {
+    return this.allocatedMemSize.get() < this.memSize() + newSize;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
index 996bc0c7e4..6e77b3cb7b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.engine.storagegroup;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.rescon.memory.WriteMemoryController;
 
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -80,8 +81,17 @@ public class StorageGroupInfo {
     lastAllocateSize.set(size);
   }
 
-  public boolean needToAllocate(long newSize) {
-    return true;
+  public void releaseAllocateMemorySize(long size) {
+    lastAllocateSize.addAndGet(size);
+    WriteMemoryController.getInstance().releaseFlushingMemory(this, size);
+  }
+
+  public void addAllocateSize(long size) {
+    lastAllocateSize.addAndGet(size);
+  }
+
+  public boolean needToAllocate() {
+    return memoryCost.get() > lastAllocateSize.get();
   }
 
   /**
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index a22031d78a..1f041b8285 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -785,11 +785,17 @@ public class TsFileProcessor {
     storageGroupInfo.addStorageGroupMemCost(memTableIncrement);
     tsFileProcessorInfo.addTSPMemCost(chunkMetadataIncrement);
     WriteMemoryController controller = WriteMemoryController.getInstance();
-    boolean allocateMemory = false;
+    boolean allocateSuccess = false;
     try {
-      allocateMemory = controller.tryAllocateMemory(memTableIncrement, 
storageGroupInfo, this);
-      if (!allocateMemory) {
-        StorageEngine.blockInsertionIfReject(this);
+      while (workMemTable.needToAllocate(memTableIncrement)) {
+        allocateSuccess =
+            controller.allocateFrame(storageGroupInfo, this, 
workMemTable.getMemTableId());
+        if (!allocateSuccess) {
+          StorageEngine.blockInsertionIfReject(this);
+        } else {
+          storageGroupInfo.addAllocateSize(WriteMemoryController.FRAME_SIZE);
+          workMemTable.addAllocatedMemSize(WriteMemoryController.FRAME_SIZE);
+        }
       }
     } catch (WriteProcessRejectException e) {
       storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
@@ -808,7 +814,6 @@ public class TsFileProcessor {
     memTableIncrement += textDataIncrement;
     storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
     tsFileProcessorInfo.releaseTSPMemCost(chunkMetadataIncrement);
-    WriteMemoryController.getInstance().releaseMemory(memTableIncrement);
     workMemTable.releaseTVListRamCost(memTableIncrement);
     workMemTable.releaseTextDataSize(textDataIncrement);
   }
@@ -1166,9 +1171,7 @@ public class TsFileProcessor {
               flushingMemTables.size());
         }
         // report to System
-        WriteMemoryController.getInstance()
-            .releaseFlushingMemory(
-                memTable.getTVListsRamCost(), storageGroupName, 
memTable.getMemTableId());
+        
storageGroupInfo.releaseAllocateMemorySize(memTable.getAllocatedMemSize());
       }
       if (logger.isDebugEnabled()) {
         logger.debug(
@@ -1628,4 +1631,8 @@ public class TsFileProcessor {
   public IMemTable getWorkMemTable() {
     return workMemTable;
   }
+
+  public long getWorkMemTableAllocateSize() {
+    return workMemTable == null ? 0 : workMemTable.getAllocatedMemSize();
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java 
b/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
index 80fe1063a7..22fad77108 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
@@ -137,8 +137,8 @@ public class MemoryController<T> {
     }
   }
 
-  private void checkTrigger(long usage, T triggerParam) {
-    if (usage >= triggerThreshold && trigger != null) {
+  private void checkTrigger(long newUsage, T triggerParam) {
+    if (newUsage >= triggerThreshold && trigger != null) {
       if (triggerRunning.compareAndSet(false, true)) {
         try {
           trigger.run(triggerParam);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
 
b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
index 9d1af8425f..60aa0a5ec8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
@@ -40,11 +40,13 @@ public class WriteMemoryController extends 
MemoryController<TsFileProcessor> {
   private static long memorySizeForWrite = config.getAllocateMemoryForWrite();
   private static double FLUSH_THRESHOLD = memorySizeForWrite * 
config.getFlushProportion();
   private static double REJECT_THRESHOLD = memorySizeForWrite * 
config.getRejectProportion();
+  private static double END_FLUSH_THRESHOLD = 0.5 * FLUSH_THRESHOLD;
   private volatile boolean rejected = false;
   private AtomicLong flushingMemory = new AtomicLong(0);
   private Set<StorageGroupInfo> infoSet = new CopyOnWriteArraySet<>();
   private ExecutorService flushTaskSubmitThreadPool =
       IoTDBThreadPoolFactory.newFixedThreadPool(1, "FlushTask-Submit-Pool");
+  public static final long FRAME_SIZE = 2L * 1024L * 1024L;
 
   public WriteMemoryController(long limitSize) {
     super(limitSize);
@@ -69,12 +71,33 @@ public class WriteMemoryController extends 
MemoryController<TsFileProcessor> {
     return success;
   }
 
-  public void releaseFlushingMemory(long size, String storageGroup, long 
memTableId) {
+  public boolean allocateFrame(StorageGroupInfo info, TsFileProcessor 
processor, long memTableId) {
+    boolean success = this.tryAllocateMemory(FRAME_SIZE, info, processor);
+    if (success) {
+      logger.error(
+          "Allocate memory frame for {}-{}#{}, current usage is {} MB, 
remaining is {} MB, flushing memory is {} MB",
+          info.getDataRegion().getLogicalStorageGroupName(),
+          info.getDataRegion().getDataRegionId(),
+          memTableId,
+          ((double) memoryUsage.get()) / 1024.0d / 1024.0d,
+          ((double) (memorySizeForWrite - memoryUsage.get())) / 1024.0d / 
1024.0d,
+          ((double) flushingMemory.get()) / 1024.0d / 1024.0d);
+    }
+    return success;
+  }
+
+  public void releaseFlushingMemory(StorageGroupInfo info, long size) {
     this.flushingMemory.addAndGet(-size);
-    this.releaseMemory(size, storageGroup, memTableId);
+    this.releaseMemory(size);
+    logger.error(
+        "Release {} size of {}-{}, remaining size is {}",
+        ((double) size) / 1024.0d / 1024.0d,
+        info.getDataRegion().getLogicalStorageGroupName(),
+        info.getDataRegion().getDataRegionId(),
+        ((double) (memorySizeForWrite - memoryUsage.get())) / 1024.0d / 
1024.0d);
   }
 
-  public void releaseMemory(long size, String storageGroup, long memTableId) {
+  public void releaseMemory(long size) {
     super.releaseMemory(size);
     if (rejected && memoryUsage.get() < REJECT_THRESHOLD) {
       rejected = false;
@@ -97,15 +120,15 @@ public class WriteMemoryController extends 
MemoryController<TsFileProcessor> {
   }
 
   public void applyExternalMemoryForFlushing(long size) {
-    memorySizeForWrite -= size;
-    FLUSH_THRESHOLD = memorySizeForWrite * config.getFlushProportion();
-    REJECT_THRESHOLD = memorySizeForWrite * config.getRejectProportion();
+    //    memorySizeForWrite -= size;
+    //    FLUSH_THRESHOLD = memorySizeForWrite * config.getFlushProportion();
+    //    REJECT_THRESHOLD = memorySizeForWrite * config.getRejectProportion();
   }
 
   public void releaseExternalMemoryForFlushing(long size) {
-    memorySizeForWrite -= size;
-    FLUSH_THRESHOLD = memorySizeForWrite * config.getFlushProportion();
-    REJECT_THRESHOLD = memorySizeForWrite * config.getRejectProportion();
+    //    memorySizeForWrite += size;
+    //    FLUSH_THRESHOLD = memorySizeForWrite * config.getFlushProportion();
+    //    REJECT_THRESHOLD = memorySizeForWrite * config.getRejectProportion();
   }
 
   protected void chooseMemtableToFlush(TsFileProcessor currentTsFileProcessor) 
{
@@ -114,20 +137,19 @@ public class WriteMemoryController extends 
MemoryController<TsFileProcessor> {
       return;
     }
     long memCost = 0;
-    long activeMemSize = memoryUsage.get() - flushingMemory.get();
-    if (activeMemSize - memCost < FLUSH_THRESHOLD) {
-      return;
-    }
     PriorityQueue<TsFileProcessor> allTsFileProcessors =
         new PriorityQueue<>(
-            (o1, o2) -> Long.compare(o2.getWorkMemTableRamCost(), 
o1.getWorkMemTableRamCost()));
+            (o1, o2) ->
+                Long.compare(o2.getWorkMemTableAllocateSize(), 
o1.getWorkMemTableAllocateSize()));
     for (StorageGroupInfo storageGroupInfo : infoSet) {
       allTsFileProcessors.addAll(storageGroupInfo.getAllReportedTsp());
     }
     long selectedCount = 0;
-    while (activeMemSize - memCost > FLUSH_THRESHOLD) {
+    long activeMemory = memoryUsage.get() - flushingMemory.get();
+    while (activeMemory - memCost > END_FLUSH_THRESHOLD) {
       if (allTsFileProcessors.isEmpty()
           || allTsFileProcessors.peek().getWorkMemTableRamCost() == 0) {
+        logger.error("No memtable to flush");
         return;
       }
       TsFileProcessor selectedTsFileProcessor = allTsFileProcessors.peek();
@@ -138,17 +160,18 @@ public class WriteMemoryController extends 
MemoryController<TsFileProcessor> {
           || selectedTsFileProcessor.getWorkMemTable().shouldFlush()) {
         continue;
       }
-      memCost += selectedTsFileProcessor.getWorkMemTableRamCost();
+      long memUsageForThisMemTable = 
selectedTsFileProcessor.getWorkMemTableAllocateSize();
+      memCost += memUsageForThisMemTable;
       selectedTsFileProcessor.setWorkMemTableShouldFlush();
-      
flushingMemory.addAndGet(selectedTsFileProcessor.getWorkMemTableRamCost());
+      flushingMemory.addAndGet(memUsageForThisMemTable);
       
flushTaskSubmitThreadPool.submit(selectedTsFileProcessor::submitAFlushTask);
       selectedCount++;
       allTsFileProcessors.poll();
     }
     logger.info(
-        "Select {} memtable to flush, flushing memory is {}, remaining memory 
is {}",
+        "Select {} memtable to flush, flushing memory is {} MB, remaining 
memory is {} MB",
         selectedCount,
-        flushingMemory.get(),
-        memoryUsage.get() - flushingMemory.get());
+        ((double) flushingMemory.get()) / 1024.0d / 1024.0d,
+        ((double) (memoryUsage.get() - flushingMemory.get())) / 1024.0d / 
1024.0d);
   }
 }

Reply via email to