This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new e9d6173688e [to dev/1.3] Fix repair performerWhen the device is
missing from the resource, this part of the data may be lost during file repair
(#14422)
e9d6173688e is described below
commit e9d6173688eb45cd66b97f383b6db8a231d5b51e
Author: shuwenwei <[email protected]>
AuthorDate: Tue Dec 17 08:10:44 2024 +0800
[to dev/1.3] Fix repair performerWhen the device is missing from the
resource, this part of the data may be lost during file repair (#14422)
* When the device is missing from the resource, this part of the data may
be lost during file repair
* fix compile
* fix compile
---
.../impl/ReadPointCompactionPerformer.java | 6 ++-
.../RepairUnsortedFileCompactionPerformer.java | 13 +++++++
.../rescon/memory/TsFileResourceManager.java | 17 ++++++++
.../repair/RepairUnsortedFileCompactionTest.java | 45 ++++++++++++++++++++++
.../rescon/memory/ResourceManagerTest.java | 40 +++++++++++++++++++
5 files changed, 120 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java
index 5efb50d7b53..064cdecf841 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java
@@ -103,7 +103,7 @@ public class ReadPointCompactionPerformer
long queryId =
QueryResourceManager.getInstance().assignCompactionQueryId();
FragmentInstanceContext fragmentInstanceContext =
FragmentInstanceContext.createFragmentInstanceContextForCompaction(queryId);
- QueryDataSource queryDataSource = new QueryDataSource(seqFiles,
unseqFiles);
+ QueryDataSource queryDataSource = initQueryDataSource();
QueryResourceManager.getInstance()
.getQueryFileManager()
.addUsedFilesForQuery(queryId, queryDataSource);
@@ -138,6 +138,10 @@ public class ReadPointCompactionPerformer
}
}
+ protected QueryDataSource initQueryDataSource() {
+ return new QueryDataSource(seqFiles, unseqFiles);
+ }
+
@Override
public void setTargetFiles(List<TsFileResource> targetFiles) {
this.targetFiles = targetFiles;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/RepairUnsortedFileCompactionPerformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/RepairUnsortedFileCompactionPerformer.java
index 79a3b9fb157..18e9ef24df2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/RepairUnsortedFileCompactionPerformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/RepairUnsortedFileCompactionPerformer.java
@@ -22,14 +22,17 @@ package
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performe
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.AbstractCompactionWriter;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.RepairUnsortedFileCompactionWriter;
+import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileRepairStatus;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
+import org.apache.iotdb.db.storageengine.rescon.memory.TsFileResourceManager;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
+import java.util.Collections;
import java.util.List;
/** Used for fixing files which contains internal unsorted data */
@@ -52,12 +55,22 @@ public class RepairUnsortedFileCompactionPerformer extends
ReadPointCompactionPe
public void perform() throws Exception {
TsFileResource resource = !seqFiles.isEmpty() ? seqFiles.get(0) :
unseqFiles.get(0);
if (resource.getTsFileRepairStatus() ==
TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE) {
+ TsFileResourceManager.getInstance().forceDegradeTsFileResource(resource);
super.perform();
} else {
prepareTargetFile();
}
}
+ @Override
+ protected QueryDataSource initQueryDataSource() {
+ // Only a single file is involved here. Regardless of whether this file is
sequence
+ // or not, it is passed in as sequence file, which allows us to bypass
some logic in
+ // SeriesScanUtils that might fail.
+ return new QueryDataSource(
+ !seqFiles.isEmpty() ? seqFiles : unseqFiles, Collections.emptyList());
+ }
+
private void prepareTargetFile() throws IOException {
TsFileResource seqSourceFile = seqFiles.get(0);
TsFileResource targetFile = targetFiles.get(0);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TsFileResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TsFileResourceManager.java
index 9da89447c72..b0d26b14913 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TsFileResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TsFileResourceManager.java
@@ -82,6 +82,23 @@ public class TsFileResourceManager {
}
}
+ public void forceDegradeTsFileResource(TsFileResource resource) {
+ if (TimeIndexLevel.valueOf(resource.getTimeIndexType()) ==
TimeIndexLevel.FILE_TIME_INDEX) {
+ return;
+ }
+ logger.debug("Force degrade tsfile resource {}", resource.getTsFilePath());
+ synchronized (this) {
+ if (!sealedTsFileResources.remove(resource)) {
+ resource.degradeTimeIndex();
+ return;
+ }
+ long memoryReduce = resource.degradeTimeIndex();
+ degradedTimeIndexNum++;
+ releaseTimeIndexMemCost(memoryReduce);
+ sealedTsFileResources.add(resource);
+ }
+ }
+
/** once degradation is triggered, the total memory for timeIndex should
reduce */
private void releaseTimeIndexMemCost(long memCost) {
totalTimeIndexMemCost -= memCost;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java
index 3122aaed50a..28f862f9371 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java
@@ -38,12 +38,15 @@ import
org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileRepairStatus;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
import org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.tsfile.file.metadata.ChunkMetadata;
import org.apache.tsfile.file.metadata.IChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
@@ -796,4 +799,46 @@ public class RepairUnsortedFileCompactionTest extends
AbstractRepairDataTest {
Assert.assertEquals(3, chunkMetadataList.size());
}
}
+
+ @Test
+ public void testResourceFileLostDevices() throws IOException {
+ IDeviceID d1 = new PlainDeviceID("root.testsg.d1");
+ IDeviceID d2 = new PlainDeviceID("root.testsg.d2");
+ TsFileResource resource = createEmptyFileAndResource(true);
+ try (CompactionTestFileWriter writer = new
CompactionTestFileWriter(resource)) {
+ writer.startChunkGroup("d1");
+ writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
+ Arrays.asList("s0", "s1", "s2"),
+ new TimeRange[] {new TimeRange(10, 40)},
+ TSEncoding.PLAIN,
+ CompressionType.LZ4,
+ Arrays.asList(false, false, true));
+ writer.endChunkGroup();
+
+ writer.startChunkGroup("d2");
+ writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
+ Arrays.asList("s0", "s1", "s2"),
+ new TimeRange[] {new TimeRange(10, 40)},
+ TSEncoding.PLAIN,
+ CompressionType.LZ4,
+ Arrays.asList(false, false, true));
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+ // lost d1
+ ITimeIndex timeIndex = new DeviceTimeIndex();
+ timeIndex.updateStartTime(d2, 10);
+ timeIndex.updateEndTime(d2, 40);
+ resource.setTimeIndex(timeIndex);
+ resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_CHECK);
+
+ RepairUnsortedFileCompactionTask task =
+ new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, true,
0);
+ Assert.assertTrue(task.start());
+ TsFileResource target = tsFileManager.getTsFileList(false).get(0);
+ try (TsFileSequenceReader reader = new
TsFileSequenceReader(target.getTsFilePath())) {
+ List<IDeviceID> devicesInTargetFile = reader.getAllDevices();
+ Assert.assertEquals(Arrays.asList(d1, d2), devicesInTargetFile);
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/memory/ResourceManagerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/memory/ResourceManagerTest.java
index efa285f7082..7bb27d1caee 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/memory/ResourceManagerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/memory/ResourceManagerTest.java
@@ -29,12 +29,14 @@ import
org.apache.iotdb.db.storageengine.buffer.TimeSeriesMetadataCache;
import
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.TimeIndexLevel;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.constant.TestConstant;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
@@ -45,6 +47,7 @@ import org.apache.tsfile.write.record.TSRecord;
import org.apache.tsfile.write.record.datapoint.DataPoint;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -368,4 +371,41 @@ public class ResourceManagerTest {
TimeIndexLevel.valueOf(tsFileResource.getTimeIndexType()));
}
}
+
+ @Test
+ public void testForceDegradeTimeIndex() {
+ File file =
+ new File(
+ TestConstant.BASE_OUTPUT_PATH.concat(
+ 1
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 1
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + IoTDBConstant.FILE_NAME_SEPARATOR
+ + 0
+ + ".tsfile"));
+ TsFileResource tsFileResource = new TsFileResource(file);
+ IDeviceID device1 = new PlainDeviceID("root.test.d1");
+ IDeviceID device2 = new PlainDeviceID("root.test.d2");
+ tsFileResource.updateStartTime(device1, 1);
+ tsFileResource.updateStartTime(device1, 2);
+ tsFileResource.updateStartTime(device2, 1);
+ tsFileResource.updateStartTime(device2, 2);
+
+ assertEquals(
+ TimeIndexLevel.DEVICE_TIME_INDEX,
+ TimeIndexLevel.valueOf(tsFileResource.getTimeIndexType()));
+ tsFileResourceManager.registerSealedTsFileResource(tsFileResource);
+ long resourceRamSizeBeforeDegrade = tsFileResource.calculateRamSize();
+ Assert.assertEquals(
+ resourceRamSizeBeforeDegrade,
tsFileResourceManager.getTotalTimeIndexMemCost());
+
TsFileResourceManager.getInstance().forceDegradeTsFileResource(tsFileResource);
+ Assert.assertEquals(
+ tsFileResource.calculateRamSize(),
tsFileResourceManager.getTotalTimeIndexMemCost());
+ Assert.assertTrue(
+ resourceRamSizeBeforeDegrade >
tsFileResourceManager.getTotalTimeIndexMemCost());
+ Assert.assertTrue(tsFileResource.getTimeIndexType() ==
ITimeIndex.FILE_TIME_INDEX_TYPE);
+ Assert.assertEquals(1, tsFileResourceManager.getDegradedTimeIndexNum());
+ }
}