This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f22e3791694 Fix overlap between tsfile is not correctly marked (#12748)
f22e3791694 is described below
commit f22e379169425ec8c022e82220084bb02475ea40
Author: shuwenwei <[email protected]>
AuthorDate: Wed Jun 19 19:44:08 2024 +0800
Fix overlap between tsfile is not correctly marked (#12748)
* file fileOverlap is not correctly marked
* fix repair compaction may generate too large chunk
---
.../CompactionValidationFailedException.java | 14 ++++++------
.../execute/task/AbstractCompactionTask.java | 17 ++++++++-------
.../writer/RepairUnsortedFileCompactionWriter.java | 18 +++++++---------
.../repair/RepairUnsortedFileCompactionTest.java | 25 ++++++++++++++++++++++
4 files changed, 50 insertions(+), 24 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionValidationFailedException.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionValidationFailedException.java
index 86b9eb4219c..b60dd6845fe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionValidationFailedException.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionValidationFailedException.java
@@ -21,22 +21,24 @@ package
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exceptio
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import java.util.List;
+
public class CompactionValidationFailedException extends RuntimeException {
- private TsFileResource overlappedTsFileResource = null;
+ private List<TsFileResource> overlappedTsFileResources = null;
public CompactionValidationFailedException(String msg) {
super(msg);
}
- public CompactionValidationFailedException(TsFileResource
overlappedTsFileResource) {
+ public CompactionValidationFailedException(List<TsFileResource>
overlappedTsFileResources) {
super(
"Failed to pass compaction validation, sequence files has overlap,
file is "
- + overlappedTsFileResource);
- this.overlappedTsFileResource = overlappedTsFileResource;
+ + overlappedTsFileResources);
+ this.overlappedTsFileResources = overlappedTsFileResources;
}
- public TsFileResource getOverlappedTsFileResource() {
- return overlappedTsFileResource;
+ public List<TsFileResource> getOverlappedTsFileResources() {
+ return overlappedTsFileResources;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
index 0a1152044aa..75556001fd6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
@@ -33,6 +33,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionTaskStage;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.TsFileIdentifier;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.repair.RepairDataFileScanUtil;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
@@ -149,10 +150,10 @@ public abstract class AbstractCompactionTask {
} else {
CompactionValidationFailedException validationException =
(CompactionValidationFailedException) e;
- TsFileResource overlappedTsFileResource =
validationException.getOverlappedTsFileResource();
- if (overlappedTsFileResource != null) {
- unsortedTsFileResources.add(overlappedTsFileResource);
- }
+ List<TsFileResource> overlappedTsFileResource =
+ validationException.getOverlappedTsFileResources();
+ unsortedTsFileResources =
+ overlappedTsFileResource == null ? unsortedTsFileResources :
overlappedTsFileResource;
}
// these exceptions generally caused by unsorted data, mark all source
files as NEED_TO_REPAIR
for (TsFileResource resource : unsortedTsFileResources) {
@@ -472,15 +473,15 @@ public abstract class AbstractCompactionTask {
Long.parseLong(f2.getTsFile().getName().split("-")[1]))
: timeDiff;
});
- if (!validator.validateTsFilesIsHasNoOverlap(timePartitionSeqFiles)) {
+ List<TsFileResource> overlapFilesInTimePartition =
+
RepairDataFileScanUtil.checkTimePartitionHasOverlap(timePartitionSeqFiles);
+ if (!overlapFilesInTimePartition.isEmpty()) {
LOGGER.error(
"Failed to pass compaction validation, source seq files: {},
source unseq files: {}, target files: {}",
sourceSeqFiles,
sourceUnseqFiles,
targetFiles);
- throw new CompactionValidationFailedException(
- "Failed to pass compaction validation, sequence files has overlap,
time partition id is "
- + timePartition);
+ throw new
CompactionValidationFailedException(overlapFilesInTimePartition);
}
}
if (needToValidateTsFileCorrectness &&
!validator.validateTsFiles(validTargetFiles)) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/RepairUnsortedFileCompactionWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/RepairUnsortedFileCompactionWriter.java
index 5dce41978c6..3c9c25de626 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/RepairUnsortedFileCompactionWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/RepairUnsortedFileCompactionWriter.java
@@ -63,24 +63,22 @@ public class RepairUnsortedFileCompactionWriter extends
ReadPointInnerCompaction
mergeTimeValuePair(timeValuePair, previousTimeValuePair);
continue;
}
- writeDataPoint(
- previousTimeValuePair.getTimestamp(),
- previousTimeValuePair.getValue(),
- chunkWriters[subTaskId]);
- chunkPointNumArray[subTaskId]++;
+ writeToChunkWriter(previousTimeValuePair, subTaskId);
previousTimeValuePair = timeValuePair;
}
// write last time value pair
- writeDataPoint(
- previousTimeValuePair.getTimestamp(),
- previousTimeValuePair.getValue(),
- chunkWriters[subTaskId]);
- chunkPointNumArray[subTaskId]++;
+ writeToChunkWriter(previousTimeValuePair, subTaskId);
dataOfCurrentSeriesArr[subTaskId] = null;
super.endMeasurement(subTaskId);
}
+ private void writeToChunkWriter(TimeValuePair timeValuePair, int subTaskId)
throws IOException {
+ writeDataPoint(timeValuePair.getTimestamp(), timeValuePair.getValue(),
chunkWriters[subTaskId]);
+ chunkPointNumArray[subTaskId]++;
+ checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId],
subTaskId);
+ }
+
private void mergeTimeValuePair(TimeValuePair from, TimeValuePair to) {
if (!isAlign) {
// ignore not aligned TimeValuePair with same timestamp
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java
index 0b2ce0652ac..08add998ccf 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java
@@ -995,4 +995,29 @@ public class RepairUnsortedFileCompactionTest extends
AbstractRepairDataTest {
}
}
}
+
+ @Test
+ public void testSplitChunk() throws IOException {
+ TsFileResource resource = createEmptyFileAndResource(true);
+ try (CompactionTestFileWriter writer = new
CompactionTestFileWriter(resource)) {
+ writer.startChunkGroup("d1");
+ writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
+ Arrays.asList("s1", "s2", "s3"),
+ new TimeRange[][] {new TimeRange[] {new TimeRange(100000, 300000)}},
+ TSEncoding.PLAIN,
+ CompressionType.LZ4,
+ Arrays.asList(true, false, false));
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+ RepairUnsortedFileCompactionTask task =
+ new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, true,
true, 0);
+ Assert.assertTrue(task.start());
+ TsFileResource target = tsFileManager.getTsFileList(false).get(0);
+ try (TsFileSequenceReader reader = new
TsFileSequenceReader(target.getTsFilePath())) {
+ List<AlignedChunkMetadata> chunkMetadataList =
+ reader.getAlignedChunkMetadata(new PlainDeviceID("root.testsg.d1"));
+ Assert.assertEquals(3, chunkMetadataList.size());
+ }
+ }
}