This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-3164-Allocate-By-Tablets
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c313800143fd25a41f388e4543115a408d8c19b3
Author: Liu Xuxin <[email protected]>
AuthorDate: Thu Jul 21 19:58:00 2022 +0800

    temp
---
 .../org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java  | 4 ++++
 .../org/apache/iotdb/db/rescon/memory/WriteMemoryController.java  | 8 +++++---
 2 files changed, 9 insertions(+), 3 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index adaee20f82..4d1e2891d1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -1111,6 +1111,10 @@ public class TsFileProcessor {
       flushListener.onMemTableFlushStarted(tobeFlushed);
     }
 
+    if (enableMemControl) {
+      
WriteMemoryController.getInstance().addFlushMemory(tobeFlushed.getTVListsRamCost());
+    }
+
     flushingMemTables.addLast(tobeFlushed);
     if (logger.isDebugEnabled()) {
       logger.debug(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
 
b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
index 581294e9bd..6fe2feb6e2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
@@ -96,6 +96,10 @@ public class WriteMemoryController extends 
MemoryController<TsFileProcessor> {
     return INSTANCE;
   }
 
+  public void addFlushMemory(long size) {
+    flushingMemory.addAndGet(size);
+  }
+
   protected void chooseMemtableToFlush(TsFileProcessor currentTsFileProcessor) 
{
     // If invoke flush by replaying logs, do not flush now!
     if (infoSet.size() == 0) {
@@ -118,7 +122,7 @@ public class WriteMemoryController extends 
MemoryController<TsFileProcessor> {
           || allTsFileProcessors.peek().getWorkMemTableRamCost() == 0) {
         return;
       }
-      TsFileProcessor selectedTsFileProcessor = allTsFileProcessors.peek();
+      TsFileProcessor selectedTsFileProcessor = allTsFileProcessors.poll();
       if (selectedTsFileProcessor == null) {
         break;
       }
@@ -128,10 +132,8 @@ public class WriteMemoryController extends 
MemoryController<TsFileProcessor> {
       }
       memCost += selectedTsFileProcessor.getWorkMemTableRamCost();
       selectedTsFileProcessor.setWorkMemTableShouldFlush();
-      
flushingMemory.addAndGet(selectedTsFileProcessor.getWorkMemTableRamCost());
       
flushTaskSubmitThreadPool.submit(selectedTsFileProcessor::submitAFlushTask);
       selectedCount++;
-      allTsFileProcessors.poll();
     }
     logger.info(
         "Select {} memtable to flush, flushing memory is {}, remaining memory 
is {}",

Reply via email to