This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch memdead
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e6c3fd0a1948ed3426108a15c50e2caa654e863a
Author: HTHou <[email protected]>
AuthorDate: Fri Mar 19 19:04:38 2021 +0800

    [IOTDB-1274] fix the insert blocked caused the bugs in mem control module
---
 .../org/apache/iotdb/db/rescon/SystemInfo.java     | 56 +++++++++++-----------
 1 file changed, 29 insertions(+), 27 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java 
b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index 8fb84f1..b6d587e 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -88,37 +88,39 @@ public class SystemInfo {
    *
    * @param storageGroupInfo storage group
    */
-  public synchronized void resetStorageGroupStatus(
+  public void resetStorageGroupStatus(
       StorageGroupInfo storageGroupInfo, boolean shouldInvokeFlush) {
-    if (reportedSgMemCostMap.containsKey(storageGroupInfo)) {
-      this.totalSgMemCost -=
-          (reportedSgMemCostMap.get(storageGroupInfo) - 
storageGroupInfo.getMemCost());
-      storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost());
-      reportedSgMemCostMap.put(storageGroupInfo, 
storageGroupInfo.getMemCost());
-      if (shouldInvokeFlush) {
-        checkSystemToInvokeFlush();
+    boolean needForceAsyncFlush = false;
+    synchronized (this) {
+      if (reportedSgMemCostMap.containsKey(storageGroupInfo)) {
+        this.totalSgMemCost -=
+            (reportedSgMemCostMap.get(storageGroupInfo) - 
storageGroupInfo.getMemCost());
+        storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost());
+        reportedSgMemCostMap.put(storageGroupInfo, 
storageGroupInfo.getMemCost());
       }
-    }
-  }
 
-  private void checkSystemToInvokeFlush() {
-    if (totalSgMemCost >= FLUSH_THERSHOLD && totalSgMemCost < 
REJECT_THERSHOLD) {
-      logger.debug("Some sg memory released but still exceeding flush 
proportion, call flush.");
-      if (rejected) {
-        logger.info("Some sg memory released, set system to normal status.");
+      if (totalSgMemCost >= FLUSH_THERSHOLD && totalSgMemCost < 
REJECT_THERSHOLD) {
+        logger.debug("Some sg memory released but still exceeding flush 
proportion, call flush.");
+        if (rejected) {
+          logger.info("Some sg memory released, set system to normal status.");
+        }
+        logCurrentTotalSGMemory();
+        rejected = false;
+        needForceAsyncFlush = true;
+      } else if (totalSgMemCost >= REJECT_THERSHOLD) {
+        logger.warn("Some sg memory released, but system is still in reject 
status.");
+        logCurrentTotalSGMemory();
+        rejected = true;
+        needForceAsyncFlush = true;
+
+      } else {
+        logger.debug("Some sg memory released, system is in normal status.");
+        logCurrentTotalSGMemory();
+        rejected = false;
       }
-      logCurrentTotalSGMemory();
-      rejected = false;
-      forceAsyncFlush();
-    } else if (totalSgMemCost >= REJECT_THERSHOLD) {
-      logger.warn("Some sg memory released, but system is still in reject 
status.");
-      logCurrentTotalSGMemory();
-      rejected = true;
+    }
+    if (shouldInvokeFlush && needForceAsyncFlush) {
       forceAsyncFlush();
-    } else {
-      logger.debug("Some sg memory released, system is in normal status.");
-      logCurrentTotalSGMemory();
-      rejected = false;
     }
   }
 
@@ -150,7 +152,7 @@ public class SystemInfo {
 
   /** Be Careful!! This method can only be called by flush thread! */
   private void forceAsyncFlush() {
-    if (FlushManager.getInstance().getNumberOfWorkingTasks() > 0) {
+    if (FlushManager.getInstance().getNumberOfWorkingTasks() > 1) {
       return;
     }
     List<TsFileProcessor> processors = getTsFileProcessorsToFlush();

Reply via email to