This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new dbdd736ab34 When the device is missing from the resource, this part of
the data may be lost during file repair (#14411)
dbdd736ab34 is described below
commit dbdd736ab34965fecf41fe45ad3313d92b865442
Author: shuwenwei <[email protected]>
AuthorDate: Sat Dec 14 14:00:36 2024 +0800
When the device is missing from the resource, this part of the data may be
lost during file repair (#14411)
---
.../impl/ReadPointCompactionPerformer.java | 6 ++-
.../RepairUnsortedFileCompactionPerformer.java | 13 +++++++
.../rescon/memory/TsFileResourceManager.java | 17 +++++++++
.../repair/RepairUnsortedFileCompactionTest.java | 44 ++++++++++++++++++++++
.../rescon/memory/ResourceManagerTest.java | 39 +++++++++++++++++++
5 files changed, 118 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java
index 98a6d0439cd..3d85c2152ea 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java
@@ -103,7 +103,7 @@ public class ReadPointCompactionPerformer
long queryId =
QueryResourceManager.getInstance().assignCompactionQueryId();
FragmentInstanceContext fragmentInstanceContext =
FragmentInstanceContext.createFragmentInstanceContextForCompaction(queryId);
- QueryDataSource queryDataSource = new QueryDataSource(seqFiles,
unseqFiles);
+ QueryDataSource queryDataSource = initQueryDataSource();
QueryResourceManager.getInstance()
.getQueryFileManager()
.addUsedFilesForQuery(queryId, queryDataSource);
@@ -142,6 +142,10 @@ public class ReadPointCompactionPerformer
}
}
+ protected QueryDataSource initQueryDataSource() {
+ return new QueryDataSource(seqFiles, unseqFiles);
+ }
+
@Override
public void setTargetFiles(List<TsFileResource> targetFiles) {
this.targetFiles = targetFiles;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/RepairUnsortedFileCompactionPerformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/RepairUnsortedFileCompactionPerformer.java
index e986cb49c56..3bdfefbdac2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/RepairUnsortedFileCompactionPerformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/RepairUnsortedFileCompactionPerformer.java
@@ -22,13 +22,16 @@ package
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performe
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.AbstractCompactionWriter;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.RepairUnsortedFileCompactionWriter;
+import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileRepairStatus;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDeviceTimeIndex;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
+import org.apache.iotdb.db.storageengine.rescon.memory.TsFileResourceManager;
import java.io.IOException;
import java.nio.file.Files;
+import java.util.Collections;
import java.util.List;
/** Used for fixing files which contains internal unsorted data */
@@ -51,12 +54,22 @@ public class RepairUnsortedFileCompactionPerformer extends
ReadPointCompactionPe
public void perform() throws Exception {
TsFileResource resource = !seqFiles.isEmpty() ? seqFiles.get(0) :
unseqFiles.get(0);
if (resource.getTsFileRepairStatus() ==
TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE) {
+ TsFileResourceManager.getInstance().forceDegradeTsFileResource(resource);
super.perform();
} else {
prepareTargetFile();
}
}
+ @Override
+ protected QueryDataSource initQueryDataSource() {
+ // Only a single file is involved here. Regardless of whether this file is
sequence
+ // or not, it is passed in as sequence file, which allows us to bypass
some logic in
+ // SeriesScanUtils that might fail.
+ return new QueryDataSource(
+ !seqFiles.isEmpty() ? seqFiles : unseqFiles, Collections.emptyList());
+ }
+
private void prepareTargetFile() throws IOException {
TsFileResource seqSourceFile = seqFiles.get(0);
TsFileResource targetFile = targetFiles.get(0);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TsFileResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TsFileResourceManager.java
index 7dae538f088..6fe0461404a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TsFileResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TsFileResourceManager.java
@@ -83,6 +83,23 @@ public class TsFileResourceManager {
}
}
+ public void forceDegradeTsFileResource(TsFileResource resource) {
+ if (TimeIndexLevel.valueOf(resource.getTimeIndexType()) ==
TimeIndexLevel.FILE_TIME_INDEX) {
+ return;
+ }
+ logger.debug("Force degrade tsfile resource {}", resource.getTsFilePath());
+ synchronized (this) {
+ if (!sealedTsFileResources.remove(resource)) {
+ resource.degradeTimeIndex();
+ return;
+ }
+ long memoryReduce = resource.degradeTimeIndex();
+ degradedTimeIndexNum++;
+ releaseTimeIndexMemCost(memoryReduce);
+ sealedTsFileResources.add(resource);
+ }
+ }
+
/** once degradation is triggered, the total memory for timeIndex should
reduce */
private void releaseTimeIndexMemCost(long memCost) {
totalTimeIndexMemCost -= memCost;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java
index 55ccea4d4e0..e088dc260b1 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java
@@ -39,6 +39,8 @@ import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFil
import
org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileRepairStatus;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDeviceTimeIndex;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
import org.apache.tsfile.exception.write.WriteProcessException;
@@ -797,4 +799,46 @@ public class RepairUnsortedFileCompactionTest extends
AbstractRepairDataTest {
Assert.assertEquals(3, chunkMetadataList.size());
}
}
+
+ @Test
+ public void testResourceFileLostDevices() throws IOException {
+ IDeviceID d1 = IDeviceID.Factory.DEFAULT_FACTORY.create("root.testsg.d1");
+ IDeviceID d2 = IDeviceID.Factory.DEFAULT_FACTORY.create("root.testsg.d2");
+ TsFileResource resource = createEmptyFileAndResource(true);
+ try (CompactionTestFileWriter writer = new
CompactionTestFileWriter(resource)) {
+ writer.startChunkGroup("d1");
+ writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
+ Arrays.asList("s0", "s1", "s2"),
+ new TimeRange[] {new TimeRange(10, 40)},
+ TSEncoding.PLAIN,
+ CompressionType.LZ4,
+ Arrays.asList(false, false, true));
+ writer.endChunkGroup();
+
+ writer.startChunkGroup("d2");
+ writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
+ Arrays.asList("s0", "s1", "s2"),
+ new TimeRange[] {new TimeRange(10, 40)},
+ TSEncoding.PLAIN,
+ CompressionType.LZ4,
+ Arrays.asList(false, false, true));
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+ // lost d1
+ ITimeIndex timeIndex = new ArrayDeviceTimeIndex();
+ timeIndex.updateStartTime(d2, 10);
+ timeIndex.updateEndTime(d2, 40);
+ resource.setTimeIndex(timeIndex);
+ resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_CHECK);
+
+ RepairUnsortedFileCompactionTask task =
+ new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, true,
0);
+ Assert.assertTrue(task.start());
+ TsFileResource target = tsFileManager.getTsFileList(false).get(0);
+ try (TsFileSequenceReader reader = new
TsFileSequenceReader(target.getTsFilePath())) {
+ List<IDeviceID> devicesInTargetFile = reader.getAllDevices();
+ Assert.assertEquals(Arrays.asList(d1, d2), devicesInTargetFile);
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/memory/ResourceManagerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/memory/ResourceManagerTest.java
index e891cbf97f9..53f399f5aa5 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/memory/ResourceManagerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/memory/ResourceManagerTest.java
@@ -30,6 +30,7 @@ import
org.apache.iotdb.db.storageengine.buffer.TimeSeriesMetadataCache;
import
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.TimeIndexLevel;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.constant.TestConstant;
@@ -47,6 +48,7 @@ import org.apache.tsfile.write.record.datapoint.DataPoint;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -370,4 +372,41 @@ public class ResourceManagerTest {
TimeIndexLevel.valueOf(tsFileResource.getTimeIndexType()));
}
}
+
+ @Test
+ public void testForceDegradeTimeIndex() {
+ File file =
+ new File(
+ TestConstant.BASE_OUTPUT_PATH.concat(
+ 1
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 1
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile"));
+ TsFileResource tsFileResource = new TsFileResource(file);
+ IDeviceID device1 =
IDeviceID.Factory.DEFAULT_FACTORY.create("root.test.d1");
+ IDeviceID device2 =
IDeviceID.Factory.DEFAULT_FACTORY.create("root.test.d2");
+ tsFileResource.updateStartTime(device1, 1);
+ tsFileResource.updateStartTime(device1, 2);
+ tsFileResource.updateStartTime(device2, 1);
+ tsFileResource.updateStartTime(device2, 2);
+
+ assertEquals(
+ TimeIndexLevel.ARRAY_DEVICE_TIME_INDEX,
+ TimeIndexLevel.valueOf(tsFileResource.getTimeIndexType()));
+ tsFileResourceManager.registerSealedTsFileResource(tsFileResource);
+ long resourceRamSizeBeforeDegrade = tsFileResource.calculateRamSize();
+ Assert.assertEquals(
+ resourceRamSizeBeforeDegrade,
tsFileResourceManager.getTotalTimeIndexMemCost());
+
TsFileResourceManager.getInstance().forceDegradeTsFileResource(tsFileResource);
+ Assert.assertEquals(
+ tsFileResource.calculateRamSize(),
tsFileResourceManager.getTotalTimeIndexMemCost());
+ Assert.assertTrue(
+ resourceRamSizeBeforeDegrade >
tsFileResourceManager.getTotalTimeIndexMemCost());
+ Assert.assertTrue(tsFileResource.getTimeIndexType() ==
ITimeIndex.FILE_TIME_INDEX_TYPE);
+ Assert.assertEquals(1, tsFileResourceManager.getDegradedTimeIndexNum());
+ }
}