This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch IOTDB-3164 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 86d6e5dd40535177512101ab72fdd4f09dc285ce Author: Liu Xuxin <[email protected]> AuthorDate: Tue Jul 19 19:52:23 2022 +0800 temp --- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +- .../iotdb/db/engine/flush/NotifyFlushMemTable.java | 13 +++++ .../apache/iotdb/db/engine/memtable/IMemTable.java | 6 +++ .../db/engine/memtable/PrimitiveMemTable.java | 17 ++++++ .../db/engine/storagegroup/StorageGroupInfo.java | 14 ++++- .../db/engine/storagegroup/TsFileProcessor.java | 23 +++++--- .../iotdb/db/rescon/memory/MemoryController.java | 4 +- .../db/rescon/memory/WriteMemoryController.java | 63 +++++++++++++++------- 8 files changed, 109 insertions(+), 33 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index e8470e94ef..e0a2906d68 100644 --- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -138,7 +138,7 @@ public class IoTDBConfig { private double timeIndexMemoryProportion = 0.2; /** Flush proportion for system */ - private double flushProportion = 0.4; + private double flushProportion = 0.3; /** Reject proportion for system */ private double rejectProportion = 0.8; diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java index 8d80b8affb..c501288c53 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/NotifyFlushMemTable.java @@ -36,4 +36,17 @@ public class NotifyFlushMemTable extends AbstractMemTable { public boolean isSignalMemTable() { return true; } + + @Override + public void addAllocatedMemSize(long size) {} + + @Override + public long getAllocatedMemSize() { + return 0; + } + + @Override + public boolean needToAllocate(long newSize) { + return false; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java index adf995e948..bc6f6ad7e0 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java @@ -188,4 +188,10 @@ public interface IMemTable extends WALEntryValue { FlushStatus getFlushStatus(); void setFlushStatus(FlushStatus flushStatus); + + void addAllocatedMemSize(long size); + + long getAllocatedMemSize(); + + boolean needToAllocate(long size); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java index 9bcf4f5807..5ce2b5f7e9 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTable.java @@ -23,8 +23,10 @@ import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; public class PrimitiveMemTable extends AbstractMemTable { + private final AtomicLong allocatedMemSize = new AtomicLong(0L); public PrimitiveMemTable() {} @@ -52,4 +54,19 @@ public class PrimitiveMemTable extends AbstractMemTable { public String toString() { return "PrimitiveMemTable{planIndex=[" + getMinPlanIndex() + "," + getMaxPlanIndex() + "]}"; } + + @Override + public void addAllocatedMemSize(long size) { + allocatedMemSize.addAndGet(size); + } + + @Override + public long getAllocatedMemSize() { + return allocatedMemSize.get(); + } + + @Override + public boolean needToAllocate(long newSize) { + return this.allocatedMemSize.get() < this.memSize() + newSize; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java index 996bc0c7e4..6e77b3cb7b 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.engine.storagegroup; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.rescon.memory.WriteMemoryController; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; @@ -80,8 +81,17 @@ public class StorageGroupInfo { lastAllocateSize.set(size); } - public boolean needToAllocate(long newSize) { - return true; + public void releaseAllocateMemorySize(long size) { + lastAllocateSize.addAndGet(size); + WriteMemoryController.getInstance().releaseFlushingMemory(this, size); + } + + public void addAllocateSize(long size) { + lastAllocateSize.addAndGet(size); + } + + public boolean needToAllocate() { + return memoryCost.get() > lastAllocateSize.get(); } /** diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index a22031d78a..1f041b8285 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -785,11 +785,17 @@ public class TsFileProcessor { storageGroupInfo.addStorageGroupMemCost(memTableIncrement); tsFileProcessorInfo.addTSPMemCost(chunkMetadataIncrement); WriteMemoryController controller = WriteMemoryController.getInstance(); - boolean allocateMemory = false; + boolean allocateSuccess = false; try { - allocateMemory = controller.tryAllocateMemory(memTableIncrement, storageGroupInfo, this); - if (!allocateMemory) { - StorageEngine.blockInsertionIfReject(this); + while (workMemTable.needToAllocate(memTableIncrement)) { + allocateSuccess = + controller.allocateFrame(storageGroupInfo, this, workMemTable.getMemTableId()); + if (!allocateSuccess) { + StorageEngine.blockInsertionIfReject(this); + } else { + storageGroupInfo.addAllocateSize(WriteMemoryController.FRAME_SIZE); + workMemTable.addAllocatedMemSize(WriteMemoryController.FRAME_SIZE); + } } } catch (WriteProcessRejectException e) { storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement); @@ -808,7 +814,6 @@ public class TsFileProcessor { memTableIncrement += textDataIncrement; storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement); tsFileProcessorInfo.releaseTSPMemCost(chunkMetadataIncrement); - WriteMemoryController.getInstance().releaseMemory(memTableIncrement); workMemTable.releaseTVListRamCost(memTableIncrement); workMemTable.releaseTextDataSize(textDataIncrement); } @@ -1166,9 +1171,7 @@ public class TsFileProcessor { flushingMemTables.size()); } // report to System - WriteMemoryController.getInstance() - .releaseFlushingMemory( - memTable.getTVListsRamCost(), storageGroupName, memTable.getMemTableId()); + storageGroupInfo.releaseAllocateMemorySize(memTable.getAllocatedMemSize()); } if (logger.isDebugEnabled()) { logger.debug( @@ -1628,4 +1631,8 @@ public class TsFileProcessor { public IMemTable getWorkMemTable() { return workMemTable; } + + public long getWorkMemTableAllocateSize() { + return workMemTable == null ? 0 : workMemTable.getAllocatedMemSize(); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java b/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java index 80fe1063a7..22fad77108 100644 --- a/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java +++ b/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java @@ -137,8 +137,8 @@ public class MemoryController<T> { } } - private void checkTrigger(long usage, T triggerParam) { - if (usage >= triggerThreshold && trigger != null) { + private void checkTrigger(long newUsage, T triggerParam) { + if (newUsage >= triggerThreshold && trigger != null) { if (triggerRunning.compareAndSet(false, true)) { try { trigger.run(triggerParam); diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java index 9d1af8425f..60aa0a5ec8 100644 --- a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java +++ b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java @@ -40,11 +40,13 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> { private static long memorySizeForWrite = config.getAllocateMemoryForWrite(); private static double FLUSH_THRESHOLD = memorySizeForWrite * config.getFlushProportion(); private static double REJECT_THRESHOLD = memorySizeForWrite * config.getRejectProportion(); + private static double END_FLUSH_THRESHOLD = 0.5 * FLUSH_THRESHOLD; private volatile boolean rejected = false; private AtomicLong flushingMemory = new AtomicLong(0); private Set<StorageGroupInfo> infoSet = new CopyOnWriteArraySet<>(); private ExecutorService flushTaskSubmitThreadPool = IoTDBThreadPoolFactory.newFixedThreadPool(1, "FlushTask-Submit-Pool"); + public static final long FRAME_SIZE = 2L * 1024L * 1024L; public WriteMemoryController(long limitSize) { super(limitSize); @@ -69,12 +71,33 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> { return success; } - public void releaseFlushingMemory(long size, String storageGroup, long memTableId) { + public boolean allocateFrame(StorageGroupInfo info, TsFileProcessor processor, long memTableId) { + boolean success = this.tryAllocateMemory(FRAME_SIZE, info, processor); + if (success) { + logger.error( + "Allocate memory frame for {}-{}#{}, current usage is {} MB, remaining is {} MB, flushing memory is {} MB", + info.getDataRegion().getLogicalStorageGroupName(), + info.getDataRegion().getDataRegionId(), + memTableId, + ((double) memoryUsage.get()) / 1024.0d / 1024.0d, + ((double) (memorySizeForWrite - memoryUsage.get())) / 1024.0d / 1024.0d, + ((double) flushingMemory.get()) / 1024.0d / 1024.0d); + } + return success; + } + + public void releaseFlushingMemory(StorageGroupInfo info, long size) { this.flushingMemory.addAndGet(-size); - this.releaseMemory(size, storageGroup, memTableId); + this.releaseMemory(size); + logger.error( + "Release {} size of {}-{}, remaining size is {}", + ((double) size) / 1024.0d / 1024.0d, + info.getDataRegion().getLogicalStorageGroupName(), + info.getDataRegion().getDataRegionId(), + ((double) (memorySizeForWrite - memoryUsage.get())) / 1024.0d / 1024.0d); } - public void releaseMemory(long size, String storageGroup, long memTableId) { + public void releaseMemory(long size) { super.releaseMemory(size); if (rejected && memoryUsage.get() < REJECT_THRESHOLD) { rejected = false; @@ -97,15 +120,15 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> { } public void applyExternalMemoryForFlushing(long size) { - memorySizeForWrite -= size; - FLUSH_THRESHOLD = memorySizeForWrite * config.getFlushProportion(); - REJECT_THRESHOLD = memorySizeForWrite * config.getRejectProportion(); + // memorySizeForWrite -= size; + // FLUSH_THRESHOLD = memorySizeForWrite * config.getFlushProportion(); + // REJECT_THRESHOLD = memorySizeForWrite * config.getRejectProportion(); } public void releaseExternalMemoryForFlushing(long size) { - memorySizeForWrite -= size; - FLUSH_THRESHOLD = memorySizeForWrite * config.getFlushProportion(); - REJECT_THRESHOLD = memorySizeForWrite * config.getRejectProportion(); + // memorySizeForWrite += size; + // FLUSH_THRESHOLD = memorySizeForWrite * config.getFlushProportion(); + // REJECT_THRESHOLD = memorySizeForWrite * config.getRejectProportion(); } protected void chooseMemtableToFlush(TsFileProcessor currentTsFileProcessor) { @@ -114,20 +137,19 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> { return; } long memCost = 0; - long activeMemSize = memoryUsage.get() - flushingMemory.get(); - if (activeMemSize - memCost < FLUSH_THRESHOLD) { - return; - } PriorityQueue<TsFileProcessor> allTsFileProcessors = new PriorityQueue<>( - (o1, o2) -> Long.compare(o2.getWorkMemTableRamCost(), o1.getWorkMemTableRamCost())); + (o1, o2) -> + Long.compare(o2.getWorkMemTableAllocateSize(), o1.getWorkMemTableAllocateSize())); for (StorageGroupInfo storageGroupInfo : infoSet) { allTsFileProcessors.addAll(storageGroupInfo.getAllReportedTsp()); } long selectedCount = 0; - while (activeMemSize - memCost > FLUSH_THRESHOLD) { + long activeMemory = memoryUsage.get() - flushingMemory.get(); + while (activeMemory - memCost > END_FLUSH_THRESHOLD) { if (allTsFileProcessors.isEmpty() || allTsFileProcessors.peek().getWorkMemTableRamCost() == 0) { + logger.error("No memtable to flush"); return; } TsFileProcessor selectedTsFileProcessor = allTsFileProcessors.peek(); @@ -138,17 +160,18 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> { || selectedTsFileProcessor.getWorkMemTable().shouldFlush()) { continue; } - memCost += selectedTsFileProcessor.getWorkMemTableRamCost(); + long memUsageForThisMemTable = selectedTsFileProcessor.getWorkMemTableAllocateSize(); + memCost += memUsageForThisMemTable; selectedTsFileProcessor.setWorkMemTableShouldFlush(); - flushingMemory.addAndGet(selectedTsFileProcessor.getWorkMemTableRamCost()); + flushingMemory.addAndGet(memUsageForThisMemTable); flushTaskSubmitThreadPool.submit(selectedTsFileProcessor::submitAFlushTask); selectedCount++; allTsFileProcessors.poll(); } logger.info( - "Select {} memtable to flush, flushing memory is {}, remaining memory is {}", + "Select {} memtable to flush, flushing memory is {} MB, remaining memory is {} MB", selectedCount, - flushingMemory.get(), - memoryUsage.get() - flushingMemory.get()); + ((double) flushingMemory.get()) / 1024.0d / 1024.0d, + ((double) (memoryUsage.get() - flushingMemory.get())) / 1024.0d / 1024.0d); } }
