This is an automated email from the ASF dual-hosted git repository. sunzesong pushed a commit to branch experimental/index in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0969f20c84d353b40b46fa8b3510b301391b26f1 Author: samperson1997 <[email protected]> AuthorDate: Fri Nov 12 15:49:57 2021 +0800 separate BloomFilter --- .../iotdb/tsfile/test1835/TsFileAggregation.java | 77 ++++++++ .../iotdb/tsfile/test1835/TsFileAggregationV2.java | 77 ++++++++ .../iotdb/tsfile/test1835/TsFileRawRead.java | 88 +++++++++ .../iotdb/tsfile/test1835/TsFileRawReadV2.java | 88 +++++++++ .../iotdb/tsfile/test1835/TsFileSketchToolV2.java | 96 ++-------- .../tsfile/file/metadata/TsFileMetadataV2.java | 32 ++-- .../iotdb/tsfile/read/TsFileSequenceReader.java | 211 ++++++++++++++++++++- .../tsfile/read/controller/IMetadataQuerier.java | 2 + .../read/controller/MetadataQuerierByFileImpl.java | 68 ++++++- .../tsfile/read/query/executor/TsFileExecutor.java | 6 +- .../iotdb/tsfile/write/writer/TsFileIOWriter.java | 30 +-- 11 files changed, 664 insertions(+), 111 deletions(-) diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1835/TsFileAggregation.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1835/TsFileAggregation.java new file mode 100644 index 0000000..b4de97d --- /dev/null +++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1835/TsFileAggregation.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.tsfile.test1835; + +import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Path; + +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; + +import java.io.IOException; + +public class TsFileAggregation { + + private static final String DEVICE1 = "device_"; + public static int chunkNum; + public static int deviceNum = 1; + public static int sensorNum = 1; + public static int fileNum = 1; + + public static void main(String[] args) throws IOException { + long costTime = 0L; + Options opts = new Options(); + // Option chunkNumOption = + // OptionBuilder.withArgName("args").withLongOpt("chunkNum").hasArg().create("c"); + // opts.addOption(chunkNumOption); + + BasicParser parser = new BasicParser(); + CommandLine cl; + try { + cl = parser.parse(opts, args); + // chunkNum = Integer.parseInt(cl.getOptionValue("c")); + } catch (Exception e) { + e.printStackTrace(); + } + + long totalStartTime = System.nanoTime(); + for (int fileIndex = 0; fileIndex < fileNum; fileIndex++) { + // file path + String path = + "/Users/samperson1997/git/iotdb/data/data/sequence/root.sg/1/" + + deviceNum + + "/test1.tsfile"; + + // aggregation query + try (TsFileSequenceReader reader = new TsFileSequenceReader(path)) { + Path seriesPath = new Path(DEVICE1, "sensor_1"); + long startTime = System.nanoTime(); + TimeseriesMetadata timeseriesMetadata = reader.readTimeseriesMetadata(seriesPath, false); + long count = timeseriesMetadata.getStatistics().getCount(); + costTime += (System.nanoTime() - startTime); + System.out.println(count); + } + } + System.out.println( + "Total raw read cost time: " + (System.nanoTime() - totalStartTime) / 1000_000 + "ms"); + System.out.println("Index area cost time: " + costTime / 1000_000 + "ms"); + } +} diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1835/TsFileAggregationV2.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1835/TsFileAggregationV2.java new file mode 100644 index 0000000..8681849 --- /dev/null +++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1835/TsFileAggregationV2.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.tsfile.test1835; + +import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Path; + +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; + +import java.io.IOException; + +public class TsFileAggregationV2 { + + private static final String DEVICE1 = "device_"; + public static int chunkNum; + public static int deviceNum = 1; + public static int sensorNum = 1; + public static int fileNum = 1; + + public static void main(String[] args) throws IOException { + long costTime = 0L; + Options opts = new Options(); + // Option chunkNumOption = + // OptionBuilder.withArgName("args").withLongOpt("chunkNum").hasArg().create("c"); + // opts.addOption(chunkNumOption); + + BasicParser parser = new BasicParser(); + CommandLine cl; + try { + cl = parser.parse(opts, args); + // chunkNum = Integer.parseInt(cl.getOptionValue("c")); + } catch (Exception e) { + e.printStackTrace(); + } + + long totalStartTime = System.nanoTime(); + for (int fileIndex = 0; fileIndex < fileNum; fileIndex++) { + // file path + String path = + "/Users/samperson1997/git/iotdb/data/data/sequence/root.sg/1/" + + deviceNum + + "/test0.tsfile"; + + // aggregation query + try (TsFileSequenceReader reader = new TsFileSequenceReader(path)) { + Path seriesPath = new Path(DEVICE1, "sensor_1"); + long startTime = System.nanoTime(); + TimeseriesMetadata timeseriesMetadata = reader.readTimeseriesMetadataV4(seriesPath, false); + long count = timeseriesMetadata.getStatistics().getCount(); + costTime += (System.nanoTime() - startTime); + System.out.println(count); + } + } + System.out.println( + "Total raw read cost time: " + (System.nanoTime() - totalStartTime) / 1000_000 + "ms"); + System.out.println("Index area cost time: " + costTime / 1000_000 + "ms"); + } +} diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1835/TsFileRawRead.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1835/TsFileRawRead.java new file mode 100644 index 0000000..11d54b2 --- /dev/null +++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1835/TsFileRawRead.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.tsfile.test1835; + +import org.apache.iotdb.tsfile.read.ReadOnlyTsFile; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.expression.QueryExpression; +import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; + +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; + +import java.io.IOException; +import java.util.ArrayList; + +public class TsFileRawRead { + + private static final String DEVICE1 = "device_"; + public static int chunkNum; + public static int deviceNum = 1; + public static int sensorNum = 1; + public static int fileNum = 1; + + public static void main(String[] args) throws IOException { + long costTime = 0L; + Options opts = new Options(); + // Option chunkNumOption = + // OptionBuilder.withArgName("args").withLongOpt("chunkNum").hasArg().create("c"); + // opts.addOption(chunkNumOption); + + BasicParser parser = new BasicParser(); + CommandLine cl; + try { + cl = parser.parse(opts, args); + // chunkNum = Integer.parseInt(cl.getOptionValue("c")); + } catch (Exception e) { + e.printStackTrace(); + } + + long totalStartTime = System.nanoTime(); + for (int fileIndex = 0; fileIndex < fileNum; fileIndex++) { + // file path + String path = + "/Users/samperson1997/git/iotdb/data/data/sequence/root.sg/1/" + + deviceNum + + "/test1.tsfile"; + + // raw data query + try (TsFileSequenceReader reader = new TsFileSequenceReader(path); + ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader)) { + + ArrayList<Path> paths = new ArrayList<>(); + paths.add(new Path(DEVICE1, "sensor_1")); + + QueryExpression queryExpression = QueryExpression.create(paths, null); + + long startTime = System.nanoTime(); + QueryDataSet queryDataSet = readTsFile.query(queryExpression); + while (queryDataSet.hasNext()) { + System.out.println(queryDataSet.next()); + } + + costTime += (System.nanoTime() - startTime); + } + } + System.out.println( + "Total raw read cost time: " + (System.nanoTime() - totalStartTime) / 1000_000 + "ms"); + System.out.println("Index area cost time: " + costTime / 1000_000 + "ms"); + } +} diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1835/TsFileRawReadV2.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1835/TsFileRawReadV2.java new file mode 100644 index 0000000..2f898dc --- /dev/null +++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1835/TsFileRawReadV2.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.tsfile.test1835; + +import org.apache.iotdb.tsfile.read.ReadOnlyTsFile; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.expression.QueryExpression; +import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; + +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; + +import java.io.IOException; +import java.util.ArrayList; + +public class TsFileRawReadV2 { + + private static final String DEVICE1 = "device_"; + public static int chunkNum; + public static int deviceNum = 1; + public static int sensorNum = 1; + public static int fileNum = 1; + + public static void main(String[] args) throws IOException { + long costTime = 0L; + long totalStartTime = System.nanoTime(); + Options opts = new Options(); + // Option chunkNumOption = + // OptionBuilder.withArgName("args").withLongOpt("chunkNum").hasArg().create("c"); + // opts.addOption(chunkNumOption); + + BasicParser parser = new BasicParser(); + CommandLine cl; + try { + cl = parser.parse(opts, args); + // chunkNum = Integer.parseInt(cl.getOptionValue("c")); + } catch (Exception e) { + e.printStackTrace(); + } + + for (int fileIndex = 0; fileIndex < fileNum; fileIndex++) { + // file path + String path = + "/Users/samperson1997/git/iotdb/data/data/sequence/root.sg/1/" + + deviceNum + + "/test0.tsfile"; + + // raw data query + try (TsFileSequenceReader reader = new TsFileSequenceReader(path); + ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader)) { + + ArrayList<Path> paths = new ArrayList<>(); + paths.add(new Path(DEVICE1, "sensor_1")); + + QueryExpression queryExpression = QueryExpression.create(paths, null); + + long startTime = System.nanoTime(); + QueryDataSet queryDataSet = readTsFile.query(queryExpression); + while (queryDataSet.hasNext()) { + System.out.println(queryDataSet.next()); + } + + costTime += (System.nanoTime() - startTime); + } + } + System.out.println( + "Total raw read cost time: " + (System.nanoTime() - totalStartTime) / 1000_000 + "ms"); + System.out.println("Index area cost time: " + costTime / 1000_000 + "ms"); + } +} diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1835/TsFileSketchToolV2.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1835/TsFileSketchToolV2.java index 869435f..c36a085 100644 --- a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1835/TsFileSketchToolV2.java +++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/test1835/TsFileSketchToolV2.java @@ -29,16 +29,14 @@ import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry; import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode; import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; -import org.apache.iotdb.tsfile.file.metadata.TsFileMetadataV2; +import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType; import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.apache.iotdb.tsfile.read.common.Chunk; import org.apache.iotdb.tsfile.read.common.Path; -import org.apache.iotdb.tsfile.read.reader.TsFileInput; import org.apache.iotdb.tsfile.utils.BloomFilter; import org.apache.iotdb.tsfile.utils.Pair; -import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import java.io.FileWriter; import java.io.IOException; @@ -100,7 +98,7 @@ public class TsFileSketchToolV2 { printlnBoth(pw, "file length: " + length); // get metadata information - TsFileMetadataV2 tsFileMetaData = reader.readFileMetadataV2(); + TsFileMetadata tsFileMetaData = reader.readFileMetadataV2(); List<ChunkGroupMetadata> allChunkGroupMetadata = new ArrayList<>(); reader.selfCheck(null, allChunkGroupMetadata, false); @@ -111,13 +109,15 @@ public class TsFileSketchToolV2 { printChunk(allChunkGroupMetadata); // metadata begins - printlnBoth(pw, String.format("%20s", tsFileMetaData.getMetaOffset()) + "|\t[marker] 2"); - - // System.out.println(reader.getFileMetadataPos()); - + if (tsFileMetaData.getMetadataIndex().getChildren().isEmpty()) { + printlnBoth(pw, String.format("%20s", reader.getFileMetadataPos() - 1) + "|\t[marker] 2"); + } else { + printlnBoth( + pw, String.format("%20s", reader.readFileMetadata().getMetaOffset()) + "|\t[marker] 2"); + } // get all timeseries index Map<Long, Pair<Path, TimeseriesMetadata>> timeseriesMetadataMap = - reader.getAllTimeseriesMetadataWithOffset(reader.position(), reader.getFileMetadataPos()); + reader.getAllTimeseriesMetadataWithOffset(); // print timeseries index printTimeseriesIndex(timeseriesMetadataMap); @@ -130,7 +130,7 @@ public class TsFileSketchToolV2 { pw, "---------------------------- IndexOfTimerseriesIndex Tree -----------------------------"); // print index tree - MetadataIndexNode metadataIndexNode = readMetadataIndex(); + MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex(); TreeMap<Long, MetadataIndexNode> metadataIndexNodeMap = new TreeMap<>(); List<String> treeOutputStringBuffer = new ArrayList<>(); loadIndexTree(metadataIndexNode, metadataIndexNodeMap, treeOutputStringBuffer, 0); @@ -147,7 +147,7 @@ public class TsFileSketchToolV2 { pw.close(); } - private void printTsFileMetadata(TsFileMetadataV2 tsFileMetaData) { + private void printTsFileMetadata(TsFileMetadata tsFileMetaData) { try { printlnBoth(pw, String.format("%20s", reader.getFileMetadataPos()) + "|\t[TsFileMetadata]"); printlnBoth( @@ -401,83 +401,17 @@ public class TsFileSketchToolV2 { pw.println(str); } - private MetadataIndexNode readMetadataIndex() throws IOException { - TsFileInput tsFileInput = FSFactoryProducer.getFileInputFactory().getTsFileInput(indexFileName); - long totalSize = tsFileInput.size(); - ByteBuffer lastNodeSizeBuffer = ByteBuffer.allocate(Integer.BYTES); - tsFileInput.read(lastNodeSizeBuffer, totalSize - Integer.BYTES); - lastNodeSizeBuffer.flip(); - - int lastNodeSize = ReadWriteIOUtils.readInt(lastNodeSizeBuffer); - ByteBuffer lastNode = ByteBuffer.allocate(lastNodeSize); - tsFileInput.read(lastNode, totalSize - lastNodeSize - Integer.BYTES); - lastNode.flip(); - return MetadataIndexNode.deserializeFrom(lastNode); - } - private class TsFileSketchToolReader extends TsFileSequenceReader { public TsFileSketchToolReader(String file) throws IOException { super(file); } - /** - * Traverse the metadata index from MetadataIndexEntry to get TimeseriesMetadatas - * - * @param metadataIndex MetadataIndexEntry - * @param buffer byte buffer - * @param deviceId String - * @param timeseriesMetadataMap map: deviceId -> timeseriesMetadata list - * @param needChunkMetadata deserialize chunk metadata list or not - */ - private void generateMetadataIndexWithOffset( - long startOffset, - MetadataIndexEntry metadataIndex, - ByteBuffer buffer, - String deviceId, - MetadataIndexNodeType type, - Map<Long, Pair<Path, TimeseriesMetadata>> timeseriesMetadataMap, - boolean needChunkMetadata) - throws IOException { - if (type.equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { - while (buffer.hasRemaining()) { - long pos = startOffset + buffer.position(); - TimeseriesMetadata timeseriesMetadata = - TimeseriesMetadata.deserializeFrom(buffer, needChunkMetadata); - timeseriesMetadataMap.put( - pos, - new Pair<>( - new Path(deviceId, timeseriesMetadata.getMeasurementId()), timeseriesMetadata)); - } - } else { - // deviceId should be determined by LEAF_DEVICE node - if (type.equals(MetadataIndexNodeType.LEAF_DEVICE)) { - deviceId = metadataIndex.getName(); - } - MetadataIndexNode metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer); - int metadataIndexListSize = metadataIndexNode.getChildren().size(); - for (int i = 0; i < metadataIndexListSize; i++) { - long endOffset = metadataIndexNode.getEndOffset(); - if (i != metadataIndexListSize - 1) { - endOffset = metadataIndexNode.getChildren().get(i + 1).getOffset(); - } - ByteBuffer nextBuffer = - readData(metadataIndexNode.getChildren().get(i).getOffset(), endOffset); - generateMetadataIndexWithOffset( - metadataIndexNode.getChildren().get(i).getOffset(), - metadataIndexNode.getChildren().get(i), - nextBuffer, - deviceId, - metadataIndexNode.getNodeType(), - timeseriesMetadataMap, - needChunkMetadata); - } - } - } - public Map<Long, Pair<Path, TimeseriesMetadata>> getAllTimeseriesMetadataWithOffset( - long startOffset, long endOffset) throws IOException { + public Map<Long, Pair<Path, TimeseriesMetadata>> getAllTimeseriesMetadataWithOffset() + throws IOException { Map<Long, Pair<Path, TimeseriesMetadata>> timeseriesMetadataMap = new TreeMap<>(); - ByteBuffer buffer = readData(startOffset, endOffset); + // FIXME + ByteBuffer buffer = readData(0, 0); while (buffer.hasRemaining()) { int bufferPos = buffer.position(); TimeseriesMetadata timeseriesMetaData = TimeseriesMetadata.deserializeFrom(buffer, false); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadataV2.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadataV2.java index 64246b7..f3f7129 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadataV2.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TsFileMetadataV2.java @@ -31,13 +31,13 @@ import java.nio.ByteBuffer; import java.util.Set; /** TSFileMetaData collects all metadata info and saves in its data structure. */ -public class TsFileMetadataV2 { +public class TsFileMetadataV2 extends TsFileMetadata { // bloom filter private BloomFilter bloomFilter; - // offset of MetaMarker.SEPARATOR - private long metaOffset; + // List of <name, offset, childMetadataIndexType> + private MetadataIndexNode metadataIndex; /** * deserialize data from the buffer. @@ -45,12 +45,11 @@ public class TsFileMetadataV2 { * @param buffer -buffer use to deserialize * @return -a instance of TsFileMetaData */ - public static TsFileMetadataV2 deserializeFrom(ByteBuffer buffer) { + public static TsFileMetadata deserializeFrom(ByteBuffer buffer) { TsFileMetadataV2 fileMetaData = new TsFileMetadataV2(); - // metaOffset - long metaOffset = ReadWriteIOUtils.readLong(buffer); - fileMetaData.setMetaOffset(metaOffset); + // metadataIndex + fileMetaData.metadataIndex = MetadataIndexNode.deserializeFrom(buffer); // read bloom filter if (buffer.hasRemaining()) { @@ -78,7 +77,16 @@ public class TsFileMetadataV2 { * @return -byte length */ public int serializeTo(OutputStream outputStream) throws IOException { - return ReadWriteIOUtils.write(metaOffset, outputStream); + int byteLen = 0; + + // metadataIndex + if (metadataIndex != null) { + byteLen += metadataIndex.serializeTo(outputStream); + } else { + byteLen += ReadWriteIOUtils.write(0, outputStream); + } + + return byteLen; } /** @@ -116,11 +124,11 @@ public class TsFileMetadataV2 { return filter; } - public long getMetaOffset() { - return metaOffset; + public MetadataIndexNode getMetadataIndex() { + return metadataIndex; } - public void setMetaOffset(long metaOffset) { - this.metaOffset = metaOffset; + public void setMetadataIndex(MetadataIndexNode metadataIndex) { + this.metadataIndex = metadataIndex; } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java index 565d677..bf5856d 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java @@ -90,6 +90,7 @@ public class TsFileSequenceReader implements AutoCloseable { "Something error happened while deserializing MetadataIndexNode of file {}"; protected String file; protected TsFileInput tsFileInput; + protected TsFileInput metadataIndexInput; protected long fileMetadataPos; protected int fileMetadataSize; private ByteBuffer markerBuffer = ByteBuffer.allocate(Byte.BYTES); @@ -127,6 +128,9 @@ public class TsFileSequenceReader implements AutoCloseable { } this.file = file; tsFileInput = FSFactoryProducer.getFileInputFactory().getTsFileInput(file); + if (FSFactoryProducer.getFSFactory().getFile(file + ".index").exists()) { + metadataIndexInput = FSFactoryProducer.getFileInputFactory().getTsFileInput(file + ".index"); + } try { if (loadMetadataSize) { loadMetadataSize(); @@ -271,13 +275,27 @@ public class TsFileSequenceReader implements AutoCloseable { return tsFileMetaData; } - public TsFileMetadataV2 readFileMetadataV2() throws IOException { + public TsFileMetadata readFileMetadataV2() throws IOException { try { - return TsFileMetadataV2.deserializeFrom(readData(fileMetadataPos, fileMetadataSize)); + if (tsFileMetaData == null) { + long totalSize = metadataIndexInput.size(); + ByteBuffer rootNodeOffsetBuffer = ByteBuffer.allocate(Long.BYTES); + metadataIndexInput.read(rootNodeOffsetBuffer, totalSize - Long.BYTES); + rootNodeOffsetBuffer.flip(); + + long rootNodeOffset = ReadWriteIOUtils.readLong(rootNodeOffsetBuffer); + tsFileMetaData = + TsFileMetadataV2.deserializeFrom( + readData( + rootNodeOffset, + FSFactoryProducer.getFSFactory().getFile(this.file + ".index").length(), + metadataIndexInput)); + } } catch (BufferOverflowException e) { logger.error("Something error happened while reading file metadata of file {}", file); throw e; } + return tsFileMetaData; } /** @@ -446,6 +464,51 @@ public class TsFileSequenceReader implements AutoCloseable { return searchResult >= 0 ? timeseriesMetadataList.get(searchResult) : null; } + public TimeseriesMetadata readTimeseriesMetadataV4(Path path, boolean ignoreNotExists) + throws IOException { + readFileMetadataV2(); + MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getMetadataIndex(); + Pair<MetadataIndexEntry, Long> metadataIndexPair = + getMetadataAndEndOffset(deviceMetadataIndexNode, path.getDevice(), true, true); + if (metadataIndexPair == null) { + if (ignoreNotExists) { + return null; + } + throw new IOException("Device {" + path.getDevice() + "} is not in tsFileMetaData"); + } + ByteBuffer buffer = + readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right, metadataIndexInput); + MetadataIndexNode metadataIndexNode = deviceMetadataIndexNode; + if (!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { + try { + metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer); + } catch (BufferOverflowException e) { + logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file); + throw e; + } + metadataIndexPair = + getMetadataAndEndOffset(metadataIndexNode, path.getMeasurement(), false, false); + } + if (metadataIndexPair == null) { + return null; + } + List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>(); + buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); + while (buffer.hasRemaining()) { + try { + timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer, true)); + } catch (BufferOverflowException e) { + logger.error( + "Something error happened while deserializing TimeseriesMetadata of file {}", file); + throw e; + } + } + // return null if path does not exist in the TsFile + int searchResult = + binarySearchInTimeseriesMetadataList(timeseriesMetadataList, path.getMeasurement()); + return searchResult >= 0 ? timeseriesMetadataList.get(searchResult) : null; + } + /** * Find the leaf node that contains this vector, return all the needed subSensor and time column * @@ -656,6 +719,82 @@ public class TsFileSequenceReader implements AutoCloseable { return resultTimeseriesMetadataList; } + public List<TimeseriesMetadata> readTimeseriesMetadataV3(String device, Set<String> measurements) + throws IOException { + readFileMetadataV2(); + MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getMetadataIndex(); + Pair<MetadataIndexEntry, Long> metadataIndexPair = + getMetadataAndEndOffset(deviceMetadataIndexNode, device, true, false); + if (metadataIndexPair == null) { + return Collections.emptyList(); + } + List<TimeseriesMetadata> resultTimeseriesMetadataList = new ArrayList<>(); + List<String> measurementList = new ArrayList<>(measurements); + Set<String> measurementsHadFound = new HashSet<>(); + for (int i = 0; i < measurementList.size(); i++) { + if (measurementsHadFound.contains(measurementList.get(i))) { + continue; + } + ByteBuffer buffer = + readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right, metadataIndexInput); + Pair<MetadataIndexEntry, Long> measurementMetadataIndexPair = metadataIndexPair; + List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>(); + MetadataIndexNode metadataIndexNode = deviceMetadataIndexNode; + if (!metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) { + try { + metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer); + } catch (BufferOverflowException e) { + logger.error(METADATA_INDEX_NODE_DESERIALIZE_ERROR, file); + throw e; + } + measurementMetadataIndexPair = + getMetadataAndEndOffset(metadataIndexNode, measurementList.get(i), false, false); + } + if (measurementMetadataIndexPair == null) { + return Collections.emptyList(); + } + buffer = + readData( + measurementMetadataIndexPair.left.getOffset(), measurementMetadataIndexPair.right); + while (buffer.hasRemaining()) { + try { + timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer, true)); + } catch (BufferOverflowException e) { + logger.error( + "Something error happened while deserializing TimeseriesMetadata of file {}", file); + throw e; + } + } + for (int j = i; j < measurementList.size(); j++) { + String current = measurementList.get(j); + if (!measurementsHadFound.contains(current)) { + int searchResult = binarySearchInTimeseriesMetadataList(timeseriesMetadataList, current); + if (searchResult >= 0) { + resultTimeseriesMetadataList.add(timeseriesMetadataList.get(searchResult)); + measurementsHadFound.add(current); + } + } + if (measurementsHadFound.size() == measurements.size()) { + return resultTimeseriesMetadataList; + } + } + } + return resultTimeseriesMetadataList; + } + + public MetadataIndexNode readMetadataIndex() throws IOException { + long totalSize = metadataIndexInput.size(); + ByteBuffer lastNodeSizeBuffer = ByteBuffer.allocate(Integer.BYTES); + metadataIndexInput.read(lastNodeSizeBuffer, totalSize - Integer.BYTES); + lastNodeSizeBuffer.flip(); + + int lastNodeSize = ReadWriteIOUtils.readInt(lastNodeSizeBuffer); + ByteBuffer lastNode = ByteBuffer.allocate(lastNodeSize); + metadataIndexInput.read(lastNode, totalSize - lastNodeSize - Integer.BYTES); + lastNode.flip(); + return MetadataIndexNode.deserializeFrom(lastNode); + } + protected int binarySearchInTimeseriesMetadataList( List<TimeseriesMetadata> timeseriesMetadataList, String key) { int low = 0; @@ -705,6 +844,13 @@ public class TsFileSequenceReader implements AutoCloseable { return getAllDevices(tsFileMetaData.getMetadataIndex()); } + public List<String> getAllDevicesV2() throws IOException { + if (tsFileMetaData == null) { + readFileMetadataV2(); + } + return getAllDevicesV2(tsFileMetaData.getMetadataIndex()); + } + private List<String> getAllDevices(MetadataIndexNode metadataIndexNode) throws IOException { List<String> deviceList = new ArrayList<>(); int metadataIndexListSize = metadataIndexNode.getChildren().size(); @@ -739,6 +885,42 @@ public class TsFileSequenceReader implements AutoCloseable { return deviceList; } + private List<String> getAllDevicesV2(MetadataIndexNode metadataIndexNode) throws IOException { + List<String> deviceList = new ArrayList<>(); + int metadataIndexListSize = metadataIndexNode.getChildren().size(); + + // if metadataIndexNode is LEAF_DEVICE, put all devices in node entry into the list + if (metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_DEVICE)) { + deviceList.addAll( + metadataIndexNode.getChildren().stream() + .map(MetadataIndexEntry::getName) + .collect(Collectors.toList())); + return deviceList; + } + + for (int i = 0; i < metadataIndexListSize; i++) { + long endOffset = metadataIndexNode.getEndOffset(); + if (i != metadataIndexListSize - 1) { + endOffset = metadataIndexNode.getChildren().get(i + 1).getOffset(); + } + ByteBuffer buffer = + readData( + metadataIndexNode.getChildren().get(i).getOffset(), endOffset, metadataIndexInput); + MetadataIndexNode node = MetadataIndexNode.deserializeFrom(buffer); + if (node.getNodeType().equals(MetadataIndexNodeType.LEAF_DEVICE)) { + // if node in next level is LEAF_DEVICE, put all devices in node entry into the list + deviceList.addAll( + node.getChildren().stream() + .map(MetadataIndexEntry::getName) + .collect(Collectors.toList())); + } else { + // keep traversing + deviceList.addAll(getAllDevices(node)); + } + } + return deviceList; + } + /** * read all ChunkMetaDatas of given device * @@ -1128,6 +1310,27 @@ public class TsFileSequenceReader implements AutoCloseable { return buffer; } + protected ByteBuffer readData(long position, int size, TsFileInput tsFileInput) + throws IOException { + ByteBuffer buffer = ByteBuffer.allocate(size); + if (position < 0) { + if (ReadWriteIOUtils.readAsPossible(tsFileInput, buffer) != size) { + throw new IOException("reach the end of the data"); + } + } else { + long actualReadSize = ReadWriteIOUtils.readAsPossible(tsFileInput, buffer, position, size); + if (actualReadSize != size) { + throw new IOException( + String.format( + "reach the end of the data. Size of data that want to read: %s," + + "actual read size: %s, position: %s", + size, actualReadSize, position)); + } + } + buffer.flip(); + return buffer; + } + /** * read data from tsFileInput, from the current position (if position = -1), or the given * position. @@ -1141,6 +1344,10 @@ public class TsFileSequenceReader implements AutoCloseable { return readData(start, (int) (end - start)); } + protected ByteBuffer readData(long start, long end, TsFileInput tsFileInput) throws IOException { + return readData(start, (int) (end - start), tsFileInput); + } + /** notice, the target bytebuffer are not flipped. */ public int readRaw(long position, int length, ByteBuffer target) throws IOException { return ReadWriteIOUtils.readAsPossible(tsFileInput, target, position, length); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/IMetadataQuerier.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/IMetadataQuerier.java index 90a1b2a..25efbad 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/IMetadataQuerier.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/IMetadataQuerier.java @@ -47,6 +47,8 @@ public interface IMetadataQuerier { void loadChunkMetaDatasV2(List<Path> paths) throws IOException; + void loadChunkMetaDatasV3(List<Path> paths) throws IOException; + /** * @return the corresponding data type. * @throws NoMeasurementException if the measurement not exists. diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java index 1b234f9..6f96550 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java @@ -56,7 +56,8 @@ public class MetadataQuerierByFileImpl implements IMetadataQuerier { /** Constructor of MetadataQuerierByFileImpl. */ public MetadataQuerierByFileImpl(TsFileSequenceReader tsFileReader) throws IOException { this.tsFileReader = tsFileReader; - this.fileMetaData = tsFileReader.readFileMetadata(); + // FIXME + this.fileMetaData = tsFileReader.readFileMetadataV2(); chunkMetaDataCache = new LRUCache<Path, List<ChunkMetadata>>(CACHED_ENTRY_NUMBER) { @Override @@ -220,6 +221,71 @@ public class MetadataQuerierByFileImpl implements IMetadataQuerier { } } + public void loadChunkMetaDatasV3(List<Path> paths) throws IOException { + // group measurements by device + TreeMap<String, Set<String>> deviceMeasurementsMap = new TreeMap<>(); + for (Path path : paths) { + if (!deviceMeasurementsMap.containsKey(path.getDevice())) { + deviceMeasurementsMap.put(path.getDevice(), new HashSet<>()); + } + deviceMeasurementsMap.get(path.getDevice()).add(path.getMeasurement()); + } + + Map<Path, List<ChunkMetadata>> tempChunkMetaDatas = new HashMap<>(); + + int count = 0; + boolean enough = false; + + for (Map.Entry<String, Set<String>> deviceMeasurements : deviceMeasurementsMap.entrySet()) { + if (enough) { + break; + } + String selectedDevice = deviceMeasurements.getKey(); + // s1, s2, s3 + Set<String> selectedMeasurements = deviceMeasurements.getValue(); + List<String> devices = this.tsFileReader.getAllDevicesV2(); + String[] deviceNames = devices.toArray(new String[0]); + if (Arrays.binarySearch(deviceNames, selectedDevice) < 0) { + continue; + } + + List<TimeseriesMetadata> timeseriesMetaDataList = + tsFileReader.readTimeseriesMetadataV3(selectedDevice, selectedMeasurements); + List<ChunkMetadata> chunkMetadataList = new ArrayList<>(); + for (TimeseriesMetadata timeseriesMetadata : timeseriesMetaDataList) { + chunkMetadataList.addAll(tsFileReader.readChunkMetaDataList(timeseriesMetadata)); + } + // d1 + for (ChunkMetadata chunkMetaData : chunkMetadataList) { + String currentMeasurement = chunkMetaData.getMeasurementUid(); + + // s1 + if (selectedMeasurements.contains(currentMeasurement)) { + + // d1.s1 + Path path = new Path(selectedDevice, currentMeasurement); + + // add into tempChunkMetaDatas + if (!tempChunkMetaDatas.containsKey(path)) { + tempChunkMetaDatas.put(path, new ArrayList<>()); + } + tempChunkMetaDatas.get(path).add(chunkMetaData); + + // check cache size, stop when reading enough + count++; + if (count == CACHED_ENTRY_NUMBER) { + enough = true; + break; + } + } + } + } + + for (Map.Entry<Path, List<ChunkMetadata>> entry : tempChunkMetaDatas.entrySet()) { + chunkMetaDataCache.put(entry.getKey(), entry.getValue()); + } + } + @Override public TSDataType getDataType(Path path) throws IOException { if (tsFileReader.getChunkMetadataList(path) == null diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/executor/TsFileExecutor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/executor/TsFileExecutor.java index 0cac002..b3b43a2 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/executor/TsFileExecutor.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/query/executor/TsFileExecutor.java @@ -67,8 +67,10 @@ public class TsFileExecutor implements QueryExecutor { queryExpression.setSelectSeries(filteredSeriesPath); } - // metadataQuerier.loadChunkMetaDatas(queryExpression.getSelectedSeries()); - metadataQuerier.loadChunkMetaDatasV2(queryExpression.getSelectedSeries()); + // metadataQuerier.loadChunkMetaDatas(queryExpression.getSelectedSeries()); + // metadataQuerier.loadChunkMetaDatasV2(queryExpression.getSelectedSeries()); + metadataQuerier.loadChunkMetaDatasV3(queryExpression.getSelectedSeries()); + if (queryExpression.hasQueryFilter()) { try { IExpression expression = queryExpression.getExpression(); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java index 3c0185c..bba12dc 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java @@ -459,34 +459,38 @@ public class TsFileIOWriter { // NOTICE: update here, TsFileMetadataV2 does not have MetadataIndexTree // ====================== // TsFileMetadataV2 tsFileMetaData = new TsFileMetadataV2(); - tsFileMetaData.setMetaOffset(metaOffset); - TsFileOutput metadataIndexOutput = new LocalTsFileOutput(new FileOutputStream(new File(file.getAbsolutePath() + ".index"))); MetadataIndexNode metadataIndex = flushMetadataIndex(chunkMetadataListMap, vectorToPathsMap, metadataIndexOutput); - int lastNodeSize = metadataIndex.serializeTo(metadataIndexOutput.wrapAsStream()); - - // write the size of last MetadataIndexNode - ReadWriteIOUtils.write(lastNodeSize, metadataIndexOutput.wrapAsStream()); - metadataIndexOutput.close(); - // ====================== // + tsFileMetaData.setMetadataIndex(metadataIndex); + long rootNodeOffset = metadataIndexOutput.getPosition(); // write TsFileMetaData - int size = tsFileMetaData.serializeTo(out.wrapAsStream()); + int size = tsFileMetaData.serializeTo(metadataIndexOutput.wrapAsStream()); if (logger.isDebugEnabled()) { - logger.debug("finish flushing the footer {}, file pos:{}", tsFileMetaData, out.getPosition()); + logger.debug( + "finish flushing the footer {}, file pos:{}", + tsFileMetaData, + metadataIndexOutput.getPosition()); } // write bloom filter - size += tsFileMetaData.serializeBloomFilter(out.wrapAsStream(), chunkMetadataListMap.keySet()); + size += + tsFileMetaData.serializeBloomFilter( + metadataIndexOutput.wrapAsStream(), chunkMetadataListMap.keySet()); if (logger.isDebugEnabled()) { - logger.debug("finish flushing the bloom filter file pos:{}", out.getPosition()); + logger.debug( + "finish flushing the bloom filter file pos:{}", metadataIndexOutput.getPosition()); } // write TsFileMetaData size - ReadWriteIOUtils.write(size, out.wrapAsStream()); // write the size of the file metadata. + ReadWriteIOUtils.write( + size, metadataIndexOutput.wrapAsStream()); // write the size of the file metadata. + ReadWriteIOUtils.write(rootNodeOffset, metadataIndexOutput.wrapAsStream()); + metadataIndexOutput.close(); + ReadWriteIOUtils.write(metaOffset, out.wrapAsStream()); // write magic string out.write(MAGIC_STRING_BYTES);
