This is an automated email from the ASF dual-hosted git repository. sunzesong pushed a commit to branch zlz in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4440dc605b4ac6693b34f397087865c51025eddd Author: samperson1997 <[email protected]> AuthorDate: Thu Apr 29 16:10:11 2021 +0800 Add TsFileSequenceReader getChunkMetadataList method return empty if path not exists --- .../db/engine/cache/TimeSeriesMetadataCache.java | 2 +- .../db/engine/merge/manage/MergeResource.java | 2 +- .../java/org/apache/iotdb/db/utils/MergeUtils.java | 4 +- .../iotdb/db/engine/merge/MergeTaskTest.java | 75 ++++++++++++++++++++++ .../iotdb/tsfile/read/TsFileSequenceReader.java | 21 ++++-- .../tsfile/v2/read/TsFileSequenceReaderForV2.java | 6 +- 6 files changed, 99 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java b/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java index 88d076d..51a05cb 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java @@ -139,7 +139,7 @@ public class TimeSeriesMetadataCache { && !bloomFilter.contains(key.device + IoTDBConstant.PATH_SEPARATOR + key.measurement)) { return null; } - return reader.readTimeseriesMetadata(new Path(key.device, key.measurement)); + return reader.readTimeseriesMetadata(new Path(key.device, key.measurement), false); } cacheRequestNum.incrementAndGet(); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java index 4e6f6fb..d7bc827 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java @@ -136,7 +136,7 @@ public class MergeResource { public List<ChunkMetadata> queryChunkMetadata(PartialPath path, TsFileResource seqFile) throws IOException { TsFileSequenceReader sequenceReader = getFileReader(seqFile); - return sequenceReader.getChunkMetadataList(path); + return sequenceReader.getChunkMetadataList(path, true); } /** diff --git a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java index 2dbaca3..89d1968 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java @@ -142,7 +142,7 @@ public class MergeUtils { List<Path> paths = collectFileSeries(sequenceReader); for (Path path : paths) { - List<ChunkMetadata> chunkMetadataList = sequenceReader.getChunkMetadataList(path); + List<ChunkMetadata> chunkMetadataList = sequenceReader.getChunkMetadataList(path, true); totalChunkNum += chunkMetadataList.size(); maxChunkNum = chunkMetadataList.size() > maxChunkNum ? chunkMetadataList.size() : maxChunkNum; } @@ -195,7 +195,7 @@ public class MergeUtils { throws IOException { for (int i = 0; i < paths.size(); i++) { PartialPath path = paths.get(i); - List<ChunkMetadata> metaDataList = tsFileReader.getChunkMetadataList(path); + List<ChunkMetadata> metaDataList = tsFileReader.getChunkMetadataList(path, true); if (metaDataList.isEmpty()) { continue; } diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java index 25c4bef..092f223 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java @@ -34,7 +34,11 @@ import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; import org.apache.iotdb.tsfile.exception.write.WriteProcessException; import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.reader.IBatchReader; +import org.apache.iotdb.tsfile.write.TsFileWriter; +import org.apache.iotdb.tsfile.write.record.TSRecord; +import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.commons.io.FileUtils; @@ -500,4 +504,75 @@ public class MergeTaskTest extends MergeTest { } tsFilesReader.close(); } + + @Test + public void testMergeWithFileWithoutSomeSensor() throws Exception { + File file = + new File( + TestConstant.BASE_OUTPUT_PATH.concat( + 10 + + "unseq" + + IoTDBConstant.FILE_NAME_SEPARATOR + + 10 + + IoTDBConstant.FILE_NAME_SEPARATOR + + 10 + + IoTDBConstant.FILE_NAME_SEPARATOR + + 0 + + ".tsfile")); + TsFileResource unseqTsFileResourceWithoutSomeSensor = new TsFileResource(file); + unseqTsFileResourceWithoutSomeSensor.setClosed(true); + unseqTsFileResourceWithoutSomeSensor.setMinPlanIndex(10); + unseqTsFileResourceWithoutSomeSensor.setMaxPlanIndex(10); + unseqTsFileResourceWithoutSomeSensor.setVersion(10); + prepareFileWithLastSensor(unseqTsFileResourceWithoutSomeSensor, 0, 50, 0); + unseqResources.add(unseqTsFileResourceWithoutSomeSensor); + + List<TsFileResource> testSeqResources = seqResources.subList(0, 1); + List<TsFileResource> testUnseqResources = new ArrayList<>(); + testUnseqResources.add(unseqTsFileResourceWithoutSomeSensor); + MergeTask mergeTask = + new MergeTask( + new MergeResource(testSeqResources, testUnseqResources), + tempSGDir.getPath(), + (k, v, l) -> { + assertEquals(99, k.get(0).getEndTime("root.mergeTest.device1")); + }, + "test", + false, + 1, + MERGE_TEST_SG); + mergeTask.call(); + } + + private void prepareFileWithLastSensor( + TsFileResource tsFileResource, long timeOffset, long ptNum, long valueOffset) + throws IOException, WriteProcessException { + TsFileWriter fileWriter = new TsFileWriter(tsFileResource.getTsFile()); + for (int i = 0; i < deviceIds.length - 1; i++) { + for (int j = 0; j < measurementSchemas.length - 1; j++) { + fileWriter.registerTimeseries( + new Path(deviceIds[i], measurementSchemas[j].getMeasurementId()), + measurementSchemas[j]); + } + } + for (long i = timeOffset; i < timeOffset + ptNum; i++) { + for (int j = 0; j < deviceNum - 1; j++) { + TSRecord record = new TSRecord(i, deviceIds[j]); + for (int k = 0; k < measurementNum - 1; k++) { + record.addTuple( + DataPoint.getDataPoint( + measurementSchemas[k].getType(), + measurementSchemas[k].getMeasurementId(), + String.valueOf(i + valueOffset))); + } + fileWriter.write(record); + tsFileResource.updateStartTime(deviceIds[j], i); + tsFileResource.updateEndTime(deviceIds[j], i); + } + if ((i + 1) % flushInterval == 0) { + fileWriter.flushAllChunkGroups(); + } + } + fileWriter.close(); + } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java index 41d0cee..72b86e7 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java @@ -325,12 +325,16 @@ public class TsFileSequenceReader implements AutoCloseable { return deviceMetadata; } - public TimeseriesMetadata readTimeseriesMetadata(Path path) throws IOException { + public TimeseriesMetadata readTimeseriesMetadata(Path path, boolean ignoreNotExists) + throws IOException { readFileMetadata(); MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getMetadataIndex(); Pair<MetadataIndexEntry, Long> metadataIndexPair = getMetadataAndEndOffset(deviceMetadataIndexNode, path.getDevice(), true, true); if (metadataIndexPair == null) { + if (ignoreNotExists) { + return null; + } throw new IOException("Device {" + path.getDevice() + "} is not in tsFileMetaData"); } ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); @@ -366,8 +370,8 @@ public class TsFileSequenceReader implements AutoCloseable { } /** - * Find the leaf node that contains path, return all the sensors in that leaf node which are also - * in allSensors set + * getChunkMetadataList Find the leaf node that contains path, return all the sensors in that leaf + * node which are also in allSensors set */ public List<TimeseriesMetadata> readTimeseriesMetadata(Path path, Set<String> allSensors) throws IOException { @@ -1147,13 +1151,14 @@ public class TsFileSequenceReader implements AutoCloseable { } /** - * get ChunkMetaDatas of given path + * get ChunkMetaDatas of given path, and throw exception if path not exists * * @param path timeseries path * @return List of ChunkMetaData */ - public List<ChunkMetadata> getChunkMetadataList(Path path) throws IOException { - TimeseriesMetadata timeseriesMetaData = readTimeseriesMetadata(path); + public List<ChunkMetadata> getChunkMetadataList(Path path, boolean ignoreNotExists) + throws IOException { + TimeseriesMetadata timeseriesMetaData = readTimeseriesMetadata(path, ignoreNotExists); if (timeseriesMetaData == null) { return Collections.emptyList(); } @@ -1162,6 +1167,10 @@ public class TsFileSequenceReader implements AutoCloseable { return chunkMetadataList; } + public List<ChunkMetadata> getChunkMetadataList(Path path) throws IOException { + return getChunkMetadataList(path, false); + } + /** * get ChunkMetaDatas in given TimeseriesMetaData * diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java index 9247a72..2541e83 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/v2/read/TsFileSequenceReaderForV2.java @@ -155,13 +155,17 @@ public class TsFileSequenceReaderForV2 extends TsFileSequenceReader implements A } @Override - public TimeseriesMetadata readTimeseriesMetadata(Path path) throws IOException { + public TimeseriesMetadata readTimeseriesMetadata(Path path, boolean ignoreNotExists) + throws IOException { readFileMetadata(); MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getMetadataIndex(); Pair<MetadataIndexEntry, Long> metadataIndexPair = getMetadataAndEndOffsetV2( deviceMetadataIndexNode, path.getDevice(), MetadataIndexNodeType.INTERNAL_DEVICE, true); if (metadataIndexPair == null) { + if (ignoreNotExists) { + return null; + } return null; } ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
