This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.13 by this push:
new 9774abe366 [IOTDB-3775][IOTDB-3776]Avoid serializing resource file and
adding mods to file that don't contain the device when deleting data (#6719)
9774abe366 is described below
commit 9774abe3666d486c09c4595788ef8e6790437584
Author: 周沛辰 <[email protected]>
AuthorDate: Wed Jul 20 09:58:06 2022 +0800
[IOTDB-3775][IOTDB-3776]Avoid serializing resource file and adding mods to
file that don't contain the device when deleting data (#6719)
Co-authored-by: 沛辰周 <[email protected]>
---
.../db/engine/storagegroup/TsFileProcessor.java | 5 ++
.../db/engine/storagegroup/TsFileResource.java | 24 +++----
.../storagegroup/VirtualStorageGroupProcessor.java | 19 +++---
.../cross/RewriteCrossSpaceCompactionTest.java | 76 ++++++++++++++--------
.../storagegroup/StorageGroupProcessorTest.java | 76 ++++++++++++++++++++++
5 files changed, 153 insertions(+), 47 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 4a9f9043a0..4bc9ab6ce4 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -1448,4 +1448,9 @@ public class TsFileProcessor {
public IMemTable getWorkMemTable() {
return workMemTable;
}
+
+ @TestOnly
+ public ConcurrentLinkedDeque<IMemTable> getFlushingMemTable() {
+ return flushingMemTables;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 14f21df950..e640c8df98 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -870,17 +870,19 @@ public class TsFileResource {
if (planIndex == Long.MIN_VALUE || planIndex == Long.MAX_VALUE) {
return;
}
- maxPlanIndex = Math.max(maxPlanIndex, planIndex);
- minPlanIndex = Math.min(minPlanIndex, planIndex);
- if (isClosed()) {
- try {
- serialize();
- } catch (IOException e) {
- LOGGER.error(
- "Cannot serialize TsFileResource {} when updating plan index
{}-{}",
- this,
- maxPlanIndex,
- planIndex);
+ if (planIndex < minPlanIndex || planIndex > maxPlanIndex) {
+ maxPlanIndex = Math.max(maxPlanIndex, planIndex);
+ minPlanIndex = Math.min(minPlanIndex, planIndex);
+ if (isClosed()) {
+ try {
+ serialize();
+ } catch (IOException e) {
+ LOGGER.error(
+ "Cannot serialize TsFileResource {} when updating plan index
{}-{}",
+ this,
+ maxPlanIndex,
+ planIndex);
+ }
}
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
index 38974db73e..db39f15ce0 100755
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
@@ -1932,15 +1932,20 @@ public class VirtualStorageGroupProcessor {
logicalStorageGroupName, tsFileResource.getTimePartition())) {
return true;
}
+ if (!tsFileResource.isClosed()) {
+ // tsfile is not closed
+ return false;
+ }
for (PartialPath device : devicePaths) {
String deviceId = device.getFullPath();
- long endTime = tsFileResource.getEndTime(deviceId);
- if (endTime == Long.MIN_VALUE) {
- return false;
+ if (!tsFileResource.mayContainsDevice(deviceId)) {
+ // resource does not contain this device
+ continue;
}
- if (tsFileResource.isDeviceIdExist(deviceId)
- && (deleteEnd >= tsFileResource.getStartTime(deviceId) &&
deleteStart <= endTime)) {
+ if (deleteEnd >= tsFileResource.getStartTime(deviceId)
+ && deleteStart <= tsFileResource.getEndTime(deviceId)) {
+ // time range of device has overlap with the deletion
return false;
}
}
@@ -1976,7 +1981,7 @@ public class VirtualStorageGroupProcessor {
// remember to close mod file
tsFileResource.getCompactionModFile().close();
tsFileResource.getModFile().close();
- } else {
+ } else if (tsFileResource.isClosed()) {
deletion.setFileOffset(tsFileResource.getTsFileSize());
// write deletion into modification file
tsFileResource.getModFile().write(deletion);
@@ -1990,8 +1995,6 @@ public class VirtualStorageGroupProcessor {
deletion.getEndTime(),
tsFileResource.getModFile().getFilePath());
- tsFileResource.updatePlanIndexes(planIndex);
-
// delete data in memory of unsealed file
if (!tsFileResource.isClosed()) {
TsFileProcessor tsfileProcessor = tsFileResource.getProcessor();
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java
index ecb07b00a8..040134b4d4 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java
@@ -630,21 +630,25 @@ public class RewriteCrossSpaceCompactionTest extends
AbstractCompactionTest {
for (int i = 0; i < seqResources.size(); i++) {
TsFileResource resource = seqResources.get(i);
resource.resetModFile();
- Assert.assertTrue(resource.getCompactionModFile().exists());
- Assert.assertEquals(1,
resource.getCompactionModFile().getModifications().size());
- Assert.assertTrue(resource.getModFile().exists());
- if (i == 3) {
- Assert.assertEquals(1,
resource.getModFile().getModifications().size());
- } else {
+ if (i < 2) {
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ Assert.assertFalse(resource.getModFile().exists());
+ } else if (i == 2) {
+ Assert.assertTrue(resource.getCompactionModFile().exists());
+ Assert.assertTrue(resource.getModFile().exists());
Assert.assertEquals(2,
resource.getModFile().getModifications().size());
+ Assert.assertEquals(1,
resource.getCompactionModFile().getModifications().size());
+ } else {
+ Assert.assertTrue(resource.getCompactionModFile().exists());
+ Assert.assertTrue(resource.getModFile().exists());
+ Assert.assertEquals(1,
resource.getModFile().getModifications().size());
+ Assert.assertEquals(1,
resource.getCompactionModFile().getModifications().size());
}
}
for (TsFileResource resource : unseqResources) {
resource.resetModFile();
- Assert.assertTrue(resource.getCompactionModFile().exists());
- Assert.assertEquals(1,
resource.getCompactionModFile().getModifications().size());
- Assert.assertTrue(resource.getModFile().exists());
- Assert.assertEquals(2, resource.getModFile().getModifications().size());
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ Assert.assertFalse(resource.getModFile().exists());
}
rewriteCrossSpaceCompactionTask.call();
for (TsFileResource resource : seqResources) {
@@ -657,13 +661,19 @@ public class RewriteCrossSpaceCompactionTest extends
AbstractCompactionTest {
Assert.assertFalse(resource.getModFile().exists());
Assert.assertFalse(resource.getCompactionModFile().exists());
}
- for (TsFileResource seqResource : seqResources) {
+ for (int i = 0; i < seqResources.size(); i++) {
+ TsFileResource seqResource = seqResources.get(i);
TsFileResource resource =
new TsFileResource(
TsFileNameGenerator.increaseCrossCompactionCnt(seqResource.getTsFile()));
- Assert.assertTrue(resource.getModFile().exists());
- Assert.assertEquals(6, resource.getModFile().getModifications().size());
- Assert.assertFalse(resource.getCompactionModFile().exists());
+ if (i < 2) {
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ Assert.assertFalse(resource.getModFile().exists());
+ } else {
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ Assert.assertTrue(resource.getModFile().exists());
+ Assert.assertEquals(1,
resource.getModFile().getModifications().size());
+ }
}
}
@@ -752,21 +762,25 @@ public class RewriteCrossSpaceCompactionTest extends
AbstractCompactionTest {
for (int i = 0; i < seqResources.size(); i++) {
TsFileResource resource = seqResources.get(i);
resource.resetModFile();
- Assert.assertTrue(resource.getCompactionModFile().exists());
- Assert.assertEquals(2,
resource.getCompactionModFile().getModifications().size());
- Assert.assertTrue(resource.getModFile().exists());
- if (i == 3) {
- Assert.assertEquals(2,
resource.getModFile().getModifications().size());
- } else {
+ if (i < 2) {
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ Assert.assertFalse(resource.getModFile().exists());
+ } else if (i == 2) {
+ Assert.assertTrue(resource.getCompactionModFile().exists());
+ Assert.assertTrue(resource.getModFile().exists());
Assert.assertEquals(3,
resource.getModFile().getModifications().size());
+ Assert.assertEquals(2,
resource.getCompactionModFile().getModifications().size());
+ } else {
+ Assert.assertTrue(resource.getCompactionModFile().exists());
+ Assert.assertTrue(resource.getModFile().exists());
+ Assert.assertEquals(2,
resource.getModFile().getModifications().size());
+ Assert.assertEquals(2,
resource.getCompactionModFile().getModifications().size());
}
}
for (TsFileResource resource : unseqResources) {
resource.resetModFile();
- Assert.assertTrue(resource.getCompactionModFile().exists());
- Assert.assertEquals(2,
resource.getCompactionModFile().getModifications().size());
- Assert.assertTrue(resource.getModFile().exists());
- Assert.assertEquals(3, resource.getModFile().getModifications().size());
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ Assert.assertFalse(resource.getModFile().exists());
}
rewriteCrossSpaceCompactionTask.call();
for (TsFileResource resource : seqResources) {
@@ -779,13 +793,19 @@ public class RewriteCrossSpaceCompactionTest extends
AbstractCompactionTest {
Assert.assertFalse(resource.getModFile().exists());
Assert.assertFalse(resource.getCompactionModFile().exists());
}
- for (TsFileResource seqResource : seqResources) {
+ for (int i = 0; i < seqResources.size(); i++) {
+ TsFileResource seqResource = seqResources.get(i);
TsFileResource resource =
new TsFileResource(
TsFileNameGenerator.increaseCrossCompactionCnt(seqResource.getTsFile()));
- Assert.assertTrue(resource.getModFile().exists());
- Assert.assertEquals(12, resource.getModFile().getModifications().size());
- Assert.assertFalse(resource.getCompactionModFile().exists());
+ if (i < 2) {
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ Assert.assertFalse(resource.getModFile().exists());
+ } else {
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ Assert.assertTrue(resource.getModFile().exists());
+ Assert.assertEquals(2,
resource.getModFile().getModifications().size());
+ }
}
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 7249cf2cb3..75af3139a8 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -871,6 +871,82 @@ public class StorageGroupProcessorTest {
config.setCloseTsFileIntervalAfterFlushing(prevCloseTsFileInterval);
}
+ /**
+ * Totally 5 tsfiles<br>
+ * file 0, file 2 and file 4 has d0 ~ d1, time range is 0 ~ 99, 200 ~ 299,
400 ~ 499<br>
+ * file 1, file 3 has d0 ~ d2, time range is 100 ~ 199, 300 ~ 399<br>
+ * delete d2 in time range 50 ~ 150 and 150 ~ 450. Therefore, only file 1
and file 3 has mods.
+ */
+ @Test
+ public void testDeleteDataNotInFile()
+ throws IllegalPathException, WriteProcessException,
TriggerExecutionException,
+ InterruptedException, IOException {
+ for (int i = 0; i < 5; i++) {
+ if (i % 2 == 0) {
+ for (int d = 0; d < 2; d++) {
+ for (int count = i * 100; count < i * 100 + 100; count++) {
+ TSRecord record = new TSRecord(count, "root.vehicle.d" + d);
+ record.addTuple(
+ DataPoint.getDataPoint(TSDataType.INT32, measurementId,
String.valueOf(count)));
+ processor.insert(new InsertRowPlan(record));
+ }
+ }
+ } else {
+ for (int d = 0; d < 3; d++) {
+ for (int count = i * 100; count < i * 100 + 100; count++) {
+ TSRecord record = new TSRecord(count, "root.vehicle.d" + d);
+ record.addTuple(
+ DataPoint.getDataPoint(TSDataType.INT32, measurementId,
String.valueOf(count)));
+ processor.insert(new InsertRowPlan(record));
+ }
+ }
+ }
+ processor.syncCloseAllWorkingTsFileProcessors();
+ }
+
+ // delete root.vehicle.d2.s0 data in the second file
+ processor.delete(new PartialPath("root.vehicle.d2.s0"), 50, 150, 0, null);
+
+ // delete root.vehicle.d2.s0 data in the third file
+ processor.delete(new PartialPath("root.vehicle.d2.s0"), 150, 450, 0, null);
+
+ for (int i = 0; i <
processor.getTsFileResourceManager().getTsFileList(true).size(); i++) {
+ TsFileResource resource =
processor.getTsFileResourceManager().getTsFileList(true).get(i);
+ if (i == 1) {
+ Assert.assertTrue(resource.getModFile().exists());
+ Assert.assertEquals(2,
resource.getModFile().getModifications().size());
+ } else if (i == 3) {
+ Assert.assertTrue(resource.getModFile().exists());
+ Assert.assertEquals(1,
resource.getModFile().getModifications().size());
+ } else {
+ Assert.assertFalse(resource.getModFile().exists());
+ }
+ }
+ }
+
+ @Test
+ public void testDeleteDataInFlushingMemtable()
+ throws IllegalPathException, WriteProcessException,
TriggerExecutionException, IOException {
+ for (int j = 0; j < 100; j++) {
+ TSRecord record = new TSRecord(j, deviceId);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId,
String.valueOf(j)));
+ processor.insert(new InsertRowPlan(record));
+ }
+ TsFileResource tsFileResource =
processor.getTsFileResourceManager().getTsFileList(true).get(0);
+ TsFileProcessor tsFileProcessor = tsFileResource.getProcessor();
+
tsFileProcessor.getFlushingMemTable().addLast(tsFileProcessor.getWorkMemTable());
+
+ // delete data which is in memtable
+ processor.delete(new PartialPath("root.vehicle.d2.s0"), 50, 70, 0, null);
+
+ // delete data which is not in memtable
+ processor.delete(new PartialPath("root.vehicle.d200.s0"), 50, 70, 0, null);
+
+ processor.syncCloseAllWorkingTsFileProcessors();
+ Assert.assertTrue(tsFileResource.getModFile().exists());
+ Assert.assertEquals(2,
tsFileResource.getModFile().getModifications().size());
+ }
+
class DummySGP extends VirtualStorageGroupProcessor {
DummySGP(String systemInfoDir, String storageGroupName) throws
StorageGroupProcessorException {