This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch restrict_memtable_number
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f62136f3363db8469fdfce3e7348545186db58a1
Author: HTHou <[email protected]>
AuthorDate: Wed Dec 23 16:00:58 2020 +0800

    restrict flushing memtable number
---
 .../db/engine/storagegroup/TsFileProcessor.java    | 28 ++++++++++++++++++++--
 1 file changed, 26 insertions(+), 2 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 68e8d86..a3cdcdb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -170,8 +170,9 @@ public class TsFileProcessor {
   public void insert(InsertRowPlan insertRowPlan) throws WriteProcessException 
{
 
     if (workMemTable == null) {
-      workMemTable = new PrimitiveMemTable(enableMemControl);
+      workMemTable = getAvailableMemTable();
     }
+
     if (enableMemControl) {
       checkMemCostAndAddToTspInfo(insertRowPlan);
     }
@@ -212,7 +213,7 @@ public class TsFileProcessor {
       TSStatus[] results) throws WriteProcessException {
 
     if (workMemTable == null) {
-      workMemTable = new PrimitiveMemTable(enableMemControl);
+      workMemTable = getAvailableMemTable();
     }
 
     try {
@@ -1008,4 +1009,27 @@ public class TsFileProcessor {
   public void addCloseFileListeners(Collection<CloseFileListener> listeners) {
     closeFileListeners.addAll(listeners);
   }
+
+  private IMemTable getAvailableMemTable() {
+    synchronized (flushingMemTables) {
+      if (flushingMemTables.isEmpty()) {
+        return new PrimitiveMemTable(enableMemControl);
+      } else {
+        // wait until flushingMemTables is empty
+        int waitCount = 1;
+        while (true) {
+          if (flushingMemTables.isEmpty()) {
+            return new PrimitiveMemTable();
+          }
+          try {
+            flushingMemTables.wait(1000);
+          } catch (InterruptedException e) {
+            logger.error("{} fails to wait for memtables {}, continue to 
wait", tsFileResource.toString(), e);
+            Thread.currentThread().interrupt();
+          }
+          logger.info("{} has waited for a memtable for {}ms", 
tsFileResource.toString(), waitCount++ * 1000);
+        }
+      }
+    }
+  }
 }

Reply via email to