This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch restrict_memtable_number in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f62136f3363db8469fdfce3e7348545186db58a1 Author: HTHou <[email protected]> AuthorDate: Wed Dec 23 16:00:58 2020 +0800 restrict flushing memtable number --- .../db/engine/storagegroup/TsFileProcessor.java | 28 ++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index 68e8d86..a3cdcdb 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -170,8 +170,9 @@ public class TsFileProcessor { public void insert(InsertRowPlan insertRowPlan) throws WriteProcessException { if (workMemTable == null) { - workMemTable = new PrimitiveMemTable(enableMemControl); + workMemTable = getAvailableMemTable(); } + if (enableMemControl) { checkMemCostAndAddToTspInfo(insertRowPlan); } @@ -212,7 +213,7 @@ public class TsFileProcessor { TSStatus[] results) throws WriteProcessException { if (workMemTable == null) { - workMemTable = new PrimitiveMemTable(enableMemControl); + workMemTable = getAvailableMemTable(); } try { @@ -1008,4 +1009,27 @@ public class TsFileProcessor { public void addCloseFileListeners(Collection<CloseFileListener> listeners) { closeFileListeners.addAll(listeners); } + + private IMemTable getAvailableMemTable() { + synchronized (flushingMemTables) { + if (flushingMemTables.isEmpty()) { + return new PrimitiveMemTable(enableMemControl); + } else { + // wait until flushingMemTables is empty + int waitCount = 1; + while (true) { + if (flushingMemTables.isEmpty()) { + return new PrimitiveMemTable(); + } + try { + flushingMemTables.wait(1000); + } catch (InterruptedException e) { + logger.error("{} fails to wait for memtables {}, continue to wait", tsFileResource.toString(), e); + Thread.currentThread().interrupt(); + } + logger.info("{} has waited for a memtable for {}ms", tsFileResource.toString(), waitCount++ * 1000); + } + } + } + } }
