This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e737aeb6693 Fix tsfile flush error when drop database or remove peer
(#12493)
e737aeb6693 is described below
commit e737aeb6693629dd56abcfad9d4ecad094178901
Author: Haonan <[email protected]>
AuthorDate: Thu May 9 11:19:54 2024 +0800
Fix tsfile flush error when drop database or remove peer (#12493)
---
.../db/storageengine/dataregion/DataRegion.java | 45 ++++++++++++++--------
1 file changed, 28 insertions(+), 17 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 1ba6c4788af..3e97a3b2c72 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1583,6 +1583,7 @@ public class DataRegion implements IDataRegionForQuery {
writeLock("syncDeleteDataFiles");
try {
forceCloseAllWorkingTsFileProcessors();
+ waitClosingTsFileProcessorFinished();
// normally, mergingModification is just need to be closed by after a
merge task is finished.
// we close it here just for IT test.
closeAllResources();
@@ -1604,6 +1605,12 @@ public class DataRegion implements IDataRegionForQuery {
lastFlushTimeMap.clearGlobalFlushedTime();
TimePartitionManager.getInstance()
.removeTimePartitionInfo(new
DataRegionId(Integer.parseInt(dataRegionId)));
+ } catch (InterruptedException e) {
+ logger.error(
+ "CloseFileNodeCondition error occurs while waiting for closing the
storage " + "group {}",
+ databaseName + "-" + dataRegionId,
+ e);
+ Thread.currentThread().interrupt();
} finally {
writeUnlock();
}
@@ -1733,23 +1740,7 @@ public class DataRegion implements IDataRegionForQuery {
public void syncCloseAllWorkingTsFileProcessors() {
try {
List<Future<?>> tsFileProcessorsClosingFutures =
asyncCloseAllWorkingTsFileProcessors();
- long startTime = System.currentTimeMillis();
- while (!closingSequenceTsFileProcessor.isEmpty()
- || !closingUnSequenceTsFileProcessor.isEmpty()) {
- synchronized (closeStorageGroupCondition) {
- // double check to avoid unnecessary waiting
- if (!closingSequenceTsFileProcessor.isEmpty()
- || !closingUnSequenceTsFileProcessor.isEmpty()) {
- closeStorageGroupCondition.wait(60_000);
- }
- }
- if (System.currentTimeMillis() - startTime > 60_000) {
- logger.warn(
- "{} has spent {}s to wait for closing all TsFiles.",
- databaseName + "-" + this.dataRegionId,
- (System.currentTimeMillis() - startTime) / 1000);
- }
- }
+ waitClosingTsFileProcessorFinished();
for (Future<?> f : tsFileProcessorsClosingFutures) {
if (f != null) {
f.get();
@@ -1764,6 +1755,26 @@ public class DataRegion implements IDataRegionForQuery {
}
}
+ private void waitClosingTsFileProcessorFinished() throws
InterruptedException {
+ long startTime = System.currentTimeMillis();
+ while (!closingSequenceTsFileProcessor.isEmpty()
+ || !closingUnSequenceTsFileProcessor.isEmpty()) {
+ synchronized (closeStorageGroupCondition) {
+ // double check to avoid unnecessary waiting
+ if (!closingSequenceTsFileProcessor.isEmpty()
+ || !closingUnSequenceTsFileProcessor.isEmpty()) {
+ closeStorageGroupCondition.wait(60_000);
+ }
+ }
+ if (System.currentTimeMillis() - startTime > 60_000) {
+ logger.warn(
+ "{} has spent {}s to wait for closing all TsFiles.",
+ databaseName + "-" + this.dataRegionId,
+ (System.currentTimeMillis() - startTime) / 1000);
+ }
+ }
+ }
+
/** close all working tsfile processors */
List<Future<?>> asyncCloseAllWorkingTsFileProcessors() {
writeLock("asyncCloseAllWorkingTsFileProcessors");