This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch IOTDB-4636 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 36acc7eb15e7361deb569719534d2e40412502e2 Author: Liu Xuxin <[email protected]> AuthorDate: Mon Oct 17 15:43:10 2022 +0800 add test --- .../impl/ReadChunkCompactionPerformer.java | 19 ++++-- .../ReadChunkCompactionPerformerAlignedTest.java | 79 ++++++++++++++++++++++ 2 files changed, 92 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java index a730730964..dbd4b1831a 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java @@ -118,12 +118,7 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer { checkThreadInterrupted(); LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>> readerAndChunkMetadataList = deviceIterator.getReaderAndChunkMetadataForCurrentAlignedSeries(); - boolean anyChunkExists = false; - for (Pair<TsFileSequenceReader, List<AlignedChunkMetadata>> readerListPair : - readerAndChunkMetadataList) { - anyChunkExists = anyChunkExists || !readerListPair.right.isEmpty(); - } - if (!anyChunkExists) { + if (!checkAlignedSeriesExists(readerAndChunkMetadataList)) { return; } writer.startChunkGroup(device); @@ -142,6 +137,18 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer { } } + private boolean checkAlignedSeriesExists( + LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>> + readerAndChunkMetadataList) { + for (Pair<TsFileSequenceReader, List<AlignedChunkMetadata>> readerListPair : + readerAndChunkMetadataList) { + if (!readerListPair.right.isEmpty()) { + return true; + } + } + return false; + } + private void compactNotAlignedSeries( String device, TsFileResource targetResource, diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerAlignedTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerAlignedTest.java index 8c2fbf2ae4..d39aaf75cf 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerAlignedTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerAlignedTest.java @@ -547,4 +547,83 @@ public class ReadChunkCompactionPerformerAlignedTest { new ArrayList<>()); CompactionCheckerUtils.validDataByValueList(originData, compactedData); } + + @Test + public void testAlignedTsFileWithEmptyChunkGroup() throws Exception { + List<String> devices = new ArrayList<>(); + devices.add(storageGroup + ".d" + 0); + for (int i = 1; i < 5; ++i) { + devices.add(devices.get(i - 1) + ".d" + i); + } + boolean[] aligned = new boolean[] {false, true, false, true, false}; + List<IMeasurementSchema> schemas = new ArrayList<>(); + schemas.add(new MeasurementSchema("s0", TSDataType.DOUBLE)); + schemas.add(new MeasurementSchema("s1", TSDataType.FLOAT)); + schemas.add(new MeasurementSchema("s2", TSDataType.INT64)); + schemas.add(new MeasurementSchema("s3", TSDataType.INT32)); + schemas.add(new MeasurementSchema("s4", TSDataType.TEXT)); + schemas.add(new MeasurementSchema("s5", TSDataType.BOOLEAN)); + + TestUtilsForAlignedSeries.registerTimeSeries( + storageGroup, + devices.toArray(new String[] {}), + schemas.toArray(new IMeasurementSchema[] {}), + aligned); + + boolean[] randomNull = new boolean[] {true, false, true, false, true}; + int timeInterval = 500; + Random random = new Random(5); + List<TsFileResource> resources = new ArrayList<>(); + for (int i = 1; i < 30; i++) { + TsFileResource resource = + new TsFileResource(new File(dataDirectory, String.format("%d-%d-0-0.tsfile", i, i))); + TestUtilsForAlignedSeries.writeTsFile( + devices.toArray(new String[0]), + schemas.toArray(new IMeasurementSchema[0]), + resource, + aligned, + timeInterval * i, + timeInterval * (i + 1), + randomNull); + resources.add(resource); + } + TsFileResource resource = + new TsFileResource(new File(dataDirectory, String.format("%d-%d-0-0.tsfile", 30, 30))); + // the start time and end time is the same + // it will write tsfile with empty chunk group + TestUtilsForAlignedSeries.writeTsFile( + devices.toArray(new String[0]), + schemas.toArray(new IMeasurementSchema[0]), + resource, + aligned, + timeInterval * (30 + 1), + timeInterval * (30 + 1), + randomNull); + resources.add(resource); + TsFileResource targetResource = new TsFileResource(new File(dataDirectory, "1-1-1-0.tsfile")); + List<PartialPath> fullPaths = new ArrayList<>(); + List<IMeasurementSchema> iMeasurementSchemas = new ArrayList<>(); + List<String> measurementIds = new ArrayList<>(); + schemas.forEach( + (e) -> { + measurementIds.add(e.getMeasurementId()); + }); + for (String device : devices) { + iMeasurementSchemas.addAll(schemas); + fullPaths.add(new AlignedPath(device, measurementIds, schemas)); + } + Map<PartialPath, List<TimeValuePair>> originData = + CompactionCheckerUtils.getDataByQuery( + fullPaths, iMeasurementSchemas, resources, new ArrayList<>()); + ICompactionPerformer performer = new ReadChunkCompactionPerformer(resources, targetResource); + performer.setSummary(new CompactionTaskSummary()); + performer.perform(); + Map<PartialPath, List<TimeValuePair>> compactedData = + CompactionCheckerUtils.getDataByQuery( + fullPaths, + iMeasurementSchemas, + Collections.singletonList(targetResource), + new ArrayList<>()); + CompactionCheckerUtils.validDataByValueList(originData, compactedData); + } }
