This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 77ce37f0940 Fix compaction device info cache (#13453)
77ce37f0940 is described below
commit 77ce37f0940af39dd1411e65cd1c71f9502c19b0
Author: shuwenwei <[email protected]>
AuthorDate: Tue Sep 10 10:18:53 2024 +0800
Fix compaction device info cache (#13453)
* fix device info cache
* simplify test
---
.../selector/utils/TsFileResourceCandidate.java | 3 +-
.../InsertionCrossSpaceCompactionSelectorTest.java | 76 ++++++++++++++++++++++
2 files changed, 78 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/utils/TsFileResourceCandidate.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/utils/TsFileResourceCandidate.java
index f01ed280ad5..2832109714d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/utils/TsFileResourceCandidate.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/utils/TsFileResourceCandidate.java
@@ -72,6 +72,7 @@ public class TsFileResourceCandidate {
}
private void prepareDeviceInfos() throws IOException {
+ boolean canCacheDeviceInfo = resource.getStatus() !=
TsFileResourceStatus.UNCLOSED;
if (deviceInfoMap == null && compactionScheduleContext != null) {
// get device info from cache
deviceInfoMap =
compactionScheduleContext.getResourceDeviceInfo(this.resource);
@@ -107,7 +108,7 @@ public class TsFileResourceCandidate {
}
}
hasDetailedDeviceInfo = true;
- if (compactionScheduleContext != null) {
+ if (compactionScheduleContext != null && canCacheDeviceInfo) {
compactionScheduleContext.addResourceDeviceTimeIndex(this.resource,
deviceInfoMap);
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionSelectorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionSelectorTest.java
index d8859780c07..e29968cd7bf 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionSelectorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionSelectorTest.java
@@ -25,12 +25,15 @@ import org.apache.iotdb.db.exception.MergeException;
import org.apache.iotdb.db.exception.StorageEngineException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InsertionCrossSpaceCompactionTask;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduler;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.impl.RewriteCrossSpaceCompactionSelector;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.utils.CrossSpaceCompactionCandidate;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.utils.InsertionCrossCompactionTaskResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDeviceTimeIndex;
+import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.WriteProcessException;
@@ -98,6 +101,79 @@ public class InsertionCrossSpaceCompactionSelectorTest
extends AbstractCompactio
Assert.assertEquals(seqResource2, result.nextSeqFile);
}
+ @Test
+ public void testInsertionCompactionWithCachedDeviceInfoAndUnclosedResource()
+ throws InterruptedException, IOException {
+ CompactionScheduleContext context = new CompactionScheduleContext();
+
+ IDeviceID d1 = IDeviceID.Factory.DEFAULT_FACTORY.create("root.testsg.d1");
+ IDeviceID d2 = IDeviceID.Factory.DEFAULT_FACTORY.create("root.testsg.d2");
+ IDeviceID d3 = IDeviceID.Factory.DEFAULT_FACTORY.create("root.testsg.d3");
+
+ TsFileResource seqResource1 = createTsFileResource("1-1-0-0.tsfile", true);
+ seqResource1.getTsFile().createNewFile();
+ seqResource1.updateStartTime(d1, 10);
+ seqResource1.updateEndTime(d1, 20);
+ seqResource1.serialize();
+
+ TsFileResource seqResource2 = createTsFileResource("3-3-0-0.tsfile", true);
+ seqResource2.getTsFile().createNewFile();
+ seqResource2.updateStartTime(d1, 40);
+ seqResource2.updateEndTime(d1, 50);
+ seqResource2.serialize();
+
+ // unclosed
+ TsFileResource seqResource3 = createTsFileResource("6-6-0-0.tsfile", true);
+ seqResource3.getTsFile().createNewFile();
+ seqResource3.setStatusForTest(TsFileResourceStatus.UNCLOSED);
+ seqResource3.updateStartTime(d1, 70);
+
+ seqResources.add(seqResource1);
+ seqResources.add(seqResource2);
+ seqResources.add(seqResource3);
+ TsFileResource unseqResource1 = createTsFileResource("5-5-1-0.tsfile",
false);
+ unseqResource1.getTsFile().createNewFile();
+ unseqResource1.updateStartTime(d1, 30);
+ unseqResource1.updateEndTime(d1, 35);
+ unseqResource1.updateStartTime(d3, 10);
+ unseqResource1.updateEndTime(d3, 20);
+ unseqResource1.serialize();
+ unseqResources.add(unseqResource1);
+
+ tsFileManager.addAll(seqResources, true);
+ tsFileManager.addAll(unseqResources, false);
+
+ Phaser phaser = new Phaser(1);
+ int submitTaskNum =
+ CompactionScheduler.scheduleInsertionCompaction(tsFileManager, 0,
phaser, context);
+ Assert.assertEquals(1, submitTaskNum);
+ // perform insertion compaction
+ phaser.awaitAdvanceInterruptibly(phaser.arrive());
+
+ // unclosed file has sealed
+ seqResource3.updateEndTime(d1, 80);
+ seqResource3.updateStartTime(d2, 10);
+ seqResource3.updateEndTime(d2, 20);
+ seqResource3.setStatusForTest(TsFileResourceStatus.NORMAL);
+
+ TsFileResource unseqResource2 = createTsFileResource("7-7-1-0.tsfile",
false);
+ unseqResource2.getTsFile().createNewFile();
+ unseqResource2.updateStartTime(d2, 10);
+ unseqResource2.updateEndTime(d2, 20);
+ unseqResource2.serialize();
+ tsFileManager.keepOrderInsert(unseqResource2, false);
+ // Should not select unseq resource2
+ // The unclosed resource should not be cached. Otherwise, the results here
will be incorrect.
+ // seq resource3: d1[70, 80] d2[10, 20]
+ // unseq resource2 d2[10, 20]
+
+ submitTaskNum =
+ CompactionScheduler.scheduleInsertionCompaction(tsFileManager, 0,
phaser, context);
+ Assert.assertEquals(0, submitTaskNum);
+ Assert.assertTrue(
+
TsFileResourceUtils.validateTsFileResourcesHasNoOverlap(tsFileManager.getTsFileList(true)));
+ }
+
@Test
public void testSimpleInsertionCompactionWithMultiUnseqFiles()
throws IOException, MergeException {