This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new 80eb11ccb1c Load: Support remote load in import-tsfile script & Fix:
remote load in import-tsfile script (#13438) (#13439)
80eb11ccb1c is described below
commit 80eb11ccb1cbb2ed3dd927205edd997bb24fffa9
Author: YC27 <[email protected]>
AuthorDate: Mon Sep 9 12:04:08 2024 +0800
Load: Support remote load in import-tsfile script & Fix: remote load in
import-tsfile script (#13438) (#13439)
Co-authored-by: Steve Yurong Su <[email protected]>
---
iotdb-client/cli/pom.xml | 10 +
.../cli/src/assembly/resources/tools/backup.bat | 2 +-
.../cli/src/assembly/resources/tools/backup.sh | 2 +-
.../src/assembly/resources/tools/export-data.bat | 2 +-
.../src/assembly/resources/tools/export-data.sh | 2 +-
.../src/assembly/resources/tools/export-schema.bat | 2 +-
.../src/assembly/resources/tools/export-schema.sh | 2 +-
.../src/assembly/resources/tools/export-tsfile.bat | 2 +-
.../src/assembly/resources/tools/export-tsfile.sh | 2 +-
.../src/assembly/resources/tools/import-data.bat | 2 +-
.../src/assembly/resources/tools/import-data.sh | 2 +-
.../src/assembly/resources/tools/import-schema.bat | 2 +-
.../src/assembly/resources/tools/import-schema.sh | 2 +-
.../src/assembly/resources/tools/load-tsfile.bat | 2 +-
.../src/assembly/resources/tools/load-tsfile.sh | 2 +-
.../java/org/apache/iotdb/cli/AbstractCli.java | 2 +-
.../iotdb/tool/{ => backup}/IoTDBDataBackTool.java | 3 +-
.../iotdb/tool/{ => data}/AbstractDataTool.java | 2 +-
.../apache/iotdb/tool/{ => data}/ExportData.java | 2 +-
.../apache/iotdb/tool/{ => data}/ImportData.java | 2 +-
.../tool/{ => schema}/AbstractSchemaTool.java | 2 +-
.../iotdb/tool/{ => schema}/ExportSchema.java | 2 +-
.../iotdb/tool/{ => schema}/ImportSchema.java | 2 +-
.../tool/{ => tsfile}/AbstractTsFileTool.java | 2 +-
.../iotdb/tool/{ => tsfile}/ExportTsFile.java | 2 +-
.../iotdb/tool/{ => tsfile}/ImportTsFile.java | 302 ++++--------------
.../apache/iotdb/tool/tsfile/ImportTsFileBase.java | 243 +++++++++++++++
.../iotdb/tool/tsfile/ImportTsFileLocally.java | 53 ++++
.../iotdb/tool/tsfile/ImportTsFileRemotely.java | 338 +++++++++++++++++++++
.../iotdb/tool/tsfile/ImportTsFileScanTool.java | 95 ++++++
.../org/apache/iotdb/tool/WriteDataFileTest.java | 2 +
31 files changed, 825 insertions(+), 267 deletions(-)
diff --git a/iotdb-client/cli/pom.xml b/iotdb-client/cli/pom.xml
index cc62e9e8d8d..6b90834c9f0 100644
--- a/iotdb-client/cli/pom.xml
+++ b/iotdb-client/cli/pom.xml
@@ -84,6 +84,16 @@
<artifactId>iotdb-thrift</artifactId>
<version>1.3.3-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-thrift-commons</artifactId>
+ <version>1.3.3-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>pipe-api</artifactId>
+ <version>1.3.3-SNAPSHOT</version>
+ </dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
diff --git a/iotdb-client/cli/src/assembly/resources/tools/backup.bat
b/iotdb-client/cli/src/assembly/resources/tools/backup.bat
index 3008a74733e..e7974164b08 100644
--- a/iotdb-client/cli/src/assembly/resources/tools/backup.bat
+++ b/iotdb-client/cli/src/assembly/resources/tools/backup.bat
@@ -102,7 +102,7 @@ echo Starting IoTDB Client Data Back Script
echo ------------------------------------------
set CLASSPATH="%IOTDB_HOME%\lib\*"
-if NOT DEFINED MAIN_CLASS set
MAIN_CLASS=org.apache.iotdb.tool.IoTDBDataBackTool
+if NOT DEFINED MAIN_CLASS set
MAIN_CLASS=org.apache.iotdb.tool.backup.IoTDBDataBackTool
set logsDir="%IOTDB_HOME%\logs"
if not exist "%logsDir%" (
diff --git a/iotdb-client/cli/src/assembly/resources/tools/backup.sh
b/iotdb-client/cli/src/assembly/resources/tools/backup.sh
index 7ef5cdc5777..d65392cdcca 100644
--- a/iotdb-client/cli/src/assembly/resources/tools/backup.sh
+++ b/iotdb-client/cli/src/assembly/resources/tools/backup.sh
@@ -126,7 +126,7 @@ for f in ${IOTDB_HOME}/lib/*.jar; do
CLASSPATH=${CLASSPATH}":"$f
done
-MAIN_CLASS=org.apache.iotdb.tool.IoTDBDataBackTool
+MAIN_CLASS=org.apache.iotdb.tool.backup.IoTDBDataBackTool
logs_dir="${IOTDB_HOME}/logs"
diff --git a/iotdb-client/cli/src/assembly/resources/tools/export-data.bat
b/iotdb-client/cli/src/assembly/resources/tools/export-data.bat
index c3acfff5f92..6df7b11d00c 100644
--- a/iotdb-client/cli/src/assembly/resources/tools/export-data.bat
+++ b/iotdb-client/cli/src/assembly/resources/tools/export-data.bat
@@ -31,7 +31,7 @@ pushd %~dp0..
if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%CD%
popd
-if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.tool.ExportData
+if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.tool.data.ExportData
if NOT DEFINED JAVA_HOME goto :err
@REM
-----------------------------------------------------------------------------
diff --git a/iotdb-client/cli/src/assembly/resources/tools/export-data.sh
b/iotdb-client/cli/src/assembly/resources/tools/export-data.sh
index 8aa7491b945..fbf81c67616 100644
--- a/iotdb-client/cli/src/assembly/resources/tools/export-data.sh
+++ b/iotdb-client/cli/src/assembly/resources/tools/export-data.sh
@@ -53,7 +53,7 @@ for f in ${IOTDB_HOME}/lib/*.jar; do
CLASSPATH=${CLASSPATH}":"$f
done
-MAIN_CLASS=org.apache.iotdb.tool.ExportData
+MAIN_CLASS=org.apache.iotdb.tool.data.ExportData
"$JAVA" -DIOTDB_HOME=${IOTDB_HOME} -cp "$CLASSPATH" "$MAIN_CLASS" "$@"
exit $?
\ No newline at end of file
diff --git a/iotdb-client/cli/src/assembly/resources/tools/export-schema.bat
b/iotdb-client/cli/src/assembly/resources/tools/export-schema.bat
index 8ed2b81f5c7..dab5dfaf667 100644
--- a/iotdb-client/cli/src/assembly/resources/tools/export-schema.bat
+++ b/iotdb-client/cli/src/assembly/resources/tools/export-schema.bat
@@ -31,7 +31,7 @@ pushd %~dp0..
if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%CD%
popd
-if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.tool.ExportSchema
+if NOT DEFINED MAIN_CLASS set
MAIN_CLASS=org.apache.iotdb.tool.schema.ExportSchema
if NOT DEFINED JAVA_HOME goto :err
@REM
-----------------------------------------------------------------------------
diff --git a/iotdb-client/cli/src/assembly/resources/tools/export-schema.sh
b/iotdb-client/cli/src/assembly/resources/tools/export-schema.sh
index b65a7d3950a..e4d18590799 100644
--- a/iotdb-client/cli/src/assembly/resources/tools/export-schema.sh
+++ b/iotdb-client/cli/src/assembly/resources/tools/export-schema.sh
@@ -51,7 +51,7 @@ fi
CLASSPATH=${IOTDB_HOME}/lib/*
-MAIN_CLASS=org.apache.iotdb.tool.ExportSchema
+MAIN_CLASS=org.apache.iotdb.tool.schema.ExportSchema
"$JAVA" -DIOTDB_HOME=${IOTDB_HOME} -cp "$CLASSPATH" "$MAIN_CLASS" "$@"
exit $?
\ No newline at end of file
diff --git a/iotdb-client/cli/src/assembly/resources/tools/export-tsfile.bat
b/iotdb-client/cli/src/assembly/resources/tools/export-tsfile.bat
index 350e806a769..2c85f42bd62 100644
--- a/iotdb-client/cli/src/assembly/resources/tools/export-tsfile.bat
+++ b/iotdb-client/cli/src/assembly/resources/tools/export-tsfile.bat
@@ -31,7 +31,7 @@ pushd %~dp0..
if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%CD%
popd
-if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.tool.ExportTsFile
+if NOT DEFINED MAIN_CLASS set
MAIN_CLASS=org.apache.iotdb.tool.tsfile.ExportTsFile
if NOT DEFINED JAVA_HOME goto :err
@REM
-----------------------------------------------------------------------------
diff --git a/iotdb-client/cli/src/assembly/resources/tools/export-tsfile.sh
b/iotdb-client/cli/src/assembly/resources/tools/export-tsfile.sh
index a19eec09f4f..ccc9df3e3a4 100644
--- a/iotdb-client/cli/src/assembly/resources/tools/export-tsfile.sh
+++ b/iotdb-client/cli/src/assembly/resources/tools/export-tsfile.sh
@@ -53,7 +53,7 @@ for f in ${IOTDB_HOME}/lib/*.jar; do
CLASSPATH=${CLASSPATH}":"$f
done
-MAIN_CLASS=org.apache.iotdb.tool.ExportTsFile
+MAIN_CLASS=org.apache.iotdb.tool.tsfile.ExportTsFile
"$JAVA" -DIOTDB_HOME=${IOTDB_HOME} -cp "$CLASSPATH" "$MAIN_CLASS" "$@"
exit $?
\ No newline at end of file
diff --git a/iotdb-client/cli/src/assembly/resources/tools/import-data.bat
b/iotdb-client/cli/src/assembly/resources/tools/import-data.bat
index cb34c5e897a..a4041015ccc 100644
--- a/iotdb-client/cli/src/assembly/resources/tools/import-data.bat
+++ b/iotdb-client/cli/src/assembly/resources/tools/import-data.bat
@@ -31,7 +31,7 @@ pushd %~dp0..
if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%CD%
popd
-if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.tool.ImportData
+if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.tool.data.ImportData
if NOT DEFINED JAVA_HOME goto :err
@REM
-----------------------------------------------------------------------------
diff --git a/iotdb-client/cli/src/assembly/resources/tools/import-data.sh
b/iotdb-client/cli/src/assembly/resources/tools/import-data.sh
index 97102b854c7..2cebfea59bf 100644
--- a/iotdb-client/cli/src/assembly/resources/tools/import-data.sh
+++ b/iotdb-client/cli/src/assembly/resources/tools/import-data.sh
@@ -53,7 +53,7 @@ for f in ${IOTDB_HOME}/lib/*.jar; do
CLASSPATH=${CLASSPATH}":"$f
done
-MAIN_CLASS=org.apache.iotdb.tool.ImportData
+MAIN_CLASS=org.apache.iotdb.tool.data.ImportData
"$JAVA" -DIOTDB_HOME=${IOTDB_HOME} -cp "$CLASSPATH" "$MAIN_CLASS" "$@"
exit $?
\ No newline at end of file
diff --git a/iotdb-client/cli/src/assembly/resources/tools/import-schema.bat
b/iotdb-client/cli/src/assembly/resources/tools/import-schema.bat
index 46a8d5abf6e..fbf5236128b 100644
--- a/iotdb-client/cli/src/assembly/resources/tools/import-schema.bat
+++ b/iotdb-client/cli/src/assembly/resources/tools/import-schema.bat
@@ -31,7 +31,7 @@ pushd %~dp0..
if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%CD%
popd
-if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.tool.ImportSchema
+if NOT DEFINED MAIN_CLASS set
MAIN_CLASS=org.apache.iotdb.tool.schema.ImportSchema
if NOT DEFINED JAVA_HOME goto :err
@REM
-----------------------------------------------------------------------------
diff --git a/iotdb-client/cli/src/assembly/resources/tools/import-schema.sh
b/iotdb-client/cli/src/assembly/resources/tools/import-schema.sh
index ca91293e235..3954446beb2 100644
--- a/iotdb-client/cli/src/assembly/resources/tools/import-schema.sh
+++ b/iotdb-client/cli/src/assembly/resources/tools/import-schema.sh
@@ -51,7 +51,7 @@ fi
CLASSPATH=${IOTDB_HOME}/lib/*
-MAIN_CLASS=org.apache.iotdb.tool.ImportSchema
+MAIN_CLASS=org.apache.iotdb.tool.schema.ImportSchema
"$JAVA" -DIOTDB_HOME=${IOTDB_HOME} -cp "$CLASSPATH" "$MAIN_CLASS" "$@"
exit $?
\ No newline at end of file
diff --git a/iotdb-client/cli/src/assembly/resources/tools/load-tsfile.bat
b/iotdb-client/cli/src/assembly/resources/tools/load-tsfile.bat
index bd1f4c9170c..52ae0a46b76 100755
--- a/iotdb-client/cli/src/assembly/resources/tools/load-tsfile.bat
+++ b/iotdb-client/cli/src/assembly/resources/tools/load-tsfile.bat
@@ -30,7 +30,7 @@ pushd %~dp0..
if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%CD%
popd
-if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.tool.ImportTsFile
+if NOT DEFINED MAIN_CLASS set
MAIN_CLASS=org.apache.iotdb.tool.tsfile.ImportTsFile
if NOT DEFINED JAVA_HOME goto :err
@REM
-----------------------------------------------------------------------------
diff --git a/iotdb-client/cli/src/assembly/resources/tools/load-tsfile.sh
b/iotdb-client/cli/src/assembly/resources/tools/load-tsfile.sh
index b7ded896c02..820ca5fcc2d 100755
--- a/iotdb-client/cli/src/assembly/resources/tools/load-tsfile.sh
+++ b/iotdb-client/cli/src/assembly/resources/tools/load-tsfile.sh
@@ -33,7 +33,7 @@ PARAMETERS=$@
IOTDB_CLI_CONF=${IOTDB_HOME}/conf
-MAIN_CLASS=org.apache.iotdb.tool.ImportTsFile
+MAIN_CLASS=org.apache.iotdb.tool.tsfile.ImportTsFile
CLASSPATH=""
for f in ${IOTDB_HOME}/lib/*.jar; do
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
index 0da407e00f0..4fddf30abf0 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
+++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.jdbc.IoTDBJDBCResultSet;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.service.rpc.thrift.ServerProperties;
-import org.apache.iotdb.tool.ImportData;
+import org.apache.iotdb.tool.data.ImportData;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/IoTDBDataBackTool.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/backup/IoTDBDataBackTool.java
similarity index 99%
rename from
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/IoTDBDataBackTool.java
rename to
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/backup/IoTDBDataBackTool.java
index 7cc3556562b..62f27acbf62 100644
---
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/IoTDBDataBackTool.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/backup/IoTDBDataBackTool.java
@@ -17,11 +17,12 @@
* under the License.
*/
-package org.apache.iotdb.tool;
+package org.apache.iotdb.tool.backup;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.tool.data.AbstractDataTool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/AbstractDataTool.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/AbstractDataTool.java
similarity index 99%
rename from
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/AbstractDataTool.java
rename to
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/AbstractDataTool.java
index 1e689e903c1..d60667676f8 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/AbstractDataTool.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/AbstractDataTool.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.tool;
+package org.apache.iotdb.tool.data;
import org.apache.iotdb.cli.utils.IoTPrinter;
import org.apache.iotdb.exception.ArgsErrorException;
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportData.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportData.java
similarity index 99%
rename from iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportData.java
rename to
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportData.java
index 08918bcf2af..0c068e04bf2 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportData.java
+++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportData.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.tool;
+package org.apache.iotdb.tool.data;
import org.apache.iotdb.cli.type.ExitType;
import org.apache.iotdb.cli.utils.CliContext;
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportData.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ImportData.java
similarity index 99%
rename from iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportData.java
rename to
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ImportData.java
index 819190d2c2c..e8fbe063313 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportData.java
+++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ImportData.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.tool;
+package org.apache.iotdb.tool.data;
import org.apache.iotdb.cli.utils.IoTPrinter;
import org.apache.iotdb.commons.exception.IllegalPathException;
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/AbstractSchemaTool.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/AbstractSchemaTool.java
similarity index 99%
rename from
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/AbstractSchemaTool.java
rename to
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/AbstractSchemaTool.java
index 9e91d404d12..00d926c7074 100644
---
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/AbstractSchemaTool.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/AbstractSchemaTool.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.tool;
+package org.apache.iotdb.tool.schema;
import org.apache.iotdb.cli.utils.IoTPrinter;
import org.apache.iotdb.exception.ArgsErrorException;
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportSchema.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ExportSchema.java
similarity index 99%
rename from
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportSchema.java
rename to
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ExportSchema.java
index 9892da625e7..54453acd898 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportSchema.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ExportSchema.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.tool;
+package org.apache.iotdb.tool.schema;
import org.apache.iotdb.cli.type.ExitType;
import org.apache.iotdb.cli.utils.CliContext;
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportSchema.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ImportSchema.java
similarity index 99%
rename from
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportSchema.java
rename to
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ImportSchema.java
index 3a7b8ab60e5..a7709313b1b 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportSchema.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ImportSchema.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.tool;
+package org.apache.iotdb.tool.schema;
import org.apache.iotdb.cli.utils.IoTPrinter;
import org.apache.iotdb.commons.exception.IllegalPathException;
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/AbstractTsFileTool.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/AbstractTsFileTool.java
similarity index 99%
rename from
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/AbstractTsFileTool.java
rename to
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/AbstractTsFileTool.java
index fe8691ddf8b..f7a88eb56c5 100644
---
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/AbstractTsFileTool.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/AbstractTsFileTool.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.tool;
+package org.apache.iotdb.tool.tsfile;
import org.apache.iotdb.cli.utils.IoTPrinter;
import org.apache.iotdb.exception.ArgsErrorException;
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportTsFile.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ExportTsFile.java
similarity index 99%
rename from
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportTsFile.java
rename to
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ExportTsFile.java
index 723e410741e..99df139a76c 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportTsFile.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ExportTsFile.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.tool;
+package org.apache.iotdb.tool.tsfile;
import org.apache.iotdb.cli.type.ExitType;
import org.apache.iotdb.cli.utils.CliContext;
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportTsFile.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java
similarity index 50%
rename from
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportTsFile.java
rename to
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java
index 576d2927beb..497900a0371 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportTsFile.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java
@@ -17,9 +17,10 @@
* under the License.
*/
-package org.apache.iotdb.tool;
+package org.apache.iotdb.tool.tsfile;
import org.apache.iotdb.cli.utils.IoTPrinter;
+import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.session.pool.SessionPool;
import org.apache.commons.cli.CommandLine;
@@ -31,18 +32,13 @@ import org.apache.commons.cli.ParseException;
import java.io.File;
import java.io.IOException;
-import java.nio.file.FileAlreadyExistsException;
+import java.net.UnknownHostException;
import java.nio.file.Files;
-import java.nio.file.Path;
import java.nio.file.Paths;
-import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
-import java.util.HashSet;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.LongAdder;
public class ImportTsFile extends AbstractTsFileTool {
@@ -64,15 +60,11 @@ public class ImportTsFile extends AbstractTsFileTool {
private static final String THREAD_NUM_ARGS = "tn";
private static final String THREAD_NUM_NAME = "thread_num";
- private static final IoTPrinter ioTPrinter = new IoTPrinter(System.out);
+ private static final IoTPrinter IOT_PRINTER = new IoTPrinter(System.out);
private static final String TS_FILE_CLI_PREFIX = "ImportTsFile";
- private static final String RESOURCE = ".resource";
- private static final String MODS = ".mods";
-
private static String source;
- private static String sourceFullPath;
private static String successDir = "success/";
private static String failDir = "fail/";
@@ -82,14 +74,7 @@ public class ImportTsFile extends AbstractTsFileTool {
private static int threadNum = 8;
- private static final LongAdder loadFileSuccessfulNum = new LongAdder();
- private static final LongAdder loadFileFailedNum = new LongAdder();
- private static final LongAdder processingLoadSuccessfulFileSuccessfulNum =
new LongAdder();
- private static final LongAdder processingLoadFailedFileSuccessfulNum = new
LongAdder();
-
- private static final LinkedBlockingQueue<String> tsfileQueue = new
LinkedBlockingQueue<>();
- private static final Set<String> tsfileSet = new HashSet<>();
- private static final Set<String> resourceOrModsSet = new HashSet<>();
+ private static boolean isRemoteLoad = true;
private static SessionPool sessionPool;
@@ -171,7 +156,7 @@ public class ImportTsFile extends AbstractTsFileTool {
helpFormatter.setWidth(MAX_HELP_CONSOLE_WIDTH);
if (args == null || args.length == 0) {
- ioTPrinter.println("Too few arguments, please check the following
hint.");
+ IOT_PRINTER.println("Too few arguments, please check the following
hint.");
helpFormatter.printHelp(TS_FILE_CLI_PREFIX, options, true);
System.exit(CODE_ERROR);
}
@@ -182,7 +167,7 @@ public class ImportTsFile extends AbstractTsFileTool {
System.exit(CODE_OK);
}
} catch (ParseException e) {
- ioTPrinter.println("Failed to parse the provided options: " +
e.getMessage());
+ IOT_PRINTER.println("Failed to parse the provided options: " +
e.getMessage());
helpFormatter.printHelp(TS_FILE_CLI_PREFIX, options, true);
System.exit(CODE_ERROR);
}
@@ -191,7 +176,7 @@ public class ImportTsFile extends AbstractTsFileTool {
try {
commandLine = parser.parse(options, args, true);
} catch (ParseException e) {
- ioTPrinter.println("Failed to parse the provided options: " +
e.getMessage());
+ IOT_PRINTER.println("Failed to parse the provided options: " +
e.getMessage());
helpFormatter.printHelp(TS_FILE_CLI_PREFIX, options, true);
System.exit(CODE_ERROR);
}
@@ -200,45 +185,30 @@ public class ImportTsFile extends AbstractTsFileTool {
parseBasicParams(commandLine);
parseSpecialParams(commandLine);
} catch (Exception e) {
- ioTPrinter.println("Encounter an error when parsing the provided
options: " + e.getMessage());
+ IOT_PRINTER.println(
+ "Encounter an error when parsing the provided options: " +
e.getMessage());
System.exit(CODE_ERROR);
}
+ IOT_PRINTER.println(isRemoteLoad ? "Load remotely." : "Load locally.");
+
final int resultCode = importFromTargetPath();
- ioTPrinter.println(
- "Successfully load "
- + loadFileSuccessfulNum.sum()
- + " tsfile(s) (--on_success operation(s): "
- + processingLoadSuccessfulFileSuccessfulNum.sum()
- + " succeed, "
- + (loadFileSuccessfulNum.sum() -
processingLoadSuccessfulFileSuccessfulNum.sum())
- + " failed)");
- ioTPrinter.println(
- "Failed to load "
- + loadFileFailedNum.sum()
- + " file(s) (--on_fail operation(s): "
- + processingLoadFailedFileSuccessfulNum.sum()
- + " succeed, "
- + (loadFileFailedNum.sum() -
processingLoadFailedFileSuccessfulNum.sum())
- + " failed)");
- ioTPrinter.println("For more details, please check the log.");
- ioTPrinter.println(
- "Total operation time: " + (System.currentTimeMillis() - startTime) +
" ms.");
- ioTPrinter.println("Work has been completed!");
+
+ ImportTsFileBase.printResult(startTime);
System.exit(resultCode);
}
private static void parseSpecialParams(CommandLine commandLine) {
source = commandLine.getOptionValue(SOURCE_ARGS);
if (!Files.exists(Paths.get(source))) {
- ioTPrinter.println(String.format("Source file or directory %s does not
exist", source));
+ IOT_PRINTER.println(String.format("Source file or directory %s does not
exist", source));
System.exit(CODE_ERROR);
}
final String onSuccess =
commandLine.getOptionValue(ON_SUCCESS_ARGS).trim().toLowerCase();
final String onFail =
commandLine.getOptionValue(ON_FAIL_ARGS).trim().toLowerCase();
if (!Operation.isValidOperation(onSuccess) ||
!Operation.isValidOperation(onFail)) {
- ioTPrinter.println("Args error: os/of must be one of none, mv, cp,
delete");
+ IOT_PRINTER.println("Args error: os/of must be one of none, mv, cp,
delete");
System.exit(CODE_ERROR);
}
@@ -262,6 +232,13 @@ public class ImportTsFile extends AbstractTsFileTool {
if (commandLine.getOptionValue(THREAD_NUM_ARGS) != null) {
threadNum =
Integer.parseInt(commandLine.getOptionValue(THREAD_NUM_ARGS));
}
+
+ try {
+ isRemoteLoad =
!NodeUrlUtils.containsLocalAddress(Collections.singletonList(host));
+ } catch (UnknownHostException e) {
+ IOT_PRINTER.println(
+ "Unknown host: " + host + ". Exception: " + e.getMessage() + ". Will
use remote load.");
+ }
}
public static boolean isFileStoreEquals(String pathString, File dir) {
@@ -269,7 +246,7 @@ public class ImportTsFile extends AbstractTsFileTool {
return Objects.equals(
Files.getFileStore(Paths.get(pathString)),
Files.getFileStore(dir.toPath()));
} catch (IOException e) {
- ioTPrinter.println("IOException when checking file store: " +
e.getMessage());
+ IOT_PRINTER.println("IOException when checking file store: " +
e.getMessage());
return false;
}
}
@@ -281,7 +258,7 @@ public class ImportTsFile extends AbstractTsFileTool {
File file = new File(successDir);
if (!file.isDirectory()) {
if (!file.mkdirs()) {
- ioTPrinter.println(String.format("Failed to create %s %s",
SUCCESS_DIR_NAME, successDir));
+ IOT_PRINTER.println(String.format("Failed to create %s %s",
SUCCESS_DIR_NAME, successDir));
System.exit(CODE_ERROR);
}
}
@@ -295,7 +272,7 @@ public class ImportTsFile extends AbstractTsFileTool {
File file = new File(failDir);
if (!file.isDirectory()) {
if (!file.mkdirs()) {
- ioTPrinter.println(String.format("Failed to create %s %s",
FAIL_DIR_NAME, failDir));
+ IOT_PRINTER.println(String.format("Failed to create %s %s",
FAIL_DIR_NAME, failDir));
System.exit(CODE_ERROR);
}
}
@@ -304,13 +281,6 @@ public class ImportTsFile extends AbstractTsFileTool {
public static int importFromTargetPath() {
try {
- final File file = new File(source);
- sourceFullPath = file.getAbsolutePath();
- if (!file.isFile() && !file.isDirectory()) {
- ioTPrinter.println(String.format("source file or directory %s does not
exist", source));
- return CODE_ERROR;
- }
-
sessionPool =
new SessionPool.Builder()
.host(host)
@@ -324,15 +294,22 @@ public class ImportTsFile extends AbstractTsFileTool {
.build();
sessionPool.setEnableQueryRedirection(false);
- traverseAndCollectFiles(file);
- addNoResourceOrModsToQueue();
- ioTPrinter.println("Load file total number : " + tsfileQueue.size());
+ // set params
+ processSetParams();
+
+ ImportTsFileScanTool.traverseAndCollectFiles();
+ ImportTsFileScanTool.addNoResourceOrModsToQueue();
+
+ IOT_PRINTER.println("Load file total number : " +
ImportTsFileScanTool.getTsFileQueueSize());
asyncImportTsFiles();
return CODE_OK;
} catch (InterruptedException e) {
- ioTPrinter.println(String.format("Import tsfile fail: %s",
e.getMessage()));
+ IOT_PRINTER.println(String.format("Import tsfile fail: %s",
e.getMessage()));
Thread.currentThread().interrupt();
return CODE_ERROR;
+ } catch (Exception e) {
+ IOT_PRINTER.println(String.format("Import tsfile fail: %s",
e.getMessage()));
+ return CODE_ERROR;
} finally {
if (sessionPool != null) {
sessionPool.close();
@@ -340,40 +317,32 @@ public class ImportTsFile extends AbstractTsFileTool {
}
}
- public static void traverseAndCollectFiles(File file) throws
InterruptedException {
- if (file.isFile()) {
- if (file.getName().endsWith(RESOURCE) || file.getName().endsWith(MODS)) {
- resourceOrModsSet.add(file.getAbsolutePath());
- } else {
- tsfileSet.add(file.getAbsolutePath());
- tsfileQueue.put(file.getAbsolutePath());
- }
- } else if (file.isDirectory()) {
- final File[] files = file.listFiles();
- if (files != null) {
- for (File f : files) {
- traverseAndCollectFiles(f);
- }
- }
+ // process other classes need param
+ private static void processSetParams() {
+ // ImportTsFileLocally
+ final File file = new File(source);
+ ImportTsFileScanTool.setSourceFullPath(file.getAbsolutePath());
+ if (!file.isFile() && !file.isDirectory()) {
+ IOT_PRINTER.println(String.format("Source file or directory %s does not
exist", source));
+ System.exit(CODE_ERROR);
}
- }
- public static void addNoResourceOrModsToQueue() throws InterruptedException {
- for (final String filePath : resourceOrModsSet) {
- final String tsfilePath =
- filePath.endsWith(RESOURCE)
- ? filePath.substring(0, filePath.length() - RESOURCE.length())
- : filePath.substring(0, filePath.length() - MODS.length());
- if (!tsfileSet.contains(tsfilePath)) {
- tsfileQueue.put(filePath);
- }
- }
+ ImportTsFileLocally.setSessionPool(sessionPool);
+
+ // ImportTsFileRemotely
+ ImportTsFileRemotely.setHost(host);
+ ImportTsFileRemotely.setPort(port);
+
+ // ImportTsFileBase
+ ImportTsFileBase.setSuccessAndFailDirAndOperation(
+ successDir, successOperation, failDir, failOperation);
}
public static void asyncImportTsFiles() {
final List<Thread> list = new ArrayList<>(threadNum);
for (int i = 0; i < threadNum; i++) {
- final Thread thread = new Thread(ImportTsFile::importTsFile);
+ final Thread thread =
+ new Thread(isRemoteLoad ? new ImportTsFileRemotely() : new
ImportTsFileLocally());
thread.start();
list.add(thread);
}
@@ -383,164 +352,11 @@ public class ImportTsFile extends AbstractTsFileTool {
thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- ioTPrinter.println("importTsFile thread join interrupted: " +
e.getMessage());
+ IOT_PRINTER.println("ImportTsFile thread join interrupted: " +
e.getMessage());
}
});
}
- public static void importTsFile() {
- String filePath;
- try {
- while ((filePath = tsfileQueue.poll()) != null) {
- final String sql = "load '" + filePath + "' onSuccess=none ";
-
- try {
- sessionPool.executeNonQueryStatement(sql);
-
- loadFileSuccessfulNum.increment();
- ioTPrinter.println("Imported [ " + filePath + " ] file
successfully!");
-
- try {
- processingFile(filePath, successDir, successOperation);
- processingLoadSuccessfulFileSuccessfulNum.increment();
- ioTPrinter.println("Processed success file [ " + filePath + " ]
successfully!");
- } catch (Exception processSuccessException) {
- ioTPrinter.println(
- "Failed to process success file [ "
- + filePath
- + " ]: "
- + processSuccessException.getMessage());
- }
- } catch (Exception e) {
- // Reject because of memory controls, do retry later
- if (Objects.nonNull(e.getMessage()) &&
e.getMessage().contains("memory")) {
- ioTPrinter.println(
- "Rejecting file [ " + filePath + " ] due to memory
constraints, will retry later.");
- tsfileQueue.put(filePath);
- continue;
- }
-
- loadFileFailedNum.increment();
- ioTPrinter.println("Failed to import [ " + filePath + " ] file: " +
e.getMessage());
-
- try {
- processingFile(filePath, failDir, failOperation);
- processingLoadFailedFileSuccessfulNum.increment();
- ioTPrinter.println("Processed fail file [ " + filePath + " ]
successfully!");
- } catch (Exception processFailException) {
- ioTPrinter.println(
- "Failed to process fail file [ "
- + filePath
- + " ]: "
- + processFailException.getMessage());
- }
- }
- }
- } catch (InterruptedException e) {
- ioTPrinter.println("Unexpected error occurred: " + e.getMessage());
- Thread.currentThread().interrupt();
- } catch (Exception e) {
- ioTPrinter.println("Unexpected error occurred: " + e.getMessage());
- }
- }
-
- public static void processingFile(String filePath, String dir, Operation
operation) {
- String relativePath = filePath.substring(sourceFullPath.length() + 1);
- Path sourcePath = Paths.get(filePath);
-
- String target = dir + File.separator +
relativePath.replace(File.separator, "_");
- Path targetPath = Paths.get(target);
-
- Path sourceResourcePath = Paths.get(sourcePath + RESOURCE);
- sourceResourcePath = Files.exists(sourceResourcePath) ? sourceResourcePath
: null;
- Path targetResourcePath = Paths.get(target + RESOURCE);
-
- Path sourceModsPath = Paths.get(sourcePath + MODS);
- sourceModsPath = Files.exists(sourceModsPath) ? sourceModsPath : null;
- Path targetModsPath = Paths.get(target + MODS);
-
- switch (operation) {
- case DELETE:
- {
- try {
- Files.deleteIfExists(sourcePath);
- if (null != sourceResourcePath) {
- Files.deleteIfExists(sourceResourcePath);
- }
- if (null != sourceModsPath) {
- Files.deleteIfExists(sourceModsPath);
- }
- } catch (Exception e) {
- ioTPrinter.println(String.format("Failed to delete file: %s",
e.getMessage()));
- }
- break;
- }
- case CP:
- {
- try {
- Files.copy(sourcePath, targetPath,
StandardCopyOption.REPLACE_EXISTING);
- if (null != sourceResourcePath) {
- Files.copy(
- sourceResourcePath, targetResourcePath,
StandardCopyOption.REPLACE_EXISTING);
- }
- if (null != sourceModsPath) {
- Files.copy(sourceModsPath, targetModsPath,
StandardCopyOption.REPLACE_EXISTING);
- }
- } catch (Exception e) {
- ioTPrinter.println(String.format("Failed to copy file: %s",
e.getMessage()));
- }
- break;
- }
- case HARDLINK:
- {
- try {
- Files.createLink(targetPath, sourcePath);
- } catch (FileAlreadyExistsException e) {
- ioTPrinter.println("Hardlink already exists: " + e.getMessage());
- } catch (Exception e) {
- try {
- Files.copy(sourcePath, targetPath,
StandardCopyOption.REPLACE_EXISTING);
- } catch (Exception copyException) {
- ioTPrinter.println(
- String.format("Failed to copy file: %s",
copyException.getMessage()));
- }
- }
-
- try {
- if (null != sourceResourcePath) {
- Files.copy(
- sourceResourcePath, targetResourcePath,
StandardCopyOption.REPLACE_EXISTING);
- }
- if (null != sourceModsPath) {
- Files.copy(sourceModsPath, targetModsPath,
StandardCopyOption.REPLACE_EXISTING);
- }
- } catch (Exception e) {
- ioTPrinter.println(
- String.format("Failed to copy resource or mods file: %s",
e.getMessage()));
- }
- break;
- }
- case MV:
- {
- try {
- Files.move(sourcePath, targetPath,
StandardCopyOption.REPLACE_EXISTING);
- if (null != sourceResourcePath) {
- Files.move(
- sourceResourcePath, targetResourcePath,
StandardCopyOption.REPLACE_EXISTING);
- }
- if (null != sourceModsPath) {
- Files.move(sourceModsPath, targetModsPath,
StandardCopyOption.REPLACE_EXISTING);
- }
- } catch (Exception e) {
- ioTPrinter.println(String.format("Failed to move file: %s",
e.getMessage()));
- }
- break;
- }
- default:
- break;
- }
- }
-
public enum Operation {
NONE,
MV,
@@ -571,7 +387,7 @@ public class ImportTsFile extends AbstractTsFileTool {
case "delete":
return Operation.DELETE;
default:
- ioTPrinter.println("Args error: os/of must be one of none, mv, cp,
delete");
+ IOT_PRINTER.println("Args error: os/of must be one of none, mv, cp,
delete");
System.exit(CODE_ERROR);
return null;
}
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileBase.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileBase.java
new file mode 100644
index 00000000000..82848d1357e
--- /dev/null
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileBase.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tool.tsfile;
+
+import org.apache.iotdb.cli.utils.IoTPrinter;
+
+import java.io.File;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.Objects;
+import java.util.concurrent.atomic.LongAdder;
+
+public abstract class ImportTsFileBase implements Runnable {
+
+ private static final IoTPrinter IOT_PRINTER = new IoTPrinter(System.out);
+
+ private static final LongAdder loadFileSuccessfulNum = new LongAdder();
+ private static final LongAdder loadFileFailedNum = new LongAdder();
+ private static final LongAdder processingLoadSuccessfulFileSuccessfulNum =
new LongAdder();
+ private static final LongAdder processingLoadFailedFileSuccessfulNum = new
LongAdder();
+
+ private static String successDir;
+ private static ImportTsFile.Operation successOperation;
+ private static String failDir;
+ private static ImportTsFile.Operation failOperation;
+
+ @Override
+ public void run() {
+ loadTsFile();
+ }
+
+ protected abstract void loadTsFile();
+
+ protected void processFailFile(final String filePath, final Exception e) {
+ // Reject because of memory controls, do retry later
+ try {
+ if (Objects.nonNull(e.getMessage()) &&
e.getMessage().contains("memory")) {
+ IOT_PRINTER.println(
+ "Rejecting file [ " + filePath + " ] due to memory constraints,
will retry later.");
+ ImportTsFileScanTool.putToQueue(filePath);
+ return;
+ }
+
+ loadFileFailedNum.increment();
+ IOT_PRINTER.println("Failed to import [ " + filePath + " ] file: " +
e.getMessage());
+
+ try {
+ processingFile(filePath, false);
+ processingLoadFailedFileSuccessfulNum.increment();
+ IOT_PRINTER.println("Processed fail file [ " + filePath + " ]
successfully!");
+ } catch (final Exception processFailException) {
+ IOT_PRINTER.println(
+ "Failed to process fail file [ "
+ + filePath
+ + " ]: "
+ + processFailException.getMessage());
+ }
+ } catch (final InterruptedException e1) {
+ IOT_PRINTER.println("Unexpected error occurred: " + e1.getMessage());
+ Thread.currentThread().interrupt();
+ } catch (final Exception e1) {
+ IOT_PRINTER.println("Unexpected error occurred: " + e1.getMessage());
+ }
+ }
+
+ protected static void processSuccessFile(final String filePath) {
+ loadFileSuccessfulNum.increment();
+ IOT_PRINTER.println("Imported [ " + filePath + " ] file successfully!");
+
+ try {
+ processingFile(filePath, true);
+ processingLoadSuccessfulFileSuccessfulNum.increment();
+ IOT_PRINTER.println("Processed success file [ " + filePath + " ]
successfully!");
+ } catch (final Exception processSuccessException) {
+ IOT_PRINTER.println(
+ "Failed to process success file [ "
+ + filePath
+ + " ]: "
+ + processSuccessException.getMessage());
+ }
+ }
+
+ public static void processingFile(final String filePath, final boolean
isSuccess) {
+ final String relativePath =
+ filePath.substring(ImportTsFileScanTool.getSourceFullPathLength() + 1);
+ final Path sourcePath = Paths.get(filePath);
+
+ final String target =
+ isSuccess
+ ? successDir
+ : failDir + File.separator + relativePath.replace(File.separator,
"_");
+ final Path targetPath = Paths.get(target);
+
+ final String RESOURCE = ".resource";
+ Path sourceResourcePath = Paths.get(sourcePath + RESOURCE);
+ sourceResourcePath = Files.exists(sourceResourcePath) ? sourceResourcePath
: null;
+ final Path targetResourcePath = Paths.get(target + RESOURCE);
+
+ final String MODS = ".mods";
+ Path sourceModsPath = Paths.get(sourcePath + MODS);
+ sourceModsPath = Files.exists(sourceModsPath) ? sourceModsPath : null;
+ final Path targetModsPath = Paths.get(target + MODS);
+
+ switch (isSuccess ? successOperation : failOperation) {
+ case DELETE:
+ {
+ try {
+ Files.deleteIfExists(sourcePath);
+ if (null != sourceResourcePath) {
+ Files.deleteIfExists(sourceResourcePath);
+ }
+ if (null != sourceModsPath) {
+ Files.deleteIfExists(sourceModsPath);
+ }
+ } catch (final Exception e) {
+ IOT_PRINTER.println(String.format("Failed to delete file: %s",
e.getMessage()));
+ }
+ break;
+ }
+ case CP:
+ {
+ try {
+ Files.copy(sourcePath, targetPath,
StandardCopyOption.REPLACE_EXISTING);
+ if (null != sourceResourcePath) {
+ Files.copy(
+ sourceResourcePath, targetResourcePath,
StandardCopyOption.REPLACE_EXISTING);
+ }
+ if (null != sourceModsPath) {
+ Files.copy(sourceModsPath, targetModsPath,
StandardCopyOption.REPLACE_EXISTING);
+ }
+ } catch (final Exception e) {
+ IOT_PRINTER.println(String.format("Failed to copy file: %s",
e.getMessage()));
+ }
+ break;
+ }
+ case HARDLINK:
+ {
+ try {
+ Files.createLink(targetPath, sourcePath);
+ } catch (FileAlreadyExistsException e) {
+ IOT_PRINTER.println("Hardlink already exists: " + e.getMessage());
+ } catch (final Exception e) {
+ try {
+ Files.copy(sourcePath, targetPath,
StandardCopyOption.REPLACE_EXISTING);
+ } catch (final Exception copyException) {
+ IOT_PRINTER.println(
+ String.format("Failed to copy file: %s",
copyException.getMessage()));
+ }
+ }
+
+ try {
+ if (null != sourceResourcePath) {
+ Files.copy(
+ sourceResourcePath, targetResourcePath,
StandardCopyOption.REPLACE_EXISTING);
+ }
+ if (null != sourceModsPath) {
+ Files.copy(sourceModsPath, targetModsPath,
StandardCopyOption.REPLACE_EXISTING);
+ }
+ } catch (final Exception e) {
+ IOT_PRINTER.println(
+ String.format("Failed to copy resource or mods file: %s",
e.getMessage()));
+ }
+ break;
+ }
+ case MV:
+ {
+ try {
+ Files.move(sourcePath, targetPath,
StandardCopyOption.REPLACE_EXISTING);
+ if (null != sourceResourcePath) {
+ Files.move(
+ sourceResourcePath, targetResourcePath,
StandardCopyOption.REPLACE_EXISTING);
+ }
+ if (null != sourceModsPath) {
+ Files.move(sourceModsPath, targetModsPath,
StandardCopyOption.REPLACE_EXISTING);
+ }
+ } catch (final Exception e) {
+ IOT_PRINTER.println(String.format("Failed to move file: %s",
e.getMessage()));
+ }
+ break;
+ }
+ default:
+ break;
+ }
+ }
+
+ protected static void printResult(final long startTime) {
+ IOT_PRINTER.println(
+ "Successfully load "
+ + loadFileSuccessfulNum.sum()
+ + " tsfile(s) (--on_success operation(s): "
+ + processingLoadSuccessfulFileSuccessfulNum.sum()
+ + " succeed, "
+ + (loadFileSuccessfulNum.sum() -
processingLoadSuccessfulFileSuccessfulNum.sum())
+ + " failed)");
+ IOT_PRINTER.println(
+ "Failed to load "
+ + loadFileFailedNum.sum()
+ + " file(s) (--on_fail operation(s): "
+ + processingLoadFailedFileSuccessfulNum.sum()
+ + " succeed, "
+ + (loadFileFailedNum.sum() -
processingLoadFailedFileSuccessfulNum.sum())
+ + " failed)");
+ IOT_PRINTER.println(
+ "Unprocessed "
+ + ImportTsFileScanTool.getTsFileQueueSize()
+ + " file(s), due to unexpected exceptions");
+ IOT_PRINTER.println("For more details, please check the log.");
+ IOT_PRINTER.println(
+ "Total operation time: " + (System.currentTimeMillis() - startTime) +
" ms.");
+ IOT_PRINTER.println("Work has been completed!");
+ }
+
+ public static void setSuccessAndFailDirAndOperation(
+ final String successDir,
+ final ImportTsFile.Operation successOperation,
+ final String failDir,
+ final ImportTsFile.Operation failOperation) {
+ ImportTsFileBase.successDir = successDir;
+ ImportTsFileBase.successOperation = successOperation;
+ ImportTsFileBase.failDir = failDir;
+ ImportTsFileBase.failOperation = failOperation;
+ }
+}
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileLocally.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileLocally.java
new file mode 100644
index 00000000000..8f65d26fd32
--- /dev/null
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileLocally.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tool.tsfile;
+
+import org.apache.iotdb.cli.utils.IoTPrinter;
+import org.apache.iotdb.session.pool.SessionPool;
+
+public class ImportTsFileLocally extends ImportTsFileBase implements Runnable {
+
+ private static final IoTPrinter ioTPrinter = new IoTPrinter(System.out);
+
+ private static SessionPool sessionPool;
+
+ @Override
+ public void loadTsFile() {
+ String filePath;
+ try {
+ while ((filePath = ImportTsFileScanTool.pollFromQueue()) != null) {
+ final String sql = "load '" + filePath + "' onSuccess=none ";
+ try {
+ sessionPool.executeNonQueryStatement(sql);
+
+ processSuccessFile(filePath);
+ } catch (final Exception e) {
+ processFailFile(filePath, e);
+ }
+ }
+ } catch (final Exception e) {
+ ioTPrinter.println("Unexpected error occurred: " + e.getMessage());
+ }
+ }
+
+ public static void setSessionPool(SessionPool sessionPool) {
+ ImportTsFileLocally.sessionPool = sessionPool;
+ }
+}
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
new file mode 100644
index 00000000000..8f3c22f9e0b
--- /dev/null
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tool.tsfile;
+
+import org.apache.iotdb.cli.utils.IoTPrinter;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient;
+import
org.apache.iotdb.commons.pipe.connector.payload.thrift.common.PipeTransferHandshakeConstant;
+import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFilePieceReq;
+import
org.apache.iotdb.commons.pipe.connector.payload.thrift.response.PipeTransferFilePieceResp;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceReq;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceWithModReq;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealReq;
+import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealWithModReq;
+import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.apache.thrift.transport.TTransportException;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.security.SecureRandom;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.LockSupport;
+
+public class ImportTsFileRemotely extends ImportTsFileBase {
+
+ private static final IoTPrinter IOT_PRINTER = new IoTPrinter(System.out);
+
+ private static final String MODS = ".mods";
+ private static final String LOAD_STRATEGY = "sync";
+ private static final Integer MAX_RETRY_COUNT = 3;
+
+ private static final AtomicInteger CONNECTION_TIMEOUT_MS =
+ new
AtomicInteger(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
+
+ private IoTDBSyncClient client;
+
+ private static String host;
+ private static String port;
+
+ public ImportTsFileRemotely() {
+ initClient();
+ sendHandshake();
+ }
+
+ @Override
+ public void loadTsFile() {
+ try {
+ String filePath;
+ while ((filePath = ImportTsFileScanTool.pollFromQueue()) != null) {
+ final File tsFile = new File(filePath);
+ try {
+ if (ImportTsFileScanTool.isContainModsFile(filePath + MODS)) {
+ doTransfer(tsFile, new File(filePath + MODS));
+ } else {
+ doTransfer(tsFile, null);
+ }
+
+ processSuccessFile(filePath);
+ } catch (final Exception e) {
+ IOT_PRINTER.println(
+ "Connect is abort, try to reconnect, max retry count: " +
MAX_RETRY_COUNT);
+
+ boolean isReconnectAndLoadSuccessFul = false;
+
+ for (int i = 1; i <= MAX_RETRY_COUNT; i++) {
+ try {
+ IOT_PRINTER.println(String.format("The %sth retry will after %s
seconds.", i, i * 2));
+ LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(i * 2L));
+
+ close();
+ initClient();
+ sendHandshake();
+
+ if (ImportTsFileScanTool.isContainModsFile(filePath + MODS)) {
+ doTransfer(tsFile, new File(filePath + MODS));
+ } else {
+ doTransfer(tsFile, null);
+ }
+
+ processSuccessFile(filePath);
+ isReconnectAndLoadSuccessFul = true;
+
+ IOT_PRINTER.println("Reconnect successful.");
+ break;
+ } catch (final Exception e1) {
+ IOT_PRINTER.println(String.format("The %sth reconnect failed",
i));
+ }
+ }
+
+ if (!isReconnectAndLoadSuccessFul) {
+ processFailFile(filePath, e);
+
+ close();
+ initClient();
+ sendHandshake();
+ }
+ }
+ }
+ } catch (final Exception e) {
+ IOT_PRINTER.println("Unexpected error occurred: " + e.getMessage());
+ } finally {
+ close();
+ }
+ }
+
+ public void sendHandshake() {
+ try {
+ final Map<String, String> params = constructParamsMap();
+ TPipeTransferResp resp =
+
client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params));
+
+ if (resp.getStatus().getCode() ==
TSStatusCode.PIPE_TYPE_ERROR.getStatusCode()) {
+ IOT_PRINTER.println(
+ String.format(
+ "Handshake error with target server ip: %s, port: %s, because:
%s. "
+ + "Retry to handshake by PipeTransferHandshakeV1Req.",
+ client.getIpAddress(), client.getPort(), resp.getStatus()));
+ resp =
+ client.pipeTransfer(
+ PipeTransferDataNodeHandshakeV1Req.toTPipeTransferReq(
+
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()));
+ }
+
+ if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeConnectionException(
+ String.format(
+ "Handshake error with target server ip: %s, port: %s, because:
%s.",
+ client.getIpAddress(), client.getPort(), resp.getStatus()));
+ } else {
+ client.setTimeout(CONNECTION_TIMEOUT_MS.get());
+ IOT_PRINTER.println(
+ String.format(
+ "Handshake success. Target server ip: %s, port: %s",
+ client.getIpAddress(), client.getPort()));
+ }
+ } catch (final Exception e) {
+ throw new PipeException(
+ String.format(
+ "Handshake error with target server ip: %s, port: %s, because:
%s.",
+ client.getIpAddress(), client.getPort(), e.getMessage()));
+ }
+ }
+
+ private Map<String, String> constructParamsMap() {
+ final Map<String, String> params = new HashMap<>();
+ params.put(
+ PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
+ CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+ params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID,
getClusterId());
+ params.put(
+ PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH,
+ Boolean.toString(true));
+
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY,
LOAD_STRATEGY);
+ return params;
+ }
+
+ public void doTransfer(final File tsFile, final File modFile) throws
PipeException, IOException {
+ final TPipeTransferResp resp;
+ final TPipeTransferReq req;
+
+ if (Objects.nonNull(modFile)) {
+ transferFilePieces(modFile, true);
+ transferFilePieces(tsFile, true);
+
+ req =
+ PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
+ modFile.getName(), modFile.length(), tsFile.getName(),
tsFile.length());
+ } else {
+ transferFilePieces(tsFile, false);
+
+ req = PipeTransferTsFileSealReq.toTPipeTransferReq(tsFile.getName(),
tsFile.length());
+ }
+
+ try {
+ resp = client.pipeTransfer(req);
+ } catch (final Exception e) {
+ throw new PipeConnectionException(
+ String.format("Network error when seal file %s, because %s.",
tsFile, e.getMessage()), e);
+ }
+
+ final TSStatus status = resp.getStatus();
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && status.getCode() !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+ throw new PipeConnectionException(
+ String.format("Seal file %s error, result status %s.", tsFile,
status));
+ }
+
+ IOT_PRINTER.println("Successfully transferred file " + tsFile);
+ }
+
+ private void transferFilePieces(final File file, final boolean isMultiFile)
+ throws PipeException, IOException {
+ final int readFileBufferSize =
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+ final byte[] readBuffer = new byte[readFileBufferSize];
+ long position = 0;
+ try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
+ while (true) {
+ final int readLength = reader.read(readBuffer);
+ if (readLength == -1) {
+ break;
+ }
+
+ final byte[] payLoad =
+ readLength == readFileBufferSize
+ ? readBuffer
+ : Arrays.copyOfRange(readBuffer, 0, readLength);
+ final PipeTransferFilePieceResp resp;
+ try {
+ final TPipeTransferReq req =
+ isMultiFile
+ ? getTransferMultiFilePieceReq(file.getName(), position,
payLoad)
+ : getTransferSingleFilePieceReq(file.getName(), position,
payLoad);
+ resp =
PipeTransferFilePieceResp.fromTPipeTransferResp(client.pipeTransfer(req));
+ } catch (final Exception e) {
+ throw new PipeConnectionException(
+ String.format(
+ "Network error when transfer file %s, because %s.", file,
e.getMessage()),
+ e);
+ }
+
+ position += readLength;
+
+ final TSStatus status = resp.getStatus();
+ if (status.getCode() ==
TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET.getStatusCode()) {
+ position = resp.getEndWritingOffset();
+ reader.seek(position);
+ IOT_PRINTER.println(String.format("Redirect file position to %s.",
position));
+ continue;
+ }
+
+ if (status.getCode()
+ ==
TSStatusCode.PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED.getStatusCode()) {
+ sendHandshake();
+ }
+ // Only handle the failed statuses to avoid string format performance
overhead
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && status.getCode() !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+ throw new PipeException(
+ String.format("Transfer file %s error, result status %s.", file,
status));
+ }
+ }
+ }
+ }
+
+ private PipeTransferFilePieceReq getTransferMultiFilePieceReq(
+ final String fileName, final long position, final byte[] payLoad) throws
IOException {
+ return PipeTransferTsFilePieceWithModReq.toTPipeTransferReq(fileName,
position, payLoad);
+ }
+
+ private PipeTransferFilePieceReq getTransferSingleFilePieceReq(
+ final String fileName, final long position, final byte[] payLoad) throws
IOException {
+ return PipeTransferTsFilePieceReq.toTPipeTransferReq(fileName, position,
payLoad);
+ }
+
+ private void initClient() {
+ try {
+ this.client =
+ new IoTDBSyncClient(
+ new ThriftClientProperty.Builder()
+ .setConnectionTimeoutMs(
+
PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs())
+ .setRpcThriftCompressionEnabled(
+
PipeConfig.getInstance().isPipeConnectorRPCThriftCompressionEnabled())
+ .build(),
+ getEndPoint().getIp(),
+ getEndPoint().getPort(),
+ false,
+ "",
+ "");
+ } catch (final TTransportException e) {
+ throw new PipeException("Sync client init error because " +
e.getMessage());
+ }
+ }
+
+ private TEndPoint getEndPoint() {
+ return new TEndPoint(host, Integer.parseInt(port));
+ }
+
+ private String getClusterId() {
+ final SecureRandom random = new SecureRandom();
+ final byte[] bytes = new byte[32]; // 32 bytes = 256 bits
+ random.nextBytes(bytes);
+ return "TSFILE-IMPORTER-" + UUID.nameUUIDFromBytes(bytes);
+ }
+
+ private void close() {
+ try {
+ if (this.client != null) {
+ this.client.close();
+ }
+ } catch (final Exception e) {
+ IOT_PRINTER.println("Failed to close client because " + e.getMessage());
+ }
+ }
+
+ public static void setHost(final String host) {
+ ImportTsFileRemotely.host = host;
+ }
+
+ public static void setPort(final String port) {
+ ImportTsFileRemotely.port = port;
+ }
+}
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileScanTool.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileScanTool.java
new file mode 100644
index 00000000000..4ec9f229084
--- /dev/null
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileScanTool.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tool.tsfile;
+
+import java.io.File;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class ImportTsFileScanTool {
+
+ private static final String RESOURCE = ".resource";
+ private static final String MODS = ".mods";
+
+ private static final LinkedBlockingQueue<String> tsfileQueue = new
LinkedBlockingQueue<>();
+ private static final Set<String> tsfileSet = new HashSet<>();
+ private static final Set<String> resourceOrModsSet = new HashSet<>();
+ private static String sourceFullPath;
+
+ public static void traverseAndCollectFiles() throws InterruptedException {
+ traverseAndCollectFilesBySourceFullPath(new File(sourceFullPath));
+ }
+
+ private static void traverseAndCollectFilesBySourceFullPath(final File file)
+ throws InterruptedException {
+ if (file.isFile()) {
+ if (file.getName().endsWith(RESOURCE) || file.getName().endsWith(MODS)) {
+ resourceOrModsSet.add(file.getAbsolutePath());
+ } else {
+ tsfileSet.add(file.getAbsolutePath());
+ tsfileQueue.put(file.getAbsolutePath());
+ }
+ } else if (file.isDirectory()) {
+ final File[] files = file.listFiles();
+ if (files != null) {
+ for (File f : files) {
+ traverseAndCollectFilesBySourceFullPath(f);
+ }
+ }
+ }
+ }
+
+ public static void addNoResourceOrModsToQueue() throws InterruptedException {
+ for (final String filePath : resourceOrModsSet) {
+ final String tsfilePath =
+ filePath.endsWith(RESOURCE)
+ ? filePath.substring(0, filePath.length() - RESOURCE.length())
+ : filePath.substring(0, filePath.length() - MODS.length());
+ if (!tsfileSet.contains(tsfilePath)) {
+ tsfileQueue.put(filePath);
+ }
+ }
+ }
+
+ public static boolean isContainModsFile(final String modsFilePath) {
+ return ImportTsFileScanTool.resourceOrModsSet.contains(modsFilePath);
+ }
+
+ public static String pollFromQueue() {
+ return ImportTsFileScanTool.tsfileQueue.poll();
+ }
+
+ public static void putToQueue(final String filePath) throws
InterruptedException {
+ ImportTsFileScanTool.tsfileQueue.put(filePath);
+ }
+
+ public static void setSourceFullPath(final String sourceFullPath) {
+ ImportTsFileScanTool.sourceFullPath = sourceFullPath;
+ }
+
+ public static int getSourceFullPathLength() {
+ return ImportTsFileScanTool.sourceFullPath.length();
+ }
+
+ public static int getTsFileQueueSize() {
+ return ImportTsFileScanTool.tsfileQueue.size();
+ }
+}
diff --git
a/iotdb-client/cli/src/test/java/org/apache/iotdb/tool/WriteDataFileTest.java
b/iotdb-client/cli/src/test/java/org/apache/iotdb/tool/WriteDataFileTest.java
index eb9e01d2937..f9b43b8a880 100644
---
a/iotdb-client/cli/src/test/java/org/apache/iotdb/tool/WriteDataFileTest.java
+++
b/iotdb-client/cli/src/test/java/org/apache/iotdb/tool/WriteDataFileTest.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.tool;
+import org.apache.iotdb.tool.data.AbstractDataTool;
+
import org.junit.Before;
import org.junit.Test;