This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-5010
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ca26589529cc0cc1c5b3ec2612d8a71e4ab51d7d
Author: LiuXuxin <[email protected]>
AuthorDate: Tue May 31 20:47:41 2022 +0800

    add TsFileChecker
---
 example/tsfile/pom.xml                             |  28 +++
 .../org/apache/iotdb/tsfile/TsFileChecker.java     | 225 +++++++++++++++++++++
 2 files changed, 253 insertions(+)

diff --git a/example/tsfile/pom.xml b/example/tsfile/pom.xml
index c9ccceae03..a84f95381c 100644
--- a/example/tsfile/pom.xml
+++ b/example/tsfile/pom.xml
@@ -36,4 +36,32 @@
             <version>${project.version}</version>
         </dependency>
     </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>2.5.5</version>
+                <executions>
+                    <execution>
+                        <configuration>
+                            <archive>
+                                <manifest>
+                                    
<mainClass>org.apache.iotdb.tsfile.TsFileChecker</mainClass>
+                                </manifest>
+                            </archive>
+                            <descriptorRefs>
+                                
<descriptorRef>jar-with-dependencies</descriptorRef>
+                            </descriptorRefs>
+                        </configuration>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>
diff --git 
a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileChecker.java 
b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileChecker.java
new file mode 100644
index 0000000000..18ad91fe03
--- /dev/null
+++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileChecker.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile;
+
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+
+import ch.qos.logback.classic.Level;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TsFileChecker {
+  static String dirPath = "data/";
+  static boolean checkBySequenceReading = true;
+  static boolean concurrentlyCheck = true;
+  static AtomicInteger currCheckIdx = new AtomicInteger(0);
+  static String outputFileName = "broken_files.txt";
+  static long startTime = 0;
+
+  public static void main(String[] args) throws InterruptedException, 
IOException {
+    if (args.length > 0) {
+      dirPath = args[0];
+      checkBySequenceReading = Boolean.parseBoolean(args[1]);
+      concurrentlyCheck = Boolean.parseBoolean(args[2]);
+    }
+    ch.qos.logback.classic.Logger rootLogger =
+        (ch.qos.logback.classic.Logger)
+            
LoggerFactory.getLogger(ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME);
+    rootLogger.setLevel(Level.toLevel("info"));
+    File dirFile = new File(dirPath);
+    if (!dirFile.exists() || !dirFile.isDirectory()) {
+      return;
+    }
+    List<File> collectedFiles = new ArrayList<>();
+    collectFileRecursively(dirFile, collectedFiles);
+    File[] files = collectedFiles.toArray(new File[0]);
+    List<File> brokenFiles = concurrentlyCheck ? new CopyOnWriteArrayList<>() 
: new ArrayList<>();
+
+    System.out.printf("There are %d files to be checked\n", files.length);
+
+    if (checkBySequenceReading) {
+      checkBySeqRead(files, brokenFiles);
+    } else {
+      checkByReadFromTail(files, brokenFiles);
+    }
+
+    BufferedOutputStream os =
+        new BufferedOutputStream(Files.newOutputStream(new 
File(outputFileName).toPath()));
+    if (brokenFiles.size() > 0) {
+      System.out.println();
+      System.out.println();
+      System.out.println(
+          "--------------------------- Here are the broken files 
---------------------------");
+      os.write(
+          "--------------------------- Here are the broken files 
---------------------------\n"
+              .getBytes());
+      for (File f : brokenFiles) {
+        System.out.println(f);
+        os.write((f.toString() + "\n").getBytes());
+      }
+    } else {
+      System.out.println("No broken files");
+      os.write("No broken files".getBytes());
+    }
+    os.close();
+  }
+
+  static void collectFileRecursively(File currFile, List<File> collectedFiles) 
{
+    if (currFile.isDirectory()) {
+      File[] files = currFile.listFiles();
+      for (File file : files) {
+        if (file.isDirectory()) {
+          collectFileRecursively(file, collectedFiles);
+        } else if (file.getName().endsWith(".tsfile")) {
+          collectedFiles.add(file);
+        }
+      }
+    } else if (currFile.getName().endsWith(".tsfile")) {
+      collectedFiles.add(currFile);
+    }
+  }
+
+  static void checkBySeqRead(File[] files, List<File> brokenFiles) throws 
InterruptedException {
+    startTime = System.currentTimeMillis();
+    if (concurrentlyCheck) {
+      int threadNum = Runtime.getRuntime().availableProcessors();
+      Thread[] threads = new Thread[threadNum];
+      for (int i = 0; i < threadNum; ++i) {
+        threads[i] = new Thread(() -> concurrentSeqCheck(files, brokenFiles));
+        threads[i].start();
+      }
+      for (Thread thread : threads) {
+        thread.join();
+      }
+    } else {
+      concurrentSeqCheck(files, brokenFiles);
+    }
+  }
+
+  static void concurrentSeqCheck(File[] files, List<File> brokenFiles) {
+    int currIdx;
+    int totalFileCnt = files.length;
+    int printInterval = Math.max(1, totalFileCnt / 500);
+    while ((currIdx = currCheckIdx.getAndIncrement()) < totalFileCnt) {
+      if (currIdx > 1 && currIdx % printInterval == 0) {
+        synchronized (TsFileChecker.class) {
+          System.out.printf(
+              "%d / %d\t%.2f%%\t%.1f seconds left\n",
+              currIdx + 1,
+              totalFileCnt,
+              ((double) (currIdx + 1)) / totalFileCnt * 100.0d,
+              (System.currentTimeMillis() - startTime)
+                  * ((double) totalFileCnt - currIdx - 1)
+                  / (currIdx + 1)
+                  / 1000.0d);
+        }
+      }
+      File file = files[currIdx];
+      try (TsFileSequenceReader reader = new 
TsFileSequenceReader(file.getAbsolutePath())) {
+        reader.readHeadMagic();
+        reader.readTailMagic();
+        // Sequential reading of one ChunkGroup now follows this order:
+        // first the CHUNK_GROUP_HEADER, then SeriesChunks (headers and data) 
in one ChunkGroup
+        // Because we do not know how many chunks a ChunkGroup may have, we 
should read one byte
+        // (the
+        // marker) ahead and judge accordingly.
+        reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 
1);
+        byte marker;
+        while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
+          switch (marker) {
+            case MetaMarker.CHUNK_HEADER:
+            case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
+              ChunkHeader header = reader.readChunkHeader(marker);
+              Decoder defaultTimeDecoder =
+                  Decoder.getDecoderByType(
+                      TSEncoding.valueOf(
+                          
TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
+                      TSDataType.INT64);
+              Decoder valueDecoder =
+                  Decoder.getDecoderByType(header.getEncodingType(), 
header.getDataType());
+              int dataSize = header.getDataSize();
+              while (dataSize > 0) {
+                valueDecoder.reset();
+                PageHeader pageHeader =
+                    reader.readPageHeader(
+                        header.getDataType(), header.getChunkType() == 
MetaMarker.CHUNK_HEADER);
+                ByteBuffer pageData = reader.readPage(pageHeader, 
header.getCompressionType());
+                PageReader reader1 =
+                    new PageReader(
+                        pageData, header.getDataType(), valueDecoder, 
defaultTimeDecoder, null);
+                BatchData batchData = reader1.getAllSatisfiedPageData();
+                while (batchData.hasCurrent()) {
+                  batchData.next();
+                }
+                dataSize -= pageHeader.getSerializedPageSize();
+              }
+              break;
+            case MetaMarker.CHUNK_GROUP_HEADER:
+              ChunkGroupHeader chunkGroupHeader = 
reader.readChunkGroupHeader();
+              break;
+            case MetaMarker.OPERATION_INDEX_RANGE:
+              reader.readPlanIndex();
+              break;
+            default:
+              MetaMarker.handleUnexpectedMarker(marker);
+          }
+        }
+        for (String device : reader.getAllDevices()) {
+          Map<String, List<ChunkMetadata>> seriesMetaData =
+              reader.readChunkMetadataInDevice(device);
+          for (Map.Entry<String, List<ChunkMetadata>> serie : 
seriesMetaData.entrySet()) {
+            for (ChunkMetadata chunkMetadata : serie.getValue()) {}
+          }
+        }
+      } catch (Throwable t) {
+        synchronized (TsFileChecker.class) {
+          System.out.println("Exception occurs while reading " + file);
+          t.printStackTrace();
+        }
+        brokenFiles.add(file);
+      }
+    }
+  }
+
+  static void checkByReadFromTail(File[] files, List<File> brokenFiles) {
+    // TODO: check by read the metadatas, and read the chunks
+  }
+}

Reply via email to