This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch IOTDB-3164 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 43b8012725c4b75891c7a452a9f198350f236c12 Author: Liu Xuxin <[email protected]> AuthorDate: Mon Jul 18 11:22:43 2022 +0800 temp --- .../iotdb/db/engine/flush/MemTableFlushTask.java | 7 ++++--- .../db/engine/storagegroup/StorageGroupInfo.java | 10 +++++++--- .../java/org/apache/iotdb/db/rescon/SystemInfo.java | 4 ++-- .../db/rescon/memory/WriteMemoryController.java | 21 ++++++++++++++++----- 4 files changed, 29 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java index eeea38fcd0..f91c346fb6 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java @@ -27,6 +27,7 @@ import org.apache.iotdb.db.engine.memtable.IWritableMemChunkGroup; import org.apache.iotdb.db.exception.runtime.FlushRunTimeException; import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID; import org.apache.iotdb.db.rescon.SystemInfo; +import org.apache.iotdb.db.rescon.memory.WriteMemoryController; import org.apache.iotdb.db.service.metrics.MetricsService; import org.apache.iotdb.db.service.metrics.enums.Metric; import org.apache.iotdb.db.service.metrics.enums.Tag; @@ -103,8 +104,7 @@ public class MemTableFlushTask { if (config.isEnableMemControl() && SystemInfo.getInstance().isEncodingFasterThanIo()) { estimatedTemporaryMemSize = memTable.memSize() / memTable.getSeriesNumber() * config.getIoTaskQueueSizeForFlushing(); - SystemInfo.getInstance().applyTemporaryMemoryForFlushing(estimatedTemporaryMemSize); - // TODO: ALLOCATE IN WRITE MEMORY CONTROLLER + WriteMemoryController.getInstance().applyExternalMemoryForFlushing(estimatedTemporaryMemSize); } long start = System.currentTimeMillis(); long sortTime = 0; @@ -152,7 +152,8 @@ public class MemTableFlushTask { if (config.isEnableMemControl()) { if (estimatedTemporaryMemSize != 0) { - SystemInfo.getInstance().releaseTemporaryMemoryForFlushing(estimatedTemporaryMemSize); + WriteMemoryController.getInstance() + .releaseExternalMemoryForFlushing(estimatedTemporaryMemSize); } SystemInfo.getInstance().setEncodingFasterThanIo(ioTime >= memSerializeTime); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java index e3f3769dfa..996bc0c7e4 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java @@ -39,7 +39,7 @@ public class StorageGroupInfo { private long storageGroupSizeReportThreshold = IoTDBDescriptor.getInstance().getConfig().getStorageGroupSizeReportThreshold(); - private AtomicLong lastReportedSize = new AtomicLong(); + private AtomicLong lastAllocateSize = new AtomicLong(); /** A set of all unclosed TsFileProcessors in this SG */ private List<TsFileProcessor> reportedTsps = new CopyOnWriteArrayList<>(); @@ -76,8 +76,12 @@ public class StorageGroupInfo { return reportedTsps; } - public void setLastReportedSize(long size) { - lastReportedSize.set(size); + public void setLastAllocateSize(long size) { + lastAllocateSize.set(size); + } + + public boolean needToAllocate(long newSize) { + return true; } /** diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java index cb65841996..7e9961141f 100644 --- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java +++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java @@ -77,7 +77,7 @@ public class SystemInfo { totalStorageGroupMemCost); } reportedStorageGroupMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost()); - storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost()); + storageGroupInfo.setLastAllocateSize(storageGroupInfo.getMemCost()); if (totalStorageGroupMemCost < FLUSH_THERSHOLD) { return true; } else if (totalStorageGroupMemCost >= FLUSH_THERSHOLD @@ -123,7 +123,7 @@ public class SystemInfo { if (reportedStorageGroupMemCostMap.containsKey(storageGroupInfo)) { delta = reportedStorageGroupMemCostMap.get(storageGroupInfo) - storageGroupInfo.getMemCost(); this.totalStorageGroupMemCost -= delta; - storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost()); + storageGroupInfo.setLastAllocateSize(storageGroupInfo.getMemCost()); reportedStorageGroupMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost()); } diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java index a758cb6c17..9d1af8425f 100644 --- a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java +++ b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java @@ -37,15 +37,14 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> { private static final Logger logger = LoggerFactory.getLogger(WriteMemoryController.class); private static volatile WriteMemoryController INSTANCE; private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); - private static final long memorySizeForWrite = config.getAllocateMemoryForWrite(); - private static final double FLUSH_THRESHOLD = memorySizeForWrite * config.getFlushProportion(); - private static final double END_FLUSH_THRESHOLD = 0.7 * FLUSH_THRESHOLD; - private static final double REJECT_THRESHOLD = memorySizeForWrite * config.getRejectProportion(); + private static long memorySizeForWrite = config.getAllocateMemoryForWrite(); + private static double FLUSH_THRESHOLD = memorySizeForWrite * config.getFlushProportion(); + private static double REJECT_THRESHOLD = memorySizeForWrite * config.getRejectProportion(); private volatile boolean rejected = false; private AtomicLong flushingMemory = new AtomicLong(0); private Set<StorageGroupInfo> infoSet = new CopyOnWriteArraySet<>(); private ExecutorService flushTaskSubmitThreadPool = - IoTDBThreadPoolFactory.newFixedThreadPool(2, "FlushTask-Submit-Pool"); + IoTDBThreadPoolFactory.newFixedThreadPool(1, "FlushTask-Submit-Pool"); public WriteMemoryController(long limitSize) { super(limitSize); @@ -97,6 +96,18 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> { return INSTANCE; } + public void applyExternalMemoryForFlushing(long size) { + memorySizeForWrite -= size; + FLUSH_THRESHOLD = memorySizeForWrite * config.getFlushProportion(); + REJECT_THRESHOLD = memorySizeForWrite * config.getRejectProportion(); + } + + public void releaseExternalMemoryForFlushing(long size) { + memorySizeForWrite -= size; + FLUSH_THRESHOLD = memorySizeForWrite * config.getFlushProportion(); + REJECT_THRESHOLD = memorySizeForWrite * config.getRejectProportion(); + } + protected void chooseMemtableToFlush(TsFileProcessor currentTsFileProcessor) { // If invoke flush by replaying logs, do not flush now! if (infoSet.size() == 0) {
