This is an automated email from the ASF dual-hosted git repository. sunzesong pushed a commit to branch tsfile_split in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit cc91eeda55ffe0f249b793b3a9ac281b6d1f1651 Author: Zesong Sun <[email protected]> AuthorDate: Mon Jan 10 10:10:04 2022 +0800 TsFileSketchTool --- .../java/org/apache/iotdb/tsfile/Constant.java | 1 + .../apache/iotdb/db/tools/TsFileRewriteTool.java | 2 + .../apache/iotdb/db/tools/TsFileSketchTool.java | 12 +- .../org/apache/iotdb/db/tools/TsFileSplitTool.java | 180 +++++++++++++++++++++ 4 files changed, 192 insertions(+), 3 deletions(-) diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/Constant.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/Constant.java index 09f4213..62dba14 100644 --- a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/Constant.java +++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/Constant.java @@ -28,4 +28,5 @@ public class Constant { static final String SENSOR_3 = "sensor_3"; static final String DEVICE_PREFIX = "device_"; + static final String SENSOR_ = "sensor_"; } diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java index 3f04c28..1f8dfeb 100644 --- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java +++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java @@ -126,6 +126,8 @@ public class TsFileRewriteTool implements AutoCloseable { } } + public TsFileRewriteTool() {} + /** * Rewrite an old file to the latest version * diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java index 947c1b7..55a1398 100644 --- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java +++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java @@ -23,7 +23,13 @@ import org.apache.iotdb.tsfile.common.conf.TSFileConfig; import org.apache.iotdb.tsfile.file.MetaMarker; import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader; import org.apache.iotdb.tsfile.file.header.PageHeader; -import org.apache.iotdb.tsfile.file.metadata.*; +import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetadata; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.MetadataIndexEntry; +import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode; +import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.iotdb.tsfile.file.metadata.TsFileMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.MetadataIndexNodeType; import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; @@ -51,8 +57,8 @@ public class TsFileSketchTool { public static void main(String[] args) throws IOException { Pair<String, String> fileNames = checkArgs(args); - String filename = fileNames.left; - String outFile = fileNames.right; + String filename = "C:\\Users\\v-zesongsun\\Desktop\\test0-0.tsfile"; // fileNames.left; + String outFile = "C:\\Users\\v-zesongsun\\Desktop\\test0-0.txt"; // fileNames.right; System.out.println("TsFile path:" + filename); System.out.println("Sketch save path:" + outFile); new TsFileSketchTool(filename, outFile).run(); diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSplitTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSplitTool.java new file mode 100644 index 0000000..84d114e --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSplitTool.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.tools; + +import org.apache.iotdb.db.exception.metadata.IllegalPathException; +import org.apache.iotdb.tsfile.common.constant.TsFileConstant; +import org.apache.iotdb.tsfile.encoding.decoder.Decoder; +import org.apache.iotdb.tsfile.exception.write.PageException; +import org.apache.iotdb.tsfile.file.MetaMarker; +import org.apache.iotdb.tsfile.file.header.ChunkHeader; +import org.apache.iotdb.tsfile.file.header.PageHeader; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl; +import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class TsFileSplitTool extends TsFileRewriteTool { + + private static final Logger logger = LoggerFactory.getLogger(TsFileSplitTool.class); + + private String filename; + private TsFileSequenceReader reader; + + /** + * If the chunk point num is lower than this threshold, it will be deserialized into points, + * default is 100 + */ + private final long chunkPointNumLowerBoundInCompaction = 100; + + public static void main(String[] args) throws IOException { + String fileName = "C:\\Users\\v-zesongsun\\Desktop\\test0.tsfile"; // args[0]; + logger.info("Splitting TsFile {} ...", fileName); + try { + new TsFileSplitTool(fileName).run(); + } catch (IllegalPathException | PageException e) { + e.printStackTrace(); + } + } + + /** + * construct TsFileSketchTool + * + * @param filename input file path + */ + public TsFileSplitTool(String filename) { + super(); + try { + this.filename = filename; + this.reader = new TsFileSequenceReader(filename); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** entry of tool */ + public void run() throws IOException, IllegalPathException, PageException { + Iterator<List<Path>> pathIterator = reader.getPathsIterator(); + Set<String> devices = new HashSet<>(); + String fileNamePrefix = filename.substring(0, filename.length() - 7).concat("-"); + int fileIndex = 0; + TsFileIOWriter tsFileIOWriter = null; + while (pathIterator.hasNext()) { + for (Path path : pathIterator.next()) { + String deviceId = path.getDevice(); + if (devices.add(deviceId)) { + if (tsFileIOWriter != null) { + // seal last TsFile + tsFileIOWriter.endFile(); + } + + // open a new TsFile + tsFileIOWriter = + new TsFileIOWriter( + FSFactoryProducer.getFSFactory() + .getFile(fileNamePrefix + fileIndex + TsFileConstant.TSFILE_SUFFIX)); + fileIndex++; + } + + List<ChunkMetadata> chunkMetadataList = reader.getChunkMetadataList(path); + assert tsFileIOWriter != null; + tsFileIOWriter.startChunkGroup(deviceId); + boolean firstChunkInChunkGroup = true; + + for (ChunkMetadata chunkMetadata : chunkMetadataList) { + Chunk chunk = reader.readMemChunk(chunkMetadata); + + ChunkHeader chunkHeader = chunk.getHeader(); + MeasurementSchema measurementSchema = + new MeasurementSchema( + chunkHeader.getMeasurementID(), + chunkHeader.getDataType(), + chunkHeader.getEncodingType(), + chunkHeader.getCompressionType()); + TSDataType dataType = chunkHeader.getDataType(); + List<PageHeader> pageHeadersInChunk = new ArrayList<>(); + List<ByteBuffer> dataInChunk = new ArrayList<>(); + int dataSize = chunkHeader.getDataSize(); + while (dataSize > 0) { + // a new Page + PageHeader pageHeader = + reader.readPageHeader( + dataType, chunkHeader.getChunkType() == MetaMarker.CHUNK_HEADER); + boolean needToDecode = pageHeader.getStatistics() == null; // ignore modification + ByteBuffer pageData = + !needToDecode + ? reader.readCompressedPage(pageHeader) + : reader.readPage(pageHeader, chunkHeader.getCompressionType()); + pageHeadersInChunk.add(pageHeader); + dataInChunk.add(pageData); + dataSize -= pageHeader.getSerializedPageSize(); + } + reWriteChunk( + deviceId, firstChunkInChunkGroup, measurementSchema, pageHeadersInChunk, dataInChunk); + } + tsFileIOWriter.endChunkGroup(); + } + } + logger.info("TsFile {} is split into {} new files.", filename, devices.size()); + } + + protected void reWriteChunk( + String deviceId, + boolean firstChunkInChunkGroup, + MeasurementSchema schema, + List<PageHeader> pageHeadersInChunk, + List<ByteBuffer> pageDataInChunk) + throws IOException, PageException { + valueDecoder = Decoder.getDecoderByType(schema.getEncodingType(), schema.getType()); + Map<Long, ChunkWriterImpl> partitionChunkWriterMap = new HashMap<>(); + for (int i = 0; i < pageDataInChunk.size(); i++) { + writePage(schema, pageHeadersInChunk.get(i), pageDataInChunk.get(i), partitionChunkWriterMap); + } + for (Map.Entry<Long, ChunkWriterImpl> entry : partitionChunkWriterMap.entrySet()) { + long partitionId = entry.getKey(); + TsFileIOWriter tsFileIOWriter = partitionWriterMap.get(partitionId); + if (firstChunkInChunkGroup || !tsFileIOWriter.isWritingChunkGroup()) { + tsFileIOWriter.startChunkGroup(deviceId); + } + // write chunks to their own upgraded tsFiles + IChunkWriter chunkWriter = entry.getValue(); + chunkWriter.writeToFileWriter(tsFileIOWriter); + } + } +}
