This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch mcbf in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4e63e435da80bda56bb61ab66e709b71cf9a06e2 Author: HTHou <[email protected]> AuthorDate: Fri Nov 6 00:36:08 2020 +0800 fix cannot flush when system reject --- .../org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java | 8 -------- .../iotdb/db/engine/storagegroup/StorageGroupProcessor.java | 10 ++++++++++ .../apache/iotdb/db/engine/storagegroup/TsFileProcessor.java | 2 ++ 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java index f2fb524..d760819 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java @@ -21,8 +21,6 @@ package org.apache.iotdb.db.engine.flush; import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor; import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * TsFileFlushPolicy is applied when a TsFileProcessor is full after insertion. For standalone @@ -35,19 +33,13 @@ public interface TsFileFlushPolicy { class DirectFlushPolicy implements TsFileFlushPolicy { - private static final Logger logger = LoggerFactory.getLogger(DirectFlushPolicy.class); - @Override public void apply(StorageGroupProcessor storageGroupProcessor, TsFileProcessor tsFileProcessor, boolean isSeq) { if (tsFileProcessor.shouldClose()) { storageGroupProcessor.asyncCloseOneTsFileProcessor(isSeq, tsFileProcessor); - logger.info("Async close tsfile: {}", - tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath()); } else { tsFileProcessor.asyncFlush(); - logger.info("Async flush a memtable to tsfile: {}", - tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath()); } } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index e869143..17d09d6 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -914,6 +914,10 @@ public class StorageGroupProcessor { } public void asyncFlushMemTableInTsFileProcessor(TsFileProcessor tsFileProcessor) { + if (closingSequenceTsFileProcessor.contains(tsFileProcessor) || + closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) { + return; + } writeLock(); try { fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence()); @@ -1089,6 +1093,12 @@ public class StorageGroupProcessor { public void asyncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor tsFileProcessor) { //for sequence tsfile, we update the endTimeMap only when the file is prepared to be closed. //for unsequence tsfile, we have maintained the endTimeMap when an insertion comes. + if (closingSequenceTsFileProcessor.contains(tsFileProcessor) || + closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) { + return; + } + logger.info("Async close tsfile: {}", + tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath()); if (sequence) { closingSequenceTsFileProcessor.add(tsFileProcessor); updateEndTimeMap(tsFileProcessor); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index 5e26f0b..5e5f083 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -602,6 +602,8 @@ public class TsFileProcessor { if (workMemTable == null) { return; } + logger.info("Async flush a memtable to tsfile: {}", + tsFileResource.getTsFile().getAbsolutePath()); addAMemtableIntoFlushingList(workMemTable); } catch (Exception e) { logger.error("{}: {} add a memtable into flushing list failed", storageGroupName,
