This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new be883fd  [IOTDB-2818] Fix exception occurs when submitting compaction 
task to global queue (#5384)
be883fd is described below

commit be883fd8ccca035e627317ba75839041109fcf47
Author: Liu Xuxin <[email protected]>
AuthorDate: Fri Apr 1 22:56:46 2022 +0800

    [IOTDB-2818] Fix exception occurs when submitting compaction task to global 
queue (#5384)
---
 .../manage/CrossSpaceCompactionResource.java       |  3 ++-
 .../sizetiered/SizeTieredCompactionSelector.java   |  4 +--
 .../db/engine/storagegroup/TsFileResource.java     |  4 +++
 .../inner/InnerCompactionSchedulerTest.java        | 31 ++++++++++++++++++++++
 4 files changed, 39 insertions(+), 3 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/manage/CrossSpaceCompactionResource.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/manage/CrossSpaceCompactionResource.java
index 7ced2fb..fecdd78 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/manage/CrossSpaceCompactionResource.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/manage/CrossSpaceCompactionResource.java
@@ -21,6 +21,7 @@ package 
org.apache.iotdb.db.engine.compaction.cross.rewrite.manage;
 
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -80,7 +81,7 @@ public class CrossSpaceCompactionResource {
    */
   private void filterUnseqResource(List<TsFileResource> unseqResources) {
     for (TsFileResource resource : unseqResources) {
-      if (resource.isCompacting() || resource.isCompactionCandidate() || 
!resource.isClosed()) {
+      if (resource.getStatus() != TsFileResourceStatus.CLOSED) {
         return;
       } else if (!resource.isDeleted() && resource.stillLives(ttlLowerBound)) {
         this.unseqFiles.add(resource);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
index 183e8da..8783aec 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.db.engine.compaction.task.AbstractCompactionTask;
 import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.slf4j.Logger;
@@ -124,8 +125,7 @@ public class SizeTieredCompactionSelector extends 
AbstractInnerSpaceCompactionSe
       TsFileNameGenerator.TsFileName currentName =
           TsFileNameGenerator.getTsFileName(currentFile.getTsFile().getName());
       if (currentName.getInnerCompactionCnt() != level
-          || currentFile.isCompactionCandidate()
-          || currentFile.isCompacting()) {
+          || currentFile.getStatus() != TsFileResourceStatus.CLOSED) {
         selectedFileList.clear();
         selectedFileSize = 0L;
         continue;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index a8dc5e5..6c9e83c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -626,6 +626,10 @@ public class TsFileResource {
     }
   }
 
+  public TsFileResourceStatus getStatus() {
+    return this.status;
+  }
+
   /**
    * check if any of the device lives over the given time bound. If the file 
is not closed, then
    * return true.
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java
index 2b05370..0208d32 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerCompactionSchedulerTest.java
@@ -134,4 +134,35 @@ public class InnerCompactionSchedulerTest extends 
AbstractCompactionTest {
     }
     Assert.assertEquals(4, tsFileManager.getTsFileList(true).size());
   }
+
+  @Test
+  public void testFileSelectorWithUnclosedFile()
+      throws IOException, MetadataException, WriteProcessException {
+    
IoTDBDescriptor.getInstance().getConfig().setConcurrentCompactionThread(50);
+    
IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(50);
+    TsFileResourceList tsFileResources = new TsFileResourceList();
+    createFiles(2, 2, 3, 100, 0, 0, 50, 50, false, true);
+    createFiles(2, 3, 5, 50, 250, 250, 50, 50, false, true);
+    seqResources.get(3).setStatus(TsFileResourceStatus.UNCLOSED);
+    TsFileManager tsFileManager = new TsFileManager("testSG", "0", "tmp");
+    tsFileManager.addAll(seqResources, true);
+    CompactionScheduler.tryToSubmitInnerSpaceCompactionTask(
+        "testSG", "0", 0L, tsFileManager, true, new 
FakedInnerSpaceCompactionTaskFactory());
+    CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
+
+    long waitingTime = 0;
+    while (CompactionTaskManager.getInstance().getExecutingTaskCount() != 0) {
+      try {
+        Thread.sleep(100);
+        waitingTime += 100;
+        if (waitingTime > MAX_WAITING_TIME) {
+          Assert.fail();
+          break;
+        }
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+    Assert.assertEquals(4, tsFileManager.getTsFileList(true).size());
+  }
 }

Reply via email to