This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a4ce6f7a1a3 Modify the condition to skip compaction schdule after
insertion compaction task selection (#14644)
a4ce6f7a1a3 is described below
commit a4ce6f7a1a309dbfd5dfec8d8b75849b620820ff
Author: shuwenwei <[email protected]>
AuthorDate: Thu Jan 9 14:27:45 2025 +0800
Modify the condition to skip compaction schdule after insertion compaction
task selection (#14644)
* Modify the condition to skip compaction schdule after insertion
compaction task selection
* add ut
* delay insertion compaction
* fix ut
---
.../db/storageengine/dataregion/DataRegion.java | 66 ++++++++++------------
.../schedule/CompactionScheduleContext.java | 21 +++++++
.../compaction/schedule/CompactionScheduler.java | 60 +++++++++++---------
.../impl/RewriteCrossSpaceCompactionSelector.java | 11 +++-
.../InsertionCrossSpaceCompactionSelectorTest.java | 4 +-
.../cross/InsertionCrossSpaceCompactionTest.java | 64 +++++++++++++++++++--
6 files changed, 157 insertions(+), 69 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 54b469de3fe..84ff12e3dc7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -176,7 +176,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -2719,31 +2718,32 @@ public class DataRegion implements IDataRegionForQuery {
if (!isCompactionSelecting.compareAndSet(false, true)) {
return 0;
}
- int trySubmitCount = 0;
+ CompactionScheduleContext context = new CompactionScheduleContext();
try {
List<Long> timePartitions = new
ArrayList<>(tsFileManager.getTimePartitions());
// Sort the time partition from largest to smallest
timePartitions.sort(Comparator.reverseOrder());
- CompactionScheduleContext context = new CompactionScheduleContext();
-
// schedule insert compaction
- trySubmitCount += executeInsertionCompaction(timePartitions, context);
- context.incrementSubmitTaskNum(CompactionTaskType.INSERTION,
trySubmitCount);
+ int[] submitCountOfTimePartitions =
executeInsertionCompaction(timePartitions, context);
// schedule the other compactions
- if (trySubmitCount == 0) {
- // the name of this variable is trySubmitCount, because the task
submitted to the queue
- // could be evicted due to the low priority of the task
- for (long timePartition : timePartitions) {
- CompactionScheduler.sharedLockCompactionSelection();
- try {
- trySubmitCount +=
- CompactionScheduler.scheduleCompaction(tsFileManager,
timePartition, context);
- } finally {
- context.clearTimePartitionDeviceInfoCache();
- CompactionScheduler.sharedUnlockCompactionSelection();
- }
+ for (int i = 0; i < timePartitions.size(); i++) {
+ boolean skipOtherCompactionSchedule =
+ submitCountOfTimePartitions[i] > 0
+ && !config
+ .getDataRegionConsensusProtocolClass()
+ .equals(ConsensusFactory.IOT_CONSENSUS_V2);
+ if (skipOtherCompactionSchedule) {
+ continue;
+ }
+ long timePartition = timePartitions.get(i);
+ CompactionScheduler.sharedLockCompactionSelection();
+ try {
+ CompactionScheduler.scheduleCompaction(tsFileManager, timePartition,
context);
+ } finally {
+ context.clearTimePartitionDeviceInfoCache();
+ CompactionScheduler.sharedUnlockCompactionSelection();
}
}
if (context.hasSubmitTask()) {
@@ -2756,7 +2756,7 @@ public class DataRegion implements IDataRegionForQuery {
} finally {
isCompactionSelecting.set(false);
}
- return trySubmitCount;
+ return context.getSubmitCompactionTaskNum();
}
/** Schedule settle compaction for ttl check. */
@@ -2803,40 +2803,36 @@ public class DataRegion implements IDataRegionForQuery {
return trySubmitCount;
}
- protected int executeInsertionCompaction(
+ protected int[] executeInsertionCompaction(
List<Long> timePartitions, CompactionScheduleContext context) throws
InterruptedException {
- int trySubmitCount = 0;
+ int[] trySubmitCountOfTimePartitions = new int[timePartitions.size()];
CompactionScheduler.sharedLockCompactionSelection();
try {
while (true) {
int currentSubmitCount = 0;
- for (long timePartition : timePartitions) {
- while (true) {
- Phaser insertionTaskPhaser = new Phaser(1);
- int selectedTaskNum =
- CompactionScheduler.scheduleInsertionCompaction(
- tsFileManager, timePartition, insertionTaskPhaser,
context);
-
insertionTaskPhaser.awaitAdvanceInterruptibly(insertionTaskPhaser.arrive());
- currentSubmitCount += selectedTaskNum;
- if (selectedTaskNum <= 0) {
- break;
- }
- }
+ for (int i = 0; i < timePartitions.size(); i++) {
+ long timePartition = timePartitions.get(i);
+ int selectedTaskNum =
+ CompactionScheduler.scheduleInsertionCompaction(
+ tsFileManager, timePartition, context);
+ currentSubmitCount += selectedTaskNum;
+ trySubmitCountOfTimePartitions[i] += selectedTaskNum;
context.clearTimePartitionDeviceInfoCache();
}
if (currentSubmitCount <= 0) {
break;
}
- trySubmitCount += currentSubmitCount;
+ context.incrementSubmitTaskNum(CompactionTaskType.INSERTION,
currentSubmitCount);
}
} catch (InterruptedException e) {
throw e;
} catch (Throwable e) {
logger.error("Meet error in insertion compaction schedule.", e);
} finally {
+ context.clearTimePartitionDeviceInfoCache();
CompactionScheduler.sharedUnlockCompactionSelection();
}
- return trySubmitCount;
+ return trySubmitCountOfTimePartitions;
}
/**
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java
index 4d51ba5010f..1f01cac0b25 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java
@@ -31,7 +31,9 @@ import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDeviceTimeIndex;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
public class CompactionScheduleContext {
private int submitSeqInnerSpaceCompactionTaskNum = 0;
@@ -49,8 +51,19 @@ public class CompactionScheduleContext {
private final Map<TsFileResource, ArrayDeviceTimeIndex>
partitionFileDeviceInfoCache;
private long cachedDeviceInfoSize = 0;
+ private final Set<Long> timePartitionsDelayInsertionSelection;
+
public CompactionScheduleContext() {
this.partitionFileDeviceInfoCache = new HashMap<>();
+ this.timePartitionsDelayInsertionSelection = new HashSet<>();
+ }
+
+ public void delayInsertionSelection(long timePartitionId) {
+ timePartitionsDelayInsertionSelection.add(timePartitionId);
+ }
+
+ public boolean isInsertionSelectionDelayed(long timePartitionId) {
+ return timePartitionsDelayInsertionSelection.remove(timePartitionId);
}
public void addResourceDeviceTimeIndex(
@@ -135,6 +148,14 @@ public class CompactionScheduleContext {
return submitSettleCompactionTaskNum;
}
+ public int getSubmitCompactionTaskNum() {
+ return submitSeqInnerSpaceCompactionTaskNum
+ + submitUnseqInnerSpaceCompactionTaskNum
+ + submitCrossSpaceCompactionTaskNum
+ + submitInsertionCrossSpaceCompactionTaskNum
+ + submitSettleCompactionTaskNum;
+ }
+
public boolean hasSubmitTask() {
return submitCrossSpaceCompactionTaskNum
+ submitInsertionCrossSpaceCompactionTaskNum
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
index c465e6fa863..a8877f9ae8f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
@@ -93,29 +93,32 @@ public class CompactionScheduler {
* @param context the context of compaction schedule
* @return the count of submitted task
*/
- public static int scheduleCompaction(
+ public static void scheduleCompaction(
TsFileManager tsFileManager, long timePartition,
CompactionScheduleContext context)
throws InterruptedException {
if (!tsFileManager.isAllowCompaction()) {
- return 0;
+ return;
}
// the name of this variable is trySubmitCount, because the task submitted
to the queue could be
// evicted due to the low priority of the task
- int trySubmitCount = 0;
try {
- trySubmitCount +=
+ int submitInnerTaskNum = 0;
+ submitInnerTaskNum +=
tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition,
true, context);
- trySubmitCount +=
+ submitInnerTaskNum +=
tryToSubmitInnerSpaceCompactionTask(tsFileManager, timePartition,
false, context);
- trySubmitCount += tryToSubmitCrossSpaceCompactionTask(tsFileManager,
timePartition, context);
- trySubmitCount +=
- tryToSubmitSettleCompactionTask(tsFileManager, timePartition,
context, false);
+ boolean executeDelayedInsertionSelection =
+ submitInnerTaskNum == 0 &&
context.isInsertionSelectionDelayed(timePartition);
+ if (executeDelayedInsertionSelection) {
+ scheduleInsertionCompaction(tsFileManager, timePartition, context);
+ }
+ tryToSubmitCrossSpaceCompactionTask(tsFileManager, timePartition,
context);
+ tryToSubmitSettleCompactionTask(tsFileManager, timePartition, context,
false);
} catch (InterruptedException e) {
throw e;
} catch (Throwable e) {
LOGGER.error("Meet error in compaction schedule.", e);
}
- return trySubmitCount;
}
@TestOnly
@@ -124,22 +127,6 @@ public class CompactionScheduler {
scheduleCompaction(tsFileManager, timePartition, new
CompactionScheduleContext());
}
- public static int scheduleInsertionCompaction(
- TsFileManager tsFileManager,
- long timePartition,
- Phaser insertionTaskPhaser,
- CompactionScheduleContext context)
- throws InterruptedException {
- if (!tsFileManager.isAllowCompaction()) {
- return 0;
- }
- int trySubmitCount = 0;
- trySubmitCount +=
- tryToSubmitInsertionCompactionTask(
- tsFileManager, timePartition, insertionTaskPhaser, context);
- return trySubmitCount;
- }
-
public static int tryToSubmitInnerSpaceCompactionTask(
TsFileManager tsFileManager,
long timePartition,
@@ -223,13 +210,31 @@ public class CompactionScheduler {
return true;
}
- private static int tryToSubmitInsertionCompactionTask(
+ public static int scheduleInsertionCompaction(
+ TsFileManager tsFileManager, long timePartition,
CompactionScheduleContext context)
+ throws InterruptedException {
+ int count = 0;
+ while (true) {
+ Phaser insertionTaskPhaser = new Phaser(1);
+ int selectedTaskNum =
+ tryToSubmitInsertionCompactionTask(
+ tsFileManager, timePartition, insertionTaskPhaser, context);
+
insertionTaskPhaser.awaitAdvanceInterruptibly(insertionTaskPhaser.arrive());
+ if (selectedTaskNum <= 0) {
+ break;
+ }
+ count += selectedTaskNum;
+ }
+ return count;
+ }
+
+ public static int tryToSubmitInsertionCompactionTask(
TsFileManager tsFileManager,
long timePartition,
Phaser insertionTaskPhaser,
CompactionScheduleContext context)
throws InterruptedException {
- if (!config.isEnableCrossSpaceCompaction()) {
+ if (!tsFileManager.isAllowCompaction() ||
!config.isEnableCrossSpaceCompaction()) {
return 0;
}
String logicalStorageGroupName = tsFileManager.getStorageGroupName();
@@ -351,6 +356,7 @@ public class CompactionScheduler {
trySubmitCount++;
}
}
+ context.incrementSubmitTaskNum(CompactionTaskType.SETTLE, trySubmitCount);
return trySubmitCount;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
index ef1fe4e0208..804d441d2f2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java
@@ -69,7 +69,8 @@ public class RewriteCrossSpaceCompactionSelector implements
ICrossSpaceSelector
protected TsFileManager tsFileManager;
private static boolean hasPrintedLog = false;
- private static int maxDeserializedFileNumToCheckInsertionCandidateValid =
500;
+ private static int maxDeserializedFileNumToCheckInsertionCandidateValid =
100;
+ private static int maxFileNumToSelectInsertionTaskInOnePartition = 200;
private final long memoryBudget;
private final int maxCrossCompactionFileNum;
@@ -166,6 +167,14 @@ public class RewriteCrossSpaceCompactionSelector
implements ICrossSpaceSelector
"Selecting insertion cross compaction task resources from {}
seqFile, {} unseqFiles",
candidate.getSeqFiles().size(),
candidate.getUnseqFiles().size());
+ boolean delaySelection =
+ candidate.getSeqFiles().size() + candidate.getUnseqFiles().size()
+ > maxFileNumToSelectInsertionTaskInOnePartition;
+ if (delaySelection) {
+ context.delayInsertionSelection(timePartition);
+ return new InsertionCrossCompactionTaskResource();
+ }
+
InsertionCrossCompactionTaskResource result =
insertionCrossSpaceCompactionSelector.executeInsertionCrossSpaceCompactionTaskSelection();
if (result.isValid()) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionSelectorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionSelectorTest.java
index 6a86b3955f9..2cb60ed2256 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionSelectorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionSelectorTest.java
@@ -146,7 +146,7 @@ public class InsertionCrossSpaceCompactionSelectorTest
extends AbstractCompactio
Phaser phaser = new Phaser(1);
int submitTaskNum =
- CompactionScheduler.scheduleInsertionCompaction(tsFileManager, 0,
phaser, context);
+ CompactionScheduler.tryToSubmitInsertionCompactionTask(tsFileManager,
0, phaser, context);
Assert.assertEquals(1, submitTaskNum);
// perform insertion compaction
phaser.awaitAdvanceInterruptibly(phaser.arrive());
@@ -169,7 +169,7 @@ public class InsertionCrossSpaceCompactionSelectorTest
extends AbstractCompactio
// unseq resource2 d2[10, 20]
submitTaskNum =
- CompactionScheduler.scheduleInsertionCompaction(tsFileManager, 0,
phaser, context);
+ CompactionScheduler.tryToSubmitInsertionCompactionTask(tsFileManager,
0, phaser, context);
Assert.assertEquals(0, submitTaskNum);
Assert.assertTrue(
TsFileResourceUtils.validateTsFileResourcesHasNoOverlap(tsFileManager.getTsFileList(true)));
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionTest.java
index ddcb90f3ac8..0e0516a46c0 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionTest.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.storageengine.dataregion.compaction.cross;
import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.service.metrics.FileMetrics;
@@ -452,7 +453,7 @@ public class InsertionCrossSpaceCompactionTest extends
AbstractCompactionTest {
}
@Test
- public void testInsertionCompactionScheduleWithMultiTimePartitions()
+ public void testInsertionCompactionScheduleWithMultiTimePartitions1()
throws IOException, InterruptedException {
TsFileResource unseqResource1 =
generateSingleNonAlignedSeriesFileWithDevices(
@@ -504,6 +505,60 @@ public class InsertionCrossSpaceCompactionTest extends
AbstractCompactionTest {
TsFileResourceManager.getInstance().getPriorityQueueSize());
}
+ @Test
+ public void testInsertionCompactionScheduleWithMultiTimePartitions2()
+ throws IOException, InterruptedException {
+ IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ int innerCompactionCandidateFileNum =
config.getInnerCompactionCandidateFileNum();
+ config.setInnerCompactionCandidateFileNum(2);
+ try {
+ TsFileResource unseqResource1 =
+ generateSingleNonAlignedSeriesFileWithDevices(
+ "2-2-0-0.tsfile", new String[] {"d1"}, new TimeRange[] {new
TimeRange(1, 4)}, false);
+ unseqResource1.setStatusForTest(TsFileResourceStatus.NORMAL);
+
+ TsFileResource unseqResource2 =
+ generateSingleNonAlignedSeriesFileWithDevices(
+ "3-3-0-0.tsfile", new String[] {"d1"}, new TimeRange[] {new
TimeRange(6, 9)}, false);
+ unseqResource2.setStatusForTest(TsFileResourceStatus.NORMAL);
+ createTimePartitionDirIfNotExist(2808L);
+ TsFileResource unseqResource3 =
+ generateSingleNonAlignedSeriesFileWithDevicesWithTimePartition(
+ "4-4-0-0.tsfile",
+ new String[] {"d1"},
+ new TimeRange[] {new TimeRange(1698301490305L, 1698301490405L)},
+ 2808L,
+ true);
+ TsFileResource unseqResource4 =
+ generateSingleNonAlignedSeriesFileWithDevicesWithTimePartition(
+ "5-5-0-0.tsfile",
+ new String[] {"d1"},
+ new TimeRange[] {new TimeRange(1698301490306L, 1698301490406L)},
+ 2808L,
+ true);
+ unseqResource3.setStatusForTest(TsFileResourceStatus.NORMAL);
+ unseqResources.add(unseqResource1);
+ unseqResources.add(unseqResource2);
+ seqResources.add(unseqResource3);
+ seqResources.add(unseqResource4);
+
+ DataRegionForCompactionTest dataRegion = createDataRegion();
+ TsFileManager tsFileManager = dataRegion.getTsFileManager();
+
TsFileResourceManager.getInstance().registerSealedTsFileResource(unseqResource1);
+
TsFileResourceManager.getInstance().registerSealedTsFileResource(unseqResource2);
+
TsFileResourceManager.getInstance().registerSealedTsFileResource(unseqResource3);
+
TsFileResourceManager.getInstance().registerSealedTsFileResource(unseqResource4);
+
tsFileManager.getOrCreateUnsequenceListByTimePartition(0).keepOrderInsert(unseqResource1);
+
tsFileManager.getOrCreateUnsequenceListByTimePartition(0).keepOrderInsert(unseqResource2);
+
tsFileManager.getOrCreateSequenceListByTimePartition(2808).keepOrderInsert(unseqResource3);
+
tsFileManager.getOrCreateSequenceListByTimePartition(2808).keepOrderInsert(unseqResource4);
+ // 2 insertion task + 1 inner task
+ Assert.assertEquals(3, dataRegion.executeCompaction());
+ } finally {
+
config.setInnerCompactionCandidateFileNum(innerCompactionCandidateFileNum);
+ }
+ }
+
@Test
public void testInsertionCompactionUpdateFileMetrics() throws IOException {
TsFileResource unseqResource1 =
@@ -606,9 +661,10 @@ public class InsertionCrossSpaceCompactionTest extends
AbstractCompactionTest {
}
public int executeInsertionCompaction() throws InterruptedException {
- return super.executeInsertionCompaction(
- new ArrayList<>(this.getTsFileManager().getTimePartitions()),
- new CompactionScheduleContext());
+ CompactionScheduleContext context = new CompactionScheduleContext();
+ super.executeInsertionCompaction(
+ new ArrayList<>(this.getTsFileManager().getTimePartitions()),
context);
+ return context.getSubmitCompactionTaskNum();
}
}
}