This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch revert-remote-load in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit bf7823085c54bdc3e1fdeef755e0c81e0d30894f Author: Steve Yurong Su <[email protected]> AuthorDate: Fri Sep 6 20:41:36 2024 +0800 Revert "Load: Support remote load in import-tsfile script (#13352)" This reverts commit 8a2aba23c0673e0560b7aff2ffc9b3ce9a73a5ff. --- iotdb-client/cli/pom.xml | 10 - .../cli/src/assembly/resources/tools/backup.bat | 2 +- .../src/assembly/resources/tools/export-data.bat | 2 +- .../src/assembly/resources/tools/export-schema.bat | 2 +- .../src/assembly/resources/tools/export-tsfile.bat | 2 +- .../src/assembly/resources/tools/import-data.bat | 2 +- .../src/assembly/resources/tools/import-schema.bat | 2 +- .../src/assembly/resources/tools/load-tsfile.bat | 2 +- .../java/org/apache/iotdb/cli/AbstractCli.java | 2 +- .../iotdb/tool/{data => }/AbstractDataTool.java | 2 +- .../tool/{schema => }/AbstractSchemaTool.java | 2 +- .../tool/{tsfile => }/AbstractTsFileTool.java | 2 +- .../apache/iotdb/tool/{data => }/ExportData.java | 2 +- .../iotdb/tool/{schema => }/ExportSchema.java | 2 +- .../iotdb/tool/{tsfile => }/ExportTsFile.java | 2 +- .../apache/iotdb/tool/{data => }/ImportData.java | 2 +- .../iotdb/tool/{schema => }/ImportSchema.java | 2 +- .../iotdb/tool/{tsfile => }/ImportTsFile.java | 302 ++++++++++++++---- .../iotdb/tool/{backup => }/IoTDBDataBackTool.java | 3 +- .../apache/iotdb/tool/tsfile/ImportTsFileBase.java | 243 --------------- .../iotdb/tool/tsfile/ImportTsFileLocally.java | 53 ---- .../iotdb/tool/tsfile/ImportTsFileRemotely.java | 338 --------------------- .../iotdb/tool/tsfile/ImportTsFileScanTool.java | 95 ------ .../org/apache/iotdb/tool/WriteDataFileTest.java | 2 - 24 files changed, 260 insertions(+), 818 deletions(-) diff --git a/iotdb-client/cli/pom.xml b/iotdb-client/cli/pom.xml index a735626e9ea..faf10b630a1 100644 --- a/iotdb-client/cli/pom.xml +++ b/iotdb-client/cli/pom.xml @@ -84,16 +84,6 @@ <artifactId>iotdb-thrift</artifactId> <version>2.0.0-SNAPSHOT</version> </dependency> - <dependency> - <groupId>org.apache.iotdb</groupId> - <artifactId>iotdb-thrift-commons</artifactId> - <version>2.0.0-SNAPSHOT</version> - </dependency> - <dependency> - <groupId>org.apache.iotdb</groupId> - <artifactId>pipe-api</artifactId> - <version>2.0.0-SNAPSHOT</version> - </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> diff --git a/iotdb-client/cli/src/assembly/resources/tools/backup.bat b/iotdb-client/cli/src/assembly/resources/tools/backup.bat index e7974164b08..3008a74733e 100644 --- a/iotdb-client/cli/src/assembly/resources/tools/backup.bat +++ b/iotdb-client/cli/src/assembly/resources/tools/backup.bat @@ -102,7 +102,7 @@ echo Starting IoTDB Client Data Back Script echo ------------------------------------------ set CLASSPATH="%IOTDB_HOME%\lib\*" -if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.tool.backup.IoTDBDataBackTool +if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.tool.IoTDBDataBackTool set logsDir="%IOTDB_HOME%\logs" if not exist "%logsDir%" ( diff --git a/iotdb-client/cli/src/assembly/resources/tools/export-data.bat b/iotdb-client/cli/src/assembly/resources/tools/export-data.bat index 6df7b11d00c..c3acfff5f92 100644 --- a/iotdb-client/cli/src/assembly/resources/tools/export-data.bat +++ b/iotdb-client/cli/src/assembly/resources/tools/export-data.bat @@ -31,7 +31,7 @@ pushd %~dp0.. if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%CD% popd -if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.tool.data.ExportData +if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.tool.ExportData if NOT DEFINED JAVA_HOME goto :err @REM ----------------------------------------------------------------------------- diff --git a/iotdb-client/cli/src/assembly/resources/tools/export-schema.bat b/iotdb-client/cli/src/assembly/resources/tools/export-schema.bat index dab5dfaf667..8ed2b81f5c7 100644 --- a/iotdb-client/cli/src/assembly/resources/tools/export-schema.bat +++ b/iotdb-client/cli/src/assembly/resources/tools/export-schema.bat @@ -31,7 +31,7 @@ pushd %~dp0.. if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%CD% popd -if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.tool.schema.ExportSchema +if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.tool.ExportSchema if NOT DEFINED JAVA_HOME goto :err @REM ----------------------------------------------------------------------------- diff --git a/iotdb-client/cli/src/assembly/resources/tools/export-tsfile.bat b/iotdb-client/cli/src/assembly/resources/tools/export-tsfile.bat index 2c85f42bd62..350e806a769 100644 --- a/iotdb-client/cli/src/assembly/resources/tools/export-tsfile.bat +++ b/iotdb-client/cli/src/assembly/resources/tools/export-tsfile.bat @@ -31,7 +31,7 @@ pushd %~dp0.. if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%CD% popd -if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.tool.tsfile.ExportTsFile +if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.tool.ExportTsFile if NOT DEFINED JAVA_HOME goto :err @REM ----------------------------------------------------------------------------- diff --git a/iotdb-client/cli/src/assembly/resources/tools/import-data.bat b/iotdb-client/cli/src/assembly/resources/tools/import-data.bat index a4041015ccc..cb34c5e897a 100644 --- a/iotdb-client/cli/src/assembly/resources/tools/import-data.bat +++ b/iotdb-client/cli/src/assembly/resources/tools/import-data.bat @@ -31,7 +31,7 @@ pushd %~dp0.. if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%CD% popd -if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.tool.data.ImportData +if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.tool.ImportData if NOT DEFINED JAVA_HOME goto :err @REM ----------------------------------------------------------------------------- diff --git a/iotdb-client/cli/src/assembly/resources/tools/import-schema.bat b/iotdb-client/cli/src/assembly/resources/tools/import-schema.bat index fbf5236128b..46a8d5abf6e 100644 --- a/iotdb-client/cli/src/assembly/resources/tools/import-schema.bat +++ b/iotdb-client/cli/src/assembly/resources/tools/import-schema.bat @@ -31,7 +31,7 @@ pushd %~dp0.. if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%CD% popd -if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.tool.schema.ImportSchema +if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.tool.ImportSchema if NOT DEFINED JAVA_HOME goto :err @REM ----------------------------------------------------------------------------- diff --git a/iotdb-client/cli/src/assembly/resources/tools/load-tsfile.bat b/iotdb-client/cli/src/assembly/resources/tools/load-tsfile.bat index 52ae0a46b76..bd1f4c9170c 100755 --- a/iotdb-client/cli/src/assembly/resources/tools/load-tsfile.bat +++ b/iotdb-client/cli/src/assembly/resources/tools/load-tsfile.bat @@ -30,7 +30,7 @@ pushd %~dp0.. if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%CD% popd -if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.tool.tsfile.ImportTsFile +if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.tool.ImportTsFile if NOT DEFINED JAVA_HOME goto :err @REM ----------------------------------------------------------------------------- diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java index b803e0df55c..a0a7ca8b14f 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java @@ -26,7 +26,7 @@ import org.apache.iotdb.jdbc.IoTDBJDBCResultSet; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.service.rpc.thrift.ServerProperties; -import org.apache.iotdb.tool.data.ImportData; +import org.apache.iotdb.tool.ImportData; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/AbstractDataTool.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/AbstractDataTool.java similarity index 99% rename from iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/AbstractDataTool.java rename to iotdb-client/cli/src/main/java/org/apache/iotdb/tool/AbstractDataTool.java index d60667676f8..1e689e903c1 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/AbstractDataTool.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/AbstractDataTool.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.tool.data; +package org.apache.iotdb.tool; import org.apache.iotdb.cli.utils.IoTPrinter; import org.apache.iotdb.exception.ArgsErrorException; diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/AbstractSchemaTool.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/AbstractSchemaTool.java similarity index 99% rename from iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/AbstractSchemaTool.java rename to iotdb-client/cli/src/main/java/org/apache/iotdb/tool/AbstractSchemaTool.java index 00d926c7074..9e91d404d12 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/AbstractSchemaTool.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/AbstractSchemaTool.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.tool.schema; +package org.apache.iotdb.tool; import org.apache.iotdb.cli.utils.IoTPrinter; import org.apache.iotdb.exception.ArgsErrorException; diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/AbstractTsFileTool.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/AbstractTsFileTool.java similarity index 99% rename from iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/AbstractTsFileTool.java rename to iotdb-client/cli/src/main/java/org/apache/iotdb/tool/AbstractTsFileTool.java index f7a88eb56c5..fe8691ddf8b 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/AbstractTsFileTool.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/AbstractTsFileTool.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.tool.tsfile; +package org.apache.iotdb.tool; import org.apache.iotdb.cli.utils.IoTPrinter; import org.apache.iotdb.exception.ArgsErrorException; diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportData.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportData.java similarity index 99% rename from iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportData.java rename to iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportData.java index 931686cced7..bc09e2faa51 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportData.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportData.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.tool.data; +package org.apache.iotdb.tool; import org.apache.iotdb.cli.type.ExitType; import org.apache.iotdb.cli.utils.CliContext; diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ExportSchema.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportSchema.java similarity index 99% rename from iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ExportSchema.java rename to iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportSchema.java index 54453acd898..9892da625e7 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ExportSchema.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportSchema.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.tool.schema; +package org.apache.iotdb.tool; import org.apache.iotdb.cli.type.ExitType; import org.apache.iotdb.cli.utils.CliContext; diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ExportTsFile.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportTsFile.java similarity index 99% rename from iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ExportTsFile.java rename to iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportTsFile.java index 22cdcb907da..9ab3d6b3ec6 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ExportTsFile.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportTsFile.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.tool.tsfile; +package org.apache.iotdb.tool; import org.apache.iotdb.cli.type.ExitType; import org.apache.iotdb.cli.utils.CliContext; diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ImportData.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportData.java similarity index 99% rename from iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ImportData.java rename to iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportData.java index e8fbe063313..819190d2c2c 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ImportData.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportData.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.tool.data; +package org.apache.iotdb.tool; import org.apache.iotdb.cli.utils.IoTPrinter; import org.apache.iotdb.commons.exception.IllegalPathException; diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ImportSchema.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportSchema.java similarity index 99% rename from iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ImportSchema.java rename to iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportSchema.java index a7709313b1b..3a7b8ab60e5 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ImportSchema.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportSchema.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.tool.schema; +package org.apache.iotdb.tool; import org.apache.iotdb.cli.utils.IoTPrinter; import org.apache.iotdb.commons.exception.IllegalPathException; diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportTsFile.java similarity index 50% rename from iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java rename to iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportTsFile.java index 497900a0371..576d2927beb 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFile.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportTsFile.java @@ -17,10 +17,9 @@ * under the License. */ -package org.apache.iotdb.tool.tsfile; +package org.apache.iotdb.tool; import org.apache.iotdb.cli.utils.IoTPrinter; -import org.apache.iotdb.commons.utils.NodeUrlUtils; import org.apache.iotdb.session.pool.SessionPool; import org.apache.commons.cli.CommandLine; @@ -32,13 +31,18 @@ import org.apache.commons.cli.ParseException; import java.io.File; import java.io.IOException; -import java.net.UnknownHostException; +import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; import java.util.ArrayList; -import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Objects; +import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.LongAdder; public class ImportTsFile extends AbstractTsFileTool { @@ -60,11 +64,15 @@ public class ImportTsFile extends AbstractTsFileTool { private static final String THREAD_NUM_ARGS = "tn"; private static final String THREAD_NUM_NAME = "thread_num"; - private static final IoTPrinter IOT_PRINTER = new IoTPrinter(System.out); + private static final IoTPrinter ioTPrinter = new IoTPrinter(System.out); private static final String TS_FILE_CLI_PREFIX = "ImportTsFile"; + private static final String RESOURCE = ".resource"; + private static final String MODS = ".mods"; + private static String source; + private static String sourceFullPath; private static String successDir = "success/"; private static String failDir = "fail/"; @@ -74,7 +82,14 @@ public class ImportTsFile extends AbstractTsFileTool { private static int threadNum = 8; - private static boolean isRemoteLoad = true; + private static final LongAdder loadFileSuccessfulNum = new LongAdder(); + private static final LongAdder loadFileFailedNum = new LongAdder(); + private static final LongAdder processingLoadSuccessfulFileSuccessfulNum = new LongAdder(); + private static final LongAdder processingLoadFailedFileSuccessfulNum = new LongAdder(); + + private static final LinkedBlockingQueue<String> tsfileQueue = new LinkedBlockingQueue<>(); + private static final Set<String> tsfileSet = new HashSet<>(); + private static final Set<String> resourceOrModsSet = new HashSet<>(); private static SessionPool sessionPool; @@ -156,7 +171,7 @@ public class ImportTsFile extends AbstractTsFileTool { helpFormatter.setWidth(MAX_HELP_CONSOLE_WIDTH); if (args == null || args.length == 0) { - IOT_PRINTER.println("Too few arguments, please check the following hint."); + ioTPrinter.println("Too few arguments, please check the following hint."); helpFormatter.printHelp(TS_FILE_CLI_PREFIX, options, true); System.exit(CODE_ERROR); } @@ -167,7 +182,7 @@ public class ImportTsFile extends AbstractTsFileTool { System.exit(CODE_OK); } } catch (ParseException e) { - IOT_PRINTER.println("Failed to parse the provided options: " + e.getMessage()); + ioTPrinter.println("Failed to parse the provided options: " + e.getMessage()); helpFormatter.printHelp(TS_FILE_CLI_PREFIX, options, true); System.exit(CODE_ERROR); } @@ -176,7 +191,7 @@ public class ImportTsFile extends AbstractTsFileTool { try { commandLine = parser.parse(options, args, true); } catch (ParseException e) { - IOT_PRINTER.println("Failed to parse the provided options: " + e.getMessage()); + ioTPrinter.println("Failed to parse the provided options: " + e.getMessage()); helpFormatter.printHelp(TS_FILE_CLI_PREFIX, options, true); System.exit(CODE_ERROR); } @@ -185,30 +200,45 @@ public class ImportTsFile extends AbstractTsFileTool { parseBasicParams(commandLine); parseSpecialParams(commandLine); } catch (Exception e) { - IOT_PRINTER.println( - "Encounter an error when parsing the provided options: " + e.getMessage()); + ioTPrinter.println("Encounter an error when parsing the provided options: " + e.getMessage()); System.exit(CODE_ERROR); } - IOT_PRINTER.println(isRemoteLoad ? "Load remotely." : "Load locally."); - final int resultCode = importFromTargetPath(); - - ImportTsFileBase.printResult(startTime); + ioTPrinter.println( + "Successfully load " + + loadFileSuccessfulNum.sum() + + " tsfile(s) (--on_success operation(s): " + + processingLoadSuccessfulFileSuccessfulNum.sum() + + " succeed, " + + (loadFileSuccessfulNum.sum() - processingLoadSuccessfulFileSuccessfulNum.sum()) + + " failed)"); + ioTPrinter.println( + "Failed to load " + + loadFileFailedNum.sum() + + " file(s) (--on_fail operation(s): " + + processingLoadFailedFileSuccessfulNum.sum() + + " succeed, " + + (loadFileFailedNum.sum() - processingLoadFailedFileSuccessfulNum.sum()) + + " failed)"); + ioTPrinter.println("For more details, please check the log."); + ioTPrinter.println( + "Total operation time: " + (System.currentTimeMillis() - startTime) + " ms."); + ioTPrinter.println("Work has been completed!"); System.exit(resultCode); } private static void parseSpecialParams(CommandLine commandLine) { source = commandLine.getOptionValue(SOURCE_ARGS); if (!Files.exists(Paths.get(source))) { - IOT_PRINTER.println(String.format("Source file or directory %s does not exist", source)); + ioTPrinter.println(String.format("Source file or directory %s does not exist", source)); System.exit(CODE_ERROR); } final String onSuccess = commandLine.getOptionValue(ON_SUCCESS_ARGS).trim().toLowerCase(); final String onFail = commandLine.getOptionValue(ON_FAIL_ARGS).trim().toLowerCase(); if (!Operation.isValidOperation(onSuccess) || !Operation.isValidOperation(onFail)) { - IOT_PRINTER.println("Args error: os/of must be one of none, mv, cp, delete"); + ioTPrinter.println("Args error: os/of must be one of none, mv, cp, delete"); System.exit(CODE_ERROR); } @@ -232,13 +262,6 @@ public class ImportTsFile extends AbstractTsFileTool { if (commandLine.getOptionValue(THREAD_NUM_ARGS) != null) { threadNum = Integer.parseInt(commandLine.getOptionValue(THREAD_NUM_ARGS)); } - - try { - isRemoteLoad = !NodeUrlUtils.containsLocalAddress(Collections.singletonList(host)); - } catch (UnknownHostException e) { - IOT_PRINTER.println( - "Unknown host: " + host + ". Exception: " + e.getMessage() + ". Will use remote load."); - } } public static boolean isFileStoreEquals(String pathString, File dir) { @@ -246,7 +269,7 @@ public class ImportTsFile extends AbstractTsFileTool { return Objects.equals( Files.getFileStore(Paths.get(pathString)), Files.getFileStore(dir.toPath())); } catch (IOException e) { - IOT_PRINTER.println("IOException when checking file store: " + e.getMessage()); + ioTPrinter.println("IOException when checking file store: " + e.getMessage()); return false; } } @@ -258,7 +281,7 @@ public class ImportTsFile extends AbstractTsFileTool { File file = new File(successDir); if (!file.isDirectory()) { if (!file.mkdirs()) { - IOT_PRINTER.println(String.format("Failed to create %s %s", SUCCESS_DIR_NAME, successDir)); + ioTPrinter.println(String.format("Failed to create %s %s", SUCCESS_DIR_NAME, successDir)); System.exit(CODE_ERROR); } } @@ -272,7 +295,7 @@ public class ImportTsFile extends AbstractTsFileTool { File file = new File(failDir); if (!file.isDirectory()) { if (!file.mkdirs()) { - IOT_PRINTER.println(String.format("Failed to create %s %s", FAIL_DIR_NAME, failDir)); + ioTPrinter.println(String.format("Failed to create %s %s", FAIL_DIR_NAME, failDir)); System.exit(CODE_ERROR); } } @@ -281,6 +304,13 @@ public class ImportTsFile extends AbstractTsFileTool { public static int importFromTargetPath() { try { + final File file = new File(source); + sourceFullPath = file.getAbsolutePath(); + if (!file.isFile() && !file.isDirectory()) { + ioTPrinter.println(String.format("source file or directory %s does not exist", source)); + return CODE_ERROR; + } + sessionPool = new SessionPool.Builder() .host(host) @@ -294,22 +324,15 @@ public class ImportTsFile extends AbstractTsFileTool { .build(); sessionPool.setEnableQueryRedirection(false); - // set params - processSetParams(); - - ImportTsFileScanTool.traverseAndCollectFiles(); - ImportTsFileScanTool.addNoResourceOrModsToQueue(); - - IOT_PRINTER.println("Load file total number : " + ImportTsFileScanTool.getTsFileQueueSize()); + traverseAndCollectFiles(file); + addNoResourceOrModsToQueue(); + ioTPrinter.println("Load file total number : " + tsfileQueue.size()); asyncImportTsFiles(); return CODE_OK; } catch (InterruptedException e) { - IOT_PRINTER.println(String.format("Import tsfile fail: %s", e.getMessage())); + ioTPrinter.println(String.format("Import tsfile fail: %s", e.getMessage())); Thread.currentThread().interrupt(); return CODE_ERROR; - } catch (Exception e) { - IOT_PRINTER.println(String.format("Import tsfile fail: %s", e.getMessage())); - return CODE_ERROR; } finally { if (sessionPool != null) { sessionPool.close(); @@ -317,32 +340,40 @@ public class ImportTsFile extends AbstractTsFileTool { } } - // process other classes need param - private static void processSetParams() { - // ImportTsFileLocally - final File file = new File(source); - ImportTsFileScanTool.setSourceFullPath(file.getAbsolutePath()); - if (!file.isFile() && !file.isDirectory()) { - IOT_PRINTER.println(String.format("Source file or directory %s does not exist", source)); - System.exit(CODE_ERROR); + public static void traverseAndCollectFiles(File file) throws InterruptedException { + if (file.isFile()) { + if (file.getName().endsWith(RESOURCE) || file.getName().endsWith(MODS)) { + resourceOrModsSet.add(file.getAbsolutePath()); + } else { + tsfileSet.add(file.getAbsolutePath()); + tsfileQueue.put(file.getAbsolutePath()); + } + } else if (file.isDirectory()) { + final File[] files = file.listFiles(); + if (files != null) { + for (File f : files) { + traverseAndCollectFiles(f); + } + } } + } - ImportTsFileLocally.setSessionPool(sessionPool); - - // ImportTsFileRemotely - ImportTsFileRemotely.setHost(host); - ImportTsFileRemotely.setPort(port); - - // ImportTsFileBase - ImportTsFileBase.setSuccessAndFailDirAndOperation( - successDir, successOperation, failDir, failOperation); + public static void addNoResourceOrModsToQueue() throws InterruptedException { + for (final String filePath : resourceOrModsSet) { + final String tsfilePath = + filePath.endsWith(RESOURCE) + ? filePath.substring(0, filePath.length() - RESOURCE.length()) + : filePath.substring(0, filePath.length() - MODS.length()); + if (!tsfileSet.contains(tsfilePath)) { + tsfileQueue.put(filePath); + } + } } public static void asyncImportTsFiles() { final List<Thread> list = new ArrayList<>(threadNum); for (int i = 0; i < threadNum; i++) { - final Thread thread = - new Thread(isRemoteLoad ? new ImportTsFileRemotely() : new ImportTsFileLocally()); + final Thread thread = new Thread(ImportTsFile::importTsFile); thread.start(); list.add(thread); } @@ -352,11 +383,164 @@ public class ImportTsFile extends AbstractTsFileTool { thread.join(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - IOT_PRINTER.println("ImportTsFile thread join interrupted: " + e.getMessage()); + ioTPrinter.println("importTsFile thread join interrupted: " + e.getMessage()); } }); } + public static void importTsFile() { + String filePath; + try { + while ((filePath = tsfileQueue.poll()) != null) { + final String sql = "load '" + filePath + "' onSuccess=none "; + + try { + sessionPool.executeNonQueryStatement(sql); + + loadFileSuccessfulNum.increment(); + ioTPrinter.println("Imported [ " + filePath + " ] file successfully!"); + + try { + processingFile(filePath, successDir, successOperation); + processingLoadSuccessfulFileSuccessfulNum.increment(); + ioTPrinter.println("Processed success file [ " + filePath + " ] successfully!"); + } catch (Exception processSuccessException) { + ioTPrinter.println( + "Failed to process success file [ " + + filePath + + " ]: " + + processSuccessException.getMessage()); + } + } catch (Exception e) { + // Reject because of memory controls, do retry later + if (Objects.nonNull(e.getMessage()) && e.getMessage().contains("memory")) { + ioTPrinter.println( + "Rejecting file [ " + filePath + " ] due to memory constraints, will retry later."); + tsfileQueue.put(filePath); + continue; + } + + loadFileFailedNum.increment(); + ioTPrinter.println("Failed to import [ " + filePath + " ] file: " + e.getMessage()); + + try { + processingFile(filePath, failDir, failOperation); + processingLoadFailedFileSuccessfulNum.increment(); + ioTPrinter.println("Processed fail file [ " + filePath + " ] successfully!"); + } catch (Exception processFailException) { + ioTPrinter.println( + "Failed to process fail file [ " + + filePath + + " ]: " + + processFailException.getMessage()); + } + } + } + } catch (InterruptedException e) { + ioTPrinter.println("Unexpected error occurred: " + e.getMessage()); + Thread.currentThread().interrupt(); + } catch (Exception e) { + ioTPrinter.println("Unexpected error occurred: " + e.getMessage()); + } + } + + public static void processingFile(String filePath, String dir, Operation operation) { + String relativePath = filePath.substring(sourceFullPath.length() + 1); + Path sourcePath = Paths.get(filePath); + + String target = dir + File.separator + relativePath.replace(File.separator, "_"); + Path targetPath = Paths.get(target); + + Path sourceResourcePath = Paths.get(sourcePath + RESOURCE); + sourceResourcePath = Files.exists(sourceResourcePath) ? sourceResourcePath : null; + Path targetResourcePath = Paths.get(target + RESOURCE); + + Path sourceModsPath = Paths.get(sourcePath + MODS); + sourceModsPath = Files.exists(sourceModsPath) ? sourceModsPath : null; + Path targetModsPath = Paths.get(target + MODS); + + switch (operation) { + case DELETE: + { + try { + Files.deleteIfExists(sourcePath); + if (null != sourceResourcePath) { + Files.deleteIfExists(sourceResourcePath); + } + if (null != sourceModsPath) { + Files.deleteIfExists(sourceModsPath); + } + } catch (Exception e) { + ioTPrinter.println(String.format("Failed to delete file: %s", e.getMessage())); + } + break; + } + case CP: + { + try { + Files.copy(sourcePath, targetPath, StandardCopyOption.REPLACE_EXISTING); + if (null != sourceResourcePath) { + Files.copy( + sourceResourcePath, targetResourcePath, StandardCopyOption.REPLACE_EXISTING); + } + if (null != sourceModsPath) { + Files.copy(sourceModsPath, targetModsPath, StandardCopyOption.REPLACE_EXISTING); + } + } catch (Exception e) { + ioTPrinter.println(String.format("Failed to copy file: %s", e.getMessage())); + } + break; + } + case HARDLINK: + { + try { + Files.createLink(targetPath, sourcePath); + } catch (FileAlreadyExistsException e) { + ioTPrinter.println("Hardlink already exists: " + e.getMessage()); + } catch (Exception e) { + try { + Files.copy(sourcePath, targetPath, StandardCopyOption.REPLACE_EXISTING); + } catch (Exception copyException) { + ioTPrinter.println( + String.format("Failed to copy file: %s", copyException.getMessage())); + } + } + + try { + if (null != sourceResourcePath) { + Files.copy( + sourceResourcePath, targetResourcePath, StandardCopyOption.REPLACE_EXISTING); + } + if (null != sourceModsPath) { + Files.copy(sourceModsPath, targetModsPath, StandardCopyOption.REPLACE_EXISTING); + } + } catch (Exception e) { + ioTPrinter.println( + String.format("Failed to copy resource or mods file: %s", e.getMessage())); + } + break; + } + case MV: + { + try { + Files.move(sourcePath, targetPath, StandardCopyOption.REPLACE_EXISTING); + if (null != sourceResourcePath) { + Files.move( + sourceResourcePath, targetResourcePath, StandardCopyOption.REPLACE_EXISTING); + } + if (null != sourceModsPath) { + Files.move(sourceModsPath, targetModsPath, StandardCopyOption.REPLACE_EXISTING); + } + } catch (Exception e) { + ioTPrinter.println(String.format("Failed to move file: %s", e.getMessage())); + } + break; + } + default: + break; + } + } + public enum Operation { NONE, MV, @@ -387,7 +571,7 @@ public class ImportTsFile extends AbstractTsFileTool { case "delete": return Operation.DELETE; default: - IOT_PRINTER.println("Args error: os/of must be one of none, mv, cp, delete"); + ioTPrinter.println("Args error: os/of must be one of none, mv, cp, delete"); System.exit(CODE_ERROR); return null; } diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/backup/IoTDBDataBackTool.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/IoTDBDataBackTool.java similarity index 99% rename from iotdb-client/cli/src/main/java/org/apache/iotdb/tool/backup/IoTDBDataBackTool.java rename to iotdb-client/cli/src/main/java/org/apache/iotdb/tool/IoTDBDataBackTool.java index 62f27acbf62..7cc3556562b 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/backup/IoTDBDataBackTool.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/IoTDBDataBackTool.java @@ -17,12 +17,11 @@ * under the License. */ -package org.apache.iotdb.tool.backup; +package org.apache.iotdb.tool; import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.tool.data.AbstractDataTool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileBase.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileBase.java deleted file mode 100644 index 82848d1357e..00000000000 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileBase.java +++ /dev/null @@ -1,243 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.tool.tsfile; - -import org.apache.iotdb.cli.utils.IoTPrinter; - -import java.io.File; -import java.nio.file.FileAlreadyExistsException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.StandardCopyOption; -import java.util.Objects; -import java.util.concurrent.atomic.LongAdder; - -public abstract class ImportTsFileBase implements Runnable { - - private static final IoTPrinter IOT_PRINTER = new IoTPrinter(System.out); - - private static final LongAdder loadFileSuccessfulNum = new LongAdder(); - private static final LongAdder loadFileFailedNum = new LongAdder(); - private static final LongAdder processingLoadSuccessfulFileSuccessfulNum = new LongAdder(); - private static final LongAdder processingLoadFailedFileSuccessfulNum = new LongAdder(); - - private static String successDir; - private static ImportTsFile.Operation successOperation; - private static String failDir; - private static ImportTsFile.Operation failOperation; - - @Override - public void run() { - loadTsFile(); - } - - protected abstract void loadTsFile(); - - protected void processFailFile(final String filePath, final Exception e) { - // Reject because of memory controls, do retry later - try { - if (Objects.nonNull(e.getMessage()) && e.getMessage().contains("memory")) { - IOT_PRINTER.println( - "Rejecting file [ " + filePath + " ] due to memory constraints, will retry later."); - ImportTsFileScanTool.putToQueue(filePath); - return; - } - - loadFileFailedNum.increment(); - IOT_PRINTER.println("Failed to import [ " + filePath + " ] file: " + e.getMessage()); - - try { - processingFile(filePath, false); - processingLoadFailedFileSuccessfulNum.increment(); - IOT_PRINTER.println("Processed fail file [ " + filePath + " ] successfully!"); - } catch (final Exception processFailException) { - IOT_PRINTER.println( - "Failed to process fail file [ " - + filePath - + " ]: " - + processFailException.getMessage()); - } - } catch (final InterruptedException e1) { - IOT_PRINTER.println("Unexpected error occurred: " + e1.getMessage()); - Thread.currentThread().interrupt(); - } catch (final Exception e1) { - IOT_PRINTER.println("Unexpected error occurred: " + e1.getMessage()); - } - } - - protected static void processSuccessFile(final String filePath) { - loadFileSuccessfulNum.increment(); - IOT_PRINTER.println("Imported [ " + filePath + " ] file successfully!"); - - try { - processingFile(filePath, true); - processingLoadSuccessfulFileSuccessfulNum.increment(); - IOT_PRINTER.println("Processed success file [ " + filePath + " ] successfully!"); - } catch (final Exception processSuccessException) { - IOT_PRINTER.println( - "Failed to process success file [ " - + filePath - + " ]: " - + processSuccessException.getMessage()); - } - } - - public static void processingFile(final String filePath, final boolean isSuccess) { - final String relativePath = - filePath.substring(ImportTsFileScanTool.getSourceFullPathLength() + 1); - final Path sourcePath = Paths.get(filePath); - - final String target = - isSuccess - ? successDir - : failDir + File.separator + relativePath.replace(File.separator, "_"); - final Path targetPath = Paths.get(target); - - final String RESOURCE = ".resource"; - Path sourceResourcePath = Paths.get(sourcePath + RESOURCE); - sourceResourcePath = Files.exists(sourceResourcePath) ? sourceResourcePath : null; - final Path targetResourcePath = Paths.get(target + RESOURCE); - - final String MODS = ".mods"; - Path sourceModsPath = Paths.get(sourcePath + MODS); - sourceModsPath = Files.exists(sourceModsPath) ? sourceModsPath : null; - final Path targetModsPath = Paths.get(target + MODS); - - switch (isSuccess ? successOperation : failOperation) { - case DELETE: - { - try { - Files.deleteIfExists(sourcePath); - if (null != sourceResourcePath) { - Files.deleteIfExists(sourceResourcePath); - } - if (null != sourceModsPath) { - Files.deleteIfExists(sourceModsPath); - } - } catch (final Exception e) { - IOT_PRINTER.println(String.format("Failed to delete file: %s", e.getMessage())); - } - break; - } - case CP: - { - try { - Files.copy(sourcePath, targetPath, StandardCopyOption.REPLACE_EXISTING); - if (null != sourceResourcePath) { - Files.copy( - sourceResourcePath, targetResourcePath, StandardCopyOption.REPLACE_EXISTING); - } - if (null != sourceModsPath) { - Files.copy(sourceModsPath, targetModsPath, StandardCopyOption.REPLACE_EXISTING); - } - } catch (final Exception e) { - IOT_PRINTER.println(String.format("Failed to copy file: %s", e.getMessage())); - } - break; - } - case HARDLINK: - { - try { - Files.createLink(targetPath, sourcePath); - } catch (FileAlreadyExistsException e) { - IOT_PRINTER.println("Hardlink already exists: " + e.getMessage()); - } catch (final Exception e) { - try { - Files.copy(sourcePath, targetPath, StandardCopyOption.REPLACE_EXISTING); - } catch (final Exception copyException) { - IOT_PRINTER.println( - String.format("Failed to copy file: %s", copyException.getMessage())); - } - } - - try { - if (null != sourceResourcePath) { - Files.copy( - sourceResourcePath, targetResourcePath, StandardCopyOption.REPLACE_EXISTING); - } - if (null != sourceModsPath) { - Files.copy(sourceModsPath, targetModsPath, StandardCopyOption.REPLACE_EXISTING); - } - } catch (final Exception e) { - IOT_PRINTER.println( - String.format("Failed to copy resource or mods file: %s", e.getMessage())); - } - break; - } - case MV: - { - try { - Files.move(sourcePath, targetPath, StandardCopyOption.REPLACE_EXISTING); - if (null != sourceResourcePath) { - Files.move( - sourceResourcePath, targetResourcePath, StandardCopyOption.REPLACE_EXISTING); - } - if (null != sourceModsPath) { - Files.move(sourceModsPath, targetModsPath, StandardCopyOption.REPLACE_EXISTING); - } - } catch (final Exception e) { - IOT_PRINTER.println(String.format("Failed to move file: %s", e.getMessage())); - } - break; - } - default: - break; - } - } - - protected static void printResult(final long startTime) { - IOT_PRINTER.println( - "Successfully load " - + loadFileSuccessfulNum.sum() - + " tsfile(s) (--on_success operation(s): " - + processingLoadSuccessfulFileSuccessfulNum.sum() - + " succeed, " - + (loadFileSuccessfulNum.sum() - processingLoadSuccessfulFileSuccessfulNum.sum()) - + " failed)"); - IOT_PRINTER.println( - "Failed to load " - + loadFileFailedNum.sum() - + " file(s) (--on_fail operation(s): " - + processingLoadFailedFileSuccessfulNum.sum() - + " succeed, " - + (loadFileFailedNum.sum() - processingLoadFailedFileSuccessfulNum.sum()) - + " failed)"); - IOT_PRINTER.println( - "Unprocessed " - + ImportTsFileScanTool.getTsFileQueueSize() - + " file(s), due to unexpected exceptions"); - IOT_PRINTER.println("For more details, please check the log."); - IOT_PRINTER.println( - "Total operation time: " + (System.currentTimeMillis() - startTime) + " ms."); - IOT_PRINTER.println("Work has been completed!"); - } - - public static void setSuccessAndFailDirAndOperation( - final String successDir, - final ImportTsFile.Operation successOperation, - final String failDir, - final ImportTsFile.Operation failOperation) { - ImportTsFileBase.successDir = successDir; - ImportTsFileBase.successOperation = successOperation; - ImportTsFileBase.failDir = failDir; - ImportTsFileBase.failOperation = failOperation; - } -} diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileLocally.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileLocally.java deleted file mode 100644 index 8f65d26fd32..00000000000 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileLocally.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.tool.tsfile; - -import org.apache.iotdb.cli.utils.IoTPrinter; -import org.apache.iotdb.session.pool.SessionPool; - -public class ImportTsFileLocally extends ImportTsFileBase implements Runnable { - - private static final IoTPrinter ioTPrinter = new IoTPrinter(System.out); - - private static SessionPool sessionPool; - - @Override - public void loadTsFile() { - String filePath; - try { - while ((filePath = ImportTsFileScanTool.pollFromQueue()) != null) { - final String sql = "load '" + filePath + "' onSuccess=none "; - try { - sessionPool.executeNonQueryStatement(sql); - - processSuccessFile(filePath); - } catch (final Exception e) { - processFailFile(filePath, e); - } - } - } catch (final Exception e) { - ioTPrinter.println("Unexpected error occurred: " + e.getMessage()); - } - } - - public static void setSessionPool(SessionPool sessionPool) { - ImportTsFileLocally.sessionPool = sessionPool; - } -} diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java deleted file mode 100644 index 8f3c22f9e0b..00000000000 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java +++ /dev/null @@ -1,338 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.tool.tsfile; - -import org.apache.iotdb.cli.utils.IoTPrinter; -import org.apache.iotdb.common.rpc.thrift.TEndPoint; -import org.apache.iotdb.common.rpc.thrift.TSStatus; -import org.apache.iotdb.commons.client.property.ThriftClientProperty; -import org.apache.iotdb.commons.conf.CommonDescriptor; -import org.apache.iotdb.commons.pipe.config.PipeConfig; -import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient; -import org.apache.iotdb.commons.pipe.connector.payload.thrift.common.PipeTransferHandshakeConstant; -import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFilePieceReq; -import org.apache.iotdb.commons.pipe.connector.payload.thrift.response.PipeTransferFilePieceResp; -import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req; -import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req; -import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceReq; -import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceWithModReq; -import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealReq; -import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealWithModReq; -import org.apache.iotdb.pipe.api.exception.PipeConnectionException; -import org.apache.iotdb.pipe.api.exception.PipeException; -import org.apache.iotdb.rpc.TSStatusCode; -import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; -import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; - -import org.apache.thrift.transport.TTransportException; - -import java.io.File; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.security.SecureRandom; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.LockSupport; - -public class ImportTsFileRemotely extends ImportTsFileBase { - - private static final IoTPrinter IOT_PRINTER = new IoTPrinter(System.out); - - private static final String MODS = ".mods"; - private static final String LOAD_STRATEGY = "sync"; - private static final Integer MAX_RETRY_COUNT = 3; - - private static final AtomicInteger CONNECTION_TIMEOUT_MS = - new AtomicInteger(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs()); - - private IoTDBSyncClient client; - - private static String host; - private static String port; - - public ImportTsFileRemotely() { - initClient(); - sendHandshake(); - } - - @Override - public void loadTsFile() { - try { - String filePath; - while ((filePath = ImportTsFileScanTool.pollFromQueue()) != null) { - final File tsFile = new File(filePath); - try { - if (ImportTsFileScanTool.isContainModsFile(filePath + MODS)) { - doTransfer(tsFile, new File(filePath + MODS)); - } else { - doTransfer(tsFile, null); - } - - processSuccessFile(filePath); - } catch (final Exception e) { - IOT_PRINTER.println( - "Connect is abort, try to reconnect, max retry count: " + MAX_RETRY_COUNT); - - boolean isReconnectAndLoadSuccessFul = false; - - for (int i = 1; i <= MAX_RETRY_COUNT; i++) { - try { - IOT_PRINTER.println(String.format("The %sth retry will after %s seconds.", i, i * 2)); - LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(i * 2L)); - - close(); - initClient(); - sendHandshake(); - - if (ImportTsFileScanTool.isContainModsFile(filePath + MODS)) { - doTransfer(tsFile, new File(filePath + MODS)); - } else { - doTransfer(tsFile, null); - } - - processSuccessFile(filePath); - isReconnectAndLoadSuccessFul = true; - - IOT_PRINTER.println("Reconnect successful."); - break; - } catch (final Exception e1) { - IOT_PRINTER.println(String.format("The %sth reconnect failed", i)); - } - } - - if (!isReconnectAndLoadSuccessFul) { - processFailFile(filePath, e); - - close(); - initClient(); - sendHandshake(); - } - } - } - } catch (final Exception e) { - IOT_PRINTER.println("Unexpected error occurred: " + e.getMessage()); - } finally { - close(); - } - } - - public void sendHandshake() { - try { - final Map<String, String> params = constructParamsMap(); - TPipeTransferResp resp = - client.pipeTransfer(PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params)); - - if (resp.getStatus().getCode() == TSStatusCode.PIPE_TYPE_ERROR.getStatusCode()) { - IOT_PRINTER.println( - String.format( - "Handshake error with target server ip: %s, port: %s, because: %s. " - + "Retry to handshake by PipeTransferHandshakeV1Req.", - client.getIpAddress(), client.getPort(), resp.getStatus())); - resp = - client.pipeTransfer( - PipeTransferDataNodeHandshakeV1Req.toTPipeTransferReq( - CommonDescriptor.getInstance().getConfig().getTimestampPrecision())); - } - - if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - throw new PipeConnectionException( - String.format( - "Handshake error with target server ip: %s, port: %s, because: %s.", - client.getIpAddress(), client.getPort(), resp.getStatus())); - } else { - client.setTimeout(CONNECTION_TIMEOUT_MS.get()); - IOT_PRINTER.println( - String.format( - "Handshake success. Target server ip: %s, port: %s", - client.getIpAddress(), client.getPort())); - } - } catch (final Exception e) { - throw new PipeException( - String.format( - "Handshake error with target server ip: %s, port: %s, because: %s.", - client.getIpAddress(), client.getPort(), e.getMessage())); - } - } - - private Map<String, String> constructParamsMap() { - final Map<String, String> params = new HashMap<>(); - params.put( - PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION, - CommonDescriptor.getInstance().getConfig().getTimestampPrecision()); - params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID, getClusterId()); - params.put( - PipeTransferHandshakeConstant.HANDSHAKE_KEY_CONVERT_ON_TYPE_MISMATCH, - Boolean.toString(true)); - params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, LOAD_STRATEGY); - return params; - } - - public void doTransfer(final File tsFile, final File modFile) throws PipeException, IOException { - final TPipeTransferResp resp; - final TPipeTransferReq req; - - if (Objects.nonNull(modFile)) { - transferFilePieces(modFile, true); - transferFilePieces(tsFile, true); - - req = - PipeTransferTsFileSealWithModReq.toTPipeTransferReq( - modFile.getName(), modFile.length(), tsFile.getName(), tsFile.length()); - } else { - transferFilePieces(tsFile, false); - - req = PipeTransferTsFileSealReq.toTPipeTransferReq(tsFile.getName(), tsFile.length()); - } - - try { - resp = client.pipeTransfer(req); - } catch (final Exception e) { - throw new PipeConnectionException( - String.format("Network error when seal file %s, because %s.", tsFile, e.getMessage()), e); - } - - final TSStatus status = resp.getStatus(); - if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() - && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { - throw new PipeConnectionException( - String.format("Seal file %s error, result status %s.", tsFile, status)); - } - - IOT_PRINTER.println("Successfully transferred file " + tsFile); - } - - private void transferFilePieces(final File file, final boolean isMultiFile) - throws PipeException, IOException { - final int readFileBufferSize = PipeConfig.getInstance().getPipeConnectorReadFileBufferSize(); - final byte[] readBuffer = new byte[readFileBufferSize]; - long position = 0; - try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) { - while (true) { - final int readLength = reader.read(readBuffer); - if (readLength == -1) { - break; - } - - final byte[] payLoad = - readLength == readFileBufferSize - ? readBuffer - : Arrays.copyOfRange(readBuffer, 0, readLength); - final PipeTransferFilePieceResp resp; - try { - final TPipeTransferReq req = - isMultiFile - ? getTransferMultiFilePieceReq(file.getName(), position, payLoad) - : getTransferSingleFilePieceReq(file.getName(), position, payLoad); - resp = PipeTransferFilePieceResp.fromTPipeTransferResp(client.pipeTransfer(req)); - } catch (final Exception e) { - throw new PipeConnectionException( - String.format( - "Network error when transfer file %s, because %s.", file, e.getMessage()), - e); - } - - position += readLength; - - final TSStatus status = resp.getStatus(); - if (status.getCode() == TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET.getStatusCode()) { - position = resp.getEndWritingOffset(); - reader.seek(position); - IOT_PRINTER.println(String.format("Redirect file position to %s.", position)); - continue; - } - - if (status.getCode() - == TSStatusCode.PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED.getStatusCode()) { - sendHandshake(); - } - // Only handle the failed statuses to avoid string format performance overhead - if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() - && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { - throw new PipeException( - String.format("Transfer file %s error, result status %s.", file, status)); - } - } - } - } - - private PipeTransferFilePieceReq getTransferMultiFilePieceReq( - final String fileName, final long position, final byte[] payLoad) throws IOException { - return PipeTransferTsFilePieceWithModReq.toTPipeTransferReq(fileName, position, payLoad); - } - - private PipeTransferFilePieceReq getTransferSingleFilePieceReq( - final String fileName, final long position, final byte[] payLoad) throws IOException { - return PipeTransferTsFilePieceReq.toTPipeTransferReq(fileName, position, payLoad); - } - - private void initClient() { - try { - this.client = - new IoTDBSyncClient( - new ThriftClientProperty.Builder() - .setConnectionTimeoutMs( - PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs()) - .setRpcThriftCompressionEnabled( - PipeConfig.getInstance().isPipeConnectorRPCThriftCompressionEnabled()) - .build(), - getEndPoint().getIp(), - getEndPoint().getPort(), - false, - "", - ""); - } catch (final TTransportException e) { - throw new PipeException("Sync client init error because " + e.getMessage()); - } - } - - private TEndPoint getEndPoint() { - return new TEndPoint(host, Integer.parseInt(port)); - } - - private String getClusterId() { - final SecureRandom random = new SecureRandom(); - final byte[] bytes = new byte[32]; // 32 bytes = 256 bits - random.nextBytes(bytes); - return "TSFILE-IMPORTER-" + UUID.nameUUIDFromBytes(bytes); - } - - private void close() { - try { - if (this.client != null) { - this.client.close(); - } - } catch (final Exception e) { - IOT_PRINTER.println("Failed to close client because " + e.getMessage()); - } - } - - public static void setHost(final String host) { - ImportTsFileRemotely.host = host; - } - - public static void setPort(final String port) { - ImportTsFileRemotely.port = port; - } -} diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileScanTool.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileScanTool.java deleted file mode 100644 index 4ec9f229084..00000000000 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileScanTool.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.tool.tsfile; - -import java.io.File; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.LinkedBlockingQueue; - -public class ImportTsFileScanTool { - - private static final String RESOURCE = ".resource"; - private static final String MODS = ".mods"; - - private static final LinkedBlockingQueue<String> tsfileQueue = new LinkedBlockingQueue<>(); - private static final Set<String> tsfileSet = new HashSet<>(); - private static final Set<String> resourceOrModsSet = new HashSet<>(); - private static String sourceFullPath; - - public static void traverseAndCollectFiles() throws InterruptedException { - traverseAndCollectFilesBySourceFullPath(new File(sourceFullPath)); - } - - private static void traverseAndCollectFilesBySourceFullPath(final File file) - throws InterruptedException { - if (file.isFile()) { - if (file.getName().endsWith(RESOURCE) || file.getName().endsWith(MODS)) { - resourceOrModsSet.add(file.getAbsolutePath()); - } else { - tsfileSet.add(file.getAbsolutePath()); - tsfileQueue.put(file.getAbsolutePath()); - } - } else if (file.isDirectory()) { - final File[] files = file.listFiles(); - if (files != null) { - for (File f : files) { - traverseAndCollectFilesBySourceFullPath(f); - } - } - } - } - - public static void addNoResourceOrModsToQueue() throws InterruptedException { - for (final String filePath : resourceOrModsSet) { - final String tsfilePath = - filePath.endsWith(RESOURCE) - ? filePath.substring(0, filePath.length() - RESOURCE.length()) - : filePath.substring(0, filePath.length() - MODS.length()); - if (!tsfileSet.contains(tsfilePath)) { - tsfileQueue.put(filePath); - } - } - } - - public static boolean isContainModsFile(final String modsFilePath) { - return ImportTsFileScanTool.resourceOrModsSet.contains(modsFilePath); - } - - public static String pollFromQueue() { - return ImportTsFileScanTool.tsfileQueue.poll(); - } - - public static void putToQueue(final String filePath) throws InterruptedException { - ImportTsFileScanTool.tsfileQueue.put(filePath); - } - - public static void setSourceFullPath(final String sourceFullPath) { - ImportTsFileScanTool.sourceFullPath = sourceFullPath; - } - - public static int getSourceFullPathLength() { - return ImportTsFileScanTool.sourceFullPath.length(); - } - - public static int getTsFileQueueSize() { - return ImportTsFileScanTool.tsfileQueue.size(); - } -} diff --git a/iotdb-client/cli/src/test/java/org/apache/iotdb/tool/WriteDataFileTest.java b/iotdb-client/cli/src/test/java/org/apache/iotdb/tool/WriteDataFileTest.java index f9b43b8a880..eb9e01d2937 100644 --- a/iotdb-client/cli/src/test/java/org/apache/iotdb/tool/WriteDataFileTest.java +++ b/iotdb-client/cli/src/test/java/org/apache/iotdb/tool/WriteDataFileTest.java @@ -19,8 +19,6 @@ package org.apache.iotdb.tool; -import org.apache.iotdb.tool.data.AbstractDataTool; - import org.junit.Before; import org.junit.Test;
