This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 52affcff02f load-tsfile script: optimization for loading tsfiles in
dir and error handling (#12541)
52affcff02f is described below
commit 52affcff02fbec82773b6b2d30380e985a2487ac
Author: ppppoooo <[email protected]>
AuthorDate: Fri May 31 14:43:27 2024 +0800
load-tsfile script: optimization for loading tsfiles in dir and error
handling (#12541)
Co-authored-by: xz m <[email protected]>
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../src/assembly/resources/tools/load-tsfile.bat | 67 +--
.../src/assembly/resources/tools/load-tsfile.sh | 80 +---
.../org/apache/iotdb/tool/AbstractTsFileTool.java | 59 ++-
.../java/org/apache/iotdb/tool/ExportTsFile.java | 11 +-
.../java/org/apache/iotdb/tool/ImportTsFile.java | 529 +++++++++++++++++++++
5 files changed, 582 insertions(+), 164 deletions(-)
diff --git a/iotdb-client/cli/src/assembly/resources/tools/load-tsfile.bat
b/iotdb-client/cli/src/assembly/resources/tools/load-tsfile.bat
old mode 100644
new mode 100755
index 3dc5553ef52..bd1f4c9170c
--- a/iotdb-client/cli/src/assembly/resources/tools/load-tsfile.bat
+++ b/iotdb-client/cli/src/assembly/resources/tools/load-tsfile.bat
@@ -30,7 +30,7 @@ pushd %~dp0..
if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%CD%
popd
-if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.cli.Cli
+if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.tool.ImportTsFile
if NOT DEFINED JAVA_HOME goto :err
@REM
-----------------------------------------------------------------------------
@@ -39,71 +39,12 @@ set JAVA_OPTS=-ea^
-DIOTDB_HOME="%IOTDB_HOME%"
REM For each jar in the IOTDB_HOME lib directory call append to build the
CLASSPATH variable.
-if EXIST "%IOTDB_HOME%\lib" (set CLASSPATH="%IOTDB_HOME%\lib\*") else set
CLASSPATH="%IOTDB_HOME%\..\lib\*"
+if EXIST %IOTDB_HOME%\lib (set CLASSPATH="%IOTDB_HOME%\lib\*") else set
CLASSPATH="%IOTDB_HOME%\..\lib\*"
-REM
-----------------------------------------------------------------------------
+set PARAMETERS=%*
-@REM set default parameters
-set pw_parameter=-pw root
-set u_parameter=-u root
-set p_parameter=-p 6667
-set h_parameter=-h 127.0.0.1
-set load_dir_parameter=-e "load '
-set sg_level_parameter=
-set verify_parameter=
-set on_success_parameter=
-
-echo %* | findstr /c:"-f">nul || (goto :load_err)
-
-@Rem get every param of input
-:loop
-set param=%1
-if %param%!== ! (
- goto :finally
-) else if "%param%"=="-pw" (
- set pw_parameter=%1 %2
-) else if "%param%"=="-u" (
- set u_parameter=%1 %2
-) else if "%param%"=="-p" (
- set p_parameter=%1 %2
-) else if "%param%"=="-h" (
- set h_parameter=%1 %2
-) else if "%param%"=="-f" (
- if "%2"=="" goto :load_err
- set load_dir_parameter=%load_dir_parameter%%2'
-) else if "%param%"=="--sgLevel" (
- set sg_level_parameter=sgLevel=%2
-) else if "%param%"=="--verify" (
- set verify_parameter=verify=%2
-) else if "%param%"=="--onSuccess" (
- set on_success_parameter=onSuccess=%2
-)
-shift
-goto :loop
-
-
-:err
-echo JAVA_HOME environment variable must be set!
-set ret_code=1
-ENDLOCAL
-EXIT /B %ret_code%
-
-:load_err
-echo -f option must be set!
-set ret_code=1
-ENDLOCAL
-EXIT /B %ret_code%
-
-@REM
-----------------------------------------------------------------------------
-:finally
-
-set PARAMETERS=%h_parameter% %p_parameter% %u_parameter% %pw_parameter%
%load_dir_parameter% %sg_level_parameter% %verify_parameter%
%on_success_parameter%"
-echo %PARAMETERS%
-
-echo start loading TsFiles, please wait...
+echo Starting...
"%JAVA_HOME%\bin\java" %JAVA_OPTS% -cp %CLASSPATH% %MAIN_CLASS% %PARAMETERS%
set ret_code=%ERRORLEVEL%
-ENDLOCAL
-
EXIT /B %ret_code%
diff --git a/iotdb-client/cli/src/assembly/resources/tools/load-tsfile.sh
b/iotdb-client/cli/src/assembly/resources/tools/load-tsfile.sh
old mode 100644
new mode 100755
index 8e2cbc8102c..b7ded896c02
--- a/iotdb-client/cli/src/assembly/resources/tools/load-tsfile.sh
+++ b/iotdb-client/cli/src/assembly/resources/tools/load-tsfile.sh
@@ -29,83 +29,11 @@ checkAllVariables
export IOTDB_HOME="${IOTDB_HOME}"
eval set -- "$VARS"
-PARAMETERS=""
-HELP="Usage: $0 -f <file_path> [--sgLevel <sg_level>] [--verify <true/false>]
[--onSuccess <none/delete>] [-h <ip>] [-p <port>] [-u <username>] [-pw
<password>]"
-
-# Added parameters when default parameters are missing
-user_param="-u root"
-passwd_param="-pw root"
-host_param="-h 127.0.0.1"
-port_param="-p 6667"
-
-while true; do
- case "$1" in
- -u)
- user_param="-u $2"
- shift 2
- ;;
- -pw)
- passwd_param="-pw $2"
- shift 2
- ;;
- -h)
- host_param="-h $2"
- shift 2
- ;;
- -p)
- port_param="-p $2"
- shift 2
- ;;
- -f)
- load_dir_param="$2"
- shift 2
- ;;
- --sgLevel)
- sg_level_param="$2"
- shift 2
- ;;
- --verify)
- verify_param="$2"
- shift 2
- ;;
- --onSuccess)
- on_success_param="$2"
- shift 2
- ;;
- "")
- #if we do not use getopt, we then have to process the case that
there is no argument.
- #in some systems, when there is no argument, shift command may
throw error, so we skip directly
- break
- ;;
- *)
- echo "Unrecognized options:$1"
- echo "${HELP}"
- exit 0
- ;;
- esac
-done
-
-if [ -z "${load_dir_param}" ]; then
- echo "A Loading file path/directory path is required."
- echo "${HELP}"
-fi
-
-LOAD_SQL="load '${load_dir_param}'"
-if [ -n "${sg_level_param}" ]; then
- LOAD_SQL="${LOAD_SQL} sgLevel=${sg_level_param}"
-fi
-if [ -n "${verify_param}" ]; then
- LOAD_SQL="${LOAD_SQL} verify=${verify_param}"
-fi
-if [ -n "${on_success_param}" ]; then
- LOAD_SQL="${LOAD_SQL} onSuccess=${on_success_param}"
-fi
-
-PARAMETERS="$host_param $port_param $user_param $passwd_param $PARAMETERS -e
\"${LOAD_SQL}\""
+PARAMETERS=$@
IOTDB_CLI_CONF=${IOTDB_HOME}/conf
-MAIN_CLASS=org.apache.iotdb.cli.Cli
+MAIN_CLASS=org.apache.iotdb.tool.ImportTsFile
CLASSPATH=""
for f in ${IOTDB_HOME}/lib/*.jar; do
@@ -124,9 +52,9 @@ else
fi
set -o noglob
-iotdb_cli_params="-Dlogback.configurationFile=${IOTDB_CLI_CONF}/logback-cli.xml"
+iotdb_cli_params="-Dlogback.configurationFile=${IOTDB_CLI_CONF}/logback-tool.xml"
-echo "start loading TsFiles, please wait..."
+echo "Starting..."
exec "$JAVA" $iotdb_cli_params -cp "$CLASSPATH" "$MAIN_CLASS" $PARAMETERS
exit $?
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/AbstractTsFileTool.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/AbstractTsFileTool.java
index 270ca079456..4a4ee1dbb11 100644
---
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/AbstractTsFileTool.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/AbstractTsFileTool.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.tool;
import org.apache.iotdb.cli.utils.IoTPrinter;
import org.apache.iotdb.exception.ArgsErrorException;
-import org.apache.iotdb.session.Session;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
@@ -51,11 +50,13 @@ public abstract class AbstractTsFileTool {
private static final IoTPrinter ioTPrinter = new IoTPrinter(System.out);
- protected static String host;
- protected static String port;
- protected static String username;
- protected static String password;
- protected static Session session;
+ protected static Options options;
+ protected static Options helpOptions;
+
+ protected static String host = "127.0.0.1";
+ protected static String port = "6667";
+ protected static String username = "root";
+ protected static String password = "root";
protected AbstractTsFileTool() {}
@@ -72,42 +73,61 @@ public abstract class AbstractTsFileTool {
}
protected static void parseBasicParams(CommandLine commandLine) throws
ArgsErrorException {
- host = checkRequiredArg(HOST_ARGS, HOST_NAME, commandLine);
- port = checkRequiredArg(PORT_ARGS, PORT_NAME, commandLine);
- username = checkRequiredArg(USERNAME_ARGS, USERNAME_NAME, commandLine);
- password = commandLine.getOptionValue(PW_ARGS);
+ host =
+ null != commandLine.getOptionValue(HOST_ARGS)
+ ? commandLine.getOptionValue(HOST_ARGS)
+ : host;
+ port =
+ null != commandLine.getOptionValue(PORT_ARGS)
+ ? commandLine.getOptionValue(PORT_ARGS)
+ : port;
+ username =
+ null != commandLine.getOptionValue(USERNAME_ARGS)
+ ? commandLine.getOptionValue(USERNAME_ARGS)
+ : username;
+ password =
+ null != commandLine.getOptionValue(PW_ARGS)
+ ? commandLine.getOptionValue(PW_ARGS)
+ : password;
}
- protected static Options createNewOptions() {
- Options options = new Options();
+ protected static void createBaseOptions() {
+ options = new Options();
+ helpOptions = new Options();
+
+ Option opHelp =
+ Option.builder(HELP_ARGS)
+ .longOpt(HELP_ARGS)
+ .hasArg(false)
+ .desc("Display help information")
+ .build();
+ options.addOption(opHelp);
+ helpOptions.addOption(opHelp);
Option opHost =
Option.builder(HOST_ARGS)
.longOpt(HOST_NAME)
- .required()
.argName(HOST_NAME)
.hasArg()
- .desc("Host Name (required)")
+ .desc("Host Name")
.build();
options.addOption(opHost);
Option opPort =
Option.builder(PORT_ARGS)
.longOpt(PORT_NAME)
- .required()
.argName(PORT_NAME)
.hasArg()
- .desc("Port (required)")
+ .desc("Port")
.build();
options.addOption(opPort);
Option opUsername =
Option.builder(USERNAME_ARGS)
.longOpt(USERNAME_NAME)
- .required()
.argName(USERNAME_NAME)
.hasArg()
- .desc("Username (required)")
+ .desc("Username")
.build();
options.addOption(opUsername);
@@ -117,9 +137,8 @@ public abstract class AbstractTsFileTool {
.optionalArg(true)
.argName(PW_NAME)
.hasArg()
- .desc("Password (required)")
+ .desc("Password")
.build();
options.addOption(opPassword);
- return options;
}
}
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportTsFile.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportTsFile.java
index 67f0ba01459..5815362ac1e 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportTsFile.java
+++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportTsFile.java
@@ -34,7 +34,6 @@ import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.WriteProcessException;
@@ -73,6 +72,9 @@ public class ExportTsFile extends AbstractTsFileTool {
private static final String QUERY_COMMAND_NAME = "queryCommand";
private static final String DUMP_FILE_NAME_DEFAULT = "dump";
private static final String TSFILEDB_CLI_PREFIX = "ExportTsFile";
+
+ private static Session session;
+
private static String targetDirectory;
private static String targetFile = DUMP_FILE_NAME_DEFAULT;
private static String queryCommand;
@@ -87,7 +89,7 @@ public class ExportTsFile extends AbstractTsFileTool {
}) // Suppress high Cognitive Complexity warning, ignore try-with-resources
/* main function of export tsFile tool. */
public static void main(String[] args) {
- Options options = createOptions();
+ createOptions();
HelpFormatter hf = new HelpFormatter();
CommandLine commandLine = null;
CommandLineParser parser = new DefaultParser();
@@ -217,8 +219,8 @@ public class ExportTsFile extends AbstractTsFileTool {
*
* @return object Options
*/
- private static Options createOptions() {
- Options options = createNewOptions();
+ private static void createOptions() {
+ createBaseOptions();
Option opTargetFile =
Option.builder(TARGET_DIR_ARGS)
@@ -268,7 +270,6 @@ public class ExportTsFile extends AbstractTsFileTool {
.desc("Timeout for session query")
.build();
options.addOption(opTimeout);
- return options;
}
/**
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportTsFile.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportTsFile.java
new file mode 100644
index 00000000000..1451c40f828
--- /dev/null
+++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ImportTsFile.java
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tool;
+
+import org.apache.iotdb.cli.utils.IoTPrinter;
+import org.apache.iotdb.exception.ArgsErrorException;
+import org.apache.iotdb.session.pool.SessionPool;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.ParseException;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class ImportTsFile extends AbstractTsFileTool {
+
+ private static final String SOURCE_ARGS = "s";
+ private static final String SOURCE_NAME = "source";
+
+ private static final String ON_SUCCESS_ARGS = "os";
+ private static final String ON_SUCCESS_NAME = "on_success";
+
+ private static final String SUCCESS_DIR_ARGS = "sd";
+ private static final String SUCCESS_DIR_NAME = "success_dir";
+
+ private static final String FAIL_DIR_ARGS = "fd";
+ private static final String FAIL_DIR_NAME = "fail_dir";
+
+ private static final String ON_FAIL_ARGS = "of";
+ private static final String ON_FAIL_NAME = "on_fail";
+
+ private static final String THREAD_NUM_ARGS = "tn";
+ private static final String THREAD_NUM_NAME = "thread_num";
+
+ private static final IoTPrinter ioTPrinter = new IoTPrinter(System.out);
+
+ private static final String TS_FILE_CLI_PREFIX = "ImportTsFile";
+
+ private static final String RESOURCE = ".resource";
+ private static final String MODS = ".mods";
+
+ private static String source;
+ private static String sourceFullPath;
+
+ private static String successDir = "success/";
+ private static String failDir = "fail/";
+
+ private static Operation successOperation;
+ private static Operation failOperation;
+
+ private static int threadNum = 8;
+
+ private static final LinkedBlockingQueue<String> tsfileQueue = new
LinkedBlockingQueue<>();
+ private static final Set<String> tsfileSet = new HashSet<>();
+ private static final Set<String> resourceOrModsSet = new HashSet<>();
+
+ private static SessionPool sessionPool;
+
+ private static void createOptions() {
+ createBaseOptions();
+
+ Option opSource =
+ Option.builder(SOURCE_ARGS)
+ .longOpt(SOURCE_NAME)
+ .argName(SOURCE_NAME)
+ .required()
+ .hasArg()
+ .desc(
+ "The source file or directory path, "
+ + "which can be a tsfile or a directory containing
tsfiles. (required)")
+ .build();
+ options.addOption(opSource);
+
+ Option opOnSuccess =
+ Option.builder(ON_SUCCESS_ARGS)
+ .longOpt(ON_SUCCESS_NAME)
+ .argName(ON_SUCCESS_NAME)
+ .required()
+ .hasArg()
+ .desc(
+ "When loading tsfile successfully, do operation on tsfile (and
its .resource and .mods files), "
+ + "optional parameters are none, mv, cp, delete.
(required)")
+ .build();
+ options.addOption(opOnSuccess);
+
+ Option opOnFail =
+ Option.builder(ON_FAIL_ARGS)
+ .longOpt(ON_FAIL_NAME)
+ .argName(ON_FAIL_NAME)
+ .required()
+ .hasArg()
+ .desc(
+ "When loading tsfile fail, do operation on tsfile (and its
.resource and .mods files), "
+ + "optional parameters are none, mv, cp, delete.
(required)")
+ .build();
+ options.addOption(opOnFail);
+
+ Option opSuccessDir =
+ Option.builder(SUCCESS_DIR_ARGS)
+ .longOpt(SUCCESS_DIR_NAME)
+ .argName(SUCCESS_DIR_NAME)
+ .hasArg()
+ .desc("The target folder when 'os' is 'mv' or 'cp'.")
+ .build();
+ options.addOption(opSuccessDir);
+
+ Option opFailDir =
+ Option.builder(FAIL_DIR_ARGS)
+ .longOpt(FAIL_DIR_NAME)
+ .argName(FAIL_DIR_NAME)
+ .hasArg()
+ .desc("The target folder when 'of' is 'mv' or 'cp'.")
+ .build();
+ options.addOption(opFailDir);
+
+ Option opThreadNum =
+ Option.builder(THREAD_NUM_ARGS)
+ .longOpt(THREAD_NUM_NAME)
+ .argName(THREAD_NUM_NAME)
+ .hasArgs()
+ .desc("The number of threads used to import tsfile, default is 8.")
+ .build();
+ options.addOption(opThreadNum);
+ }
+
+ public static void main(String[] args) {
+ createOptions();
+
+ final CommandLineParser parser = new DefaultParser();
+
+ final HelpFormatter helpFormatter = new HelpFormatter();
+ helpFormatter.setOptionComparator(null);
+ helpFormatter.setWidth(MAX_HELP_CONSOLE_WIDTH);
+
+ if (args == null || args.length == 0) {
+ ioTPrinter.println("Too few arguments, please check the following
hint.");
+ helpFormatter.printHelp(TS_FILE_CLI_PREFIX, options, true);
+ System.exit(CODE_ERROR);
+ }
+
+ try {
+ if (parser.parse(helpOptions, args, true).hasOption(HELP_ARGS)) {
+ helpFormatter.printHelp(TS_FILE_CLI_PREFIX, options, true);
+ System.exit(CODE_OK);
+ }
+ } catch (ParseException e) {
+ ioTPrinter.println("Failed to parse the provided options: " +
e.getMessage());
+ helpFormatter.printHelp(TS_FILE_CLI_PREFIX, options, true);
+ System.exit(CODE_ERROR);
+ }
+
+ CommandLine commandLine = null;
+ try {
+ commandLine = parser.parse(options, args, true);
+ } catch (ParseException e) {
+ ioTPrinter.println("Failed to parse the provided options: " +
e.getMessage());
+ helpFormatter.printHelp(TS_FILE_CLI_PREFIX, options, true);
+ System.exit(CODE_ERROR);
+ }
+
+ try {
+ parseBasicParams(commandLine);
+ parseSpecialParams(commandLine);
+ } catch (Exception e) {
+ ioTPrinter.println("Encounter an error when parsing the provided
options: " + e.getMessage());
+ System.exit(CODE_ERROR);
+ }
+
+ System.exit(importFromTargetPath());
+ }
+
+ private static void parseSpecialParams(CommandLine commandLine) throws
ArgsErrorException {
+ source = commandLine.getOptionValue(SOURCE_ARGS);
+ if (!Files.exists(Paths.get(source))) {
+ ioTPrinter.println(String.format("Source file or directory %s does not
exist", source));
+ System.exit(CODE_ERROR);
+ }
+
+ final String onSuccess =
commandLine.getOptionValue(ON_SUCCESS_ARGS).trim().toLowerCase();
+ final String onFail =
commandLine.getOptionValue(ON_FAIL_ARGS).trim().toLowerCase();
+ if (!Operation.isValidOperation(onSuccess) ||
!Operation.isValidOperation(onFail)) {
+ ioTPrinter.println("Args error: os/of must be one of none, mv, cp,
delete");
+ System.exit(CODE_ERROR);
+ }
+
+ boolean isSuccessDirEqualsSourceDir = false;
+ if (Operation.MV.name().equalsIgnoreCase(onSuccess)
+ || Operation.CP.name().equalsIgnoreCase(onSuccess)) {
+ File dir = createSuccessDir(commandLine);
+ isSuccessDirEqualsSourceDir = isFileStoreEquals(source, dir);
+ }
+
+ boolean isFailDirEqualsSourceDir = false;
+ if (Operation.MV.name().equalsIgnoreCase(onFail)
+ || Operation.CP.name().equalsIgnoreCase(onFail)) {
+ File dir = createFailDir(commandLine);
+ isFailDirEqualsSourceDir = isFileStoreEquals(source, dir);
+ }
+
+ successOperation = Operation.getOperation(onSuccess,
isSuccessDirEqualsSourceDir);
+ failOperation = Operation.getOperation(onFail, isFailDirEqualsSourceDir);
+
+ if (commandLine.getOptionValue(THREAD_NUM_ARGS) != null) {
+ threadNum =
Integer.parseInt(commandLine.getOptionValue(THREAD_NUM_ARGS));
+ }
+ }
+
+ public static boolean isFileStoreEquals(String pathString, File dir) {
+ try {
+ return Objects.equals(
+ Files.getFileStore(Paths.get(pathString)),
Files.getFileStore(dir.toPath()));
+ } catch (IOException e) {
+ ioTPrinter.println("IOException when checking file store: " +
e.getMessage());
+ return false;
+ }
+ }
+
+ public static File createSuccessDir(CommandLine commandLine) {
+ if (commandLine.getOptionValue(SUCCESS_DIR_ARGS) != null) {
+ successDir = commandLine.getOptionValue(SUCCESS_DIR_ARGS);
+ }
+ File file = new File(successDir);
+ if (!file.isDirectory()) {
+ if (!file.mkdirs()) {
+ ioTPrinter.println(String.format("Failed to create %s %s",
SUCCESS_DIR_NAME, successDir));
+ System.exit(CODE_ERROR);
+ }
+ }
+ return file;
+ }
+
+ public static File createFailDir(CommandLine commandLine) {
+ if (commandLine.getOptionValue(FAIL_DIR_ARGS) != null) {
+ failDir = commandLine.getOptionValue(FAIL_DIR_ARGS);
+ }
+ File file = new File(failDir);
+ if (!file.isDirectory()) {
+ if (!file.mkdirs()) {
+ ioTPrinter.println(String.format("Failed to create %s %s",
FAIL_DIR_NAME, failDir));
+ System.exit(CODE_ERROR);
+ }
+ }
+ return file;
+ }
+
+ public static int importFromTargetPath() {
+ try {
+ final File file = new File(source);
+ sourceFullPath = file.getAbsolutePath();
+ if (!file.isFile() && !file.isDirectory()) {
+ ioTPrinter.println(String.format("source file or directory %s does not
exist", source));
+ return CODE_ERROR;
+ }
+
+ sessionPool =
+ new SessionPool(host, Integer.parseInt(port), username, password,
threadNum + 1);
+
+ traverseAndCollectFiles(file);
+ addNoResourceOrModsToQueue();
+ asyncImportTsFiles();
+ return CODE_OK;
+ } catch (InterruptedException e) {
+ ioTPrinter.println(String.format("Import tsfile fail: %s",
e.getMessage()));
+ return CODE_ERROR;
+ } finally {
+ if (sessionPool != null) {
+ sessionPool.close();
+ }
+ }
+ }
+
+ public static void traverseAndCollectFiles(File file) throws
InterruptedException {
+ if (file.isFile()) {
+ if (file.getName().endsWith(RESOURCE) || file.getName().endsWith(MODS)) {
+ resourceOrModsSet.add(file.getAbsolutePath());
+ } else {
+ tsfileSet.add(file.getAbsolutePath());
+ tsfileQueue.put(file.getAbsolutePath());
+ }
+ } else if (file.isDirectory()) {
+ final File[] files = file.listFiles();
+ if (files != null) {
+ for (File f : files) {
+ traverseAndCollectFiles(f);
+ }
+ }
+ }
+ }
+
+ public static void addNoResourceOrModsToQueue() throws InterruptedException {
+ for (final String filePath : resourceOrModsSet) {
+ final String tsfilePath =
+ filePath.endsWith(RESOURCE)
+ ? filePath.substring(0, filePath.length() - RESOURCE.length())
+ : filePath.substring(0, filePath.length() - MODS.length());
+ if (!tsfileSet.contains(tsfilePath)) {
+ tsfileQueue.put(filePath);
+ }
+ }
+ }
+
+ public static void asyncImportTsFiles() {
+ final List<Thread> list = new ArrayList<>(threadNum);
+ for (int i = 0; i < threadNum; i++) {
+ final Thread thread = new Thread(ImportTsFile::importTsFile);
+ thread.start();
+ list.add(thread);
+ }
+ list.forEach(
+ thread -> {
+ try {
+ thread.join();
+ } catch (InterruptedException e) {
+ ioTPrinter.println("importTsFile thread join interrupted: " +
e.getMessage());
+ }
+ });
+ }
+
+ public static void importTsFile() {
+ String filePath;
+ try {
+ while ((filePath = tsfileQueue.poll()) != null) {
+ final String sql = "load '" + filePath + "' onSuccess=none ";
+
+ try {
+ ioTPrinter.println("Importing [ " + filePath + " ] file ...");
+ sessionPool.executeNonQueryStatement(sql);
+ ioTPrinter.println("Imported [ " + filePath + " ] file
successfully!");
+
+ try {
+ ioTPrinter.println("Processing success file [ " + filePath + " ]
...");
+ processingFile(filePath, successDir, successOperation);
+ ioTPrinter.println("Processed success file [ " + filePath + " ]
successfully!");
+ } catch (Exception processSuccessException) {
+ ioTPrinter.println(
+ "Failed to process success file [ "
+ + filePath
+ + " ]: "
+ + processSuccessException.getMessage());
+ }
+ } catch (Exception e) {
+ ioTPrinter.println("Failed to import [ " + filePath + " ] file: " +
e.getMessage());
+
+ try {
+ ioTPrinter.println("Processing fail file [ " + filePath + " ]
...");
+ processingFile(filePath, failDir, failOperation);
+ ioTPrinter.println("Processed fail file [ " + filePath + " ]
successfully!");
+ } catch (Exception processFailException) {
+ ioTPrinter.println(
+ "Failed to process fail file [ "
+ + filePath
+ + " ]: "
+ + processFailException.getMessage());
+ }
+ } finally {
+ ioTPrinter.println("Processed file [ " + filePath + " ]
completely!");
+ }
+ }
+ } catch (Exception e) {
+ ioTPrinter.println("Unexpected error occurred: " + e.getMessage());
+ }
+ }
+
+ public static void processingFile(String filePath, String dir, Operation
operation) {
+ String relativePath = filePath.substring(sourceFullPath.length() + 1);
+ Path sourcePath = Paths.get(filePath);
+
+ String target = dir + File.separator +
relativePath.replace(File.separator, "_");
+ Path targetPath = Paths.get(target);
+
+ Path sourceResourcePath = Paths.get(sourcePath + RESOURCE);
+ sourceResourcePath = Files.exists(sourceResourcePath) ? sourceResourcePath
: null;
+ Path targetResourcePath = Paths.get(target + RESOURCE);
+
+ Path sourceModsPath = Paths.get(sourcePath + MODS);
+ sourceModsPath = Files.exists(sourceModsPath) ? sourceModsPath : null;
+ Path targetModsPath = Paths.get(target + MODS);
+
+ switch (operation) {
+ case DELETE:
+ {
+ try {
+ Files.deleteIfExists(sourcePath);
+ if (null != sourceResourcePath) {
+ Files.deleteIfExists(sourceResourcePath);
+ }
+ if (null != sourceModsPath) {
+ Files.deleteIfExists(sourceModsPath);
+ }
+ } catch (Exception e) {
+ ioTPrinter.println(String.format("Failed to delete file: %s",
e.getMessage()));
+ }
+ break;
+ }
+ case CP:
+ {
+ try {
+ Files.copy(sourcePath, targetPath,
StandardCopyOption.REPLACE_EXISTING);
+ if (null != sourceResourcePath) {
+ Files.copy(
+ sourceResourcePath, targetResourcePath,
StandardCopyOption.REPLACE_EXISTING);
+ }
+ if (null != sourceModsPath) {
+ Files.copy(sourceModsPath, targetModsPath,
StandardCopyOption.REPLACE_EXISTING);
+ }
+ } catch (Exception e) {
+ ioTPrinter.println(String.format("Failed to copy file: %s",
e.getMessage()));
+ }
+ break;
+ }
+ case HARDLINK:
+ {
+ try {
+ Files.createLink(targetPath, sourcePath);
+ } catch (FileAlreadyExistsException e) {
+ ioTPrinter.println("Hardlink already exists: " + e.getMessage());
+ } catch (Exception e) {
+ try {
+ Files.copy(sourcePath, targetPath,
StandardCopyOption.REPLACE_EXISTING);
+ } catch (Exception copyException) {
+ ioTPrinter.println(
+ String.format("Failed to copy file: %s",
copyException.getMessage()));
+ }
+ }
+
+ try {
+ if (null != sourceResourcePath) {
+ Files.copy(
+ sourceResourcePath, targetResourcePath,
StandardCopyOption.REPLACE_EXISTING);
+ }
+ if (null != sourceModsPath) {
+ Files.copy(sourceModsPath, targetModsPath,
StandardCopyOption.REPLACE_EXISTING);
+ }
+ } catch (Exception e) {
+ ioTPrinter.println(
+ String.format("Failed to copy resource or mods file: %s",
e.getMessage()));
+ }
+ break;
+ }
+ case MV:
+ {
+ try {
+ Files.move(sourcePath, targetPath,
StandardCopyOption.REPLACE_EXISTING);
+ if (null != sourceResourcePath) {
+ Files.move(
+ sourceResourcePath, targetResourcePath,
StandardCopyOption.REPLACE_EXISTING);
+ }
+ if (null != sourceModsPath) {
+ Files.move(sourceModsPath, targetModsPath,
StandardCopyOption.REPLACE_EXISTING);
+ }
+ } catch (Exception e) {
+ ioTPrinter.println(String.format("Failed to move file: %s",
e.getMessage()));
+ }
+ break;
+ }
+ default:
+ break;
+ }
+ }
+
+ public enum Operation {
+ NONE,
+ MV,
+ HARDLINK,
+ CP,
+ DELETE,
+ ;
+
+ public static boolean isValidOperation(String operation) {
+ return "none".equalsIgnoreCase(operation)
+ || "mv".equalsIgnoreCase(operation)
+ || "cp".equalsIgnoreCase(operation)
+ || "delete".equalsIgnoreCase(operation);
+ }
+
+ public static Operation getOperation(String operation, boolean
isFileStoreEquals) {
+ switch (operation.toLowerCase()) {
+ case "none":
+ return Operation.NONE;
+ case "mv":
+ return Operation.MV;
+ case "cp":
+ if (isFileStoreEquals) {
+ return Operation.HARDLINK;
+ } else {
+ return Operation.CP;
+ }
+ case "delete":
+ return Operation.DELETE;
+ default:
+ ioTPrinter.println("Args error: os/of must be one of none, mv, cp,
delete");
+ System.exit(CODE_ERROR);
+ return null;
+ }
+ }
+ }
+}