This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch NewMemControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 036a637e6232c014439033c99b67f2ce9ea36493
Author: HTHou <[email protected]>
AuthorDate: Sun Apr 25 17:03:35 2021 +0800

    New memory ontrol strategy
---
 .../db/engine/storagegroup/StorageGroupProcessor.java | 19 +++++++++++++++++++
 .../iotdb/db/engine/storagegroup/TsFileProcessor.java | 16 +++++++++++++++-
 .../java/org/apache/iotdb/db/rescon/SystemInfo.java   | 12 +++++++++++-
 .../db/engine/storagegroup/TsFileProcessorTest.java   |  8 ++++----
 4 files changed, 49 insertions(+), 6 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 69277b1..f829517 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -110,6 +110,7 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -164,6 +165,8 @@ public class StorageGroupProcessor {
    * partitionLatestFlushedTimeForEachDevice)
    */
   private final ReadWriteLock insertLock = new ReentrantReadWriteLock();
+
+  private final Condition writeLockConditionForReject = 
insertLock.writeLock().newCondition();
   /** closeStorageGroupCondition is used to wait for all currently closing 
TsFiles to be done. */
   private final Object closeStorageGroupCondition = new Object();
   /**
@@ -1110,6 +1113,18 @@ public class StorageGroupProcessor {
     }
   }
 
+  public void submitAFlushTaskWhenShouldFlush(TsFileProcessor tsFileProcessor) 
{
+    writeLock();
+    try {
+      // check memtable size and may asyncTryToFlush the work memtable
+      if (tsFileProcessor.shouldFlush()) {
+        fileFlushPolicy.apply(this, tsFileProcessor, 
tsFileProcessor.isSequence());
+      }
+    } finally {
+      writeUnlock();
+    }
+  }
+
   private TsFileProcessor getOrCreateTsFileProcessor(long timeRangeId, boolean 
sequence) {
     TsFileProcessor tsFileProcessor = null;
     try {
@@ -1596,6 +1611,10 @@ public class StorageGroupProcessor {
     insertLock.writeLock().unlock();
   }
 
+  public void writeLockConditionAwait() throws InterruptedException {
+    
writeLockConditionForReject.await(config.getCheckPeriodWhenInsertBlocked(), 
TimeUnit.MILLISECONDS);
+  }
+
   /**
    * @param tsFileResources includes sealed and unsealed tsfile resources
    * @return fill unsealed tsfile resources with memory data and 
ChunkMetadataList of data in disk
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 8cc6b50..d967843 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -459,13 +459,23 @@ public class TsFileProcessor {
     if (storageGroupInfo.needToReportToSystem()) {
       try {
         if 
(!SystemInfo.getInstance().reportStorageGroupStatus(storageGroupInfo, this)) {
-          StorageEngine.blockInsertionIfReject();
+          long startTime = System.currentTimeMillis();
+          while (SystemInfo.getInstance().isRejected()) {
+            
storageGroupInfo.getStorageGroupProcessor().writeLockConditionAwait();
+            if (System.currentTimeMillis() - startTime
+                > config.getMaxWaitingTimeWhenInsertBlocked()) {
+              throw new WriteProcessRejectException(
+                  "System rejected over " + 
config.getMaxWaitingTimeWhenInsertBlocked() + "ms");
+            }
+          }
         }
       } catch (WriteProcessRejectException e) {
         storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
         tsFileProcessorInfo.releaseTSPMemCost(chunkMetadataIncrement);
         SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo);
         throw e;
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
       }
     }
     workMemTable.addTVListRamCost(memTableIncrement);
@@ -1284,4 +1294,8 @@ public class TsFileProcessor {
   public void addCloseFileListeners(Collection<CloseFileListener> listeners) {
     closeFileListeners.addAll(listeners);
   }
+
+  public void submitAFlushTask() {
+    
this.storageGroupInfo.getStorageGroupProcessor().submitAFlushTaskWhenShouldFlush(this);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java 
b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index e9b1312..3ad7fba 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -31,6 +31,9 @@ import org.slf4j.LoggerFactory;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.PriorityQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class SystemInfo {
 
@@ -44,6 +47,10 @@ public class SystemInfo {
   private Map<StorageGroupInfo, Long> reportedStorageGroupMemCostMap = new 
HashMap<>();
 
   private long flushingMemTablesCost = 0L;
+  private AtomicInteger threadCnt = new AtomicInteger();
+  private ExecutorService flushTaskSubmitThreadPool =
+      Executors.newFixedThreadPool(
+          config.getConcurrentFlushThread(), r -> new Thread(r, 
"FlushTaskSubmitThread-" + threadCnt.getAndIncrement()));
   private static double FLUSH_THERSHOLD = memorySizeForWrite * 
config.getFlushProportion();
   private static double REJECT_THERSHOLD = memorySizeForWrite * 
config.getRejectProportion();
 
@@ -89,7 +96,7 @@ public class SystemInfo {
         if (totalStorageGroupMemCost < memorySizeForWrite) {
           return true;
         } else {
-          throw new WriteProcessRejectException("Total Storage Group MemCost 
"+ totalStorageGroupMemCost +" is over than memorySizeForWrite");
+          throw new WriteProcessRejectException("Total Storage Group MemCost 
"+ totalStorageGroupMemCost +" is over than memorySizeForWriting " + 
memorySizeForWrite);
         }
       } else {
         return false;
@@ -188,6 +195,9 @@ public class SystemInfo {
       if (selectedTsFileProcessor == currentTsFileProcessor) {
         isCurrentTsFileProcessorSelected = true;
       }
+      flushTaskSubmitThreadPool.submit(() -> {
+        selectedTsFileProcessor.submitAFlushTask();
+      });
       allTsFileProcessors.poll();
     }
     return isCurrentTsFileProcessorSelected;
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
index b3d20c6..ad96383 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
@@ -102,7 +102,7 @@ public class TsFileProcessorTest {
     TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
     processor.setTsFileProcessorInfo(tsFileProcessorInfo);
     this.sgInfo.initTsFileProcessorInfo(processor);
-    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo);
+    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor);
     List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
     processor.query(
         deviceId,
@@ -178,7 +178,7 @@ public class TsFileProcessorTest {
     TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
     processor.setTsFileProcessorInfo(tsFileProcessorInfo);
     this.sgInfo.initTsFileProcessorInfo(processor);
-    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo);
+    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor);
     List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
     processor.query(
         deviceId,
@@ -280,7 +280,7 @@ public class TsFileProcessorTest {
     TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
     processor.setTsFileProcessorInfo(tsFileProcessorInfo);
     this.sgInfo.initTsFileProcessorInfo(processor);
-    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo);
+    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor);
     List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
     processor.query(
         deviceId,
@@ -336,7 +336,7 @@ public class TsFileProcessorTest {
     TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
     processor.setTsFileProcessorInfo(tsFileProcessorInfo);
     this.sgInfo.initTsFileProcessorInfo(processor);
-    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo);
+    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor);
     List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
 
     processor.query(

Reply via email to