This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch fix_mem_control_211 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 09eb962c047e9afcf481999d1b5ca3732934b239 Author: qiaojialin <[email protected]> AuthorDate: Fri Nov 6 15:16:17 2020 +0800 add doc --- .../db/engine/storagegroup/StorageGroupInfo.java | 5 ++-- .../engine/storagegroup/TsFileProcessorInfo.java | 4 +++- .../org/apache/iotdb/db/rescon/SystemInfo.java | 28 ++++++++++++---------- 3 files changed, 21 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java index 57de6eb..e696a60 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java @@ -33,7 +33,8 @@ public class StorageGroupInfo { private StorageGroupProcessor storageGroupProcessor; /** - * The total Storage group memory cost + * The total Storage group memory cost, + * including unsealed TsFileResource, ChunkMetadata, WAL, primitive arrays and TEXT values */ private AtomicLong memoryCost; @@ -76,7 +77,7 @@ public class StorageGroupInfo { memoryCost.getAndAdd(-cost); } - public long getSgMemCost() { + public long getMemCost() { return memoryCost.get(); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorInfo.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorInfo.java index 52bc863..417168e 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorInfo.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorInfo.java @@ -30,7 +30,9 @@ public class TsFileProcessorInfo { */ private StorageGroupInfo storageGroupInfo; - // unsealed TsFileResource, ChunkMetadata, WAL + /** + * memory occupation of unsealed TsFileResource, ChunkMetadata, WAL + */ private long memCost; diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java index fa1715e..aa32467 100644 --- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java +++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java @@ -54,17 +54,19 @@ public class SystemInfo { * @param storageGroupInfo storage group */ public void reportStorageGroupStatus(StorageGroupInfo storageGroupInfo) { - long delta = storageGroupInfo.getSgMemCost() - + long delta = storageGroupInfo.getMemCost() - reportedSgMemCostMap.getOrDefault(storageGroupInfo, 0L); totalSgMemCost.addAndGet(delta); - logger.debug("Report Storage Group Status to the system. " + if (logger.isDebugEnabled()) { + logger.debug("Report Storage Group Status to the system. " + "After adding {}, current sg mem cost is {}.", delta, totalSgMemCost); - reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getSgMemCost()); - storageGroupInfo.setLastReportedSize(storageGroupInfo.getSgMemCost()); + } + reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost()); + storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost()); if (totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * FLUSH_PROPORTION) { logger.debug("The total storage group mem costs are too large, call for flushing. " + "Current sg cost is {}", totalSgMemCost); - flush(); + chooseTSPToMarkFlush(); } if (totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * REJECT_PROPORTION) { logger.info("Change system to reject status..."); @@ -81,10 +83,10 @@ public class SystemInfo { public void resetStorageGroupStatus(StorageGroupInfo storageGroupInfo, boolean shouldInvokeFlush) { if (reportedSgMemCostMap.containsKey(storageGroupInfo)) { - this.totalSgMemCost.addAndGet(storageGroupInfo.getSgMemCost() - + this.totalSgMemCost.addAndGet(storageGroupInfo.getMemCost() - reportedSgMemCostMap.get(storageGroupInfo)); - storageGroupInfo.setLastReportedSize(storageGroupInfo.getSgMemCost()); - reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getSgMemCost()); + storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost()); + reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost()); if (shouldInvokeFlush) { checkSystemToInvokeFlush(); } @@ -120,11 +122,11 @@ public class SystemInfo { } /** - * Flush the tsfileProcessor in SG with the max mem cost. If the queue size of flushing > - * threshold, it's identified as flushing is in progress. + * Order all tsfileProcessors in system by memory cost of actual data points in memtable. + * Mark the top K TSPs as to be flushed, + * so that after flushing the K TSPs, the memory cost should be less than FLUSH_THRESHOLD */ - public void flush() { - + private void chooseTSPToMarkFlush() { if (FlushManager.getInstance().getNumberOfWorkingTasks() > 0) { return; } @@ -144,7 +146,7 @@ public class SystemInfo { /** * Be Careful!! This method can only be called by flush thread! */ - public void forceAsyncFlush() { + private void forceAsyncFlush() { List<TsFileProcessor> processors = getTsFileProcessorsToFlush(); for (TsFileProcessor processor : processors) { if (processor != null) {
