This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch jira1306_012
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit daa44f7d4f45a25b0bbcfbc6cde33fcc61295370
Author: HTHou <[email protected]>
AuthorDate: Thu Apr 15 17:08:30 2021 +0800

    [To rel/0.12] [IOTDB-1306]Fix DeadLock in MemControl module
---
 .../engine/storagegroup/StorageGroupProcessor.java | 12 ++++++++++
 .../db/engine/storagegroup/TsFileProcessor.java    | 27 ++++++++++++++--------
 2 files changed, 30 insertions(+), 9 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 5ddf1e3..5a7c2f6 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -106,6 +106,7 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -160,6 +161,8 @@ public class StorageGroupProcessor {
    * partitionLatestFlushedTimeForEachDevice)
    */
   private final ReadWriteLock insertLock = new ReentrantReadWriteLock();
+
+  private final Condition rejectCondition = 
insertLock.writeLock().newCondition();
   /** closeStorageGroupCondition is used to wait for all currently closing 
TsFiles to be done. */
   private final Object closeStorageGroupCondition = new Object();
   /**
@@ -1100,6 +1103,7 @@ public class StorageGroupProcessor {
       }
     } finally {
       writeUnlock();
+      rejectConditionSignal();
     }
   }
 
@@ -1585,6 +1589,14 @@ public class StorageGroupProcessor {
     insertLock.writeLock().unlock();
   }
 
+  public void rejectConditionAwait() throws InterruptedException {
+    rejectCondition.await(config.getCheckPeriodWhenInsertBlocked(), 
TimeUnit.MILLISECONDS);
+  }
+
+  public void rejectConditionSignal() {
+    rejectCondition.signal();
+  }
+
   /**
    * @param tsFileResources includes sealed and unsealed tsfile resources
    * @return fill unsealed tsfile resources with memory data and 
ChunkMetadataList of data in disk
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index c20ec7e..ff795f2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -22,7 +22,6 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.adapter.CompressionRatio;
-import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.flush.CloseFileListener;
 import org.apache.iotdb.db.engine.flush.FlushListener;
 import org.apache.iotdb.db.engine.flush.FlushManager;
@@ -386,15 +385,25 @@ public class TsFileProcessor {
     storageGroupInfo.addStorageGroupMemCost(memTableIncrement);
     tsFileProcessorInfo.addTSPMemCost(unsealedResourceIncrement + 
chunkMetadataIncrement);
     if (storageGroupInfo.needToReportToSystem()) {
-      SystemInfo.getInstance().reportStorageGroupStatus(storageGroupInfo);
-      try {
-        StorageEngine.blockInsertionIfReject();
-      } catch (WriteProcessRejectException e) {
-        storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
-        tsFileProcessorInfo.releaseTSPMemCost(unsealedResourceIncrement + 
chunkMetadataIncrement);
-        SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo, 
false);
-        throw e;
+      long startTime = System.currentTimeMillis();
+      while (SystemInfo.getInstance().isRejected()) {
+        try {
+          storageGroupInfo.getStorageGroupProcessor().rejectConditionAwait();
+          if (System.currentTimeMillis() - startTime
+              > config.getMaxWaitingTimeWhenInsertBlocked()) {
+            throw new WriteProcessRejectException(
+                "System rejected over " + 
config.getMaxWaitingTimeWhenInsertBlocked() + "ms");
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        } catch (WriteProcessRejectException e) {
+          storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
+          tsFileProcessorInfo.releaseTSPMemCost(unsealedResourceIncrement + 
chunkMetadataIncrement);
+          SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo, 
false);
+          throw e;
+        }
       }
+      logger.debug("Time for Waiting memory release is {}", 
System.currentTimeMillis() - startTime);
     }
     workMemTable.addTVListRamCost(memTableIncrement);
     workMemTable.addTextDataSize(textDataIncrement);

Reply via email to