This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch IOTDB-3164-Allocate-By-Tablets in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c313800143fd25a41f388e4543115a408d8c19b3 Author: Liu Xuxin <[email protected]> AuthorDate: Thu Jul 21 19:58:00 2022 +0800 temp --- .../org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java | 4 ++++ .../org/apache/iotdb/db/rescon/memory/WriteMemoryController.java | 8 +++++--- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index adaee20f82..4d1e2891d1 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -1111,6 +1111,10 @@ public class TsFileProcessor { flushListener.onMemTableFlushStarted(tobeFlushed); } + if (enableMemControl) { + WriteMemoryController.getInstance().addFlushMemory(tobeFlushed.getTVListsRamCost()); + } + flushingMemTables.addLast(tobeFlushed); if (logger.isDebugEnabled()) { logger.debug( diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java index 581294e9bd..6fe2feb6e2 100644 --- a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java +++ b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java @@ -96,6 +96,10 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> { return INSTANCE; } + public void addFlushMemory(long size) { + flushingMemory.addAndGet(size); + } + protected void chooseMemtableToFlush(TsFileProcessor currentTsFileProcessor) { // If invoke flush by replaying logs, do not flush now! if (infoSet.size() == 0) { @@ -118,7 +122,7 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> { || allTsFileProcessors.peek().getWorkMemTableRamCost() == 0) { return; } - TsFileProcessor selectedTsFileProcessor = allTsFileProcessors.peek(); + TsFileProcessor selectedTsFileProcessor = allTsFileProcessors.poll(); if (selectedTsFileProcessor == null) { break; } @@ -128,10 +132,8 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> { } memCost += selectedTsFileProcessor.getWorkMemTableRamCost(); selectedTsFileProcessor.setWorkMemTableShouldFlush(); - flushingMemory.addAndGet(selectedTsFileProcessor.getWorkMemTableRamCost()); flushTaskSubmitThreadPool.submit(selectedTsFileProcessor::submitAFlushTask); selectedCount++; - allTsFileProcessors.poll(); } logger.info( "Select {} memtable to flush, flushing memory is {}, remaining memory is {}",
