This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3427b320b62 Fix compaction UTs and add concurrency control when
schedule compaction (#11638)
3427b320b62 is described below
commit 3427b320b629bfd39e2e329203b0fd067f3e1aa4
Author: shuwenwei <[email protected]>
AuthorDate: Wed Dec 6 11:47:05 2023 +0800
Fix compaction UTs and add concurrency control when schedule compaction
(#11638)
---
.../db/storageengine/dataregion/DataRegion.java | 6 ++++++
.../compaction/schedule/CompactionScheduler.java | 13 +++++++++++
.../compaction/settle/SettleRequestHandler.java | 24 +++++++++++++--------
.../compaction/CompactionSchedulerTest.java | 25 +++++++++++++++++++---
4 files changed, 56 insertions(+), 12 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index a884ba9fcd3..8b6c1d1289d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -2322,6 +2322,7 @@ public class DataRegion implements IDataRegionForQuery {
// the name of this variable is trySubmitCount, because the task
submitted to the queue could
// be
// evicted due to the low priority of the task
+ CompactionScheduler.lockCompactionSelection();
try {
for (long timePartition : timePartitions) {
trySubmitCount +=
@@ -2329,6 +2330,8 @@ public class DataRegion implements IDataRegionForQuery {
}
} catch (Throwable e) {
logger.error("Meet error in compaction schedule.", e);
+ } finally {
+ CompactionScheduler.unlockCompactionSelection();
}
}
if (summary.hasSubmitTask()) {
@@ -2349,6 +2352,7 @@ public class DataRegion implements IDataRegionForQuery {
protected int executeInsertionCompaction(List<Long> timePartitions) {
int trySubmitCount = 0;
+ CompactionScheduler.lockCompactionSelection();
try {
while (true) {
int currentSubmitCount = 0;
@@ -2367,6 +2371,8 @@ public class DataRegion implements IDataRegionForQuery {
}
} catch (Throwable e) {
logger.error("Meet error in compaction schedule.", e);
+ } finally {
+ CompactionScheduler.unlockCompactionSelection();
}
return trySubmitCount;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
index f36dea68747..d2534a427e7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
@@ -40,6 +40,8 @@ import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.Phaser;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
/**
@@ -59,6 +61,17 @@ public class CompactionScheduler {
private CompactionScheduler() {}
+ /** avoid timed compaction schedule conflict with manually triggered
schedule */
+ private static final Lock compactionTaskSelectionLock = new ReentrantLock();
+
+ public static void lockCompactionSelection() {
+ compactionTaskSelectionLock.lock();
+ }
+
+ public static void unlockCompactionSelection() {
+ compactionTaskSelectionLock.unlock();
+ }
+
/**
* Select compaction task and submit them to CompactionTaskManager.
*
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/settle/SettleRequestHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/settle/SettleRequestHandler.java
index 60652442dad..9e1e90441a4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/settle/SettleRequestHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/settle/SettleRequestHandler.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.AbstractCompactionTask;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduler;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
@@ -73,16 +74,21 @@ public class SettleRequestHandler {
public TSStatus handleSettleRequest(TSettleReq req) {
List<String> paths = req.getPaths();
- SettleRequestContext context = new SettleRequestContext(paths);
- TSStatus validationResult = context.validate();
- if (validationResult.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- return validationResult;
- }
- if (testMode) {
- return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ CompactionScheduler.lockCompactionSelection();
+ try {
+ SettleRequestContext context = new SettleRequestContext(paths);
+ TSStatus validationResult = context.validate();
+ if (validationResult.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return validationResult;
+ }
+ if (testMode) {
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
+ }
+ List<TsFileResource> selectedTsFileResources =
context.getTsFileResourcesByFileNames();
+ return context.submitCompactionTask(selectedTsFileResources);
+ } finally {
+ CompactionScheduler.unlockCompactionSelection();
}
- List<TsFileResource> selectedTsFileResources =
context.getTsFileResourcesByFileNames();
- return context.submitCompactionTask(selectedTsFileResources);
}
private static class SettleRequestContext {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java
index 52b6f091576..dd2d9651b61 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java
@@ -37,6 +37,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionC
import
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionFileGeneratorUtils;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.constant.TestConstant;
@@ -1792,18 +1793,36 @@ public class CompactionSchedulerTest {
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
Thread.sleep(100);
long sleepTime = 0;
- while (tsFileManager.getTsFileList(true).size() > 3) {
+ while (tsFileManager.getTsFileList(true).size() >= 2) {
CompactionScheduler.scheduleCompaction(tsFileManager, 0);
+ tsFileManager.readLock();
+ List<TsFileResource> resources = tsFileManager.getTsFileList(true);
+ int previousFileLevel =
+
TsFileNameGenerator.getTsFileName(resources.get(0).getTsFile().getName())
+ .getInnerCompactionCnt();
+ boolean canMerge = false;
+ for (int i = 1; i < resources.size(); i++) {
+ int currentFileLevel =
+
TsFileNameGenerator.getTsFileName(resources.get(i).getTsFile().getName())
+ .getInnerCompactionCnt();
+ if (currentFileLevel == previousFileLevel) {
+ canMerge = true;
+ break;
+ }
+ }
+ tsFileManager.readUnlock();
+ if (!canMerge) {
+ break;
+ }
Thread.sleep(100);
sleepTime += 100;
- if (sleepTime >= 20_000) {
+ if (sleepTime >= 200_000) {
fail();
}
}
stopCompactionTaskManager();
tsFileManager.setAllowCompaction(false);
- assertEquals(3, tsFileManager.getTsFileList(true).size());
} finally {
IoTDBDescriptor.getInstance()
.getConfig()