This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.13 by this push:
new cd657efec3 [To rel/0.13] Validation and Rewrite file tool (#6438)
cd657efec3 is described below
commit cd657efec3d69fea141aec8c78e322ba931ea26a
Author: Xiangwei Wei <[email protected]>
AuthorDate: Mon Jun 27 22:24:02 2022 +0800
[To rel/0.13] Validation and Rewrite file tool (#6438)
---
pom.xml | 1 +
rewriteFileTool/pom.xml | 82 +++++
.../assembly/resources/sbin/rewrite-file-tool.sh | 48 +++
.../src/assembly/resources/sbin/validate-tsfile.sh | 48 +++
rewriteFileTool/src/assembly/rewriteFileTool.xml | 40 ++
.../java/org/apache/iotdb/RewriteFileTool.java | 367 +++++++++++++++++++
.../org/apache/iotdb/TsFileValidationTool.java | 403 +++++++++++++++++++++
7 files changed, 989 insertions(+)
diff --git a/pom.xml b/pom.xml
index ab27b70744..f3f1dc4479 100644
--- a/pom.xml
+++ b/pom.xml
@@ -109,6 +109,7 @@
<module>client-cpp</module>
<module>metrics</module>
<module>integration</module>
+ <module>rewriteFileTool</module>
<!-- <module>library-udf</module>-->
</modules>
<!-- Properties Management -->
diff --git a/rewriteFileTool/pom.xml b/rewriteFileTool/pom.xml
new file mode 100644
index 0000000000..c833868fab
--- /dev/null
+++ b/rewriteFileTool/pom.xml
@@ -0,0 +1,82 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>iotdb-parent</artifactId>
+ <groupId>org.apache.iotdb</groupId>
+ <version>0.13.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>rewriteFileTool</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-server</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>tsfile</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-session</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>${maven.assembly.version}</version>
+ <executions>
+ <!-- Package binaries-->
+ <execution>
+ <id>client-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <descriptors>
+
<descriptor>src/assembly/rewriteFileTool.xml</descriptor>
+ </descriptors>
+ <appendAssemblyId>false</appendAssemblyId>
+ <archive>
+ <manifest>
+
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
+
<addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
+ </manifest>
+ </archive>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ </properties>
+</project>
diff --git a/rewriteFileTool/src/assembly/resources/sbin/rewrite-file-tool.sh
b/rewriteFileTool/src/assembly/resources/sbin/rewrite-file-tool.sh
new file mode 100644
index 0000000000..988ce60e69
--- /dev/null
+++ b/rewriteFileTool/src/assembly/resources/sbin/rewrite-file-tool.sh
@@ -0,0 +1,48 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+echo ---------------------
+echo Starting Rewriting the bad TsFiles
+echo ---------------------
+
+if [ -z "${IOTDB_HOME}" ]; then
+ export IOTDB_HOME="$(cd "`dirname "$0"`"/..; pwd)"
+fi
+
+if [ -n "$JAVA_HOME" ]; then
+ for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do
+ if [ -x "$java" ]; then
+ JAVA="$java"
+ break
+ fi
+ done
+else
+ JAVA=java
+fi
+
+CLASSPATH=""
+for f in ${IOTDB_HOME}/lib/*.jar; do
+ CLASSPATH=${CLASSPATH}":"$f
+done
+
+MAIN_CLASS=org.apache.iotdb.RewriteFileTool
+
+"$JAVA" -cp "$CLASSPATH" "$MAIN_CLASS" "$@"
+exit $?
diff --git a/rewriteFileTool/src/assembly/resources/sbin/validate-tsfile.sh
b/rewriteFileTool/src/assembly/resources/sbin/validate-tsfile.sh
new file mode 100644
index 0000000000..db50b91400
--- /dev/null
+++ b/rewriteFileTool/src/assembly/resources/sbin/validate-tsfile.sh
@@ -0,0 +1,48 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+echo ---------------------
+echo Starting Validating the TsFile
+echo ---------------------
+
+if [ -z "${IOTDB_HOME}" ]; then
+ export IOTDB_HOME="$(cd "`dirname "$0"`"/..; pwd)"
+fi
+
+if [ -n "$JAVA_HOME" ]; then
+ for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do
+ if [ -x "$java" ]; then
+ JAVA="$java"
+ break
+ fi
+ done
+else
+ JAVA=java
+fi
+
+CLASSPATH=""
+for f in ${IOTDB_HOME}/lib/*.jar; do
+ CLASSPATH=${CLASSPATH}":"$f
+done
+
+MAIN_CLASS=org.apache.iotdb.TsFileValidationTool
+
+"$JAVA" -cp "$CLASSPATH" "$MAIN_CLASS" "$@"
+exit $?
\ No newline at end of file
diff --git a/rewriteFileTool/src/assembly/rewriteFileTool.xml
b/rewriteFileTool/src/assembly/rewriteFileTool.xml
new file mode 100644
index 0000000000..68b7510885
--- /dev/null
+++ b/rewriteFileTool/src/assembly/rewriteFileTool.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<assembly>
+ <id>RewriteFileTool</id>
+ <formats>
+ <format>dir</format>
+ <format>zip</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <dependencySets>
+ <dependencySet>
+ <outputDirectory>lib</outputDirectory>
+ </dependencySet>
+ </dependencySets>
+ <fileSets>
+ <fileSet>
+ <directory>src/assembly/resources</directory>
+ <outputDirectory>${file.separator}</outputDirectory>
+ </fileSet>
+ </fileSets>
+</assembly>
diff --git
a/rewriteFileTool/src/main/java/org/apache/iotdb/RewriteFileTool.java
b/rewriteFileTool/src/main/java/org/apache/iotdb/RewriteFileTool.java
new file mode 100644
index 0000000000..6ad579b025
--- /dev/null
+++ b/rewriteFileTool/src/main/java/org/apache/iotdb/RewriteFileTool.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb;
+
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * This tool reads tsFiles and rewrites it chunk by chunk. It constructs
tablet and invokes
+ * insertTablet() for every page in chunk, in case chunk is too large.
`Unload` command is used to
+ * unload files in iotdb, and mods files are moved manually.
+ */
+public class RewriteFileTool {
+ // backup data dir path
+ private static String backUpDirPath;
+ // validation file path
+ private static String validationFilePath;
+ // tsfile list path
+ private static String tsfileListPath;
+ // output file path
+ private static String outputLogFilePath;
+
+ private static final String HostIP = "localhost";
+ private static final String rpcPort = "6667";
+ private static String user = "root";
+ private static String password = "root";
+
+ private static final FSFactory fsFactory = FSFactoryProducer.getFSFactory();
+ private static final long MAX_TABLET_SIZE = 1024 * 1024;
+
+ private static PrintWriter writer;
+
+ /**
+ * -b=[path of backUp directory] -vf=[path of validation file]/-f=[path of
tsfile list] -o=[path
+ * of output log] -u=[username, default="root"] -pw=[password,
default="root"]
+ */
+ public static void main(String[] args) throws IOException {
+ if (!checkArgs(args)) {
+ System.exit(1);
+ }
+ writer = new PrintWriter(new FileWriter(outputLogFilePath));
+ try {
+ if (validationFilePath != null) {
+ readValidationFile(validationFilePath);
+ }
+ if (tsfileListPath != null) {
+ readTsFileList(tsfileListPath);
+ }
+ } catch (Exception e) {
+ printBoth(e.getMessage());
+ e.printStackTrace();
+ } finally {
+ writer.close();
+ }
+ }
+
+ public static void readValidationFile(String validationFilePath)
+ throws IOException, IoTDBConnectionException {
+ Session session = new Session(HostIP, rpcPort, user, password);
+ session.open(false);
+
+ BufferedReader bufferedReader = new BufferedReader(new
FileReader(validationFilePath));
+ String line;
+ while ((line = bufferedReader.readLine()) != null) {
+ if (!line.startsWith("-- Find the bad file ")) {
+ continue;
+ }
+ String badFilePath = line.replace("-- Find the bad file ", "");
+ unloadAndReWriteWrongTsFile(badFilePath, session);
+ }
+ bufferedReader.close();
+ session.close();
+ printBoth("Finish rewriting all bad files.");
+ }
+
+ public static void readTsFileList(String tsfileListPath)
+ throws IoTDBConnectionException, IOException {
+ Session session = new Session(HostIP, rpcPort, user, password);
+ session.open(false);
+
+ BufferedReader bufferedReader = new BufferedReader(new
FileReader(tsfileListPath));
+ String badFilePath;
+ while ((badFilePath = bufferedReader.readLine()) != null) {
+ unloadAndReWriteWrongTsFile(badFilePath, session);
+ }
+ bufferedReader.close();
+ session.close();
+ printBoth("Finish rewriting all bad files.");
+ }
+
+ public static void unloadAndReWriteWrongTsFile(String filename, Session
session) {
+ try {
+ String[] dirs = filename.split("/");
+ String targetFilePath = backUpDirPath + File.separator +
dirs[dirs.length - 1];
+ File targetFile = new File(targetFilePath);
+ // move mods file
+ File modsFile = new File(filename + ModificationFile.FILE_SUFFIX);
+ if (modsFile.exists()) {
+ fsFactory.moveFile(modsFile, new File(targetFilePath +
ModificationFile.FILE_SUFFIX));
+ }
+ if (targetFile.exists()) {
+ printBoth(String.format("%s is already in the backup dir. Don't need
to move.", filename));
+ } else {
+ printBoth(String.format("Start moving %s to backup dir.", filename));
+ session.executeNonQueryStatement(
+ String.format("unload '%s' '%s'", filename, backUpDirPath));
+ }
+ printBoth(String.format("Finish unloading %s.", filename));
+
+ // try to rewriteFile
+ printBoth(String.format("Start rewriting %s to iotdb.", filename));
+ if (targetFile.exists()) {
+ rewriteWrongTsFile(targetFilePath, session);
+ targetFile.renameTo(new File(targetFilePath + "." + "finish"));
+ } else {
+ printBoth("---- Meet error in rewriting, " + targetFilePath + " does
not exist.");
+ }
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ e.printStackTrace();
+ printBoth("---- Meet error in unloading " + filename + ", " +
e.getMessage());
+ }
+ }
+
+ public static void rewriteWrongTsFile(String filename, Session session) {
+ // read mods file
+ List<Modification> modifications = null;
+ if (FSFactoryProducer.getFSFactory()
+ .getFile(filename + ModificationFile.FILE_SUFFIX)
+ .exists()) {
+ modifications =
+ (List<Modification>)
+ new ModificationFile(filename +
ModificationFile.FILE_SUFFIX).getModifications();
+ }
+
+ try (TsFileSequenceReader reader = new TsFileSequenceReader(filename)) {
+ // Sequential reading of one ChunkGroup now follows this order:
+ // first the CHUNK_GROUP_HEADER, then SeriesChunks (headers and data) in
one ChunkGroup
+ // Because we do not know how many chunks a ChunkGroup may have, we
should read one byte (the
+ // marker) ahead and judge accordingly.
+ reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 1);
+ byte marker;
+ String curDevice = null;
+ long chunkHeaderOffset = -1;
+ while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
+ switch (marker) {
+ case MetaMarker.CHUNK_HEADER:
+ case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
+ chunkHeaderOffset = reader.position() - 1;
+ ChunkHeader header = reader.readChunkHeader(marker);
+ Decoder defaultTimeDecoder =
+ Decoder.getDecoderByType(
+
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
+ TSDataType.INT64);
+ Decoder valueDecoder =
+ Decoder.getDecoderByType(header.getEncodingType(),
header.getDataType());
+ // 1. construct MeasurementSchema from chunkHeader
+ String measurement = header.getMeasurementID();
+ MeasurementSchema measurementSchema =
+ new MeasurementSchema(
+ measurement,
+ header.getDataType(),
+ header.getEncodingType(),
+ header.getCompressionType());
+ // 2. record data point of each measurement
+ int dataSize = header.getDataSize();
+ while (dataSize > 0) {
+ valueDecoder.reset();
+ PageHeader pageHeader =
+ reader.readPageHeader(
+ header.getDataType(), header.getChunkType() ==
MetaMarker.CHUNK_HEADER);
+ ByteBuffer pageData = reader.readPage(pageHeader,
header.getCompressionType());
+ PageReader reader1 =
+ new PageReader(
+ pageData, header.getDataType(), valueDecoder,
defaultTimeDecoder, null);
+ // read delete time range from old modification file
+ List<TimeRange> deleteIntervalList =
+ getOldSortedDeleteIntervals(
+ curDevice, measurementSchema, chunkHeaderOffset,
modifications);
+ reader1.setDeleteIntervalList(deleteIntervalList);
+ BatchData batchData = reader1.getAllSatisfiedPageData();
+ int maxRow;
+ if (header.getChunkType() == MetaMarker.CHUNK_HEADER) {
+ maxRow = (int) pageHeader.getNumOfValues();
+ } else {
+ maxRow = batchData.length();
+ }
+
+ Tablet tablet =
+ new Tablet(curDevice,
Collections.singletonList(measurementSchema), maxRow);
+ long curTabletSize = 0;
+ while (batchData.hasCurrent()) {
+ tablet.addTimestamp(tablet.rowSize, batchData.currentTime());
+ tablet.addValue(measurement, tablet.rowSize,
batchData.currentValue());
+ tablet.rowSize++;
+ // calculate curTabletSize based on timestamp and value
+ curTabletSize += 8;
+ switch (header.getDataType()) {
+ case BOOLEAN:
+ curTabletSize += 1;
+ break;
+ case INT32:
+ case FLOAT:
+ curTabletSize += 4;
+ break;
+ case INT64:
+ case DOUBLE:
+ curTabletSize += 8;
+ break;
+ case TEXT:
+ curTabletSize += 4 + ((Binary)
batchData.currentValue()).getLength();
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Data type %s is not supported.",
header.getDataType()));
+ }
+ // if curTabletSize is over the threshold
+ if (curTabletSize >= MAX_TABLET_SIZE) {
+ session.insertTablet(tablet);
+ curTabletSize = 0;
+ tablet.reset();
+ }
+ batchData.next();
+ }
+ if (tablet.rowSize > 0) {
+ session.insertTablet(tablet);
+ }
+ dataSize -= pageHeader.getSerializedPageSize();
+ }
+ break;
+ case MetaMarker.CHUNK_GROUP_HEADER:
+ ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader();
+ curDevice = chunkGroupHeader.getDeviceID();
+ break;
+ case MetaMarker.OPERATION_INDEX_RANGE:
+ reader.readPlanIndex();
+ reader.getMinPlanIndex();
+ reader.getMaxPlanIndex();
+ break;
+ default:
+ MetaMarker.handleUnexpectedMarker(marker);
+ }
+ }
+ } catch (IllegalPathException
+ | IOException
+ | IoTDBConnectionException
+ | StatementExecutionException e) {
+ printBoth("---- Meet error in rewriting " + filename + ", " +
e.getMessage());
+ printBoth(e.getMessage());
+ e.printStackTrace();
+ }
+ }
+
+ private static boolean checkArgs(String[] args) {
+ String paramConfig =
+ "-b=[path of backUp directory] -vf=[path of validation file]/-f=[path
of tsfile list] -o=[path of output log] -u=[username, default=\"root\"]
-pw=[password, default=\"root\"]";
+ for (String arg : args) {
+ if (arg.startsWith("-b")) {
+ backUpDirPath = arg.substring(arg.indexOf('=') + 1);
+ } else if (arg.startsWith("-vf")) {
+ validationFilePath = arg.substring(arg.indexOf('=') + 1);
+ } else if (arg.startsWith("-f")) {
+ tsfileListPath = arg.substring(arg.indexOf('=') + 1);
+ } else if (arg.startsWith("-o")) {
+ outputLogFilePath = arg.substring(arg.indexOf('=') + 1);
+ } else if (arg.startsWith("-u")) {
+ user = arg.substring(arg.indexOf('=') + 1);
+ } else if (arg.startsWith("-pw")) {
+ password = arg.substring(arg.indexOf('=') + 1);
+ } else {
+ System.out.println("Param incorrect!" + paramConfig);
+ return false;
+ }
+ }
+ if (backUpDirPath == null
+ || (validationFilePath == null && tsfileListPath == null)
+ || outputLogFilePath == null) {
+ System.out.println("Param incorrect!" + paramConfig);
+ return false;
+ }
+ return true;
+ }
+
+ private static void printBoth(String msg) {
+ System.out.println(msg);
+ writer.println(msg);
+ }
+
+ private static List<TimeRange> getOldSortedDeleteIntervals(
+ String deviceId,
+ MeasurementSchema schema,
+ long chunkHeaderOffset,
+ List<Modification> modifications)
+ throws IllegalPathException {
+ if (modifications != null && modifications.size() != 0) {
+ Iterator<Modification> modsIterator = modifications.listIterator();
+ ChunkMetadata chunkMetadata = new ChunkMetadata();
+ Deletion currentDeletion = null;
+ while (modsIterator.hasNext()) {
+ currentDeletion = (Deletion) modsIterator.next();
+ // if deletion path match the chunkPath, then add the deletion to the
list
+ if (currentDeletion
+ .getPath()
+ .matchFullPath(new PartialPath(deviceId + "." +
schema.getMeasurementId()))
+ && currentDeletion.getFileOffset() > chunkHeaderOffset) {
+ chunkMetadata.insertIntoSortedDeletions(
+ currentDeletion.getStartTime(), currentDeletion.getEndTime());
+ }
+ }
+ return chunkMetadata.getDeleteIntervalList();
+ }
+ return null;
+ }
+}
diff --git
a/rewriteFileTool/src/main/java/org/apache/iotdb/TsFileValidationTool.java
b/rewriteFileTool/src/main/java/org/apache/iotdb/TsFileValidationTool.java
new file mode 100644
index 0000000000..beea1a1075
--- /dev/null
+++ b/rewriteFileTool/src/main/java/org/apache/iotdb/TsFileValidationTool.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb;
+
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.file.MetaMarker;
+import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static
org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
+/**
+ * This tool can be used to check the correctness of tsfile and point out
errors in specific
+ * timeseries or devices. The types of errors include the following:
+ *
+ * <p>Device overlap between files
+ *
+ * <p>Timeseries overlap between files
+ *
+ * <p>Timeseries overlap between chunks
+ *
+ * <p>Timeseries overlap between pages
+ *
+ * <p>Timeseries overlap within one page
+ */
+public class TsFileValidationTool {
+ // print detail type of overlap or not
+ private static boolean printDetails = false;
+
+ // print to local file or not
+ private static boolean printToFile = false;
+
+ private static String outFilePath = "TsFile_validation_view.txt";
+
+ private static PrintWriter pw = null;
+
+ private static final Logger logger =
LoggerFactory.getLogger(TsFileValidationTool.class);
+ private static final List<File> seqDataDirList = new ArrayList<>();
+ private static final List<File> fileList = new ArrayList<>();
+ private static int badFileNum = 0;
+
+ /**
+ * The form of param is: [path of data dir or tsfile] [-pd = print details
or not] [-f = path of
+ * outFile]. Eg: xxx/iotdb/data/data1 xxx/xxx.tsfile -pd=true
-f=xxx/TsFile_validation_view.txt
+ *
+ * <p>The first parameter is required, the others are optional.
+ */
+ public static void main(String[] args) throws IOException {
+ if (!checkArgs(args)) {
+ System.exit(1);
+ }
+ if (printToFile) {
+ pw = new PrintWriter(new FileWriter(outFilePath));
+ }
+ if (printDetails) {
+ printBoth("Start checking seq files ...");
+ }
+
+ // check tsfile, which will only check for correctness inside a single
tsfile
+ for (File f : fileList) {
+ findUncorrectFiles(Collections.singletonList(f));
+ }
+
+ // check tsfiles in data dir, which will check for correctness inside one
single tsfile and
+ // between files
+ for (File seqDataDir : seqDataDirList) {
+ // get sg data dirs
+ if (!checkIsDirectory(seqDataDir)) {
+ continue;
+ }
+ File[] sgDirs = seqDataDir.listFiles();
+ for (File sgDir : Objects.requireNonNull(sgDirs)) {
+ if (!checkIsDirectory(sgDir)) {
+ continue;
+ }
+ if (printDetails) {
+ printBoth("- Check files in storage group: " +
sgDir.getAbsolutePath());
+ }
+ // get data region dirs
+ File[] dataRegionDirs = sgDir.listFiles();
+ for (File dataRegionDir : Objects.requireNonNull(dataRegionDirs)) {
+ if (!checkIsDirectory(dataRegionDir)) {
+ continue;
+ }
+ // get time partition dirs and sort them
+ List<File> timePartitionDirs = new ArrayList<>();
+ for (File file : Objects.requireNonNull(dataRegionDir.listFiles())) {
+ if (file != null && !file.getName().endsWith(".DS_Store")) {
+ timePartitionDirs.add(file);
+ }
+ }
+ timePartitionDirs.sort(
+ (f1, f2) ->
+ Long.compareUnsigned(Long.parseLong(f1.getName()),
Long.parseLong(f2.getName())));
+ for (File timePartitionDir : timePartitionDirs) {
+ if (!checkIsDirectory(timePartitionDir)) {
+ continue;
+ }
+ // get all seq files under the time partition dir
+ List<File> tsFiles =
+ Arrays.asList(
+ Objects.requireNonNull(
+ timePartitionDir.listFiles(
+ file -> file.getName().endsWith(TSFILE_SUFFIX))));
+ // sort the seq files with timestamp
+ tsFiles.sort(
+ (f1, f2) ->
+ Long.compareUnsigned(
+ Long.parseLong(f1.getName().split("-")[0]),
+ Long.parseLong(f2.getName().split("-")[0])));
+ findUncorrectFiles(tsFiles);
+ }
+ }
+ }
+ }
+ if (printDetails) {
+ printBoth("Finish checking successfully, totally find " + badFileNum + "
bad files.");
+ }
+ if (printToFile) {
+ pw.close();
+ }
+ }
+
+ private static void findUncorrectFiles(List<File> tsFiles) {
+ // measurementID -> [lastTime, endTimeInLastFile]
+ Map<String, long[]> measurementLastTime = new HashMap<>();
+ // deviceID -> endTime, the endTime of device in the last seq file
+ Map<String, Long> deviceEndTime = new HashMap<>();
+
+ for (File tsFile : tsFiles) {
+ try {
+ TsFileResource resource = new TsFileResource(tsFile);
+ if (!new File(tsFile.getAbsolutePath() +
TsFileResource.RESOURCE_SUFFIX).exists()) {
+ // resource file does not exist, tsfile may not be flushed yet
+ logger.warn(
+ "{} does not exist ,skip it.",
+ tsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
+ continue;
+ } else {
+ resource.deserialize();
+ }
+ boolean isBadFile = false;
+ try (TsFileSequenceReader reader = new
TsFileSequenceReader(tsFile.getAbsolutePath())) {
+ // deviceID -> has checked overlap or not
+ Map<String, Boolean> hasCheckedDeviceOverlap = new HashMap<>();
+ reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length +
1);
+ byte marker;
+ String deviceID = "";
+ Map<String, boolean[]> hasMeasurementPrintedDetails = new
HashMap<>();
+ // measurementId -> lastChunkEndTime in current file
+ Map<String, Long> lashChunkEndTime = new HashMap<>();
+
+ // start reading data points in sequence
+ while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
+ switch (marker) {
+ case MetaMarker.CHUNK_HEADER:
+ case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
+ ChunkHeader header = reader.readChunkHeader(marker);
+ if (header.getDataSize() == 0) {
+ // empty value chunk
+ break;
+ }
+ long currentChunkEndTime = Long.MIN_VALUE;
+ String measurementID = deviceID + "." +
header.getMeasurementID();
+ hasMeasurementPrintedDetails.computeIfAbsent(measurementID, k
-> new boolean[4]);
+ measurementLastTime.computeIfAbsent(
+ measurementID,
+ k -> {
+ long[] arr = new long[2];
+ Arrays.fill(arr, Long.MIN_VALUE);
+ return arr;
+ });
+ Decoder defaultTimeDecoder =
+ Decoder.getDecoderByType(
+ TSEncoding.valueOf(
+
TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
+ TSDataType.INT64);
+ Decoder valueDecoder =
+ Decoder.getDecoderByType(header.getEncodingType(),
header.getDataType());
+ int dataSize = header.getDataSize();
+ long lastPageEndTime = Long.MIN_VALUE;
+ while (dataSize > 0) {
+ valueDecoder.reset();
+ PageHeader pageHeader =
+ reader.readPageHeader(
+ header.getDataType(),
+ (header.getChunkType() & 0x3F) ==
MetaMarker.CHUNK_HEADER);
+ ByteBuffer pageData = reader.readPage(pageHeader,
header.getCompressionType());
+ long currentPageEndTime = Long.MIN_VALUE;
+ // NonAligned Chunk
+ PageReader pageReader =
+ new PageReader(
+ pageData, header.getDataType(), valueDecoder,
defaultTimeDecoder, null);
+ BatchData batchData = pageReader.getAllSatisfiedPageData();
+ while (batchData.hasCurrent()) {
+ long timestamp = batchData.currentTime();
+ if (timestamp <=
measurementLastTime.get(measurementID)[0]) {
+ // find bad file
+ if (!isBadFile) {
+ if (printDetails) {
+ printBoth("-- Find the bad file " +
tsFile.getAbsolutePath());
+ } else {
+ printBoth(tsFile.getAbsolutePath());
+ }
+ isBadFile = true;
+ badFileNum++;
+ }
+ if (printDetails) {
+ if (timestamp <=
measurementLastTime.get(measurementID)[1]) {
+ if
(!hasMeasurementPrintedDetails.get(measurementID)[0]) {
+ printBoth(
+ "-------- Timeseries " + measurementID + "
overlap between files");
+ hasMeasurementPrintedDetails.get(measurementID)[0]
= true;
+ }
+ } else if (timestamp
+ <= lashChunkEndTime.getOrDefault(measurementID,
Long.MIN_VALUE)) {
+ if
(!hasMeasurementPrintedDetails.get(measurementID)[1]) {
+ printBoth(
+ "-------- Timeseries " + measurementID + "
overlap between chunks");
+ hasMeasurementPrintedDetails.get(measurementID)[1]
= true;
+ }
+ } else if (timestamp <= lastPageEndTime) {
+ if
(!hasMeasurementPrintedDetails.get(measurementID)[2]) {
+ printBoth(
+ "-------- Timeseries " + measurementID + "
overlap between pages");
+ hasMeasurementPrintedDetails.get(measurementID)[2]
= true;
+ }
+ } else {
+ if
(!hasMeasurementPrintedDetails.get(measurementID)[3]) {
+ printBoth(
+ "-------- Timeseries "
+ + measurementID
+ + " overlap within one page");
+ hasMeasurementPrintedDetails.get(measurementID)[3]
= true;
+ }
+ }
+ }
+ } else {
+ measurementLastTime.get(measurementID)[0] = timestamp;
+ currentPageEndTime = timestamp;
+ currentChunkEndTime = timestamp;
+ }
+ batchData.next();
+ }
+ dataSize -= pageHeader.getSerializedPageSize();
+ lastPageEndTime = Math.max(lastPageEndTime,
currentPageEndTime);
+ }
+ lashChunkEndTime.put(
+ measurementID,
+ Math.max(
+ lashChunkEndTime.getOrDefault(measurementID,
Long.MIN_VALUE),
+ currentChunkEndTime));
+ break;
+ case MetaMarker.CHUNK_GROUP_HEADER:
+ if (!deviceID.equals("")) {
+ // record the end time of last device in current file
+ if (resource.getEndTime(deviceID)
+ > deviceEndTime.getOrDefault(deviceID, Long.MIN_VALUE)) {
+ deviceEndTime.put(deviceID, resource.getEndTime(deviceID));
+ }
+ }
+ ChunkGroupHeader chunkGroupHeader =
reader.readChunkGroupHeader();
+ deviceID = chunkGroupHeader.getDeviceID();
+ if (!hasCheckedDeviceOverlap.getOrDefault(deviceID, false)
+ && resource.getStartTime(deviceID)
+ <= deviceEndTime.getOrDefault(deviceID,
Long.MIN_VALUE)) {
+ // find bad file
+ if (!isBadFile) {
+ if (printDetails) {
+ printBoth("-- Find the bad file " +
tsFile.getAbsolutePath());
+ } else {
+ printBoth(tsFile.getAbsolutePath());
+ }
+ isBadFile = true;
+ badFileNum++;
+ }
+ if (printDetails) {
+ printBoth("---- Device " + deviceID + " overlap between
files");
+ }
+ }
+ hasCheckedDeviceOverlap.put(deviceID, true);
+ break;
+ case MetaMarker.OPERATION_INDEX_RANGE:
+ reader.readPlanIndex();
+ break;
+ default:
+ MetaMarker.handleUnexpectedMarker(marker);
+ }
+ }
+
+ // record the end time of each timeseries in current file
+ for (Map.Entry<String, Long> entry : lashChunkEndTime.entrySet()) {
+ Long endTime =
Math.max(measurementLastTime.get(entry.getKey())[1], entry.getValue());
+ measurementLastTime.get(entry.getKey())[1] = endTime;
+ }
+ }
+ } catch (Throwable e) {
+ logger.error("Meet errors in reading file {} , skip it.",
tsFile.getAbsolutePath(), e);
+ if (printDetails) {
+ printBoth("-- Meet errors in reading file " +
tsFile.getAbsolutePath());
+ }
+ }
+ }
+ }
+
+ public static boolean checkArgs(String[] args) {
+ if (args.length < 1) {
+ System.out.println(
+ "Please input correct param, which is [path of data dir] [-pd =
print details or not] [-f = path of outFile]. Eg: xxx/iotdb/data/data -pd=true
-f=xxx/TsFile_validation_view.txt");
+ return false;
+ } else {
+ for (String arg : args) {
+ if (arg.startsWith("-pd")) {
+ printDetails = Boolean.parseBoolean(arg.split("=")[1]);
+ } else if (arg.startsWith("-f")) {
+ printToFile = true;
+ outFilePath = arg.split("=")[1];
+ } else {
+ File f = new File(arg);
+ if (f.isDirectory()
+ && Objects.requireNonNull(
+ f.list(
+ (dir, name) ->
+ (name.equals("sequence") ||
name.equals("unsequence"))))
+ .length
+ == 2) {
+ File seqDataDir = new File(f, "sequence");
+ seqDataDirList.add(seqDataDir);
+ } else if (arg.endsWith(TSFILE_SUFFIX) && f.isFile()) {
+ fileList.add(f);
+ } else {
+ System.out.println(arg + " is not a correct data directory or
tsfile of IOTDB.");
+ return false;
+ }
+ }
+ }
+ if (seqDataDirList.size() == 0 && fileList.size() == 0) {
+ System.out.println(
+ "Please input correct param, which is [path of data dir] [-pd =
print details or not] [-f = path of outFile]. Eg: xxx/iotdb/data/data -pd=true
-f=xxx/TsFile_validation_view.txt");
+ return false;
+ }
+ return true;
+ }
+ }
+
+ private static boolean checkIsDirectory(File dir) {
+ boolean res = true;
+ if (!dir.isDirectory()) {
+ logger.error("{} is not a directory or does not exist, skip it.",
dir.getAbsolutePath());
+ res = false;
+ }
+ return res;
+ }
+
+ private static void printBoth(String msg) {
+ System.out.println(msg);
+ if (printToFile) {
+ pw.println(msg);
+ }
+ }
+}