This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c04d3fa9cc [IOTDB-4835] Fix InnerSpaceCompaction may be blocked (#7901)
c04d3fa9cc is described below
commit c04d3fa9cc2e1f150e8442e1bb27cb06dff57668
Author: Liu Xuxin <[email protected]>
AuthorDate: Wed Nov 9 20:51:11 2022 +0800
[IOTDB-4835] Fix InnerSpaceCompaction may be blocked (#7901)
---
.../engine/compaction/CompactionTaskManager.java | 1 +
.../sizetiered/SizeTieredCompactionSelector.java | 78 +++++++++-------
.../engine/compaction/CompactionSchedulerTest.java | 103 ++++++++++++++++++++-
3 files changed, 146 insertions(+), 36 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index 649a277716..1720ad6c71 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -385,6 +385,7 @@ public class CompactionTaskManager implements IService {
init = true;
}
currentTaskNum = new AtomicInteger(0);
+ init = true;
logger.info("Compaction task manager started.");
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
index 6261787398..d8430029c9 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
@@ -78,38 +78,6 @@ public class SizeTieredCompactionSelector
hasNextTimePartition = tsFileManager.hasNextTimePartition(timePartition,
sequence);
}
- /**
- * This method searches for a batch of files to be compacted from layer 0 to
the highest layer. If
- * there are more than a batch of files to be merged on a certain layer, it
does not search to
- * higher layers. It creates a compaction thread for each batch of files and
put it into the
- * candidateCompactionTaskQueue of the {@link CompactionTaskManager}.
- *
- * @return Returns whether the file was found and submits the merge task
- */
- @Override
- public List<List<TsFileResource>> selectInnerSpaceTask(List<TsFileResource>
tsFileResources) {
- this.tsFileResources = tsFileResources;
- PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue =
- new PriorityQueue<>(new SizeTieredCompactionTaskComparator());
- try {
- int maxLevel = searchMaxFileLevel();
- for (int currentLevel = 0; currentLevel <= maxLevel; currentLevel++) {
- if (!selectLevelTask(currentLevel, taskPriorityQueue)) {
- break;
- }
- }
- List<List<TsFileResource>> taskList = new LinkedList<>();
- while (taskPriorityQueue.size() > 0) {
- List<TsFileResource> resources = taskPriorityQueue.poll().left;
- taskList.add(resources);
- }
- return taskList;
- } catch (Exception e) {
- LOGGER.error("Exception occurs while selecting files", e);
- }
- return Collections.emptyList();
- }
-
/**
* This method searches for all files on the given level. If there are
consecutive files on the
* level that meet the system preset conditions (the number exceeds 10 or
the total file size
@@ -135,8 +103,16 @@ public class SizeTieredCompactionSelector
for (TsFileResource currentFile : tsFileResources) {
TsFileNameGenerator.TsFileName currentName =
TsFileNameGenerator.getTsFileName(currentFile.getTsFile().getName());
- if (currentName.getInnerCompactionCnt() != level
- || currentFile.getStatus() != TsFileResourceStatus.CLOSED) {
+ if (currentName.getInnerCompactionCnt() != level) {
+ if (selectedFileList.size() > 1) {
+ taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList),
selectedFileSize));
+ shouldContinueToSearch = false;
+ }
+ selectedFileList = new ArrayList<>();
+ selectedFileSize = 0L;
+ continue;
+ }
+ if (currentFile.getStatus() != TsFileResourceStatus.CLOSED) {
selectedFileList.clear();
selectedFileSize = 0L;
continue;
@@ -155,10 +131,10 @@ public class SizeTieredCompactionSelector
// submit the task
if (selectedFileList.size() > 1) {
taskPriorityQueue.add(new Pair<>(new ArrayList<>(selectedFileList),
selectedFileSize));
+ shouldContinueToSearch = false;
}
selectedFileList = new ArrayList<>();
selectedFileSize = 0L;
- shouldContinueToSearch = false;
}
}
@@ -171,6 +147,38 @@ public class SizeTieredCompactionSelector
return shouldContinueToSearch;
}
+ /**
+ * This method searches for a batch of files to be compacted from layer 0 to
the highest layer. If
+ * there are more than a batch of files to be merged on a certain layer, it
does not search to
+ * higher layers. It creates a compaction thread for each batch of files and
put it into the
+ * candidateCompactionTaskQueue of the {@link CompactionTaskManager}.
+ *
+ * @return Returns whether the file was found and submits the merge task
+ */
+ @Override
+ public List<List<TsFileResource>> selectInnerSpaceTask(List<TsFileResource>
tsFileResources) {
+ this.tsFileResources = tsFileResources;
+ PriorityQueue<Pair<List<TsFileResource>, Long>> taskPriorityQueue =
+ new PriorityQueue<>(new SizeTieredCompactionTaskComparator());
+ try {
+ int maxLevel = searchMaxFileLevel();
+ for (int currentLevel = 0; currentLevel <= maxLevel; currentLevel++) {
+ if (!selectLevelTask(currentLevel, taskPriorityQueue)) {
+ break;
+ }
+ }
+ List<List<TsFileResource>> taskList = new LinkedList<>();
+ while (taskPriorityQueue.size() > 0) {
+ List<TsFileResource> resources = taskPriorityQueue.poll().left;
+ taskList.add(resources);
+ }
+ return taskList;
+ } catch (Exception e) {
+ LOGGER.error("Exception occurs while selecting files", e);
+ }
+ return Collections.emptyList();
+ }
+
private int searchMaxFileLevel() throws IOException {
int maxLevel = -1;
for (TsFileResource currentFile : tsFileResources) {
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
index b9440b03e5..e498fcf1b2 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionSchedulerTest.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.engine.compaction;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.TestConstant;
@@ -1627,7 +1628,6 @@ public class CompactionSchedulerTest {
fullPath.add(sgName + device);
}
for (int i = 0; i < 100; i++) {
-
List<List<Long>> chunkPagePointsNum = new ArrayList<>();
List<Long> pagePointsNum = new ArrayList<>();
pagePointsNum.add(100L);
@@ -1705,6 +1705,107 @@ public class CompactionSchedulerTest {
}
}
+ @Test
+ public void testLargeFileInLowerLevel() throws Exception {
+ logger.warn("Running test16");
+ int prevMaxCompactionCandidateFileNum =
+
IoTDBDescriptor.getInstance().getConfig().getMaxInnerCompactionCandidateFileNum();
+
IoTDBDescriptor.getInstance().getConfig().setMaxInnerCompactionCandidateFileNum(2);
+ long originTargetSize =
IoTDBDescriptor.getInstance().getConfig().getTargetCompactionFileSize();
+ IoTDBDescriptor.getInstance().getConfig().setTargetCompactionFileSize(1024
* 1024);
+ String sgName = COMPACTION_TEST_SG + "test17";
+ try {
+ CompactionTaskManager.getInstance().restart();
+ TsFileManager tsFileManager = new TsFileManager(sgName, "0", "target");
+ Set<String> fullPath = new HashSet<>();
+ for (String device : fullPaths) {
+ fullPath.add(sgName + device);
+ }
+ for (int i = 0; i < 10; i++) {
+ List<List<Long>> chunkPagePointsNum = new ArrayList<>();
+ List<Long> pagePointsNum = new ArrayList<>();
+ pagePointsNum.add(100L);
+ chunkPagePointsNum.add(pagePointsNum);
+ TsFileResource tsFileResource =
+ new TsFileResource(
+ new File(
+ TestConstant.BASE_OUTPUT_PATH
+ .concat(File.separator)
+ .concat("sequence")
+ .concat(File.separator)
+ .concat(sgName)
+ .concat(File.separator)
+ .concat("0")
+ .concat(File.separator)
+ .concat("0")
+ .concat(File.separator)
+ .concat(
+ (i + 1)
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + (i + 1)
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 1
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile")));
+ CompactionFileGeneratorUtils.writeTsFile(
+ fullPath, chunkPagePointsNum, 100 * i + 100, tsFileResource);
+ tsFileManager.add(tsFileResource, true);
+ }
+
+ List<List<Long>> chunkPagePointsNum = new ArrayList<>();
+ List<Long> pagePointsNum = new ArrayList<>();
+ pagePointsNum.add(100000L);
+ chunkPagePointsNum.add(pagePointsNum);
+ TsFileResource tsFileResource =
+ new TsFileResource(
+ new File(
+ TestConstant.BASE_OUTPUT_PATH
+ .concat(File.separator)
+ .concat("sequence")
+ .concat(File.separator)
+ .concat(sgName)
+ .concat(File.separator)
+ .concat("0")
+ .concat(File.separator)
+ .concat("0")
+ .concat(File.separator)
+ .concat(
+ 11
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 11
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile")));
+ CompactionFileGeneratorUtils.writeTsFile(
+ fullPath, chunkPagePointsNum, 100 * 10 + 100, tsFileResource);
+ tsFileManager.add(tsFileResource, true);
+
+ CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ Thread.sleep(100);
+ long sleepTime = 0;
+ while (tsFileManager.getTsFileList(true).size() > 3) {
+ CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ Thread.sleep(100);
+ sleepTime += 100;
+ if (sleepTime >= 20_000) {
+ fail();
+ }
+ }
+
+ stopCompactionTaskManager();
+ tsFileManager.setAllowCompaction(false);
+ assertEquals(3, tsFileManager.getTsFileList(true).size());
+ } finally {
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+
.setMaxInnerCompactionCandidateFileNum(prevMaxCompactionCandidateFileNum);
+
IoTDBDescriptor.getInstance().getConfig().setTargetCompactionFileSize(originTargetSize);
+ }
+ }
+
public void stopCompactionTaskManager() {
CompactionTaskManager.getInstance().clearCandidateQueue();
while
(CompactionTaskManager.getInstance().getRunningCompactionTaskList().size() > 0)
{