This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a2aba23c06 Load: Support remote load in import-tsfile script (#13352)
8a2aba23c06 is described below

commit 8a2aba23c0673e0560b7aff2ffc9b3ce9a73a5ff
Author: YC27 <[email protected]>
AuthorDate: Fri Sep 6 19:41:07 2024 +0800

    Load: Support remote load in import-tsfile script (#13352)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 iotdb-client/cli/pom.xml                           |  10 +
 .../cli/src/assembly/resources/tools/backup.bat    |   2 +-
 .../src/assembly/resources/tools/export-data.bat   |   2 +-
 .../src/assembly/resources/tools/export-schema.bat |   2 +-
 .../src/assembly/resources/tools/export-tsfile.bat |   2 +-
 .../src/assembly/resources/tools/import-data.bat   |   2 +-
 .../src/assembly/resources/tools/import-schema.bat |   2 +-
 .../src/assembly/resources/tools/load-tsfile.bat   |   2 +-
 .../java/org/apache/iotdb/cli/AbstractCli.java     |   2 +-
 .../iotdb/tool/{ => backup}/IoTDBDataBackTool.java |   3 +-
 .../iotdb/tool/{ => data}/AbstractDataTool.java    |   2 +-
 .../apache/iotdb/tool/{ => data}/ExportData.java   |   2 +-
 .../apache/iotdb/tool/{ => data}/ImportData.java   |   2 +-
 .../tool/{ => schema}/AbstractSchemaTool.java      |   2 +-
 .../iotdb/tool/{ => schema}/ExportSchema.java      |   2 +-
 .../iotdb/tool/{ => schema}/ImportSchema.java      |   2 +-
 .../tool/{ => tsfile}/AbstractTsFileTool.java      |   2 +-
 .../iotdb/tool/{ => tsfile}/ExportTsFile.java      |   2 +-
 .../iotdb/tool/{ => tsfile}/ImportTsFile.java      | 302 ++++--------------
 .../apache/iotdb/tool/tsfile/ImportTsFileBase.java | 243 +++++++++++++++
 .../iotdb/tool/tsfile/ImportTsFileLocally.java     |  53 ++++
 .../iotdb/tool/tsfile/ImportTsFileRemotely.java    | 338 +++++++++++++++++++++
 .../iotdb/tool/tsfile/ImportTsFileScanTool.java    |  95 ++++++
 .../org/apache/iotdb/tool/WriteDataFileTest.java   |   2 +
 24 files changed, 818 insertions(+), 260 deletions(-)

diff --git a/iotdb-client/cli/pom.xml b/iotdb-client/cli/pom.xml
index faf10b630a1..a735626e9ea 100644
--- a/iotdb-client/cli/pom.xml
+++ b/iotdb-client/cli/pom.xml
@@ -84,6 +84,16 @@
             <artifactId>iotdb-thrift</artifactId>
             <version>2.0.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-thrift-commons</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>pipe-api</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+        </dependency>
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
diff --git a/iotdb-client/cli/src/assembly/resources/tools/backup.bat 
b/iotdb-client/cli/src/assembly/resources/tools/backup.bat
index 3008a74733e..e7974164b08 100644
--- a/iotdb-client/cli/src/assembly/resources/tools/backup.bat
+++ b/iotdb-client/cli/src/assembly/resources/tools/backup.bat
@@ -102,7 +102,7 @@ echo Starting IoTDB Client Data Back Script
 echo ------------------------------------------
 
 set CLASSPATH="%IOTDB_HOME%\lib\*"
-if NOT DEFINED MAIN_CLASS set 
MAIN_CLASS=org.apache.iotdb.tool.IoTDBDataBackTool
+if NOT DEFINED MAIN_CLASS set 
MAIN_CLASS=org.apache.iotdb.tool.backup.IoTDBDataBackTool
 
 set logsDir="%IOTDB_HOME%\logs"
 if not exist "%logsDir%" (
diff --git a/iotdb-client/cli/src/assembly/resources/tools/export-data.bat 
b/iotdb-client/cli/src/assembly/resources/tools/export-data.bat
index c3acfff5f92..6df7b11d00c 100644
--- a/iotdb-client/cli/src/assembly/resources/tools/export-data.bat
+++ b/iotdb-client/cli/src/assembly/resources/tools/export-data.bat
@@ -31,7 +31,7 @@ pushd %~dp0..
 if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%CD%
 popd
 
-if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.tool.ExportData
+if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.tool.data.ExportData
 if NOT DEFINED JAVA_HOME goto :err
 
 @REM 
-----------------------------------------------------------------------------
diff --git a/iotdb-client/cli/src/assembly/resources/tools/export-schema.bat 
b/iotdb-client/cli/src/assembly/resources/tools/export-schema.bat
index 8ed2b81f5c7..dab5dfaf667 100644
--- a/iotdb-client/cli/src/assembly/resources/tools/export-schema.bat
+++ b/iotdb-client/cli/src/assembly/resources/tools/export-schema.bat
@@ -31,7 +31,7 @@ pushd %~dp0..
 if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%CD%
 popd
 
-if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.tool.ExportSchema
+if NOT DEFINED MAIN_CLASS set 
MAIN_CLASS=org.apache.iotdb.tool.schema.ExportSchema
 if NOT DEFINED JAVA_HOME goto :err
 
 @REM 
-----------------------------------------------------------------------------
diff --git a/iotdb-client/cli/src/assembly/resources/tools/export-tsfile.bat 
b/iotdb-client/cli/src/assembly/resources/tools/export-tsfile.bat
index 350e806a769..2c85f42bd62 100644
--- a/iotdb-client/cli/src/assembly/resources/tools/export-tsfile.bat
+++ b/iotdb-client/cli/src/assembly/resources/tools/export-tsfile.bat
@@ -31,7 +31,7 @@ pushd %~dp0..
 if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%CD%
 popd
 
-if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.tool.ExportTsFile
+if NOT DEFINED MAIN_CLASS set 
MAIN_CLASS=org.apache.iotdb.tool.tsfile.ExportTsFile
 if NOT DEFINED JAVA_HOME goto :err
 
 @REM 
-----------------------------------------------------------------------------
diff --git a/iotdb-client/cli/src/assembly/resources/tools/import-data.bat 
b/iotdb-client/cli/src/assembly/resources/tools/import-data.bat
index cb34c5e897a..a4041015ccc 100644
--- a/iotdb-client/cli/src/assembly/resources/tools/import-data.bat
+++ b/iotdb-client/cli/src/assembly/resources/tools/import-data.bat
@@ -31,7 +31,7 @@ pushd %~dp0..
 if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%CD%
 popd
 
-if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.tool.ImportData
+if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.tool.data.ImportData
 if NOT DEFINED JAVA_HOME goto :err
 
 @REM 
-----------------------------------------------------------------------------
diff --git a/iotdb-client/cli/src/assembly/resources/tools/import-schema.bat 
b/iotdb-client/cli/src/assembly/resources/tools/import-schema.bat
index 46a8d5abf6e..fbf5236128b 100644
--- a/iotdb-client/cli/src/assembly/resources/tools/import-schema.bat
+++ b/iotdb-client/cli/src/assembly/resources/tools/import-schema.bat
@@ -31,7 +31,7 @@ pushd %~dp0..
 if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%CD%
 popd
 
-if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.tool.ImportSchema
+if NOT DEFINED MAIN_CLASS set 
MAIN_CLASS=org.apache.iotdb.tool.schema.ImportSchema
 if NOT DEFINED JAVA_HOME goto :err
 
 @REM 
-----------------------------------------------------------------------------
diff --git a/iotdb-client/cli/src/assembly/resources/tools/load-tsfile.bat 
b/iotdb-client/cli/src/assembly/resources/tools/load-tsfile.bat
index bd1f4c9170c..52ae0a46b76 100755
--- a/iotdb-client/cli/src/assembly/resources/tools/load-tsfile.bat
+++ b/iotdb-client/cli/src/assembly/resources/tools/load-tsfile.bat
@@ -30,7 +30,7 @@ pushd %~dp0..
 if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%CD%
 popd
 
-if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.tool.ImportTsFile
+if NOT DEFINED MAIN_CLASS set 
MAIN_CLASS=org.apache.iotdb.tool.tsfile.ImportTsFile
 if NOT DEFINED JAVA_HOME goto :err
 
 @REM 
-----------------------------------------------------------------------------
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
index a0a7ca8b14f..b803e0df55c 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
+++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.jdbc.IoTDBJDBCResultSet;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.service.rpc.thrift.ServerProperties;
-import org.apache.iotdb.tool.ImportData;
+import org.apache.iotdb.tool.data.ImportData;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/IoTDBDataBackTool.java 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/backup/IoTDBDataBackTool.java
similarity index 99%
rename from 
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/IoTDBDataBackTool.java
rename to 
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/backup/IoTDBDataBackTool.java
index 7cc3556562b..62f27acbf62 100644
--- 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/IoTDBDataBackTool.java
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/backup/IoTDBDataBackTool.java
@@ -17,11 +17,12 @@
  * under the License.
  */
 
-package org.apache.iotdb.tool;
+package org.apache.iotdb.tool.backup;
 
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.tool.data.AbstractDataTool;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/AbstractDataTool.java 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/AbstractDataTool.java
similarity index 99%
rename from 
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/AbstractDataTool.java
rename to 
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/AbstractDataTool.java
index 1e689e903c1..d60667676f8 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/AbstractDataTool.java
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/AbstractDataTool.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.tool;
+package org.apache.iotdb.tool.data;
 
 import org.apache.iotdb.cli.utils.IoTPrinter;
 import org.apache.iotdb.exception.ArgsErrorException;
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportData.java 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportData.java
similarity index 99%
rename from iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportData.java
rename to 
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportData.java
index bc09e2faa51..931686cced7 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportData.java
+++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportData.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.tool;
+package org.apache.iotdb.tool.data;
 
 import org.apache.iotdb.cli.type.ExitType;
 import org.apache.iotdb.cli.utils.CliContext;
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportData.java 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ImportData.java
similarity index 99%
rename from iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportData.java
rename to 
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ImportData.java
index 819190d2c2c..e8fbe063313 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportData.java
+++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ImportData.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.tool;
+package org.apache.iotdb.tool.data;
 
 import org.apache.iotdb.cli.utils.IoTPrinter;
 import org.apache.iotdb.commons.exception.IllegalPathException;
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/AbstractSchemaTool.java 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/AbstractSchemaTool.java
similarity index 99%
rename from 
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/AbstractSchemaTool.java
rename to 
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/AbstractSchemaTool.java
index 9e91d404d12..00d926c7074 100644
--- 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/AbstractSchemaTool.java
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/AbstractSchemaTool.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.tool;
+package org.apache.iotdb.tool.schema;
 
 import org.apache.iotdb.cli.utils.IoTPrinter;
 import org.apache.iotdb.exception.ArgsErrorException;
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportSchema.java 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ExportSchema.java
similarity index 99%
rename from 
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportSchema.java
rename to 
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ExportSchema.java
index 9892da625e7..54453acd898 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportSchema.java
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ExportSchema.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.tool;
+package org.apache.iotdb.tool.schema;
 
 import org.apache.iotdb.cli.type.ExitType;
 import org.apache.iotdb.cli.utils.CliContext;
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportSchema.java 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ImportSchema.java
similarity index 99%
rename from 
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportSchema.java
rename to 
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ImportSchema.java
index 3a7b8ab60e5..a7709313b1b 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportSchema.java
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ImportSchema.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.tool;
+package org.apache.iotdb.tool.schema;
 
 import org.apache.iotdb.cli.utils.IoTPrinter;
 import org.apache.iotdb.commons.exception.IllegalPathException;
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/AbstractTsFileTool.java 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/AbstractTsFileTool.java
similarity index 99%
rename from 
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/AbstractTsFileTool.java
rename to 
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/AbstractTsFileTool.java
index fe8691ddf8b..f7a88eb56c5 100644
--- 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/AbstractTsFileTool.java
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/AbstractTsFileTool.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.tool;
+package org.apache.iotdb.tool.tsfile;
 
 import org.apache.iotdb.cli.utils.IoTPrinter;
 import org.apache.iotdb.exception.ArgsErrorException;
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportTsFile.java 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ExportTsFile.java
similarity index 99%
rename from 
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportTsFile.java
rename to 
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ExportTsFile.java
index 9ab3d6b3ec6..22cdcb907da 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportTsFile.java
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ExportTsFile.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.tool;
+package org.apache.iotdb.tool.tsfile;
 
 import org.apache.iotdb.cli.type.ExitType;
 import org.apache.iotdb.cli.utils.CliContext;
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportTsFile.java 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java
similarity index 50%
rename from 
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportTsFile.java
rename to 
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java
index 576d2927beb..497900a0371 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportTsFile.java
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java
@@ -17,9 +17,10 @@
  * under the License.
  */
 
-package org.apache.iotdb.tool;
+package org.apache.iotdb.tool.tsfile;
 
 import org.apache.iotdb.cli.utils.IoTPrinter;
+import org.apache.iotdb.commons.utils.NodeUrlUtils;
 import org.apache.iotdb.session.pool.SessionPool;
 
 import org.apache.commons.cli.CommandLine;
@@ -31,18 +32,13 @@ import org.apache.commons.cli.ParseException;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.file.FileAlreadyExistsException;
+import java.net.UnknownHostException;
 import java.nio.file.Files;
-import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.nio.file.StandardCopyOption;
 import java.util.ArrayList;
-import java.util.HashSet;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.LongAdder;
 
 public class ImportTsFile extends AbstractTsFileTool {
 
@@ -64,15 +60,11 @@ public class ImportTsFile extends AbstractTsFileTool {
   private static final String THREAD_NUM_ARGS = "tn";
   private static final String THREAD_NUM_NAME = "thread_num";
 
-  private static final IoTPrinter ioTPrinter = new IoTPrinter(System.out);
+  private static final IoTPrinter IOT_PRINTER = new IoTPrinter(System.out);
 
   private static final String TS_FILE_CLI_PREFIX = "ImportTsFile";
 
-  private static final String RESOURCE = ".resource";
-  private static final String MODS = ".mods";
-
   private static String source;
-  private static String sourceFullPath;
 
   private static String successDir = "success/";
   private static String failDir = "fail/";
@@ -82,14 +74,7 @@ public class ImportTsFile extends AbstractTsFileTool {
 
   private static int threadNum = 8;
 
-  private static final LongAdder loadFileSuccessfulNum = new LongAdder();
-  private static final LongAdder loadFileFailedNum = new LongAdder();
-  private static final LongAdder processingLoadSuccessfulFileSuccessfulNum = 
new LongAdder();
-  private static final LongAdder processingLoadFailedFileSuccessfulNum = new 
LongAdder();
-
-  private static final LinkedBlockingQueue<String> tsfileQueue = new 
LinkedBlockingQueue<>();
-  private static final Set<String> tsfileSet = new HashSet<>();
-  private static final Set<String> resourceOrModsSet = new HashSet<>();
+  private static boolean isRemoteLoad = true;
 
   private static SessionPool sessionPool;
 
@@ -171,7 +156,7 @@ public class ImportTsFile extends AbstractTsFileTool {
     helpFormatter.setWidth(MAX_HELP_CONSOLE_WIDTH);
 
     if (args == null || args.length == 0) {
-      ioTPrinter.println("Too few arguments, please check the following 
hint.");
+      IOT_PRINTER.println("Too few arguments, please check the following 
hint.");
       helpFormatter.printHelp(TS_FILE_CLI_PREFIX, options, true);
       System.exit(CODE_ERROR);
     }
@@ -182,7 +167,7 @@ public class ImportTsFile extends AbstractTsFileTool {
         System.exit(CODE_OK);
       }
     } catch (ParseException e) {
-      ioTPrinter.println("Failed to parse the provided options: " + 
e.getMessage());
+      IOT_PRINTER.println("Failed to parse the provided options: " + 
e.getMessage());
       helpFormatter.printHelp(TS_FILE_CLI_PREFIX, options, true);
       System.exit(CODE_ERROR);
     }
@@ -191,7 +176,7 @@ public class ImportTsFile extends AbstractTsFileTool {
     try {
       commandLine = parser.parse(options, args, true);
     } catch (ParseException e) {
-      ioTPrinter.println("Failed to parse the provided options: " + 
e.getMessage());
+      IOT_PRINTER.println("Failed to parse the provided options: " + 
e.getMessage());
       helpFormatter.printHelp(TS_FILE_CLI_PREFIX, options, true);
       System.exit(CODE_ERROR);
     }
@@ -200,45 +185,30 @@ public class ImportTsFile extends AbstractTsFileTool {
       parseBasicParams(commandLine);
       parseSpecialParams(commandLine);
     } catch (Exception e) {
-      ioTPrinter.println("Encounter an error when parsing the provided 
options: " + e.getMessage());
+      IOT_PRINTER.println(
+          "Encounter an error when parsing the provided options: " + 
e.getMessage());
       System.exit(CODE_ERROR);
     }
 
+    IOT_PRINTER.println(isRemoteLoad ? "Load remotely." : "Load locally.");
+
     final int resultCode = importFromTargetPath();
-    ioTPrinter.println(
-        "Successfully load "
-            + loadFileSuccessfulNum.sum()
-            + " tsfile(s) (--on_success operation(s): "
-            + processingLoadSuccessfulFileSuccessfulNum.sum()
-            + " succeed, "
-            + (loadFileSuccessfulNum.sum() - 
processingLoadSuccessfulFileSuccessfulNum.sum())
-            + " failed)");
-    ioTPrinter.println(
-        "Failed to load "
-            + loadFileFailedNum.sum()
-            + " file(s) (--on_fail operation(s): "
-            + processingLoadFailedFileSuccessfulNum.sum()
-            + " succeed, "
-            + (loadFileFailedNum.sum() - 
processingLoadFailedFileSuccessfulNum.sum())
-            + " failed)");
-    ioTPrinter.println("For more details, please check the log.");
-    ioTPrinter.println(
-        "Total operation time: " + (System.currentTimeMillis() - startTime) + 
" ms.");
-    ioTPrinter.println("Work has been completed!");
+
+    ImportTsFileBase.printResult(startTime);
     System.exit(resultCode);
   }
 
   private static void parseSpecialParams(CommandLine commandLine) {
     source = commandLine.getOptionValue(SOURCE_ARGS);
     if (!Files.exists(Paths.get(source))) {
-      ioTPrinter.println(String.format("Source file or directory %s does not 
exist", source));
+      IOT_PRINTER.println(String.format("Source file or directory %s does not 
exist", source));
       System.exit(CODE_ERROR);
     }
 
     final String onSuccess = 
commandLine.getOptionValue(ON_SUCCESS_ARGS).trim().toLowerCase();
     final String onFail = 
commandLine.getOptionValue(ON_FAIL_ARGS).trim().toLowerCase();
     if (!Operation.isValidOperation(onSuccess) || 
!Operation.isValidOperation(onFail)) {
-      ioTPrinter.println("Args error: os/of must be one of none, mv, cp, 
delete");
+      IOT_PRINTER.println("Args error: os/of must be one of none, mv, cp, 
delete");
       System.exit(CODE_ERROR);
     }
 
@@ -262,6 +232,13 @@ public class ImportTsFile extends AbstractTsFileTool {
     if (commandLine.getOptionValue(THREAD_NUM_ARGS) != null) {
       threadNum = 
Integer.parseInt(commandLine.getOptionValue(THREAD_NUM_ARGS));
     }
+
+    try {
+      isRemoteLoad = 
!NodeUrlUtils.containsLocalAddress(Collections.singletonList(host));
+    } catch (UnknownHostException e) {
+      IOT_PRINTER.println(
+          "Unknown host: " + host + ". Exception: " + e.getMessage() + ". Will 
use remote load.");
+    }
   }
 
   public static boolean isFileStoreEquals(String pathString, File dir) {
@@ -269,7 +246,7 @@ public class ImportTsFile extends AbstractTsFileTool {
       return Objects.equals(
           Files.getFileStore(Paths.get(pathString)), 
Files.getFileStore(dir.toPath()));
     } catch (IOException e) {
-      ioTPrinter.println("IOException when checking file store: " + 
e.getMessage());
+      IOT_PRINTER.println("IOException when checking file store: " + 
e.getMessage());
       return false;
     }
   }
@@ -281,7 +258,7 @@ public class ImportTsFile extends AbstractTsFileTool {
     File file = new File(successDir);
     if (!file.isDirectory()) {
       if (!file.mkdirs()) {
-        ioTPrinter.println(String.format("Failed to create %s %s", 
SUCCESS_DIR_NAME, successDir));
+        IOT_PRINTER.println(String.format("Failed to create %s %s", 
SUCCESS_DIR_NAME, successDir));
         System.exit(CODE_ERROR);
       }
     }
@@ -295,7 +272,7 @@ public class ImportTsFile extends AbstractTsFileTool {
     File file = new File(failDir);
     if (!file.isDirectory()) {
       if (!file.mkdirs()) {
-        ioTPrinter.println(String.format("Failed to create %s %s", 
FAIL_DIR_NAME, failDir));
+        IOT_PRINTER.println(String.format("Failed to create %s %s", 
FAIL_DIR_NAME, failDir));
         System.exit(CODE_ERROR);
       }
     }
@@ -304,13 +281,6 @@ public class ImportTsFile extends AbstractTsFileTool {
 
   public static int importFromTargetPath() {
     try {
-      final File file = new File(source);
-      sourceFullPath = file.getAbsolutePath();
-      if (!file.isFile() && !file.isDirectory()) {
-        ioTPrinter.println(String.format("source file or directory %s does not 
exist", source));
-        return CODE_ERROR;
-      }
-
       sessionPool =
           new SessionPool.Builder()
               .host(host)
@@ -324,15 +294,22 @@ public class ImportTsFile extends AbstractTsFileTool {
               .build();
       sessionPool.setEnableQueryRedirection(false);
 
-      traverseAndCollectFiles(file);
-      addNoResourceOrModsToQueue();
-      ioTPrinter.println("Load file total number : " + tsfileQueue.size());
+      // set params
+      processSetParams();
+
+      ImportTsFileScanTool.traverseAndCollectFiles();
+      ImportTsFileScanTool.addNoResourceOrModsToQueue();
+
+      IOT_PRINTER.println("Load file total number : " + 
ImportTsFileScanTool.getTsFileQueueSize());
       asyncImportTsFiles();
       return CODE_OK;
     } catch (InterruptedException e) {
-      ioTPrinter.println(String.format("Import tsfile fail: %s", 
e.getMessage()));
+      IOT_PRINTER.println(String.format("Import tsfile fail: %s", 
e.getMessage()));
       Thread.currentThread().interrupt();
       return CODE_ERROR;
+    } catch (Exception e) {
+      IOT_PRINTER.println(String.format("Import tsfile fail: %s", 
e.getMessage()));
+      return CODE_ERROR;
     } finally {
       if (sessionPool != null) {
         sessionPool.close();
@@ -340,40 +317,32 @@ public class ImportTsFile extends AbstractTsFileTool {
     }
   }
 
-  public static void traverseAndCollectFiles(File file) throws 
InterruptedException {
-    if (file.isFile()) {
-      if (file.getName().endsWith(RESOURCE) || file.getName().endsWith(MODS)) {
-        resourceOrModsSet.add(file.getAbsolutePath());
-      } else {
-        tsfileSet.add(file.getAbsolutePath());
-        tsfileQueue.put(file.getAbsolutePath());
-      }
-    } else if (file.isDirectory()) {
-      final File[] files = file.listFiles();
-      if (files != null) {
-        for (File f : files) {
-          traverseAndCollectFiles(f);
-        }
-      }
+  // process other classes need param
+  private static void processSetParams() {
+    // ImportTsFileLocally
+    final File file = new File(source);
+    ImportTsFileScanTool.setSourceFullPath(file.getAbsolutePath());
+    if (!file.isFile() && !file.isDirectory()) {
+      IOT_PRINTER.println(String.format("Source file or directory %s does not 
exist", source));
+      System.exit(CODE_ERROR);
     }
-  }
 
-  public static void addNoResourceOrModsToQueue() throws InterruptedException {
-    for (final String filePath : resourceOrModsSet) {
-      final String tsfilePath =
-          filePath.endsWith(RESOURCE)
-              ? filePath.substring(0, filePath.length() - RESOURCE.length())
-              : filePath.substring(0, filePath.length() - MODS.length());
-      if (!tsfileSet.contains(tsfilePath)) {
-        tsfileQueue.put(filePath);
-      }
-    }
+    ImportTsFileLocally.setSessionPool(sessionPool);
+
+    // ImportTsFileRemotely
+    ImportTsFileRemotely.setHost(host);
+    ImportTsFileRemotely.setPort(port);
+
+    // ImportTsFileBase
+    ImportTsFileBase.setSuccessAndFailDirAndOperation(
+        successDir, successOperation, failDir, failOperation);
   }
 
   public static void asyncImportTsFiles() {
     final List<Thread> list = new ArrayList<>(threadNum);
     for (int i = 0; i < threadNum; i++) {
-      final Thread thread = new Thread(ImportTsFile::importTsFile);
+      final Thread thread =
+          new Thread(isRemoteLoad ? new ImportTsFileRemotely() : new 
ImportTsFileLocally());
       thread.start();
       list.add(thread);
     }
@@ -383,164 +352,11 @@ public class ImportTsFile extends AbstractTsFileTool {
             thread.join();
           } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
-            ioTPrinter.println("importTsFile thread join interrupted: " + 
e.getMessage());
+            IOT_PRINTER.println("ImportTsFile thread join interrupted: " + 
e.getMessage());
           }
         });
   }
 
-  public static void importTsFile() {
-    String filePath;
-    try {
-      while ((filePath = tsfileQueue.poll()) != null) {
-        final String sql = "load '" + filePath + "' onSuccess=none ";
-
-        try {
-          sessionPool.executeNonQueryStatement(sql);
-
-          loadFileSuccessfulNum.increment();
-          ioTPrinter.println("Imported [ " + filePath + " ] file 
successfully!");
-
-          try {
-            processingFile(filePath, successDir, successOperation);
-            processingLoadSuccessfulFileSuccessfulNum.increment();
-            ioTPrinter.println("Processed success file [ " + filePath + " ] 
successfully!");
-          } catch (Exception processSuccessException) {
-            ioTPrinter.println(
-                "Failed to process success file [ "
-                    + filePath
-                    + " ]: "
-                    + processSuccessException.getMessage());
-          }
-        } catch (Exception e) {
-          // Reject because of memory controls, do retry later
-          if (Objects.nonNull(e.getMessage()) && 
e.getMessage().contains("memory")) {
-            ioTPrinter.println(
-                "Rejecting file [ " + filePath + " ] due to memory 
constraints, will retry later.");
-            tsfileQueue.put(filePath);
-            continue;
-          }
-
-          loadFileFailedNum.increment();
-          ioTPrinter.println("Failed to import [ " + filePath + " ] file: " + 
e.getMessage());
-
-          try {
-            processingFile(filePath, failDir, failOperation);
-            processingLoadFailedFileSuccessfulNum.increment();
-            ioTPrinter.println("Processed fail file [ " + filePath + " ] 
successfully!");
-          } catch (Exception processFailException) {
-            ioTPrinter.println(
-                "Failed to process fail file [ "
-                    + filePath
-                    + " ]: "
-                    + processFailException.getMessage());
-          }
-        }
-      }
-    } catch (InterruptedException e) {
-      ioTPrinter.println("Unexpected error occurred: " + e.getMessage());
-      Thread.currentThread().interrupt();
-    } catch (Exception e) {
-      ioTPrinter.println("Unexpected error occurred: " + e.getMessage());
-    }
-  }
-
-  public static void processingFile(String filePath, String dir, Operation 
operation) {
-    String relativePath = filePath.substring(sourceFullPath.length() + 1);
-    Path sourcePath = Paths.get(filePath);
-
-    String target = dir + File.separator + 
relativePath.replace(File.separator, "_");
-    Path targetPath = Paths.get(target);
-
-    Path sourceResourcePath = Paths.get(sourcePath + RESOURCE);
-    sourceResourcePath = Files.exists(sourceResourcePath) ? sourceResourcePath 
: null;
-    Path targetResourcePath = Paths.get(target + RESOURCE);
-
-    Path sourceModsPath = Paths.get(sourcePath + MODS);
-    sourceModsPath = Files.exists(sourceModsPath) ? sourceModsPath : null;
-    Path targetModsPath = Paths.get(target + MODS);
-
-    switch (operation) {
-      case DELETE:
-        {
-          try {
-            Files.deleteIfExists(sourcePath);
-            if (null != sourceResourcePath) {
-              Files.deleteIfExists(sourceResourcePath);
-            }
-            if (null != sourceModsPath) {
-              Files.deleteIfExists(sourceModsPath);
-            }
-          } catch (Exception e) {
-            ioTPrinter.println(String.format("Failed to delete file: %s", 
e.getMessage()));
-          }
-          break;
-        }
-      case CP:
-        {
-          try {
-            Files.copy(sourcePath, targetPath, 
StandardCopyOption.REPLACE_EXISTING);
-            if (null != sourceResourcePath) {
-              Files.copy(
-                  sourceResourcePath, targetResourcePath, 
StandardCopyOption.REPLACE_EXISTING);
-            }
-            if (null != sourceModsPath) {
-              Files.copy(sourceModsPath, targetModsPath, 
StandardCopyOption.REPLACE_EXISTING);
-            }
-          } catch (Exception e) {
-            ioTPrinter.println(String.format("Failed to copy file: %s", 
e.getMessage()));
-          }
-          break;
-        }
-      case HARDLINK:
-        {
-          try {
-            Files.createLink(targetPath, sourcePath);
-          } catch (FileAlreadyExistsException e) {
-            ioTPrinter.println("Hardlink already exists: " + e.getMessage());
-          } catch (Exception e) {
-            try {
-              Files.copy(sourcePath, targetPath, 
StandardCopyOption.REPLACE_EXISTING);
-            } catch (Exception copyException) {
-              ioTPrinter.println(
-                  String.format("Failed to copy file: %s", 
copyException.getMessage()));
-            }
-          }
-
-          try {
-            if (null != sourceResourcePath) {
-              Files.copy(
-                  sourceResourcePath, targetResourcePath, 
StandardCopyOption.REPLACE_EXISTING);
-            }
-            if (null != sourceModsPath) {
-              Files.copy(sourceModsPath, targetModsPath, 
StandardCopyOption.REPLACE_EXISTING);
-            }
-          } catch (Exception e) {
-            ioTPrinter.println(
-                String.format("Failed to copy resource or mods file: %s", 
e.getMessage()));
-          }
-          break;
-        }
-      case MV:
-        {
-          try {
-            Files.move(sourcePath, targetPath, 
StandardCopyOption.REPLACE_EXISTING);
-            if (null != sourceResourcePath) {
-              Files.move(
-                  sourceResourcePath, targetResourcePath, 
StandardCopyOption.REPLACE_EXISTING);
-            }
-            if (null != sourceModsPath) {
-              Files.move(sourceModsPath, targetModsPath, 
StandardCopyOption.REPLACE_EXISTING);
-            }
-          } catch (Exception e) {
-            ioTPrinter.println(String.format("Failed to move file: %s", 
e.getMessage()));
-          }
-          break;
-        }
-      default:
-        break;
-    }
-  }
-
   public enum Operation {
     NONE,
     MV,
@@ -571,7 +387,7 @@ public class ImportTsFile extends AbstractTsFileTool {
         case "delete":
           return Operation.DELETE;
         default:
-          ioTPrinter.println("Args error: os/of must be one of none, mv, cp, 
delete");
+          IOT_PRINTER.println("Args error: os/of must be one of none, mv, cp, 
delete");
           System.exit(CODE_ERROR);
           return null;
       }
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileBase.java
 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileBase.java
new file mode 100644
index 00000000000..82848d1357e
--- /dev/null
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileBase.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tool.tsfile;
+
+import org.apache.iotdb.cli.utils.IoTPrinter;
+
+import java.io.File;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.Objects;
+import java.util.concurrent.atomic.LongAdder;
+
+public abstract class ImportTsFileBase implements Runnable {
+
+  private static final IoTPrinter IOT_PRINTER = new IoTPrinter(System.out);
+
+  private static final LongAdder loadFileSuccessfulNum = new LongAdder();
+  private static final LongAdder loadFileFailedNum = new LongAdder();
+  private static final LongAdder processingLoadSuccessfulFileSuccessfulNum = 
new LongAdder();
+  private static final LongAdder processingLoadFailedFileSuccessfulNum = new 
LongAdder();
+
+  private static String successDir;
+  private static ImportTsFile.Operation successOperation;
+  private static String failDir;
+  private static ImportTsFile.Operation failOperation;
+
+  @Override
+  public void run() {
+    loadTsFile();
+  }
+
+  protected abstract void loadTsFile();
+
+  protected void processFailFile(final String filePath, final Exception e) {
+    // Reject because of memory controls, do retry later
+    try {
+      if (Objects.nonNull(e.getMessage()) && 
e.getMessage().contains("memory")) {
+        IOT_PRINTER.println(
+            "Rejecting file [ " + filePath + " ] due to memory constraints, 
will retry later.");
+        ImportTsFileScanTool.putToQueue(filePath);
+        return;
+      }
+
+      loadFileFailedNum.increment();
+      IOT_PRINTER.println("Failed to import [ " + filePath + " ] file: " + 
e.getMessage());
+
+      try {
+        processingFile(filePath, false);
+        processingLoadFailedFileSuccessfulNum.increment();
+        IOT_PRINTER.println("Processed fail file [ " + filePath + " ] 
successfully!");
+      } catch (final Exception processFailException) {
+        IOT_PRINTER.println(
+            "Failed to process fail file [ "
+                + filePath
+                + " ]: "
+                + processFailException.getMessage());
+      }
+    } catch (final InterruptedException e1) {
+      IOT_PRINTER.println("Unexpected error occurred: " + e1.getMessage());
+      Thread.currentThread().interrupt();
+    } catch (final Exception e1) {
+      IOT_PRINTER.println("Unexpected error occurred: " + e1.getMessage());
+    }
+  }
+
+  protected static void processSuccessFile(final String filePath) {
+    loadFileSuccessfulNum.increment();
+    IOT_PRINTER.println("Imported [ " + filePath + " ] file successfully!");
+
+    try {
+      processingFile(filePath, true);
+      processingLoadSuccessfulFileSuccessfulNum.increment();
+      IOT_PRINTER.println("Processed success file [ " + filePath + " ] 
successfully!");
+    } catch (final Exception processSuccessException) {
+      IOT_PRINTER.println(
+          "Failed to process success file [ "
+              + filePath
+              + " ]: "
+              + processSuccessException.getMessage());
+    }
+  }
+
+  public static void processingFile(final String filePath, final boolean 
isSuccess) {
+    final String relativePath =
+        filePath.substring(ImportTsFileScanTool.getSourceFullPathLength() + 1);
+    final Path sourcePath = Paths.get(filePath);
+
+    final String target =
+        isSuccess
+            ? successDir
+            : failDir + File.separator + relativePath.replace(File.separator, 
"_");
+    final Path targetPath = Paths.get(target);
+
+    final String RESOURCE = ".resource";
+    Path sourceResourcePath = Paths.get(sourcePath + RESOURCE);
+    sourceResourcePath = Files.exists(sourceResourcePath) ? sourceResourcePath 
: null;
+    final Path targetResourcePath = Paths.get(target + RESOURCE);
+
+    final String MODS = ".mods";
+    Path sourceModsPath = Paths.get(sourcePath + MODS);
+    sourceModsPath = Files.exists(sourceModsPath) ? sourceModsPath : null;
+    final Path targetModsPath = Paths.get(target + MODS);
+
+    switch (isSuccess ? successOperation : failOperation) {
+      case DELETE:
+        {
+          try {
+            Files.deleteIfExists(sourcePath);
+            if (null != sourceResourcePath) {
+              Files.deleteIfExists(sourceResourcePath);
+            }
+            if (null != sourceModsPath) {
+              Files.deleteIfExists(sourceModsPath);
+            }
+          } catch (final Exception e) {
+            IOT_PRINTER.println(String.format("Failed to delete file: %s", 
e.getMessage()));
+          }
+          break;
+        }
+      case CP:
+        {
+          try {
+            Files.copy(sourcePath, targetPath, 
StandardCopyOption.REPLACE_EXISTING);
+            if (null != sourceResourcePath) {
+              Files.copy(
+                  sourceResourcePath, targetResourcePath, 
StandardCopyOption.REPLACE_EXISTING);
+            }
+            if (null != sourceModsPath) {
+              Files.copy(sourceModsPath, targetModsPath, 
StandardCopyOption.REPLACE_EXISTING);
+            }
+          } catch (final Exception e) {
+            IOT_PRINTER.println(String.format("Failed to copy file: %s", 
e.getMessage()));
+          }
+          break;
+        }
+      case HARDLINK:
+        {
+          try {
+            Files.createLink(targetPath, sourcePath);
+          } catch (FileAlreadyExistsException e) {
+            IOT_PRINTER.println("Hardlink already exists: " + e.getMessage());
+          } catch (final Exception e) {
+            try {
+              Files.copy(sourcePath, targetPath, 
StandardCopyOption.REPLACE_EXISTING);
+            } catch (final Exception copyException) {
+              IOT_PRINTER.println(
+                  String.format("Failed to copy file: %s", 
copyException.getMessage()));
+            }
+          }
+
+          try {
+            if (null != sourceResourcePath) {
+              Files.copy(
+                  sourceResourcePath, targetResourcePath, 
StandardCopyOption.REPLACE_EXISTING);
+            }
+            if (null != sourceModsPath) {
+              Files.copy(sourceModsPath, targetModsPath, 
StandardCopyOption.REPLACE_EXISTING);
+            }
+          } catch (final Exception e) {
+            IOT_PRINTER.println(
+                String.format("Failed to copy resource or mods file: %s", 
e.getMessage()));
+          }
+          break;
+        }
+      case MV:
+        {
+          try {
+            Files.move(sourcePath, targetPath, 
StandardCopyOption.REPLACE_EXISTING);
+            if (null != sourceResourcePath) {
+              Files.move(
+                  sourceResourcePath, targetResourcePath, 
StandardCopyOption.REPLACE_EXISTING);
+            }
+            if (null != sourceModsPath) {
+              Files.move(sourceModsPath, targetModsPath, 
StandardCopyOption.REPLACE_EXISTING);
+            }
+          } catch (final Exception e) {
+            IOT_PRINTER.println(String.format("Failed to move file: %s", 
e.getMessage()));
+          }
+          break;
+        }
+      default:
+        break;
+    }
+  }
+
+  protected static void printResult(final long startTime) {
+    IOT_PRINTER.println(
+        "Successfully load "
+            + loadFileSuccessfulNum.sum()
+            + " tsfile(s) (--on_success operation(s): "
+            + processingLoadSuccessfulFileSuccessfulNum.sum()
+            + " succeed, "
+            + (loadFileSuccessfulNum.sum() - 
processingLoadSuccessfulFileSuccessfulNum.sum())
+            + " failed)");
+    IOT_PRINTER.println(
+        "Failed to load "
+            + loadFileFailedNum.sum()
+            + " file(s) (--on_fail operation(s): "
+            + processingLoadFailedFileSuccessfulNum.sum()
+            + " succeed, "
+            + (loadFileFailedNum.sum() - 
processingLoadFailedFileSuccessfulNum.sum())
+            + " failed)");
+    IOT_PRINTER.println(
+        "Unprocessed "
+            + ImportTsFileScanTool.getTsFileQueueSize()
+            + " file(s), due to unexpected exceptions");
+    IOT_PRINTER.println("For more details, please check the log.");
+    IOT_PRINTER.println(
+        "Total operation time: " + (System.currentTimeMillis() - startTime) + 
" ms.");
+    IOT_PRINTER.println("Work has been completed!");
+  }
+
+  public static void setSuccessAndFailDirAndOperation(
+      final String successDir,
+      final ImportTsFile.Operation successOperation,
+      final String failDir,
+      final ImportTsFile.Operation failOperation) {
+    ImportTsFileBase.successDir = successDir;
+    ImportTsFileBase.successOperation = successOperation;
+    ImportTsFileBase.failDir = failDir;
+    ImportTsFileBase.failOperation = failOperation;
+  }
+}
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileLocally.java
 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileLocally.java
new file mode 100644
index 00000000000..8f65d26fd32
--- /dev/null
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileLocally.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tool.tsfile;
+
+import org.apache.iotdb.cli.utils.IoTPrinter;
+import org.apache.iotdb.session.pool.SessionPool;
+
+public class ImportTsFileLocally extends ImportTsFileBase implements Runnable {
+
+  private static final IoTPrinter ioTPrinter = new IoTPrinter(System.out);
+
+  private static SessionPool sessionPool;
+
+  @Override
+  public void loadTsFile() {
+    String filePath;
+    try {
+      while ((filePath = ImportTsFileScanTool.pollFromQueue()) != null) {
+        final String sql = "load '" + filePath + "' onSuccess=none ";
+        try {
+          sessionPool.executeNonQueryStatement(sql);
+
+          processSuccessFile(filePath);
+        } catch (final Exception e) {
+          processFailFile(filePath, e);
+        }
+      }
+    } catch (final Exception e) {
+      ioTPrinter.println("Unexpected error occurred: " + e.getMessage());
+    }
+  }
+
+  public static void setSessionPool(SessionPool sessionPool) {
+    ImportTsFileLocally.sessionPool = sessionPool;
+  }
+}
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
new file mode 100644
index 00000000000..8f3c22f9e0b
--- /dev/null
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tool.tsfile;
+
+import org.apache.iotdb.cli.utils.IoTPrinter;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient;
+import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.common.PipeTransferHandshakeConstant;
+import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFilePieceReq;
+import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.response.PipeTransferFilePieceResp;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceReq;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceWithModReq;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealReq;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealWithModReq;
+import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.apache.thrift.transport.TTransportException;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.security.SecureRandom;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.LockSupport;
+
+public class ImportTsFileRemotely extends ImportTsFileBase {
+
+  private static final IoTPrinter IOT_PRINTER = new IoTPrinter(System.out);
+
+  private static final String MODS = ".mods";
+  private static final String LOAD_STRATEGY = "sync";
+  private static final Integer MAX_RETRY_COUNT = 3;
+
+  private static final AtomicInteger CONNECTION_TIMEOUT_MS =
+      new 
AtomicInteger(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
+
+  private IoTDBSyncClient client;
+
+  private static String host;
+  private static String port;
+
+  public ImportTsFileRemotely() {
+    initClient();
+    sendHandshake();
+  }
+
+  @Override
+  public void loadTsFile() {
+    try {
+      String filePath;
+      while ((filePath = ImportTsFileScanTool.pollFromQueue()) != null) {
+        final File tsFile = new File(filePath);
+        try {
+          if (ImportTsFileScanTool.isContainModsFile(filePath + MODS)) {
+            doTransfer(tsFile, new File(filePath + MODS));
+          } else {
+            doTransfer(tsFile, null);
+          }
+
+          processSuccessFile(filePath);
+        } catch (final Exception e) {
+          IOT_PRINTER.println(
+              "Connect is abort, try to reconnect, max retry count: " + 
MAX_RETRY_COUNT);
+
+          boolean isReconnectAndLoadSuccessFul = false;
+
+          for (int i = 1; i <= MAX_RETRY_COUNT; i++) {
+            try {
+              IOT_PRINTER.println(String.format("The %sth retry will after %s 
seconds.", i, i * 2));
+              LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(i * 2L));
+
+              close();
+              initClient();
+              sendHandshake();
+
+              if (ImportTsFileScanTool.isContainModsFile(filePath + MODS)) {
+                doTransfer(tsFile, new File(filePath + MODS));
+              } else {
+                doTransfer(tsFile, null);
+              }
+
+              processSuccessFile(filePath);
+              isReconnectAndLoadSuccessFul = true;
+
+              IOT_PRINTER.println("Reconnect successful.");
+              break;
+            } catch (final Exception e1) {
+              IOT_PRINTER.println(String.format("The %sth reconnect failed", 
i));
+            }
+          }
+
+          if (!isReconnectAndLoadSuccessFul) {
+            processFailFile(filePath, e);
+
+            close();
+            initClient();
+            sendHandshake();
+          }
+        }
+      }
+    } catch (final Exception e) {
+      IOT_PRINTER.println("Unexpected error occurred: " + e.getMessage());
+    } finally {
+      close();
+    }
+  }
+
+  public void sendHandshake() {
+    try {
+      final Map<String, String> params = constructParamsMap();
+      TPipeTransferResp resp =
+          
client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params));
+
+      if (resp.getStatus().getCode() == 
TSStatusCode.PIPE_TYPE_ERROR.getStatusCode()) {
+        IOT_PRINTER.println(
+            String.format(
+                "Handshake error with target server ip: %s, port: %s, because: 
%s. "
+                    + "Retry to handshake by PipeTransferHandshakeV1Req.",
+                client.getIpAddress(), client.getPort(), resp.getStatus()));
+        resp =
+            client.pipeTransfer(
+                PipeTransferDataNodeHandshakeV1Req.toTPipeTransferReq(
+                    
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()));
+      }
+
+      if (resp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        throw new PipeConnectionException(
+            String.format(
+                "Handshake error with target server ip: %s, port: %s, because: 
%s.",
+                client.getIpAddress(), client.getPort(), resp.getStatus()));
+      } else {
+        client.setTimeout(CONNECTION_TIMEOUT_MS.get());
+        IOT_PRINTER.println(
+            String.format(
+                "Handshake success. Target server ip: %s, port: %s",
+                client.getIpAddress(), client.getPort()));
+      }
+    } catch (final Exception e) {
+      throw new PipeException(
+          String.format(
+              "Handshake error with target server ip: %s, port: %s, because: 
%s.",
+              client.getIpAddress(), client.getPort(), e.getMessage()));
+    }
+  }
+
+  private Map<String, String> constructParamsMap() {
+    final Map<String, String> params = new HashMap<>();
+    params.put(
+        PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
+        CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
+    params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID, 
getClusterId());
+    params.put(
+        PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH,
+        Boolean.toString(true));
+    
params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, 
LOAD_STRATEGY);
+    return params;
+  }
+
+  public void doTransfer(final File tsFile, final File modFile) throws 
PipeException, IOException {
+    final TPipeTransferResp resp;
+    final TPipeTransferReq req;
+
+    if (Objects.nonNull(modFile)) {
+      transferFilePieces(modFile, true);
+      transferFilePieces(tsFile, true);
+
+      req =
+          PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
+              modFile.getName(), modFile.length(), tsFile.getName(), 
tsFile.length());
+    } else {
+      transferFilePieces(tsFile, false);
+
+      req = PipeTransferTsFileSealReq.toTPipeTransferReq(tsFile.getName(), 
tsFile.length());
+    }
+
+    try {
+      resp = client.pipeTransfer(req);
+    } catch (final Exception e) {
+      throw new PipeConnectionException(
+          String.format("Network error when seal file %s, because %s.", 
tsFile, e.getMessage()), e);
+    }
+
+    final TSStatus status = resp.getStatus();
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        && status.getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+      throw new PipeConnectionException(
+          String.format("Seal file %s error, result status %s.", tsFile, 
status));
+    }
+
+    IOT_PRINTER.println("Successfully transferred file " + tsFile);
+  }
+
+  private void transferFilePieces(final File file, final boolean isMultiFile)
+      throws PipeException, IOException {
+    final int readFileBufferSize = 
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
+    final byte[] readBuffer = new byte[readFileBufferSize];
+    long position = 0;
+    try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
+      while (true) {
+        final int readLength = reader.read(readBuffer);
+        if (readLength == -1) {
+          break;
+        }
+
+        final byte[] payLoad =
+            readLength == readFileBufferSize
+                ? readBuffer
+                : Arrays.copyOfRange(readBuffer, 0, readLength);
+        final PipeTransferFilePieceResp resp;
+        try {
+          final TPipeTransferReq req =
+              isMultiFile
+                  ? getTransferMultiFilePieceReq(file.getName(), position, 
payLoad)
+                  : getTransferSingleFilePieceReq(file.getName(), position, 
payLoad);
+          resp = 
PipeTransferFilePieceResp.fromTPipeTransferResp(client.pipeTransfer(req));
+        } catch (final Exception e) {
+          throw new PipeConnectionException(
+              String.format(
+                  "Network error when transfer file %s, because %s.", file, 
e.getMessage()),
+              e);
+        }
+
+        position += readLength;
+
+        final TSStatus status = resp.getStatus();
+        if (status.getCode() == 
TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET.getStatusCode()) {
+          position = resp.getEndWritingOffset();
+          reader.seek(position);
+          IOT_PRINTER.println(String.format("Redirect file position to %s.", 
position));
+          continue;
+        }
+
+        if (status.getCode()
+            == 
TSStatusCode.PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED.getStatusCode()) {
+          sendHandshake();
+        }
+        // Only handle the failed statuses to avoid string format performance 
overhead
+        if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+            && status.getCode() != 
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+          throw new PipeException(
+              String.format("Transfer file %s error, result status %s.", file, 
status));
+        }
+      }
+    }
+  }
+
+  private PipeTransferFilePieceReq getTransferMultiFilePieceReq(
+      final String fileName, final long position, final byte[] payLoad) throws 
IOException {
+    return PipeTransferTsFilePieceWithModReq.toTPipeTransferReq(fileName, 
position, payLoad);
+  }
+
+  private PipeTransferFilePieceReq getTransferSingleFilePieceReq(
+      final String fileName, final long position, final byte[] payLoad) throws 
IOException {
+    return PipeTransferTsFilePieceReq.toTPipeTransferReq(fileName, position, 
payLoad);
+  }
+
+  private void initClient() {
+    try {
+      this.client =
+          new IoTDBSyncClient(
+              new ThriftClientProperty.Builder()
+                  .setConnectionTimeoutMs(
+                      
PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs())
+                  .setRpcThriftCompressionEnabled(
+                      
PipeConfig.getInstance().isPipeConnectorRPCThriftCompressionEnabled())
+                  .build(),
+              getEndPoint().getIp(),
+              getEndPoint().getPort(),
+              false,
+              "",
+              "");
+    } catch (final TTransportException e) {
+      throw new PipeException("Sync client init error because " + 
e.getMessage());
+    }
+  }
+
+  private TEndPoint getEndPoint() {
+    return new TEndPoint(host, Integer.parseInt(port));
+  }
+
+  private String getClusterId() {
+    final SecureRandom random = new SecureRandom();
+    final byte[] bytes = new byte[32]; // 32 bytes = 256 bits
+    random.nextBytes(bytes);
+    return "TSFILE-IMPORTER-" + UUID.nameUUIDFromBytes(bytes);
+  }
+
+  private void close() {
+    try {
+      if (this.client != null) {
+        this.client.close();
+      }
+    } catch (final Exception e) {
+      IOT_PRINTER.println("Failed to close client because " + e.getMessage());
+    }
+  }
+
+  public static void setHost(final String host) {
+    ImportTsFileRemotely.host = host;
+  }
+
+  public static void setPort(final String port) {
+    ImportTsFileRemotely.port = port;
+  }
+}
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileScanTool.java
 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileScanTool.java
new file mode 100644
index 00000000000..4ec9f229084
--- /dev/null
+++ 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileScanTool.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tool.tsfile;
+
+import java.io.File;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class ImportTsFileScanTool {
+
+  private static final String RESOURCE = ".resource";
+  private static final String MODS = ".mods";
+
+  private static final LinkedBlockingQueue<String> tsfileQueue = new 
LinkedBlockingQueue<>();
+  private static final Set<String> tsfileSet = new HashSet<>();
+  private static final Set<String> resourceOrModsSet = new HashSet<>();
+  private static String sourceFullPath;
+
+  public static void traverseAndCollectFiles() throws InterruptedException {
+    traverseAndCollectFilesBySourceFullPath(new File(sourceFullPath));
+  }
+
+  private static void traverseAndCollectFilesBySourceFullPath(final File file)
+      throws InterruptedException {
+    if (file.isFile()) {
+      if (file.getName().endsWith(RESOURCE) || file.getName().endsWith(MODS)) {
+        resourceOrModsSet.add(file.getAbsolutePath());
+      } else {
+        tsfileSet.add(file.getAbsolutePath());
+        tsfileQueue.put(file.getAbsolutePath());
+      }
+    } else if (file.isDirectory()) {
+      final File[] files = file.listFiles();
+      if (files != null) {
+        for (File f : files) {
+          traverseAndCollectFilesBySourceFullPath(f);
+        }
+      }
+    }
+  }
+
+  public static void addNoResourceOrModsToQueue() throws InterruptedException {
+    for (final String filePath : resourceOrModsSet) {
+      final String tsfilePath =
+          filePath.endsWith(RESOURCE)
+              ? filePath.substring(0, filePath.length() - RESOURCE.length())
+              : filePath.substring(0, filePath.length() - MODS.length());
+      if (!tsfileSet.contains(tsfilePath)) {
+        tsfileQueue.put(filePath);
+      }
+    }
+  }
+
+  public static boolean isContainModsFile(final String modsFilePath) {
+    return ImportTsFileScanTool.resourceOrModsSet.contains(modsFilePath);
+  }
+
+  public static String pollFromQueue() {
+    return ImportTsFileScanTool.tsfileQueue.poll();
+  }
+
+  public static void putToQueue(final String filePath) throws 
InterruptedException {
+    ImportTsFileScanTool.tsfileQueue.put(filePath);
+  }
+
+  public static void setSourceFullPath(final String sourceFullPath) {
+    ImportTsFileScanTool.sourceFullPath = sourceFullPath;
+  }
+
+  public static int getSourceFullPathLength() {
+    return ImportTsFileScanTool.sourceFullPath.length();
+  }
+
+  public static int getTsFileQueueSize() {
+    return ImportTsFileScanTool.tsfileQueue.size();
+  }
+}
diff --git 
a/iotdb-client/cli/src/test/java/org/apache/iotdb/tool/WriteDataFileTest.java 
b/iotdb-client/cli/src/test/java/org/apache/iotdb/tool/WriteDataFileTest.java
index eb9e01d2937..f9b43b8a880 100644
--- 
a/iotdb-client/cli/src/test/java/org/apache/iotdb/tool/WriteDataFileTest.java
+++ 
b/iotdb-client/cli/src/test/java/org/apache/iotdb/tool/WriteDataFileTest.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.tool;
 
+import org.apache.iotdb.tool.data.AbstractDataTool;
+
 import org.junit.Before;
 import org.junit.Test;
 

Reply via email to