This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.11 by this push:
new f5c275e fix cannot flush when system reject (#1964)
f5c275e is described below
commit f5c275ef918a78f70f1823089300d51f194b3b96
Author: Haonan <[email protected]>
AuthorDate: Fri Nov 6 13:18:31 2020 +0800
fix cannot flush when system reject (#1964)
---
.../org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java | 8 --------
.../iotdb/db/engine/storagegroup/StorageGroupProcessor.java | 10 ++++++++++
.../apache/iotdb/db/engine/storagegroup/TsFileProcessor.java | 2 ++
3 files changed, 12 insertions(+), 8 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java
b/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java
index f2fb524..d760819 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java
@@ -21,8 +21,6 @@ package org.apache.iotdb.db.engine.flush;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* TsFileFlushPolicy is applied when a TsFileProcessor is full after
insertion. For standalone
@@ -35,19 +33,13 @@ public interface TsFileFlushPolicy {
class DirectFlushPolicy implements TsFileFlushPolicy {
- private static final Logger logger =
LoggerFactory.getLogger(DirectFlushPolicy.class);
-
@Override
public void apply(StorageGroupProcessor storageGroupProcessor,
TsFileProcessor tsFileProcessor,
boolean isSeq) {
if (tsFileProcessor.shouldClose()) {
storageGroupProcessor.asyncCloseOneTsFileProcessor(isSeq,
tsFileProcessor);
- logger.info("Async close tsfile: {}",
- tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath());
} else {
tsFileProcessor.asyncFlush();
- logger.info("Async flush a memtable to tsfile: {}",
- tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath());
}
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index e869143..17d09d6 100755
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -914,6 +914,10 @@ public class StorageGroupProcessor {
}
public void asyncFlushMemTableInTsFileProcessor(TsFileProcessor
tsFileProcessor) {
+ if (closingSequenceTsFileProcessor.contains(tsFileProcessor) ||
+ closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
+ return;
+ }
writeLock();
try {
fileFlushPolicy.apply(this, tsFileProcessor,
tsFileProcessor.isSequence());
@@ -1089,6 +1093,12 @@ public class StorageGroupProcessor {
public void asyncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor
tsFileProcessor) {
//for sequence tsfile, we update the endTimeMap only when the file is
prepared to be closed.
//for unsequence tsfile, we have maintained the endTimeMap when an
insertion comes.
+ if (closingSequenceTsFileProcessor.contains(tsFileProcessor) ||
+ closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
+ return;
+ }
+ logger.info("Async close tsfile: {}",
+ tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath());
if (sequence) {
closingSequenceTsFileProcessor.add(tsFileProcessor);
updateEndTimeMap(tsFileProcessor);
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 5e26f0b..5e5f083 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -602,6 +602,8 @@ public class TsFileProcessor {
if (workMemTable == null) {
return;
}
+ logger.info("Async flush a memtable to tsfile: {}",
+ tsFileResource.getTsFile().getAbsolutePath());
addAMemtableIntoFlushingList(workMemTable);
} catch (Exception e) {
logger.error("{}: {} add a memtable into flushing list failed",
storageGroupName,