This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch memdead in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e6c3fd0a1948ed3426108a15c50e2caa654e863a Author: HTHou <[email protected]> AuthorDate: Fri Mar 19 19:04:38 2021 +0800 [IOTDB-1274] fix the insert blocked caused the bugs in mem control module --- .../org/apache/iotdb/db/rescon/SystemInfo.java | 56 +++++++++++----------- 1 file changed, 29 insertions(+), 27 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java index 8fb84f1..b6d587e 100644 --- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java +++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java @@ -88,37 +88,39 @@ public class SystemInfo { * * @param storageGroupInfo storage group */ - public synchronized void resetStorageGroupStatus( + public void resetStorageGroupStatus( StorageGroupInfo storageGroupInfo, boolean shouldInvokeFlush) { - if (reportedSgMemCostMap.containsKey(storageGroupInfo)) { - this.totalSgMemCost -= - (reportedSgMemCostMap.get(storageGroupInfo) - storageGroupInfo.getMemCost()); - storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost()); - reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost()); - if (shouldInvokeFlush) { - checkSystemToInvokeFlush(); + boolean needForceAsyncFlush = false; + synchronized (this) { + if (reportedSgMemCostMap.containsKey(storageGroupInfo)) { + this.totalSgMemCost -= + (reportedSgMemCostMap.get(storageGroupInfo) - storageGroupInfo.getMemCost()); + storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost()); + reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost()); } - } - } - private void checkSystemToInvokeFlush() { - if (totalSgMemCost >= FLUSH_THERSHOLD && totalSgMemCost < REJECT_THERSHOLD) { - logger.debug("Some sg memory released but still exceeding flush proportion, call flush."); - if (rejected) { - logger.info("Some sg memory released, set system to normal status."); + if (totalSgMemCost >= FLUSH_THERSHOLD && totalSgMemCost < REJECT_THERSHOLD) { + logger.debug("Some sg memory released but still exceeding flush proportion, call flush."); + if (rejected) { + logger.info("Some sg memory released, set system to normal status."); + } + logCurrentTotalSGMemory(); + rejected = false; + needForceAsyncFlush = true; + } else if (totalSgMemCost >= REJECT_THERSHOLD) { + logger.warn("Some sg memory released, but system is still in reject status."); + logCurrentTotalSGMemory(); + rejected = true; + needForceAsyncFlush = true; + + } else { + logger.debug("Some sg memory released, system is in normal status."); + logCurrentTotalSGMemory(); + rejected = false; } - logCurrentTotalSGMemory(); - rejected = false; - forceAsyncFlush(); - } else if (totalSgMemCost >= REJECT_THERSHOLD) { - logger.warn("Some sg memory released, but system is still in reject status."); - logCurrentTotalSGMemory(); - rejected = true; + } + if (shouldInvokeFlush && needForceAsyncFlush) { forceAsyncFlush(); - } else { - logger.debug("Some sg memory released, system is in normal status."); - logCurrentTotalSGMemory(); - rejected = false; } } @@ -150,7 +152,7 @@ public class SystemInfo { /** Be Careful!! This method can only be called by flush thread! */ private void forceAsyncFlush() { - if (FlushManager.getInstance().getNumberOfWorkingTasks() > 0) { + if (FlushManager.getInstance().getNumberOfWorkingTasks() > 1) { return; } List<TsFileProcessor> processors = getTsFileProcessorsToFlush();
