This is an automated email from the ASF dual-hosted git repository. ejttianyu pushed a commit to branch fix_compaction_loss_data in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 31e4257c93040856fc0fb034381db8e1320f594a Author: EJTTianyu <[email protected]> AuthorDate: Fri Jun 11 01:19:49 2021 +0800 fix bug and add UT --- .../engine/compaction/utils/CompactionUtils.java | 5 ++- .../compaction/LevelCompactionMergeTest.java | 50 ++++++++++++++++++++++ .../db/engine/compaction/LevelCompactionTest.java | 43 +++++++++++++++++++ 3 files changed, 97 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java index 822b149..e5ea202 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java @@ -318,7 +318,10 @@ public class CompactionUtils { // get all sensor used later allSensors.addAll(sensorChunkMetadataListMap.keySet()); } - + // if there is no more chunkMetaData, merge all the sensors + if (!hasNextChunkMetadataList(chunkMetadataListIteratorCache.values())) { + lastSensor = Collections.max(allSensors); + } for (String sensor : allSensors) { if (sensor.compareTo(lastSensor) <= 0) { Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataListMap = diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java index f687957..6656993 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java @@ -66,6 +66,56 @@ public class LevelCompactionMergeTest extends LevelCompactionTest { FileUtils.deleteDirectory(tempSGDir); } + @Test + public void testCompactionDiffTimeSeries() + throws IOException, WriteProcessException, IllegalPathException { + int prevSeqLevelFileNum = IoTDBDescriptor.getInstance().getConfig().getSeqFileNumInEachLevel(); + int prevSeqLevelNum = IoTDBDescriptor.getInstance().getConfig().getSeqLevelNum(); + IoTDBDescriptor.getInstance().getConfig().setSeqFileNumInEachLevel(2); + IoTDBDescriptor.getInstance().getConfig().setSeqLevelNum(2); + List<TsFileResource> compactionFiles = prepareTsFileResources(); + LevelCompactionTsFileManagement levelCompactionTsFileManagement = new LevelCompactionTsFileManagement( + COMPACTION_TEST_SG, tempSGDir.getPath()); + levelCompactionTsFileManagement.addAll(compactionFiles, true); + QueryContext context = new QueryContext(); + PartialPath path = new PartialPath( + deviceIds[0] + TsFileConstant.PATH_SEPARATOR + measurementSchemas[1].getMeasurementId()); + IBatchReader tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[1].getType(), + context, + levelCompactionTsFileManagement.getTsFileList(true), new ArrayList<>(), null, null, true); + int count = 0; + while (tsFilesReader.hasNextBatch()) { + BatchData batchData = tsFilesReader.nextBatch(); + for (int i = 0; i < batchData.length(); i++) { + count++; + } + } + assertEquals(count, 1); + + levelCompactionTsFileManagement.forkCurrentFileList(0); + CompactionOnePartitionUtil compactionOnePartitionUtil = levelCompactionTsFileManagement.new CompactionOnePartitionUtil( + this::closeCompactionMergeCallBack, 0); + compactionMergeWorking = true; + compactionOnePartitionUtil.run(); + while (compactionMergeWorking) { + //wait + } + context = new QueryContext(); + tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[1].getType(), + context, + levelCompactionTsFileManagement.getTsFileList(true), new ArrayList<>(), null, null, true); + count = 0; + while (tsFilesReader.hasNextBatch()) { + BatchData batchData = tsFilesReader.nextBatch(); + for (int i = 0; i < batchData.length(); i++) { + count++; + } + } + assertEquals(count, 1); + IoTDBDescriptor.getInstance().getConfig().setSeqFileNumInEachLevel(prevSeqLevelFileNum); + IoTDBDescriptor.getInstance().getConfig().setSeqLevelNum(prevSeqLevelNum); + } + /** * just compaction once */ diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java index dc71ea4..dfa69d9 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTest.java @@ -204,4 +204,47 @@ abstract class LevelCompactionTest { fileWriter.close(); } + List<TsFileResource> prepareTsFileResources() throws IOException, WriteProcessException { + List<TsFileResource> ret = new ArrayList<>(); + // prepare file 1 + File file1 = new File( + TestConstant.BASE_OUTPUT_PATH.concat( + System.nanoTime() + IoTDBConstant.FILE_NAME_SEPARATOR + 0 + + IoTDBConstant.FILE_NAME_SEPARATOR + 0 + + ".tsfile")); + TsFileResource tsFileResource1 = new TsFileResource(file1); + tsFileResource1.setClosed(true); + tsFileResource1.updatePlanIndexes((long) 0); + TsFileWriter fileWriter1 = new TsFileWriter(tsFileResource1.getTsFile()); + fileWriter1.registerTimeseries( + new Path(deviceIds[0], measurementSchemas[0].getMeasurementId()), measurementSchemas[0]); + TSRecord record1 = new TSRecord(0, deviceIds[0]); + record1.addTuple(DataPoint + .getDataPoint(measurementSchemas[0].getType(), measurementSchemas[0].getMeasurementId(), + String.valueOf(0))); + fileWriter1.write(record1); + fileWriter1.close(); + // prepare file 2 + File file2 = new File( + TestConstant.BASE_OUTPUT_PATH.concat( + System.nanoTime() + IoTDBConstant.FILE_NAME_SEPARATOR + 1 + + IoTDBConstant.FILE_NAME_SEPARATOR + 0 + + ".tsfile")); + TsFileResource tsFileResource2 = new TsFileResource(file2); + tsFileResource2.setClosed(true); + tsFileResource2.updatePlanIndexes((long) 1); + TsFileWriter fileWriter2 = new TsFileWriter(tsFileResource2.getTsFile()); + fileWriter2.registerTimeseries( + new Path(deviceIds[0], measurementSchemas[1].getMeasurementId()), measurementSchemas[1]); + TSRecord record2 = new TSRecord(0, deviceIds[0]); + record2.addTuple(DataPoint + .getDataPoint(measurementSchemas[1].getType(), measurementSchemas[1].getMeasurementId(), + String.valueOf(0))); + fileWriter2.write(record2); + fileWriter2.close(); + ret.add(tsFileResource1); + ret.add(tsFileResource2); + return ret; + } + } \ No newline at end of file
