This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch IOTDB-5010 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ca26589529cc0cc1c5b3ec2612d8a71e4ab51d7d Author: LiuXuxin <[email protected]> AuthorDate: Tue May 31 20:47:41 2022 +0800 add TsFileChecker --- example/tsfile/pom.xml | 28 +++ .../org/apache/iotdb/tsfile/TsFileChecker.java | 225 +++++++++++++++++++++ 2 files changed, 253 insertions(+) diff --git a/example/tsfile/pom.xml b/example/tsfile/pom.xml index c9ccceae03..a84f95381c 100644 --- a/example/tsfile/pom.xml +++ b/example/tsfile/pom.xml @@ -36,4 +36,32 @@ <version>${project.version}</version> </dependency> </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-assembly-plugin</artifactId> + <version>2.5.5</version> + <executions> + <execution> + <configuration> + <archive> + <manifest> + <mainClass>org.apache.iotdb.tsfile.TsFileChecker</mainClass> + </manifest> + </archive> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + <id>make-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> </project> diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileChecker.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileChecker.java new file mode 100644 index 0000000000..18ad91fe03 --- /dev/null +++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileChecker.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.tsfile; + +import org.apache.iotdb.tsfile.common.conf.TSFileConfig; +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; +import org.apache.iotdb.tsfile.encoding.decoder.Decoder; +import org.apache.iotdb.tsfile.file.MetaMarker; +import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader; +import org.apache.iotdb.tsfile.file.header.ChunkHeader; +import org.apache.iotdb.tsfile.file.header.PageHeader; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.reader.page.PageReader; + +import ch.qos.logback.classic.Level; +import org.slf4j.LoggerFactory; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +public class TsFileChecker { + static String dirPath = "data/"; + static boolean checkBySequenceReading = true; + static boolean concurrentlyCheck = true; + static AtomicInteger currCheckIdx = new AtomicInteger(0); + static String outputFileName = "broken_files.txt"; + static long startTime = 0; + + public static void main(String[] args) throws InterruptedException, IOException { + if (args.length > 0) { + dirPath = args[0]; + checkBySequenceReading = Boolean.parseBoolean(args[1]); + concurrentlyCheck = Boolean.parseBoolean(args[2]); + } + ch.qos.logback.classic.Logger rootLogger = + (ch.qos.logback.classic.Logger) + LoggerFactory.getLogger(ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME); + rootLogger.setLevel(Level.toLevel("info")); + File dirFile = new File(dirPath); + if (!dirFile.exists() || !dirFile.isDirectory()) { + return; + } + List<File> collectedFiles = new ArrayList<>(); + collectFileRecursively(dirFile, collectedFiles); + File[] files = collectedFiles.toArray(new File[0]); + List<File> brokenFiles = concurrentlyCheck ? new CopyOnWriteArrayList<>() : new ArrayList<>(); + + System.out.printf("There are %d files to be checked\n", files.length); + + if (checkBySequenceReading) { + checkBySeqRead(files, brokenFiles); + } else { + checkByReadFromTail(files, brokenFiles); + } + + BufferedOutputStream os = + new BufferedOutputStream(Files.newOutputStream(new File(outputFileName).toPath())); + if (brokenFiles.size() > 0) { + System.out.println(); + System.out.println(); + System.out.println( + "--------------------------- Here are the broken files ---------------------------"); + os.write( + "--------------------------- Here are the broken files ---------------------------\n" + .getBytes()); + for (File f : brokenFiles) { + System.out.println(f); + os.write((f.toString() + "\n").getBytes()); + } + } else { + System.out.println("No broken files"); + os.write("No broken files".getBytes()); + } + os.close(); + } + + static void collectFileRecursively(File currFile, List<File> collectedFiles) { + if (currFile.isDirectory()) { + File[] files = currFile.listFiles(); + for (File file : files) { + if (file.isDirectory()) { + collectFileRecursively(file, collectedFiles); + } else if (file.getName().endsWith(".tsfile")) { + collectedFiles.add(file); + } + } + } else if (currFile.getName().endsWith(".tsfile")) { + collectedFiles.add(currFile); + } + } + + static void checkBySeqRead(File[] files, List<File> brokenFiles) throws InterruptedException { + startTime = System.currentTimeMillis(); + if (concurrentlyCheck) { + int threadNum = Runtime.getRuntime().availableProcessors(); + Thread[] threads = new Thread[threadNum]; + for (int i = 0; i < threadNum; ++i) { + threads[i] = new Thread(() -> concurrentSeqCheck(files, brokenFiles)); + threads[i].start(); + } + for (Thread thread : threads) { + thread.join(); + } + } else { + concurrentSeqCheck(files, brokenFiles); + } + } + + static void concurrentSeqCheck(File[] files, List<File> brokenFiles) { + int currIdx; + int totalFileCnt = files.length; + int printInterval = Math.max(1, totalFileCnt / 500); + while ((currIdx = currCheckIdx.getAndIncrement()) < totalFileCnt) { + if (currIdx > 1 && currIdx % printInterval == 0) { + synchronized (TsFileChecker.class) { + System.out.printf( + "%d / %d\t%.2f%%\t%.1f seconds left\n", + currIdx + 1, + totalFileCnt, + ((double) (currIdx + 1)) / totalFileCnt * 100.0d, + (System.currentTimeMillis() - startTime) + * ((double) totalFileCnt - currIdx - 1) + / (currIdx + 1) + / 1000.0d); + } + } + File file = files[currIdx]; + try (TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath())) { + reader.readHeadMagic(); + reader.readTailMagic(); + // Sequential reading of one ChunkGroup now follows this order: + // first the CHUNK_GROUP_HEADER, then SeriesChunks (headers and data) in one ChunkGroup + // Because we do not know how many chunks a ChunkGroup may have, we should read one byte + // (the + // marker) ahead and judge accordingly. + reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 1); + byte marker; + while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) { + switch (marker) { + case MetaMarker.CHUNK_HEADER: + case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER: + ChunkHeader header = reader.readChunkHeader(marker); + Decoder defaultTimeDecoder = + Decoder.getDecoderByType( + TSEncoding.valueOf( + TSFileDescriptor.getInstance().getConfig().getTimeEncoder()), + TSDataType.INT64); + Decoder valueDecoder = + Decoder.getDecoderByType(header.getEncodingType(), header.getDataType()); + int dataSize = header.getDataSize(); + while (dataSize > 0) { + valueDecoder.reset(); + PageHeader pageHeader = + reader.readPageHeader( + header.getDataType(), header.getChunkType() == MetaMarker.CHUNK_HEADER); + ByteBuffer pageData = reader.readPage(pageHeader, header.getCompressionType()); + PageReader reader1 = + new PageReader( + pageData, header.getDataType(), valueDecoder, defaultTimeDecoder, null); + BatchData batchData = reader1.getAllSatisfiedPageData(); + while (batchData.hasCurrent()) { + batchData.next(); + } + dataSize -= pageHeader.getSerializedPageSize(); + } + break; + case MetaMarker.CHUNK_GROUP_HEADER: + ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader(); + break; + case MetaMarker.OPERATION_INDEX_RANGE: + reader.readPlanIndex(); + break; + default: + MetaMarker.handleUnexpectedMarker(marker); + } + } + for (String device : reader.getAllDevices()) { + Map<String, List<ChunkMetadata>> seriesMetaData = + reader.readChunkMetadataInDevice(device); + for (Map.Entry<String, List<ChunkMetadata>> serie : seriesMetaData.entrySet()) { + for (ChunkMetadata chunkMetadata : serie.getValue()) {} + } + } + } catch (Throwable t) { + synchronized (TsFileChecker.class) { + System.out.println("Exception occurs while reading " + file); + t.printStackTrace(); + } + brokenFiles.add(file); + } + } + } + + static void checkByReadFromTail(File[] files, List<File> brokenFiles) { + // TODO: check by read the metadatas, and read the chunks + } +}
