This is an automated email from the ASF dual-hosted git repository.
critas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9132389e4fd Import/Export schema script adapt table model (#15192)
9132389e4fd is described below
commit 9132389e4fdf3a250eface68f9dbf3b0b86a1ff8
Author: Summer <[email protected]>
AuthorDate: Wed Apr 2 14:10:12 2025 +0800
Import/Export schema script adapt table model (#15192)
* export schema script adapt table model
* export schema script adapt table model
---------
Co-authored-by: 2b3c511 <[email protected]>
---
.../apache/iotdb/tools/it/ExportDataTestIT.java | 13 +-
.../apache/iotdb/tools/it/ExportSchemaTestIT.java | 5 +-
.../apache/iotdb/tools/it/ImportSchemaTestIT.java | 8 +-
.../org/apache/iotdb/tool/common/Constants.java | 45 +-
.../org/apache/iotdb/tool/common/OptionsUtil.java | 156 +++++-
.../apache/iotdb/tool/data/AbstractImportData.java | 2 +-
.../org/apache/iotdb/tool/data/ExportData.java | 1 -
.../apache/iotdb/tool/data/ExportDataTable.java | 2 +-
.../org/apache/iotdb/tool/data/ExportDataTree.java | 2 +-
.../iotdb/tool/schema/AbstractExportSchema.java | 33 ++
.../iotdb/tool/schema/AbstractImportSchema.java | 94 ++++
.../iotdb/tool/schema/AbstractSchemaTool.java | 120 ++--
.../org/apache/iotdb/tool/schema/ExportSchema.java | 325 +++--------
.../iotdb/tool/schema/ExportSchemaTable.java | 252 +++++++++
.../apache/iotdb/tool/schema/ExportSchemaTree.java | 116 ++++
.../org/apache/iotdb/tool/schema/ImportSchema.java | 613 +++------------------
.../iotdb/tool/schema/ImportSchemaTable.java | 188 +++++++
.../{ImportSchema.java => ImportSchemaTree.java} | 417 ++++----------
18 files changed, 1167 insertions(+), 1225 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportDataTestIT.java
b/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportDataTestIT.java
index f082391769d..2a30077f487 100644
---
a/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportDataTestIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportDataTestIT.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.tool.common.Constants;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -75,7 +76,7 @@ public class ExportDataTestIT extends AbstractScriptIT {
@Override
protected void testOnWindows() throws IOException {
- final String[] output = {"Export completely!"};
+ final String[] output = {Constants.EXPORT_COMPLETELY};
ProcessBuilder builder =
new ProcessBuilder(
"cmd.exe",
@@ -103,7 +104,7 @@ public class ExportDataTestIT extends AbstractScriptIT {
prepareData();
- final String[] output1 = {"Export completely!"};
+ final String[] output1 = {Constants.EXPORT_COMPLETELY};
ProcessBuilder builder1 =
new ProcessBuilder(
"cmd.exe",
@@ -131,7 +132,7 @@ public class ExportDataTestIT extends AbstractScriptIT {
prepareData();
- final String[] output2 = {"Export completely!"};
+ final String[] output2 = {Constants.EXPORT_COMPLETELY};
ProcessBuilder builder2 =
new ProcessBuilder(
"cmd.exe",
@@ -160,7 +161,7 @@ public class ExportDataTestIT extends AbstractScriptIT {
@Override
protected void testOnUnix() throws IOException {
- final String[] output = {"Export completely!"};
+ final String[] output = {Constants.EXPORT_COMPLETELY};
// -h 127.0.0.1 -p 6667 -u root -pw root -td ./ -q "select * from root.**"
ProcessBuilder builder =
new ProcessBuilder(
@@ -185,7 +186,7 @@ public class ExportDataTestIT extends AbstractScriptIT {
prepareData();
- final String[] output1 = {"Export completely!"};
+ final String[] output1 = {Constants.EXPORT_COMPLETELY};
// -h 127.0.0.1 -p 6667 -u root -pw root -td ./ -q "select * from root.**"
ProcessBuilder builder1 =
new ProcessBuilder(
@@ -210,7 +211,7 @@ public class ExportDataTestIT extends AbstractScriptIT {
prepareData();
- final String[] output2 = {"Export completely!"};
+ final String[] output2 = {Constants.EXPORT_COMPLETELY};
// -h 127.0.0.1 -p 6667 -u root -pw root -td ./ -q "select * from root.**"
ProcessBuilder builder2 =
new ProcessBuilder(
diff --git
a/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportSchemaTestIT.java
b/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportSchemaTestIT.java
index cc520b5e809..fd40f9962f7 100644
---
a/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportSchemaTestIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportSchemaTestIT.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.tool.common.Constants;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.CompressionType;
@@ -81,7 +82,7 @@ public class ExportSchemaTestIT extends AbstractScriptIT {
@Override
protected void testOnWindows() throws IOException {
prepareSchema();
- final String[] output = {"Export completely!"};
+ final String[] output = {Constants.EXPORT_COMPLETELY};
ProcessBuilder builder =
new ProcessBuilder(
"cmd.exe",
@@ -109,7 +110,7 @@ public class ExportSchemaTestIT extends AbstractScriptIT {
@Override
protected void testOnUnix() throws IOException {
prepareSchema();
- final String[] output = {"Export completely!"};
+ final String[] output = {Constants.EXPORT_COMPLETELY};
ProcessBuilder builder =
new ProcessBuilder(
"bash",
diff --git
a/integration-test/src/test/java/org/apache/iotdb/tools/it/ImportSchemaTestIT.java
b/integration-test/src/test/java/org/apache/iotdb/tools/it/ImportSchemaTestIT.java
index 06eb3f4eb34..74c7c69c8c7 100644
---
a/integration-test/src/test/java/org/apache/iotdb/tools/it/ImportSchemaTestIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/tools/it/ImportSchemaTestIT.java
@@ -77,7 +77,7 @@ public class ImportSchemaTestIT extends AbstractScriptIT {
@Override
protected void testOnWindows() throws IOException {
final String[] output = {
- "The file name must end with \"csv\"!",
+ "Import completely!",
};
ProcessBuilder builder =
new ProcessBuilder(
@@ -92,6 +92,8 @@ public class ImportSchemaTestIT extends AbstractScriptIT {
"root",
"-pw",
"root",
+ "-db",
+ "test",
"-s",
"./",
"&",
@@ -104,7 +106,7 @@ public class ImportSchemaTestIT extends AbstractScriptIT {
@Override
protected void testOnUnix() throws IOException {
final String[] output = {
- "The file name must end with \"csv\"!",
+ "Import completely!",
};
ProcessBuilder builder =
new ProcessBuilder(
@@ -118,6 +120,8 @@ public class ImportSchemaTestIT extends AbstractScriptIT {
"root",
"-pw",
"root",
+ "-db",
+ "test",
"-s",
"./");
builder.environment().put("IOTDB_HOME", homePath);
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/Constants.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/Constants.java
index cd17f05d5a9..787cd6b3d18 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/Constants.java
+++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/Constants.java
@@ -22,7 +22,9 @@ package org.apache.iotdb.tool.common;
import org.apache.tsfile.enums.TSDataType;
import java.text.SimpleDateFormat;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
public class Constants {
@@ -96,7 +98,7 @@ public class Constants {
public static final String DB_ARGS = "db";
public static final String DB_NAME = "database";
public static final String DB_DESC =
- "The database to be exported,only takes effect when sql_dialect is
table.(optional)";
+ "The database to be exported,only takes effect and required when
sql_dialect is table .(optional)";
public static final String TABLE_ARGS = "table";
public static final String TABLE_DESC =
@@ -159,8 +161,12 @@ public class Constants {
public static final String COLON = ": ";
public static final String MINUS = "-";
+ public static final List<String> HEAD_COLUMNS =
+ Arrays.asList("Timeseries", "Alias", "DataType", "Encoding",
"Compression");
+
// export constants
public static final String EXPORT_CLI_PREFIX = "Export Data";
+ public static final String EXPORT_SCHEMA_CLI_PREFIX = "ExportSchema";
public static final String EXPORT_CLI_HEAD =
"Please obtain help information for the corresponding data type based on
different parameters, for example:\n"
@@ -168,6 +174,8 @@ public class Constants {
+ "./export_data.sh -help sql\n"
+ "./export_data.sh -help csv";
+ public static final String SCHEMA_CLI_CHECK_IN_HEAD =
+ "Too few params input, please check the following hint.";
public static final String START_TIME_ARGS = "start_time";
public static final String START_TIME_DESC = "The start time to be exported
(optional)";
@@ -181,6 +189,11 @@ public class Constants {
public static final String TARGET_DIR_SUBSCRIPTION_DESC =
"Target file directory.default ./target (optional)";
+ public static final String TARGET_PATH_ARGS = "path";
+ public static final String TARGET_PATH_ARGS_NAME = "path_pattern";
+ public static final String TARGET_PATH_NAME = "exportPathPattern";
+ public static final String TARGET_PATH_DESC = "Export Path Pattern
(optional)";
+
public static final String QUERY_COMMAND_ARGS = "q";
public static final String QUERY_COMMAND_NAME = "query";
public static final String QUERY_COMMAND_ARGS_NAME = "query_command";
@@ -200,13 +213,17 @@ public class Constants {
public static final String LINES_PER_FILE_ARGS = "lpf";
public static final String LINES_PER_FILE_NAME = "lines_per_file";
- public static final String LINES_PER_FILE_DESC = "Lines per dump
file.(optional)";
+ public static final String LINES_PER_FILE_DESC =
+ "Lines per dump file,only effective in tree model.(optional)";
public static final String DUMP_FILE_NAME_DEFAULT = "dump";
public static final String queryTableParamRequired =
"Either '-q' or '-table' is required when 'sql-dialect' is' table '";
public static final String INSERT_CSV_MEET_ERROR_MSG = "Meet error when
insert csv because ";
+ public static final String INSERT_SQL_MEET_ERROR_MSG = "Meet error when
insert sql because ";
+ public static final String COLUMN_SQL_MEET_ERROR_MSG =
+ "Meet error when get table columns information because ";
public static final String TARGET_DATABASE_NOT_EXIST_MSG =
"The target database %s does not exist";
public static final String TARGET_TABLE_NOT_EXIST_MSG =
@@ -275,8 +292,19 @@ public class Constants {
public static final String HANDLER = "TsFileHandler";
public static final String CONSUMER_NAME_PREFIX = "consumer_";
public static final SimpleDateFormat DATE_FORMAT_VIEW = new
SimpleDateFormat("yyyyMMddHHmmssSSS");
+ public static final String BASE_VIEW_TYPE = "BASE";
+ public static final String HEADER_VIEW_TYPE = "ViewType";
+ public static final String HEADER_TIMESERIES = "Timeseries";
+ public static final String EXPORT_COMPLETELY = "Export completely!";
+ public static final String EXPORT_SCHEMA_TABLES_SELECT =
+ "select * from information_schema.tables where database = '%s'";
+ public static final String EXPORT_SCHEMA_TABLES_SHOW = "show tables details
from %s";
+ public static final String EXPORT_SCHEMA_COLUMNS_SELECT =
+ "select * from information_schema.columns where database like '%s' and
table_name like '%s'";
+ public static final String EXPORT_SCHEMA_COLUMNS_DESC = "desc %s.%s details";
// import constants
+ public static final String IMPORT_SCHEMA_CLI_PREFIX = "ImportSchema";
public static final String IMPORT_CLI_PREFIX = "Import Data";
public static final String IMPORT_CLI_HEAD =
@@ -290,6 +318,12 @@ public class Constants {
public static final String FILE_DESC =
"The local directory path of the script file (folder) to be loaded.
(required)";
+ public static final String FAILED_FILE_ARGS = "fd";
+ public static final String FAILED_FILE_NAME = "fail_dir";
+ public static final String FAILED_FILE_ARGS_NAME = "failDir";
+ public static final String FAILED_FILE_DESC =
+ "Specifying a directory to save failed file, default YOUR_CSV_FILE_PATH
(optional)";
+
public static final String ON_SUCCESS_ARGS = "os";
public static final String ON_SUCCESS_NAME = "on_success";
public static final String ON_SUCCESS_DESC =
@@ -325,6 +359,8 @@ public class Constants {
public static final String BATCH_POINT_SIZE_NAME = "batch_size";
public static final String BATCH_POINT_SIZE_ARGS_NAME = "batch_size";
public static final String BATCH_POINT_SIZE_DESC = "100000 (optional)";
+ public static final String BATCH_POINT_SIZE_LIMIT_DESC =
+ "10000 (only not aligned and sql_dialect tree optional)";
public static final String TIMESTAMP_PRECISION_ARGS = "tp";
public static final String TIMESTAMP_PRECISION_NAME = "timestamp_precision";
@@ -338,5 +374,8 @@ public class Constants {
public static final String LINES_PER_FAILED_FILE_ARGS = "lpf";
public static final String LINES_PER_FAILED_FILE_ARGS_NAME =
"lines_per_failed_file";
- public static final String LINES_PER_FAILED_FILE_DESC = "Lines per failed
file.(optional)";
+ public static final String LINES_PER_FAILED_FILE_DESC =
+ "Lines per failed file,only takes effect and required when sql_dialect
is table .(option)";
+ public static final String IMPORT_COMPLETELY = "Import completely!";
+ public static final int BATCH_POINT_SIZE = 10000;
}
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/OptionsUtil.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/OptionsUtil.java
index 722f4996983..b076ed1d114 100644
---
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/OptionsUtil.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/OptionsUtil.java
@@ -56,19 +56,7 @@ public class OptionsUtil extends Constants {
return options;
}
- public static Options createCommonOptions() {
- Options options = new Options();
-
- Option opFileType =
- Option.builder(FILE_TYPE_ARGS)
- .longOpt(FILE_TYPE_NAME)
- .argName(FILE_TYPE_ARGS_NAME)
- .required()
- .hasArg()
- .desc(isImport ? FILE_TYPE_DESC_IMPORT : FILE_TYPE_DESC_EXPORT)
- .build();
- options.addOption(opFileType);
-
+ public static Options createCommonOptions(Options options) {
Option opSqlDialect =
Option.builder(SQL_DIALECT_ARGS)
.longOpt(SQL_DIALECT_ARGS)
@@ -118,12 +106,28 @@ public class OptionsUtil extends Constants {
return options;
}
+ public static Options createImportCommonOptions() {
+ Options options = new Options();
+
+ Option opFileType =
+ Option.builder(FILE_TYPE_ARGS)
+ .longOpt(FILE_TYPE_NAME)
+ .argName(FILE_TYPE_ARGS_NAME)
+ .required()
+ .hasArg()
+ .desc(isImport ? FILE_TYPE_DESC_IMPORT : FILE_TYPE_DESC_EXPORT)
+ .build();
+ options.addOption(opFileType);
+
+ return createCommonOptions(options);
+ }
+
public static Options createTreeImportCommonOptions() {
- return createCommonOptions();
+ return createImportCommonOptions();
}
public static Options createTableImportCommonOptions() {
- Options options = createCommonOptions();
+ Options options = createImportCommonOptions();
Option opDatabase =
Option.builder(DB_ARGS)
@@ -139,7 +143,7 @@ public class OptionsUtil extends Constants {
}
public static Options createTreeExportCommonOptions() {
- Options options = createCommonOptions();
+ Options options = createImportCommonOptions();
Option opFile =
Option.builder(TARGET_DIR_ARGS)
@@ -181,7 +185,7 @@ public class OptionsUtil extends Constants {
}
public static Options createTableExportCommonOptions() {
- final Options options = createCommonOptions();
+ final Options options = createImportCommonOptions();
Option opFile =
Option.builder(TARGET_DIR_ARGS)
@@ -1073,4 +1077,122 @@ public class OptionsUtil extends Constants {
options.addOption(opHelp);
return options;
}
+
+ public static Options createExportSchemaOptions() {
+ Options options = createCommonOptions(new Options());
+ Option opTargetFile =
+ Option.builder(TARGET_DIR_ARGS)
+ .required()
+ .longOpt(TARGET_DIR_ARGS_NAME)
+ .hasArg()
+ .argName(TARGET_DIR_NAME)
+ .desc(TARGET_DIR_DESC)
+ .build();
+ options.addOption(opTargetFile);
+
+ Option targetPathPattern =
+ Option.builder(TARGET_PATH_ARGS)
+ .longOpt(TARGET_PATH_ARGS_NAME)
+ .hasArg()
+ .argName(TARGET_PATH_NAME)
+ .desc(TARGET_PATH_DESC)
+ .build();
+ options.addOption(targetPathPattern);
+
+ Option targetFileName =
+ Option.builder(TARGET_FILE_ARGS)
+ .longOpt(TARGET_FILE_NAME)
+ .hasArg()
+ .argName(TARGET_FILE_NAME)
+ .desc(TARGET_FILE_DESC)
+ .build();
+ options.addOption(targetFileName);
+
+ Option opLinesPerFile =
+ Option.builder(LINES_PER_FILE_ARGS)
+ .longOpt(LINES_PER_FILE_NAME)
+ .hasArg()
+ .argName(LINES_PER_FILE_NAME)
+ .desc(LINES_PER_FILE_DESC)
+ .build();
+ options.addOption(opLinesPerFile);
+
+ Option opTimeout =
+ Option.builder(TIMEOUT_ARGS)
+ .longOpt(TIMEOUT_NAME)
+ .hasArg()
+ .argName(TIMEOUT_ARGS)
+ .desc(TIMEOUT_DESC)
+ .build();
+ options.addOption(opTimeout);
+
+ Option opDatabase =
+
Option.builder(DB_ARGS).longOpt(DB_NAME).argName(DB_ARGS).hasArg().desc(DB_DESC).build();
+ options.addOption(opDatabase);
+
+ Option opTable =
+ Option.builder(TABLE_ARGS)
+ .longOpt(TABLE_ARGS)
+ .argName(TABLE_ARGS)
+ .hasArg()
+ .desc(TABLE_DESC_EXPORT)
+ .build();
+ options.addOption(opTable);
+
+ Option opHelp =
Option.builder(HELP_ARGS).longOpt(HELP_ARGS).desc(HELP_DESC).build();
+ options.addOption(opHelp);
+
+ return options;
+ }
+
+ public static Options createImportSchemaOptions() {
+ Options options = createCommonOptions(new Options());
+
+ Option opFile =
+ Option.builder(FILE_ARGS)
+ .required()
+ .longOpt(FILE_NAME)
+ .argName(FILE_NAME)
+ .hasArg()
+ .desc(FILE_DESC)
+ .build();
+ options.addOption(opFile);
+
+ Option opFailedFile =
+ Option.builder(FAILED_FILE_ARGS)
+ .longOpt(FAILED_FILE_NAME)
+ .hasArg()
+ .argName(FAILED_FILE_ARGS_NAME)
+ .desc(FAILED_FILE_DESC)
+ .build();
+ options.addOption(opFailedFile);
+
+ Option opBatchPointSize =
+ Option.builder(BATCH_POINT_SIZE_ARGS)
+ .longOpt(BATCH_POINT_SIZE_NAME)
+ .hasArg()
+ .argName(BATCH_POINT_SIZE_ARGS_NAME)
+ .desc(BATCH_POINT_SIZE_LIMIT_DESC)
+ .build();
+ options.addOption(opBatchPointSize);
+
+ Option opFailedLinesPerFile =
+ Option.builder(LINES_PER_FAILED_FILE_ARGS)
+ .longOpt(LINES_PER_FAILED_FILE_ARGS_NAME)
+ .hasArg()
+ .argName(LINES_PER_FAILED_FILE_ARGS_NAME)
+ .desc(LINES_PER_FAILED_FILE_DESC)
+ .build();
+ options.addOption(opFailedLinesPerFile);
+
+ Option opDatabase =
+
Option.builder(DB_ARGS).longOpt(DB_NAME).argName(DB_ARGS).hasArg().desc(DB_DESC).build();
+ options.addOption(opDatabase);
+
+ Option opHelp =
+
Option.builder(Constants.HELP_ARGS).longOpt(Constants.HELP_ARGS).desc(HELP_DESC).build();
+ options.addOption(opHelp);
+
+ return options;
+ }
}
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/AbstractImportData.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/AbstractImportData.java
index 769d5fd14a0..fa80c8feb20 100644
---
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/AbstractImportData.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/AbstractImportData.java
@@ -89,7 +89,7 @@ public abstract class AbstractImportData extends
AbstractDataTool implements Run
ioTPrinter.println("ImportData thread join interrupted: " +
e.getMessage());
}
});
- ioTPrinter.println("Import completely!");
+ ioTPrinter.println(Constants.IMPORT_COMPLETELY);
}
}
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportData.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportData.java
index 8bd215768cd..b31bc8d90b9 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportData.java
+++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportData.java
@@ -220,7 +220,6 @@ public class ExportData extends AbstractDataTool {
System.exit(Constants.CODE_ERROR);
}
AbstractExportData exportData;
- // check timePrecision by session
exportData = new ExportDataTree();
exportData.init();
if (!sqlDialectTree) {
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportDataTable.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportDataTable.java
index 8f4ffd1aa51..afe7f1dbd48 100644
---
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportDataTable.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportDataTable.java
@@ -141,7 +141,7 @@ public class ExportDataTable extends AbstractExportData {
exportToCsvFile(sessionDataSet, path);
}
sessionDataSet.closeOperationHandle();
- ioTPrinter.println("Export completely!");
+ ioTPrinter.println(Constants.EXPORT_COMPLETELY);
} catch (StatementExecutionException
| IoTDBConnectionException
| IOException
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportDataTree.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportDataTree.java
index 2777389d3f5..c9ff2d28dd1 100644
---
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportDataTree.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/data/ExportDataTree.java
@@ -108,7 +108,7 @@ public class ExportDataTree extends AbstractExportData {
exportToCsvFile(sessionDataSet, path);
}
sessionDataSet.closeOperationHandle();
- ioTPrinter.println("Export completely!");
+ ioTPrinter.println(Constants.EXPORT_COMPLETELY);
} catch (StatementExecutionException
| IoTDBConnectionException
| IOException
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/AbstractExportSchema.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/AbstractExportSchema.java
new file mode 100644
index 00000000000..902198a7ad2
--- /dev/null
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/AbstractExportSchema.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tool.schema;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+
+public abstract class AbstractExportSchema extends AbstractSchemaTool {
+
+ public abstract void init()
+ throws InterruptedException, IoTDBConnectionException,
StatementExecutionException;
+
+ protected abstract void exportSchemaToSqlFile();
+
+ protected abstract void exportSchemaToCsvFile(String pathPattern, int index);
+}
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/AbstractImportSchema.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/AbstractImportSchema.java
new file mode 100644
index 00000000000..881cc08bdfd
--- /dev/null
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/AbstractImportSchema.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tool.schema;
+
+import org.apache.iotdb.cli.utils.IoTPrinter;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.tool.common.Constants;
+import org.apache.iotdb.tool.data.ImportDataScanTool;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class AbstractImportSchema extends AbstractSchemaTool
implements Runnable {
+
+ private static final IoTPrinter ioTPrinter = new IoTPrinter(System.out);
+
+ public abstract void init()
+ throws InterruptedException, IoTDBConnectionException,
StatementExecutionException;
+
+ @Override
+ public void run() {
+ String filePath;
+ try {
+ while ((filePath = ImportDataScanTool.pollFromQueue()) != null) {
+ File file = new File(filePath);
+ if (!sqlDialectTree && file.getName().endsWith(Constants.SQL_SUFFIXS))
{
+ importSchemaFromSqlFile(file);
+ } else if (sqlDialectTree &&
file.getName().endsWith(Constants.CSV_SUFFIXS)) {
+ importSchemaFromCsvFile(file);
+ } else {
+ ioTPrinter.println(
+ file.getName()
+ + " : The file name must end with \"csv\" when sql_dialect
tree or \"sql\" when sql_dialect table!");
+ }
+ }
+ } catch (Exception e) {
+ ioTPrinter.println("Unexpected error occurred: " + e.getMessage());
+ }
+ }
+
+ protected abstract Runnable getAsyncImportRunnable();
+
+ protected class ThreadManager {
+ public void asyncImportSchemaFiles() {
+ List<Thread> list = new ArrayList<>(threadNum);
+ for (int i = 0; i < threadNum; i++) {
+ Thread thread = new Thread(getAsyncImportRunnable());
+ thread.start();
+ list.add(thread);
+ }
+ list.forEach(
+ thread -> {
+ try {
+ thread.join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ ioTPrinter.println("ImportData thread join interrupted: " +
e.getMessage());
+ }
+ });
+ ioTPrinter.println(Constants.IMPORT_COMPLETELY);
+ }
+ }
+
+ public static void init(AbstractImportSchema instance) {
+ instance.new ThreadManager().asyncImportSchemaFiles();
+ }
+
+ protected abstract void importSchemaFromSqlFile(File file);
+
+ protected abstract void importSchemaFromCsvFile(File file);
+
+ protected void processSuccessFile() {
+ loadFileSuccessfulNum.increment();
+ }
+}
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/AbstractSchemaTool.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/AbstractSchemaTool.java
index 00d926c7074..23a9127a427 100644
---
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/AbstractSchemaTool.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/AbstractSchemaTool.java
@@ -22,10 +22,9 @@ package org.apache.iotdb.tool.schema;
import org.apache.iotdb.cli.utils.IoTPrinter;
import org.apache.iotdb.exception.ArgsErrorException;
import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tool.common.Constants;
import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.apache.commons.csv.QuoteMode;
@@ -35,48 +34,33 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.PrintWriter;
-import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.atomic.LongAdder;
public abstract class AbstractSchemaTool {
- protected static final String HOST_ARGS = "h";
- protected static final String HOST_NAME = "host";
- protected static final String HOST_DEFAULT_VALUE = "127.0.0.1";
-
- protected static final String HELP_ARGS = "help";
-
- protected static final String PORT_ARGS = "p";
- protected static final String PORT_NAME = "port";
- protected static final String PORT_DEFAULT_VALUE = "6667";
-
- protected static final String PW_ARGS = "pw";
- protected static final String PW_NAME = "password";
- protected static final String PW_DEFAULT_VALUE = "root";
-
- protected static final String USERNAME_ARGS = "u";
- protected static final String USERNAME_NAME = "username";
- protected static final String USERNAME_DEFAULT_VALUE = "root";
-
- protected static final String TIMEOUT_ARGS = "timeout";
- protected static final String TIMEOUT_ARGS_NAME = "queryTimeout";
-
- protected static final int MAX_HELP_CONSOLE_WIDTH = 92;
-
- protected static final int CODE_OK = 0;
- protected static final int CODE_ERROR = 1;
-
protected static String host;
protected static String port;
+ protected static String table;
+ protected static String database;
protected static String username;
protected static String password;
-
- protected static String aligned;
protected static Session session;
+ protected static String queryPath;
+ protected static int threadNum = 8;
+ protected static String targetPath;
+ protected static Boolean sqlDialectTree = true;
+ protected static long timeout = 60000;
+ protected static String targetDirectory;
+ protected static Boolean aligned = false;
+ protected static int linesPerFile = 10000;
+ protected static String failedFileDirectory = null;
+ protected static int batchPointSize = Constants.BATCH_POINT_SIZE;
+ protected static int linesPerFailedFile = Constants.BATCH_POINT_SIZE;
+ protected static String targetFile = Constants.DUMP_FILE_NAME_DEFAULT;
+ protected static final LongAdder loadFileSuccessfulNum = new LongAdder();
- protected static final List<String> HEAD_COLUMNS =
- Arrays.asList("Timeseries", "Alias", "DataType", "Encoding",
"Compression");
private static final IoTPrinter ioTPrinter = new IoTPrinter(System.out);
private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractSchemaTool.class);
@@ -91,63 +75,27 @@ public abstract class AbstractSchemaTool {
return defaultValue;
}
String msg = String.format("Required values for option '%s' not
provided", name);
- LOGGER.info(msg);
- LOGGER.info("Use -help for more information");
throw new ArgsErrorException(msg);
}
return str;
}
protected static void parseBasicParams(CommandLine commandLine) throws
ArgsErrorException {
- host = checkRequiredArg(HOST_ARGS, HOST_NAME, commandLine,
HOST_DEFAULT_VALUE);
- port = checkRequiredArg(PORT_ARGS, PORT_NAME, commandLine,
PORT_DEFAULT_VALUE);
- username = checkRequiredArg(USERNAME_ARGS, USERNAME_NAME, commandLine,
USERNAME_DEFAULT_VALUE);
- password = checkRequiredArg(PW_ARGS, PW_NAME, commandLine,
PW_DEFAULT_VALUE);
- }
-
- protected static Options createNewOptions() {
- Options options = new Options();
-
- Option opHost =
- Option.builder(HOST_ARGS)
- .longOpt(HOST_NAME)
- .optionalArg(true)
- .argName(HOST_NAME)
- .hasArg()
- .desc("Host Name (optional)")
- .build();
- options.addOption(opHost);
-
- Option opPort =
- Option.builder(PORT_ARGS)
- .longOpt(PORT_NAME)
- .optionalArg(true)
- .argName(PORT_NAME)
- .hasArg()
- .desc("Port (optional)")
- .build();
- options.addOption(opPort);
-
- Option opUsername =
- Option.builder(USERNAME_ARGS)
- .longOpt(USERNAME_NAME)
- .optionalArg(true)
- .argName(USERNAME_NAME)
- .hasArg()
- .desc("Username (optional)")
- .build();
- options.addOption(opUsername);
-
- Option opPassword =
- Option.builder(PW_ARGS)
- .longOpt(PW_NAME)
- .optionalArg(true)
- .argName(PW_NAME)
- .hasArg()
- .desc("Password (optional)")
- .build();
- options.addOption(opPassword);
- return options;
+ host =
+ checkRequiredArg(
+ Constants.HOST_ARGS, Constants.HOST_NAME, commandLine,
Constants.HOST_DEFAULT_VALUE);
+ port =
+ checkRequiredArg(
+ Constants.PORT_ARGS, Constants.PORT_NAME, commandLine,
Constants.PORT_DEFAULT_VALUE);
+ username =
+ checkRequiredArg(
+ Constants.USERNAME_ARGS,
+ Constants.USERNAME_NAME,
+ commandLine,
+ Constants.USERNAME_DEFAULT_VALUE);
+ password =
+ checkRequiredArg(
+ Constants.PW_ARGS, Constants.PW_NAME, commandLine,
Constants.PW_DEFAULT_VALUE);
}
/**
@@ -160,7 +108,7 @@ public abstract class AbstractSchemaTool {
try {
final CSVPrinterWrapper csvPrinterWrapper = new
CSVPrinterWrapper(filePath);
for (List<Object> CsvRecord : records) {
- csvPrinterWrapper.printRecordln(CsvRecord);
+ csvPrinterWrapper.printRecordLn(CsvRecord);
}
csvPrinterWrapper.flush();
csvPrinterWrapper.close();
@@ -194,7 +142,7 @@ public abstract class AbstractSchemaTool {
csvPrinter.printRecord(values);
}
- public void printRecordln(final Iterable<?> values) throws IOException {
+ public void printRecordLn(final Iterable<?> values) throws IOException {
if (csvPrinter == null) {
csvPrinter = csvFormat.print(new PrintWriter(filePath));
}
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ExportSchema.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ExportSchema.java
index 54453acd898..9db300ddd12 100644
---
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ExportSchema.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ExportSchema.java
@@ -24,316 +24,167 @@ import org.apache.iotdb.cli.utils.CliContext;
import org.apache.iotdb.cli.utils.IoTPrinter;
import org.apache.iotdb.cli.utils.JlineUtils;
import org.apache.iotdb.exception.ArgsErrorException;
-import org.apache.iotdb.isession.SessionDataSet;
-import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tool.common.Constants;
+import org.apache.iotdb.tool.common.OptionsUtil;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang3.StringUtils;
-import org.apache.tsfile.read.common.Field;
-import org.apache.tsfile.read.common.RowRecord;
import org.jline.reader.LineReader;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
-import java.util.List;
-
-import static org.apache.iotdb.commons.schema.SchemaConstant.SYSTEM_DATABASE;
/** Export Schema CSV file. */
public class ExportSchema extends AbstractSchemaTool {
- private static final String TARGET_DIR_ARGS = "t";
- private static final String TARGET_DIR_ARGS_NAME = "target";
- private static final String TARGET_DIR_NAME = "targetDir";
-
- private static final String TARGET_PATH_ARGS = "path";
- private static final String TARGET_PATH_ARGS_NAME = "path_pattern";
- private static final String TARGET_PATH_NAME = "exportPathPattern";
- private static String queryPath;
-
- private static final String TARGET_FILE_ARGS = "pf";
- private static final String TARGET_FILE_ARGS_NAME = "path_pattern_file";
- private static final String TARGET_FILE_NAME = "exportPathPatternFile";
-
- private static final String LINES_PER_FILE_ARGS = "lpf";
- private static final String LINES_PER_FILE_ARGS_NAME = "lines_per_file";
- private static final String LINES_PER_FILE_NAME = "linesPerFile";
- private static int linesPerFile = 10000;
-
- private static final String EXPORT_SCHEMA_CLI_PREFIX = "ExportSchema";
-
- private static final String DUMP_FILE_NAME_DEFAULT = "dump";
- private static String targetFile = DUMP_FILE_NAME_DEFAULT;
-
- private static String targetDirectory;
-
- private static long timeout = 60000;
-
private static final IoTPrinter ioTPrinter = new IoTPrinter(System.out);
- private static final String BASE_VIEW_TYPE = "BASE";
- private static final String HEADER_VIEW_TYPE = "ViewType";
- private static final String HEADER_TIMESERIES = "Timeseries";
-
@SuppressWarnings({
"squid:S3776",
"squid:S2093"
}) // Suppress high Cognitive Complexity warning, ignore try-with-resources
/* main function of export csv tool. */
public static void main(String[] args) {
- Options options = createOptions();
+ Options options = OptionsUtil.createExportSchemaOptions();
HelpFormatter hf = new HelpFormatter();
CommandLine commandLine = null;
CommandLineParser parser = new DefaultParser();
hf.setOptionComparator(null); // avoid reordering
- hf.setWidth(MAX_HELP_CONSOLE_WIDTH);
+ hf.setWidth(org.apache.iotdb.tool.common.Constants.MAX_HELP_CONSOLE_WIDTH);
if (args == null || args.length == 0) {
- ioTPrinter.println("Too few params input, please check the following
hint.");
- hf.printHelp(EXPORT_SCHEMA_CLI_PREFIX, options, true);
- System.exit(CODE_ERROR);
+ ioTPrinter.println(Constants.SCHEMA_CLI_CHECK_IN_HEAD);
+ hf.printHelp(Constants.EXPORT_SCHEMA_CLI_PREFIX, options, true);
+ System.exit(Constants.CODE_ERROR);
}
try {
commandLine = parser.parse(options, args);
} catch (ParseException e) {
ioTPrinter.println(e.getMessage());
- hf.printHelp(EXPORT_SCHEMA_CLI_PREFIX, options, true);
- System.exit(CODE_ERROR);
+ hf.printHelp(Constants.EXPORT_SCHEMA_CLI_PREFIX, options, true);
+ System.exit(Constants.CODE_ERROR);
}
- if (commandLine.hasOption(HELP_ARGS)) {
- hf.printHelp(EXPORT_SCHEMA_CLI_PREFIX, options, true);
- System.exit(CODE_ERROR);
+ if (commandLine.hasOption(Constants.HELP_ARGS)) {
+ hf.printHelp(Constants.EXPORT_SCHEMA_CLI_PREFIX, options, true);
+ System.exit(Constants.CODE_ERROR);
}
- int exitCode = CODE_OK;
try {
parseBasicParams(commandLine);
parseSpecialParams(commandLine);
- session = new Session(host, Integer.parseInt(port), username, password);
- session.open(false);
- if (queryPath == null) {
- String pathFile = commandLine.getOptionValue(TARGET_FILE_ARGS);
- String path;
- if (pathFile == null) {
+ } catch (ArgsErrorException e) {
+ ioTPrinter.println("Args args: " + e.getMessage());
+ ioTPrinter.println("Use -help for more information");
+ System.exit(Constants.CODE_ERROR);
+ } catch (Exception e) {
+ ioTPrinter.println("Encounter an error, because " + e.getMessage());
+ System.exit(Constants.CODE_ERROR);
+ }
+ System.exit(exportToTargetPath());
+ }
+
+ private static int exportToTargetPath() {
+ AbstractExportSchema exportSchema;
+ try {
+ if (sqlDialectTree) {
+ exportSchema = new ExportSchemaTree();
+ exportSchema.init();
+ if (sqlDialectTree && queryPath == null) {
LineReader lineReader =
JlineUtils.getLineReader(
new CliContext(System.in, System.out, System.err,
ExitType.EXCEPTION),
username,
host,
port);
- path = lineReader.readLine(EXPORT_SCHEMA_CLI_PREFIX + "> please
input path pattern: ");
- ioTPrinter.println(path);
- String[] values = path.trim().split(";");
+ String pathPattern =
+ lineReader.readLine(Constants.EXPORT_CLI_PREFIX + "> please
input path pattern: ");
+ ioTPrinter.println(pathPattern);
+ String[] values = pathPattern.trim().split(";");
for (int i = 0; i < values.length; i++) {
if (StringUtils.isBlank(values[i])) {
continue;
} else {
- dumpResult(values[i], i);
+ exportSchema.exportSchemaToCsvFile(values[i], i);
}
}
- } else if (!pathFile.endsWith(".txt")) {
- ioTPrinter.println("The file name must end with \"txt\"!");
- hf.printHelp(EXPORT_SCHEMA_CLI_PREFIX, options, true);
- System.exit(CODE_ERROR);
} else {
- dumpFromPathFile(pathFile);
+ exportSchema.exportSchemaToCsvFile(queryPath, 0);
}
} else {
- dumpResult(queryPath, 0);
- }
- } catch (IOException e) {
- ioTPrinter.println("Failed to operate on file, because " +
e.getMessage());
- exitCode = CODE_ERROR;
- } catch (ArgsErrorException e) {
- ioTPrinter.println("Invalid args: " + e.getMessage());
- exitCode = CODE_ERROR;
- } catch (IoTDBConnectionException e) {
- ioTPrinter.println("Connect failed because " + e.getMessage());
- exitCode = CODE_ERROR;
- } finally {
- if (session != null) {
- try {
- session.close();
- } catch (IoTDBConnectionException e) {
- exitCode = CODE_ERROR;
- ioTPrinter.println(
- "Encounter an error when closing session, error is: " +
e.getMessage());
- }
+ exportSchema = new ExportSchemaTable();
+ exportSchema.init();
+ exportSchema.exportSchemaToSqlFile();
}
+ return Constants.CODE_OK;
+ } catch (InterruptedException e) {
+ ioTPrinter.println(String.format("Export schema fail: %s",
e.getMessage()));
+ Thread.currentThread().interrupt();
+ return Constants.CODE_ERROR;
+ } catch (Exception e) {
+ ioTPrinter.println(String.format("Export schema fail: %s",
e.getMessage()));
+ return Constants.CODE_ERROR;
}
- System.exit(exitCode);
}
- private static void parseSpecialParams(CommandLine commandLine) throws
ArgsErrorException {
- targetDirectory = checkRequiredArg(TARGET_DIR_ARGS, TARGET_DIR_ARGS_NAME,
commandLine, null);
- queryPath = commandLine.getOptionValue(TARGET_PATH_ARGS);
- String timeoutString = commandLine.getOptionValue(TIMEOUT_ARGS);
- if (timeoutString != null) {
- timeout = Long.parseLong(timeoutString);
- }
- if (targetFile == null) {
- targetFile = DUMP_FILE_NAME_DEFAULT;
- }
- if (!targetDirectory.endsWith("/") && !targetDirectory.endsWith("\\")) {
- targetDirectory += File.separator;
- }
- if (commandLine.getOptionValue(LINES_PER_FILE_ARGS) != null) {
- linesPerFile =
Integer.parseInt(commandLine.getOptionValue(LINES_PER_FILE_ARGS));
- }
- }
-
- /**
- * commandline option create.
- *
- * @return object Options
- */
- private static Options createOptions() {
- Options options = createNewOptions();
-
- Option opTargetFile =
- Option.builder(TARGET_DIR_ARGS)
- .required()
- .longOpt(TARGET_DIR_ARGS_NAME)
- .hasArg()
- .argName(TARGET_DIR_NAME)
- .desc("Target File Directory (required)")
- .build();
- options.addOption(opTargetFile);
-
- Option targetPathPattern =
- Option.builder(TARGET_PATH_ARGS)
- .longOpt(TARGET_PATH_ARGS_NAME)
- .hasArg()
- .argName(TARGET_PATH_NAME)
- .desc("Export Path Pattern (optional)")
- .build();
- options.addOption(targetPathPattern);
-
- Option targetFileName =
- Option.builder(TARGET_FILE_ARGS)
- .longOpt(TARGET_FILE_ARGS_NAME)
- .hasArg()
- .argName(TARGET_FILE_NAME)
- .desc("Export File Name (optional)")
- .build();
- options.addOption(targetFileName);
-
- Option opLinesPerFile =
- Option.builder(LINES_PER_FILE_ARGS)
- .longOpt(LINES_PER_FILE_ARGS_NAME)
- .hasArg()
- .argName(LINES_PER_FILE_NAME)
- .desc("Lines per dump file.")
- .build();
- options.addOption(opLinesPerFile);
-
- Option opTimeout =
- Option.builder(TIMEOUT_ARGS)
- .longOpt(TIMEOUT_ARGS_NAME)
- .hasArg()
- .argName(TIMEOUT_ARGS)
- .desc(timeout + " Timeout for session query")
- .build();
- options.addOption(opTimeout);
-
- Option opHelp =
- Option.builder(HELP_ARGS).longOpt(HELP_ARGS).desc("Display help
information").build();
- options.addOption(opHelp);
- return options;
- }
-
- /**
- * This method will be called, if the query commands are written in a sql
file.
- *
- * @param pathFile sql file path
- * @throws IOException exception
- */
- private static void dumpFromPathFile(String pathFile) throws IOException {
+ private static void dumpFromPathFile(AbstractExportSchema exportSchema,
String pathFile)
+ throws IOException {
try (BufferedReader reader = new BufferedReader(new FileReader(pathFile)))
{
String path;
int index = 0;
while ((path = reader.readLine()) != null) {
- dumpResult(path, index);
+ exportSchema.exportSchemaToCsvFile(path, index);
index++;
}
}
}
- /**
- * Dump files from database to CSV file.
- *
- * @param pattern used to be export schema
- * @param index used to create dump file name
- */
- private static void dumpResult(String pattern, int index) {
- File file = new File(targetDirectory);
- if (!file.isDirectory()) {
- file.mkdir();
+ private static void parseSpecialParams(CommandLine commandLine) throws
ArgsErrorException {
+ targetDirectory =
+ checkRequiredArg(Constants.TARGET_DIR_ARGS, Constants.TARGET_DIR_NAME,
commandLine, null);
+ queryPath = commandLine.getOptionValue(Constants.TARGET_PATH_ARGS);
+ targetFile = commandLine.getOptionValue(Constants.TARGET_FILE_ARGS);
+ targetFile =
+ checkRequiredArg(
+ Constants.TARGET_FILE_ARGS,
+ Constants.TARGET_FILE_NAME,
+ commandLine,
+ Constants.DUMP_FILE_NAME_DEFAULT);
+ String timeoutString = commandLine.getOptionValue(Constants.TIMEOUT_ARGS);
+ if (timeoutString != null) {
+ timeout = Long.parseLong(timeoutString);
}
- final String path = targetDirectory + targetFile + index;
- try {
- SessionDataSet sessionDataSet =
- session.executeQueryStatement("show timeseries " + pattern, timeout);
- writeCsvFile(sessionDataSet, path, sessionDataSet.getColumnNames(),
linesPerFile);
- sessionDataSet.closeOperationHandle();
- ioTPrinter.println("Export completely!");
- } catch (StatementExecutionException | IoTDBConnectionException |
IOException e) {
- ioTPrinter.println("Cannot dump result because: " + e.getMessage());
+ if (!targetDirectory.endsWith("/") && !targetDirectory.endsWith("\\")) {
+ targetDirectory += File.separator;
}
- }
-
- @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
- public static void writeCsvFile(
- SessionDataSet sessionDataSet, String filePath, List<String> headers,
int linesPerFile)
- throws IOException, IoTDBConnectionException,
StatementExecutionException {
- int viewTypeIndex = headers.indexOf(HEADER_VIEW_TYPE);
- int timeseriesIndex = headers.indexOf(HEADER_TIMESERIES);
-
- int fileIndex = 0;
- boolean hasNext = true;
- while (hasNext) {
- int i = 0;
- final String finalFilePath = filePath + "_" + fileIndex + ".csv";
- final CSVPrinterWrapper csvPrinterWrapper = new
CSVPrinterWrapper(finalFilePath);
- while (i++ < linesPerFile) {
- if (sessionDataSet.hasNext()) {
- if (i == 1) {
- csvPrinterWrapper.printRecord(HEAD_COLUMNS);
- }
- RowRecord rowRecord = sessionDataSet.next();
- List<Field> fields = rowRecord.getFields();
- if
(fields.get(timeseriesIndex).getStringValue().startsWith(SYSTEM_DATABASE + ".")
- ||
!fields.get(viewTypeIndex).getStringValue().equals(BASE_VIEW_TYPE)) {
- continue;
- }
- HEAD_COLUMNS.forEach(
- column -> {
- Field field = fields.get(headers.indexOf(column));
- String fieldStringValue = field.getStringValue();
- if (!"null".equals(field.getStringValue())) {
- csvPrinterWrapper.print(fieldStringValue);
- } else {
- csvPrinterWrapper.print("");
- }
- });
- csvPrinterWrapper.println();
- } else {
- hasNext = false;
- break;
- }
+ if (commandLine.getOptionValue(Constants.LINES_PER_FILE_ARGS) != null) {
+ linesPerFile =
Integer.parseInt(commandLine.getOptionValue(Constants.LINES_PER_FILE_ARGS));
+ }
+ database = commandLine.getOptionValue(Constants.DB_ARGS);
+ table = commandLine.getOptionValue(Constants.TABLE_ARGS);
+ String sqlDialectValue =
+ checkRequiredArg(
+ Constants.SQL_DIALECT_ARGS,
+ Constants.SQL_DIALECT_ARGS,
+ commandLine,
+ Constants.SQL_DIALECT_VALUE_TREE);
+ if (Constants.SQL_DIALECT_VALUE_TABLE.equalsIgnoreCase(sqlDialectValue)) {
+ sqlDialectTree = false;
+ if (StringUtils.isBlank(database)) {
+ ioTPrinter.println(
+ String.format("The database param is required when sql_dialect is
table "));
+ System.exit(Constants.CODE_ERROR);
+ } else if (StringUtils.isNotBlank(database)
+ && "information_schema".equalsIgnoreCase(database)) {
+ ioTPrinter.println(
+ String.format("Does not support exporting system databases %s",
database));
+ System.exit(Constants.CODE_ERROR);
}
- fileIndex++;
- csvPrinterWrapper.flush();
- csvPrinterWrapper.close();
}
}
}
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ExportSchemaTable.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ExportSchemaTable.java
new file mode 100644
index 00000000000..b7fc6a5ca2c
--- /dev/null
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ExportSchemaTable.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tool.schema;
+
+import org.apache.iotdb.cli.utils.IoTPrinter;
+import org.apache.iotdb.isession.ITableSession;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.isession.pool.ITableSessionPool;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.pool.TableSessionPoolBuilder;
+import org.apache.iotdb.tool.common.Constants;
+
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.tsfile.read.common.Field;
+import org.apache.tsfile.read.common.RowRecord;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ExportSchemaTable extends AbstractExportSchema {
+
+ private static final IoTPrinter ioTPrinter = new IoTPrinter(System.out);
+ private static ITableSessionPool sessionPool;
+ private static Map<String, String> tableCommentList = new HashMap<>();
+
+ public void init() throws InterruptedException {
+ sessionPool =
+ new TableSessionPoolBuilder()
+ .nodeUrls(Collections.singletonList(host + ":" + port))
+ .user(username)
+ .password(password)
+ .maxSize(threadNum + 1)
+ .enableCompression(false)
+ .enableRedirection(false)
+ .enableAutoFetch(false)
+ .database(database)
+ .build();
+ SessionDataSet sessionDataSet = null;
+ try (ITableSession session = sessionPool.getSession()) {
+ sessionDataSet =
+ session.executeQueryStatement(
+ String.format(Constants.EXPORT_SCHEMA_TABLES_SELECT, database));
+ checkDataBaseAndParseTablesBySelectSchema(sessionDataSet);
+ } catch (StatementExecutionException | IoTDBConnectionException e) {
+ try {
+ sessionDataSet =
+ session.executeQueryStatement(
+ String.format(Constants.EXPORT_SCHEMA_TABLES_SHOW, database));
+ checkDataBaseAndParseTablesByShowSchema(sessionDataSet);
+ } catch (IoTDBConnectionException | StatementExecutionException e1) {
+ ioTPrinter.println(Constants.INSERT_SQL_MEET_ERROR_MSG +
e.getMessage());
+ System.exit(1);
+ }
+ } finally {
+ if (ObjectUtils.isNotEmpty(sessionDataSet)) {
+ try {
+ sessionDataSet.close();
+ } catch (Exception e) {
+ }
+ }
+ }
+ }
+
+ private static void checkDataBaseAndParseTablesBySelectSchema(SessionDataSet
sessionDataSet)
+ throws StatementExecutionException, IoTDBConnectionException {
+ Set<String> databases = new HashSet<>();
+ HashMap<String, String> tables = new HashMap<>();
+ while (sessionDataSet.hasNext()) {
+ RowRecord rowRecord = sessionDataSet.next();
+ databases.add(rowRecord.getField(0).getStringValue());
+ String comment = rowRecord.getField(4).getStringValue();
+ tables.putIfAbsent(
+ rowRecord.getField(1).getStringValue(), comment.equals("null") ?
null : comment);
+ }
+ if (!databases.contains(database)) {
+
ioTPrinter.println(String.format(Constants.TARGET_DATABASE_NOT_EXIST_MSG,
database));
+ System.exit(1);
+ }
+ if (StringUtils.isNotBlank(table)) {
+ if (!tables.containsKey(table)) {
+ ioTPrinter.println(String.format(Constants.TARGET_TABLE_NOT_EXIST_MSG,
database));
+ System.exit(1);
+ }
+ tableCommentList.put(table, tables.get(table));
+ } else {
+ tableCommentList.putAll(tables);
+ }
+ }
+
+ private static void checkDataBaseAndParseTablesByShowSchema(SessionDataSet
sessionDataSet)
+ throws StatementExecutionException, IoTDBConnectionException {
+ HashMap<String, String> tables = new HashMap<>();
+ while (sessionDataSet.hasNext()) {
+ RowRecord rowRecord = sessionDataSet.next();
+ String comment = rowRecord.getField(3).getStringValue();
+ tables.putIfAbsent(
+ rowRecord.getField(0).getStringValue(), comment.equals("null") ?
null : comment);
+ }
+ if (MapUtils.isNotEmpty(tables)) {
+ if (!tables.containsKey(table)) {
+ ioTPrinter.println(String.format(Constants.TARGET_TABLE_NOT_EXIST_MSG,
database));
+ System.exit(1);
+ }
+ tableCommentList.put(table, tables.get(table));
+ } else {
+ tableCommentList.putAll(tables);
+ }
+ }
+
+ @Override
+ protected void exportSchemaToSqlFile() {
+ File file = new File(targetDirectory);
+ if (!file.isDirectory()) {
+ file.mkdir();
+ }
+ String fileName = targetDirectory + targetFile + "_" + database + ".sql";
+ final Iterator<String> iterator = tableCommentList.keySet().iterator();
+ while (iterator.hasNext()) {
+ String tableName = iterator.next();
+ String comment = tableCommentList.get(tableName);
+ SessionDataSet sessionDataSet = null;
+ try (ITableSession session = sessionPool.getSession()) {
+ sessionDataSet =
+ session.executeQueryStatement(
+ String.format(Constants.EXPORT_SCHEMA_COLUMNS_SELECT,
database, tableName));
+ exportSchemaBySelect(sessionDataSet, fileName, tableName, comment);
+ } catch (IoTDBConnectionException | StatementExecutionException |
IOException e) {
+ try {
+ sessionDataSet =
+ session.executeQueryStatement(
+ String.format(Constants.EXPORT_SCHEMA_COLUMNS_DESC,
database, tableName));
+ exportSchemaByDesc(sessionDataSet, fileName, tableName, comment);
+ } catch (IoTDBConnectionException | StatementExecutionException |
IOException e1) {
+ ioTPrinter.println(Constants.COLUMN_SQL_MEET_ERROR_MSG +
e.getMessage());
+ }
+ } finally {
+ if (ObjectUtils.isNotEmpty(sessionDataSet)) {
+ try {
+ sessionDataSet.close();
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ ;
+ }
+ }
+ }
+ }
+ }
+
+ private void exportSchemaByDesc(
+ SessionDataSet sessionDataSet, String fileName, String tableName, String
tableComment)
+ throws IoTDBConnectionException, StatementExecutionException,
IOException {
+ String dropSql = "DROP TABLE IF EXISTS " + tableName + ";\n";
+ StringBuilder sb = new StringBuilder(dropSql);
+ sb.append("CREATE TABLE " + tableName + "(\n");
+ try (FileWriter writer = new FileWriter(fileName)) {
+ boolean hasNext = sessionDataSet.hasNext();
+ while (hasNext) {
+ RowRecord rowRecord = sessionDataSet.next();
+ hasNext = sessionDataSet.hasNext();
+ List<Field> fields = rowRecord.getFields();
+ String columnName = fields.get(0).getStringValue();
+ String dataType = fields.get(1).getStringValue();
+ String category = fields.get(2).getStringValue();
+ String comment = fields.get(4).getStringValue();
+ comment = comment.equals("null") ? null : comment;
+ sb.append("\t" + columnName + " " + dataType + " " + category);
+ if (ObjectUtils.isNotEmpty(comment)) {
+ sb.append(" COMMENT '" + comment + "'");
+ }
+ if (hasNext) {
+ sb.append(",");
+ }
+ sb.append("\n");
+ }
+ sb.append("\n)");
+ if (StringUtils.isNotBlank(tableComment)) {
+ sb.append(" COMMENT '" + tableComment + "'\n");
+ }
+ sb.append(";\n");
+ writer.write(sb.toString());
+ writer.flush();
+ }
+ }
+
+ private void exportSchemaBySelect(
+ SessionDataSet sessionDataSet, String fileName, String tableName, String
tableComment)
+ throws IoTDBConnectionException, StatementExecutionException,
IOException {
+ String dropSql = "DROP TABLE IF EXISTS " + tableName + ";\n";
+ StringBuilder sb = new StringBuilder(dropSql);
+ sb.append("CREATE TABLE " + tableName + "(\n");
+ try (FileWriter writer = new FileWriter(fileName, true)) {
+ boolean hasNext = sessionDataSet.hasNext();
+ while (hasNext) {
+ RowRecord rowRecord = sessionDataSet.next();
+ hasNext = sessionDataSet.hasNext();
+ List<Field> fields = rowRecord.getFields();
+ String columnName = fields.get(2).getStringValue();
+ String dataType = fields.get(3).getStringValue();
+ String category = fields.get(4).getStringValue();
+ String comment = fields.get(6).getStringValue();
+ comment = comment.equals("null") ? null : comment;
+ sb.append("\t" + columnName + " " + dataType + " " + category);
+ if (ObjectUtils.isNotEmpty(comment)) {
+ sb.append(" COMMENT '" + comment + "'");
+ }
+ if (hasNext) {
+ sb.append(",");
+ }
+ sb.append("\n");
+ }
+ sb.append(")");
+ if (StringUtils.isNotBlank(tableComment)) {
+ sb.append(" COMMENT '" + tableComment + "'");
+ }
+ sb.append(";\n");
+ writer.append(sb.toString());
+ writer.flush();
+ }
+ }
+
+ @Override
+ protected void exportSchemaToCsvFile(String pathPattern, int index) {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+}
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ExportSchemaTree.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ExportSchemaTree.java
new file mode 100644
index 00000000000..7b8c63a4914
--- /dev/null
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ExportSchemaTree.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tool.schema;
+
+import org.apache.iotdb.cli.utils.IoTPrinter;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tool.common.Constants;
+
+import org.apache.tsfile.read.common.Field;
+import org.apache.tsfile.read.common.RowRecord;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.iotdb.commons.schema.SchemaConstant.SYSTEM_DATABASE;
+
+public class ExportSchemaTree extends AbstractExportSchema {
+
+ private static final IoTPrinter ioTPrinter = new IoTPrinter(System.out);
+ private static Session session;
+
+ public void init()
+ throws InterruptedException, IoTDBConnectionException,
StatementExecutionException {
+ session = new Session(host, Integer.parseInt(port), username, password);
+ session.open(false);
+ }
+
+ @Override
+ protected void exportSchemaToSqlFile() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ protected void exportSchemaToCsvFile(String pathPattern, int index) {
+ File file = new File(targetDirectory);
+ if (!file.isDirectory()) {
+ file.mkdir();
+ }
+ final String path = targetDirectory + targetFile + index;
+ try {
+ SessionDataSet sessionDataSet =
+ session.executeQueryStatement("show timeseries " + pathPattern,
timeout);
+ writeCsvFile(sessionDataSet, path, sessionDataSet.getColumnNames(),
linesPerFile);
+ sessionDataSet.closeOperationHandle();
+ ioTPrinter.println(Constants.EXPORT_COMPLETELY);
+ } catch (StatementExecutionException | IoTDBConnectionException |
IOException e) {
+ ioTPrinter.println("Cannot dump result because: " + e.getMessage());
+ }
+ }
+
+ private static void writeCsvFile(
+ SessionDataSet sessionDataSet, String filePath, List<String> headers,
int linesPerFile)
+ throws IOException, IoTDBConnectionException,
StatementExecutionException {
+ int viewTypeIndex = headers.indexOf(Constants.HEADER_VIEW_TYPE);
+ int timeseriesIndex = headers.indexOf(Constants.HEADER_TIMESERIES);
+
+ int fileIndex = 0;
+ boolean hasNext = true;
+ while (hasNext) {
+ int i = 0;
+ final String finalFilePath = filePath + "_" + fileIndex + ".csv";
+ final CSVPrinterWrapper csvPrinterWrapper = new
CSVPrinterWrapper(finalFilePath);
+ while (i++ < linesPerFile) {
+ if (sessionDataSet.hasNext()) {
+ if (i == 1) {
+ csvPrinterWrapper.printRecord(Constants.HEAD_COLUMNS);
+ }
+ RowRecord rowRecord = sessionDataSet.next();
+ List<Field> fields = rowRecord.getFields();
+ if
(fields.get(timeseriesIndex).getStringValue().startsWith(SYSTEM_DATABASE + ".")
+ ||
!fields.get(viewTypeIndex).getStringValue().equals(Constants.BASE_VIEW_TYPE)) {
+ continue;
+ }
+ Constants.HEAD_COLUMNS.forEach(
+ column -> {
+ Field field = fields.get(headers.indexOf(column));
+ String fieldStringValue = field.getStringValue();
+ if (!"null".equals(field.getStringValue())) {
+ csvPrinterWrapper.print(fieldStringValue);
+ } else {
+ csvPrinterWrapper.print("");
+ }
+ });
+ csvPrinterWrapper.println();
+ } else {
+ hasNext = false;
+ break;
+ }
+ }
+ fileIndex++;
+ csvPrinterWrapper.flush();
+ csvPrinterWrapper.close();
+ }
+ }
+}
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ImportSchema.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ImportSchema.java
index a7709313b1b..21e19055b73 100644
---
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ImportSchema.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ImportSchema.java
@@ -20,145 +20,38 @@
package org.apache.iotdb.tool.schema;
import org.apache.iotdb.cli.utils.IoTPrinter;
-import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.exception.ArgsErrorException;
import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tool.common.Constants;
+import org.apache.iotdb.tool.common.OptionsUtil;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.csv.CSVFormat;
-import org.apache.commons.csv.CSVParser;
-import org.apache.commons.csv.CSVRecord;
-import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.tsfile.enums.TSDataType;
-import org.apache.tsfile.file.metadata.enums.CompressionType;
-import org.apache.tsfile.file.metadata.enums.TSEncoding;
import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static org.apache.iotdb.commons.schema.SchemaConstant.SYSTEM_DATABASE;
/** Import Schema CSV file. */
public class ImportSchema extends AbstractSchemaTool {
- private static final String FILE_ARGS = "s";
- private static final String FILE_NAME = "source";
- private static final String FILE_ARGS_NAME = "sourceDir/sourceFile";
-
- private static final String FAILED_FILE_ARGS = "fd";
- private static final String FAILED_FILE_NAME = "fail_dir";
- private static final String FAILED_FILE_ARGS_NAME = "failDir";
-
- private static final String ALIGNED_ARGS = "aligned";
- private static Boolean aligned = false;
-
- private static final String BATCH_POINT_SIZE_ARGS = "batch";
- private static final String BATCH_POINT_SIZE_NAME = "batch_size";
- private static final String BATCH_POINT_SIZE_ARGS_NAME = "batchSize";
- private static int batchPointSize = 10_000;
-
- private static final String CSV_SUFFIXS = "csv";
-
- private static final String LINES_PER_FAILED_FILE_ARGS = "lpf";
- private static final String LINES_PER_FAILED_FILE_NAME = "lines_per_file";
- private static final String LINES_PER_FAILED_FILE_ARGS_NAME = "linesPerFile";
- private static final String IMPORT_SCHEMA_CLI_PREFIX = "ImportSchema";
- private static int linesPerFailedFile = 10000;
-
- private static String targetPath;
- private static String failedFileDirectory = null;
-
- private static final String INSERT_CSV_MEET_ERROR_MSG = "Meet error when
insert csv because ";
-
private static final IoTPrinter ioTPrinter = new IoTPrinter(System.out);
- /**
- * create the commandline options.
- *
- * @return object Options
- */
- private static Options createOptions() {
- Options options = createNewOptions();
-
- Option opFile =
- Option.builder(FILE_ARGS)
- .required()
- .longOpt(FILE_NAME)
- .hasArg()
- .argName(FILE_ARGS_NAME)
- .desc(
- "If input a file path, load a csv file, "
- + "otherwise load all csv file under this directory
(required)")
- .build();
- options.addOption(opFile);
-
- Option opFailedFile =
- Option.builder(FAILED_FILE_ARGS)
- .longOpt(FAILED_FILE_NAME)
- .hasArg()
- .argName(FAILED_FILE_ARGS_NAME)
- .desc(
- "Specifying a directory to save failed file, default
YOUR_CSV_FILE_PATH (optional)")
- .build();
- options.addOption(opFailedFile);
-
- Option opBatchPointSize =
- Option.builder(BATCH_POINT_SIZE_ARGS)
- .longOpt(BATCH_POINT_SIZE_NAME)
- .hasArg()
- .argName(BATCH_POINT_SIZE_ARGS_NAME)
- .desc("10000 (only not aligned optional)")
- .build();
- options.addOption(opBatchPointSize);
-
- Option opFailedLinesPerFile =
- Option.builder(LINES_PER_FAILED_FILE_ARGS)
- .longOpt(LINES_PER_FAILED_FILE_NAME)
- .hasArg()
- .argName(LINES_PER_FAILED_FILE_ARGS_NAME)
- .desc("Lines per failed file")
- .build();
- options.addOption(opFailedLinesPerFile);
-
- Option opHelp =
- Option.builder(HELP_ARGS).longOpt(HELP_ARGS).desc("Display help
information").build();
- options.addOption(opHelp);
- return options;
- }
-
/**
* parse optional params
*
* @param commandLine
*/
private static void parseSpecialParams(CommandLine commandLine) throws
ArgsErrorException {
- targetPath = commandLine.getOptionValue(FILE_ARGS);
- if (commandLine.getOptionValue(BATCH_POINT_SIZE_ARGS) != null) {
- batchPointSize =
Integer.parseInt(commandLine.getOptionValue(BATCH_POINT_SIZE_ARGS));
+ targetPath = commandLine.getOptionValue(Constants.FILE_ARGS);
+ if (commandLine.getOptionValue(Constants.BATCH_POINT_SIZE_ARGS) != null) {
+ batchPointSize =
+
Integer.parseInt(commandLine.getOptionValue(Constants.BATCH_POINT_SIZE_ARGS));
}
- if (commandLine.getOptionValue(FAILED_FILE_ARGS) != null) {
- failedFileDirectory = commandLine.getOptionValue(FAILED_FILE_ARGS);
+ if (commandLine.getOptionValue(Constants.FAILED_FILE_ARGS) != null) {
+ failedFileDirectory =
commandLine.getOptionValue(Constants.FAILED_FILE_ARGS);
File file = new File(failedFileDirectory);
if (!file.isDirectory()) {
file.mkdir();
@@ -167,481 +60,103 @@ public class ImportSchema extends AbstractSchemaTool {
failedFileDirectory += File.separator;
}
}
- if (commandLine.getOptionValue(ALIGNED_ARGS) != null) {
- aligned = Boolean.valueOf(commandLine.getOptionValue(ALIGNED_ARGS));
+ if (commandLine.getOptionValue(Constants.ALIGNED_ARGS) != null) {
+ aligned =
Boolean.valueOf(commandLine.getOptionValue(Constants.ALIGNED_ARGS));
+ }
+ if (commandLine.getOptionValue(Constants.LINES_PER_FAILED_FILE_ARGS) !=
null) {
+ linesPerFailedFile =
+
Integer.parseInt(commandLine.getOptionValue(Constants.LINES_PER_FAILED_FILE_ARGS));
}
- if (commandLine.getOptionValue(LINES_PER_FAILED_FILE_ARGS) != null) {
- linesPerFailedFile =
Integer.parseInt(commandLine.getOptionValue(LINES_PER_FAILED_FILE_ARGS));
+ database = checkRequiredArg(Constants.DB_ARGS, Constants.DB_NAME,
commandLine, null);
+ String sqlDialectValue =
+ checkRequiredArg(
+ Constants.SQL_DIALECT_ARGS,
+ Constants.SQL_DIALECT_ARGS,
+ commandLine,
+ Constants.SQL_DIALECT_VALUE_TREE);
+ if (Constants.SQL_DIALECT_VALUE_TABLE.equalsIgnoreCase(sqlDialectValue)) {
+ sqlDialectTree = false;
+ if (StringUtils.isBlank(database)) {
+ ioTPrinter.println(
+ String.format("The database param is required when sql_dialect is
table "));
+ System.exit(Constants.CODE_ERROR);
+ } else if (StringUtils.isNotBlank(database)
+ && "information_schema".equalsIgnoreCase(database)) {
+ ioTPrinter.println(
+ String.format("Does not support exporting system databases %s",
database));
+ System.exit(Constants.CODE_ERROR);
+ }
}
}
public static void main(String[] args) throws IoTDBConnectionException {
- Options options = createOptions();
+ Options options = OptionsUtil.createImportSchemaOptions();
HelpFormatter hf = new HelpFormatter();
hf.setOptionComparator(null);
- hf.setWidth(MAX_HELP_CONSOLE_WIDTH);
+ hf.setWidth(Constants.MAX_HELP_CONSOLE_WIDTH);
CommandLine commandLine = null;
CommandLineParser parser = new DefaultParser();
if (args == null || args.length == 0) {
- ioTPrinter.println("Too few params input, please check the following
hint.");
- hf.printHelp(IMPORT_SCHEMA_CLI_PREFIX, options, true);
- System.exit(CODE_ERROR);
+ ioTPrinter.println(Constants.SCHEMA_CLI_CHECK_IN_HEAD);
+ hf.printHelp(Constants.IMPORT_SCHEMA_CLI_PREFIX, options, true);
+ System.exit(Constants.CODE_ERROR);
}
try {
commandLine = parser.parse(options, args);
} catch (org.apache.commons.cli.ParseException e) {
ioTPrinter.println("Parse error: " + e.getMessage());
- hf.printHelp(IMPORT_SCHEMA_CLI_PREFIX, options, true);
- System.exit(CODE_ERROR);
+ hf.printHelp(Constants.IMPORT_SCHEMA_CLI_PREFIX, options, true);
+ System.exit(Constants.CODE_ERROR);
}
- if (commandLine.hasOption(HELP_ARGS)) {
- hf.printHelp(IMPORT_SCHEMA_CLI_PREFIX, options, true);
- System.exit(CODE_ERROR);
+ if (commandLine.hasOption(Constants.HELP_ARGS)) {
+ hf.printHelp(Constants.IMPORT_SCHEMA_CLI_PREFIX, options, true);
+ System.exit(Constants.CODE_ERROR);
}
try {
parseBasicParams(commandLine);
- String filename = commandLine.getOptionValue(FILE_ARGS);
+ String filename = commandLine.getOptionValue(Constants.FILE_ARGS);
if (filename == null) {
- hf.printHelp(IMPORT_SCHEMA_CLI_PREFIX, options, true);
- System.exit(CODE_ERROR);
+ hf.printHelp(Constants.IMPORT_SCHEMA_CLI_PREFIX, options, true);
+ System.exit(Constants.CODE_ERROR);
}
parseSpecialParams(commandLine);
} catch (ArgsErrorException e) {
ioTPrinter.println("Args error: " + e.getMessage());
- System.exit(CODE_ERROR);
+ System.exit(Constants.CODE_ERROR);
} catch (Exception e) {
ioTPrinter.println("Encounter an error, because: " + e.getMessage());
- System.exit(CODE_ERROR);
+ System.exit(Constants.CODE_ERROR);
}
- System.exit(importFromTargetPath(host, Integer.parseInt(port), username,
password, targetPath));
+ System.exit(importFromTargetPath());
}
/**
* Specifying a CSV file or a directory including CSV files that you want to
import. This method
* can be offered to console cli to implement importing CSV file by command.
*
- * @param host
- * @param port
- * @param username
- * @param password
- * @param targetPath a CSV file or a directory including CSV files
* @return the status code
* @throws IoTDBConnectionException
*/
@SuppressWarnings({"squid:S2093"}) // ignore try-with-resources
- public static int importFromTargetPath(
- String host, int port, String username, String password, String
targetPath) {
+ private static int importFromTargetPath() {
+ AbstractImportSchema importSchema;
try {
- session = new Session(host, port, username, password, false);
- session.open(false);
- File file = new File(targetPath);
- if (file.isFile()) {
- importFromSingleFile(file);
- } else if (file.isDirectory()) {
- File[] files = file.listFiles();
- if (files == null) {
- return CODE_OK;
- }
- // 按文件名排序
- Arrays.sort(files, (f1, f2) -> f1.getName().compareTo(f2.getName()));
- for (File subFile : files) {
- if (subFile.isFile()) {
- importFromSingleFile(subFile);
- }
- }
+ if (sqlDialectTree) {
+ importSchema = new ImportSchemaTree();
} else {
- ioTPrinter.println("File not found!");
- return CODE_ERROR;
- }
- } catch (IoTDBConnectionException e) {
- ioTPrinter.println("Encounter an error when connecting to server,
because " + e.getMessage());
- return CODE_ERROR;
- } finally {
- if (session != null) {
- try {
- session.close();
- } catch (IoTDBConnectionException e) {
- ;
- }
- }
- }
- return CODE_OK;
- }
-
- /**
- * import the CSV file and load headers and records.
- *
- * @param file the File object of the CSV file that you want to import.
- */
- private static void importFromSingleFile(File file) {
- if (file.getName().endsWith(CSV_SUFFIXS)) {
- try {
- CSVParser csvRecords = readCsvFile(file.getAbsolutePath());
- List<String> headerNames = csvRecords.getHeaderNames();
- Stream<CSVRecord> records = csvRecords.stream();
- if (headerNames.isEmpty()) {
- ioTPrinter.println(file.getName() + " : Empty file!");
- return;
- }
- if (!checkHeader(headerNames)) {
- return;
- }
- String failedFilePath = null;
- if (failedFileDirectory == null) {
- failedFilePath = file.getAbsolutePath() + ".failed";
- } else {
- failedFilePath = failedFileDirectory + file.getName() + ".failed";
- }
- writeScheme(file.getName(), headerNames, records, failedFilePath);
- } catch (IOException | IllegalPathException e) {
- ioTPrinter.println(
- file.getName() + " : CSV file read exception because: " +
e.getMessage());
- }
- } else {
- ioTPrinter.println(file.getName() + " : The file name must end with
\"csv\"!");
- }
- }
-
- /**
- * if the data is aligned by time, the data will be written by this method.
- *
- * @param headerNames the header names of CSV file
- * @param records the records of CSV file
- * @param failedFilePath the directory to save the failed files
- */
- @SuppressWarnings("squid:S3776")
- private static void writeScheme(
- String fileName, List<String> headerNames, Stream<CSVRecord> records,
String failedFilePath)
- throws IllegalPathException {
- List<String> paths = new ArrayList<>();
- List<TSDataType> dataTypes = new ArrayList<>();
- List<TSEncoding> encodings = new ArrayList<>();
- List<CompressionType> compressors = new ArrayList<>();
- List<String> pathsWithAlias = new ArrayList<>();
- List<TSDataType> dataTypesWithAlias = new ArrayList<>();
- List<TSEncoding> encodingsWithAlias = new ArrayList<>();
- List<CompressionType> compressorsWithAlias = new ArrayList<>();
- List<String> measurementAlias = new ArrayList<>();
-
- AtomicReference<Boolean> hasStarted = new AtomicReference<>(false);
- AtomicInteger pointSize = new AtomicInteger(0);
- ArrayList<List<Object>> failedRecords = new ArrayList<>();
- records.forEach(
- recordObj -> {
- boolean failed = false;
- if (!aligned) {
- if (Boolean.FALSE.equals(hasStarted.get())) {
- hasStarted.set(true);
- } else if (pointSize.get() >= batchPointSize) {
- try {
- if (CollectionUtils.isNotEmpty(paths)) {
- writeAndEmptyDataSet(
- paths, dataTypes, encodings, compressors, null, null,
null, null, 3);
- }
- } catch (Exception e) {
- paths.forEach(t ->
failedRecords.add(Collections.singletonList(t)));
- }
- try {
- if (CollectionUtils.isNotEmpty(pathsWithAlias)) {
- writeAndEmptyDataSet(
- pathsWithAlias,
- dataTypesWithAlias,
- encodingsWithAlias,
- compressorsWithAlias,
- null,
- null,
- null,
- measurementAlias,
- 3);
- }
- } catch (Exception e) {
- paths.forEach(t ->
failedRecords.add(Collections.singletonList(t)));
- }
- paths.clear();
- dataTypes.clear();
- encodings.clear();
- compressors.clear();
- measurementAlias.clear();
- pointSize.set(0);
- }
- } else {
- paths.clear();
- dataTypes.clear();
- encodings.clear();
- compressors.clear();
- measurementAlias.clear();
- }
- String path =
recordObj.get(headerNames.indexOf(HEAD_COLUMNS.get(0)));
- String alias =
recordObj.get(headerNames.indexOf(HEAD_COLUMNS.get(1)));
- String dataTypeRaw =
recordObj.get(headerNames.indexOf(HEAD_COLUMNS.get(2)));
- TSDataType dataType = typeInfer(dataTypeRaw);
- String encodingTypeRaw =
recordObj.get(headerNames.indexOf(HEAD_COLUMNS.get(3)));
- TSEncoding encodingType = encodingInfer(encodingTypeRaw);
- String compressionTypeRaw =
recordObj.get(headerNames.indexOf(HEAD_COLUMNS.get(4)));
- CompressionType compressionType = compressInfer(compressionTypeRaw);
- if (StringUtils.isBlank(path) ||
path.trim().startsWith(SYSTEM_DATABASE)) {
- ioTPrinter.println(
- String.format(
- "Line '%s', column '%s': illegal path %s",
- recordObj.getRecordNumber(), headerNames, path));
- failedRecords.add(recordObj.stream().collect(Collectors.toList()));
- failed = true;
- } else if (ObjectUtils.isEmpty(dataType)) {
- ioTPrinter.println(
- String.format(
- "Line '%s', column '%s': '%s' unknown dataType %n",
- recordObj.getRecordNumber(), path, dataTypeRaw));
- failedRecords.add(recordObj.stream().collect(Collectors.toList()));
- failed = true;
- } else if (ObjectUtils.isEmpty(encodingType)) {
- ioTPrinter.println(
- String.format(
- "Line '%s', column '%s': '%s' unknown encodingType %n",
- recordObj.getRecordNumber(), path, encodingTypeRaw));
- failedRecords.add(recordObj.stream().collect(Collectors.toList()));
- failed = true;
- } else if (ObjectUtils.isEmpty(compressionType)) {
- ioTPrinter.println(
- String.format(
- "Line '%s', column '%s': '%s' unknown compressionType %n",
- recordObj.getRecordNumber(), path, compressionTypeRaw));
- failedRecords.add(recordObj.stream().collect(Collectors.toList()));
- failed = true;
- } else {
- if (StringUtils.isBlank(alias)) {
- paths.add(path);
- dataTypes.add(dataType);
- encodings.add(encodingType);
- compressors.add(compressionType);
- } else {
- pathsWithAlias.add(path);
- dataTypesWithAlias.add(dataType);
- encodingsWithAlias.add(encodingType);
- compressorsWithAlias.add(compressionType);
- measurementAlias.add(alias);
- }
- pointSize.getAndIncrement();
- }
- if (!failed && aligned) {
- String deviceId = path.substring(0, path.lastIndexOf("."));
- paths.add(0, path.substring(deviceId.length() + 1));
- writeAndEmptyDataSetAligned(
- deviceId, paths, dataTypes, encodings, compressors,
measurementAlias, 3);
- }
- });
- try {
- if (CollectionUtils.isNotEmpty(paths)) {
- writeAndEmptyDataSet(paths, dataTypes, encodings, compressors, null,
null, null, null, 3);
- }
- } catch (Exception e) {
- paths.forEach(t -> failedRecords.add(Collections.singletonList(t)));
- }
- try {
- if (CollectionUtils.isNotEmpty(pathsWithAlias)) {
- writeAndEmptyDataSet(
- pathsWithAlias,
- dataTypesWithAlias,
- encodingsWithAlias,
- compressorsWithAlias,
- null,
- null,
- null,
- measurementAlias,
- 3);
- }
+ importSchema = new ImportSchemaTable();
+ }
+ importSchema.init();
+ AbstractImportSchema.init(importSchema);
+ return Constants.CODE_OK;
+ } catch (InterruptedException e) {
+ ioTPrinter.println(String.format("Import schema fail: %s",
e.getMessage()));
+ Thread.currentThread().interrupt();
+ return Constants.CODE_ERROR;
} catch (Exception e) {
- pathsWithAlias.forEach(t ->
failedRecords.add(Collections.singletonList(t)));
- }
- pointSize.set(0);
- if (!failedRecords.isEmpty()) {
- writeFailedLinesFile(failedFilePath, failedRecords);
- }
- if (Boolean.TRUE.equals(hasStarted.get())) {
- if (!failedRecords.isEmpty()) {
- ioTPrinter.println(fileName + " : Import completely fail!");
- } else {
- ioTPrinter.println(fileName + " : Import completely successful!");
- }
- } else {
- ioTPrinter.println(fileName + " : No records!");
- }
- }
-
- private static boolean checkHeader(List<String> headerNames) {
- if (CollectionUtils.isNotEmpty(headerNames)
- && new HashSet<>(headerNames).size() == HEAD_COLUMNS.size()) {
- List<String> strangers =
- headerNames.stream().filter(t ->
!HEAD_COLUMNS.contains(t)).collect(Collectors.toList());
- if (CollectionUtils.isNotEmpty(strangers)) {
- ioTPrinter.println(
- "The header of the CSV file to be imported is illegal. The correct
format is \"Timeseries, Alibaba, DataType, Encoding, Compression\"!");
- return false;
- }
- }
- return true;
- }
-
- private static void writeFailedLinesFile(
- String failedFilePath, ArrayList<List<Object>> failedRecords) {
- int fileIndex = 0;
- int from = 0;
- int failedRecordsSize = failedRecords.size();
- int restFailedRecords = failedRecordsSize;
- while (from < failedRecordsSize) {
- int step = Math.min(restFailedRecords, linesPerFailedFile);
- writeCsvFile(failedRecords.subList(from, from + step), failedFilePath +
"_" + fileIndex++);
- from += step;
- restFailedRecords -= step;
- }
- }
-
- private static void writeAndEmptyDataSet(
- List<String> paths,
- List<TSDataType> dataTypes,
- List<TSEncoding> encodings,
- List<CompressionType> compressors,
- List<Map<String, String>> propsList,
- List<Map<String, String>> tagsList,
- List<Map<String, String>> attributesList,
- List<String> measurementAliasList,
- int retryTime)
- throws StatementExecutionException {
- try {
- session.createMultiTimeseries(
- paths,
- dataTypes,
- encodings,
- compressors,
- propsList,
- tagsList,
- attributesList,
- measurementAliasList);
- } catch (IoTDBConnectionException e) {
- if (retryTime > 0) {
- try {
- session.open();
- } catch (IoTDBConnectionException ex) {
- ioTPrinter.println(INSERT_CSV_MEET_ERROR_MSG + e.getMessage());
- }
- writeAndEmptyDataSet(
- paths,
- dataTypes,
- encodings,
- compressors,
- propsList,
- tagsList,
- attributesList,
- measurementAliasList,
- --retryTime);
- }
- } catch (StatementExecutionException e) {
- try {
- session.close();
- } catch (IoTDBConnectionException ex) {
- // do nothing
- }
- throw e;
- }
- }
-
- private static void writeAndEmptyDataSetAligned(
- String deviceId,
- List<String> measurements,
- List<TSDataType> dataTypes,
- List<TSEncoding> encodings,
- List<CompressionType> compressors,
- List<String> measurementAliasList,
- int retryTime) {
- try {
- session.createAlignedTimeseries(
- deviceId, measurements, dataTypes, encodings, compressors,
measurementAliasList);
- } catch (IoTDBConnectionException e) {
- if (retryTime > 0) {
- try {
- session.open();
- } catch (IoTDBConnectionException ex) {
- ioTPrinter.println(INSERT_CSV_MEET_ERROR_MSG + e.getMessage());
- }
- writeAndEmptyDataSetAligned(
- deviceId,
- measurements,
- dataTypes,
- encodings,
- compressors,
- measurementAliasList,
- --retryTime);
- }
- } catch (StatementExecutionException e) {
- ioTPrinter.println(INSERT_CSV_MEET_ERROR_MSG + e.getMessage());
- try {
- session.close();
- } catch (IoTDBConnectionException ex) {
- // do nothing
- }
- System.exit(1);
- } finally {
- deviceId = null;
- measurements.clear();
- dataTypes.clear();
- encodings.clear();
- compressors.clear();
- measurementAliasList.clear();
- }
- }
-
- /**
- * read data from the CSV file
- *
- * @param path
- * @return CSVParser csv parser
- * @throws IOException when reading the csv file failed.
- */
- private static CSVParser readCsvFile(String path) throws IOException {
- return CSVFormat.Builder.create(CSVFormat.DEFAULT)
- .setHeader()
- .setSkipHeaderRecord(true)
- .setQuote('`')
- .setEscape('\\')
- .setIgnoreEmptyLines(true)
- .build()
- .parse(new InputStreamReader(new FileInputStream(path)));
- }
-
- /**
- * @param typeStr
- * @return
- */
- private static TSDataType typeInfer(String typeStr) {
- try {
- if (StringUtils.isNotBlank(typeStr)) {
- return TSDataType.valueOf(typeStr);
- }
- } catch (IllegalArgumentException e) {
- ;
- }
- return null;
- }
-
- private static CompressionType compressInfer(String compressionType) {
- try {
- if (StringUtils.isNotBlank(compressionType)) {
- return CompressionType.valueOf(compressionType);
- }
- } catch (IllegalArgumentException e) {
- ;
- }
- return null;
- }
-
- private static TSEncoding encodingInfer(String encodingType) {
- try {
- if (StringUtils.isNotBlank(encodingType)) {
- return TSEncoding.valueOf(encodingType);
- }
- } catch (IllegalArgumentException e) {
- ;
+ ioTPrinter.println(String.format("Import schema fail: %s",
e.getMessage()));
+ return Constants.CODE_ERROR;
}
- return null;
}
}
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ImportSchemaTable.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ImportSchemaTable.java
new file mode 100644
index 00000000000..25119e48623
--- /dev/null
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ImportSchemaTable.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tool.schema;
+
+import org.apache.iotdb.cli.utils.IoTPrinter;
+import org.apache.iotdb.isession.ITableSession;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.isession.pool.ITableSessionPool;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.pool.TableSessionPoolBuilder;
+import org.apache.iotdb.tool.common.Constants;
+import org.apache.iotdb.tool.data.ImportDataScanTool;
+
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.tsfile.read.common.RowRecord;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class ImportSchemaTable extends AbstractImportSchema {
+
+ private static final IoTPrinter ioTPrinter = new IoTPrinter(System.out);
+ private static ITableSessionPool sessionPool;
+
+ public void init() throws InterruptedException {
+ sessionPool =
+ new TableSessionPoolBuilder()
+ .nodeUrls(Collections.singletonList(host + ":" + port))
+ .user(username)
+ .password(password)
+ .maxSize(threadNum + 1)
+ .enableCompression(false)
+ .enableRedirection(false)
+ .enableAutoFetch(false)
+ .database(database)
+ .build();
+ final File file = new File(targetPath);
+ if (!file.isFile() && !file.isDirectory()) {
+ ioTPrinter.println(String.format("Source file or directory %s does not
exist", targetPath));
+ System.exit(Constants.CODE_ERROR);
+ }
+ SessionDataSet sessionDataSet = null;
+ try (ITableSession session = sessionPool.getSession()) {
+ List<String> databases = new ArrayList<>();
+ sessionDataSet = session.executeQueryStatement("show databases");
+ while (sessionDataSet.hasNext()) {
+ RowRecord rowRecord = sessionDataSet.next();
+ databases.add(rowRecord.getField(0).getStringValue());
+ }
+ if (!databases.contains(database)) {
+
ioTPrinter.println(String.format(Constants.TARGET_DATABASE_NOT_EXIST_MSG,
database));
+ System.exit(1);
+ }
+ } catch (StatementExecutionException e) {
+ ioTPrinter.println(Constants.INSERT_CSV_MEET_ERROR_MSG + e.getMessage());
+ System.exit(1);
+ } catch (IoTDBConnectionException e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (ObjectUtils.isNotEmpty(sessionDataSet)) {
+ try {
+ sessionDataSet.close();
+ } catch (Exception e) {
+ }
+ }
+ }
+ ImportDataScanTool.setSourceFullPath(targetPath);
+ ImportDataScanTool.traverseAndCollectFiles();
+ }
+
+ @Override
+ protected Runnable getAsyncImportRunnable() {
+ return new ImportSchemaTable();
+ }
+
+ @Override
+ protected void importSchemaFromSqlFile(File file) {
+ ArrayList<List<Object>> failedRecords = new ArrayList<>();
+ String failedFilePath;
+ if (failedFileDirectory == null) {
+ failedFilePath = file.getAbsolutePath() + ".failed";
+ } else {
+ failedFilePath = failedFileDirectory + file.getName() + ".failed";
+ }
+ try (BufferedReader br = new BufferedReader(new
FileReader(file.getAbsolutePath()))) {
+ StringBuilder sqlBuilder = new StringBuilder();
+ String sql;
+ while ((sql = br.readLine()) != null) {
+ if (!sql.contains(";")) {
+ sqlBuilder.append(sql);
+ } else {
+ String[] sqls = sql.split(";");
+ boolean addBuilder = false;
+ if (sqls.length > 0) {
+ if (!sql.endsWith(";")) {
+ addBuilder = true;
+ }
+ for (int i = 0; i < sqls.length; i++) {
+ sqlBuilder.append(sqls[i]);
+ if (i == sqls.length - 1 && addBuilder) {
+ break;
+ }
+ String builderString = sqlBuilder.toString();
+ try (ITableSession session = sessionPool.getSession()) {
+ session.executeNonQueryStatement(builderString);
+ sqlBuilder = new StringBuilder();
+ } catch (IoTDBConnectionException | StatementExecutionException
e) {
+ failedRecords.add(Collections.singletonList(builderString));
+ sqlBuilder = new StringBuilder();
+ }
+ }
+ } else {
+ String builderString = sqlBuilder.toString();
+ try (ITableSession session = sessionPool.getSession()) {
+ session.executeNonQueryStatement(builderString);
+ sqlBuilder = new StringBuilder();
+ } catch (IoTDBConnectionException | StatementExecutionException e)
{
+ failedRecords.add(Collections.singletonList(builderString));
+ sqlBuilder = new StringBuilder();
+ }
+ }
+ }
+ }
+ String builderString = sqlBuilder.toString();
+ if (StringUtils.isNotBlank(builderString)) {
+ try (ITableSession session = sessionPool.getSession()) {
+ session.executeNonQueryStatement(sqlBuilder.toString());
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ failedRecords.add(Collections.singletonList(builderString));
+ sqlBuilder = new StringBuilder();
+ }
+ }
+ processSuccessFile();
+ } catch (IOException e) {
+ ioTPrinter.println("SQL file read exception because: " + e.getMessage());
+ }
+ if (!failedRecords.isEmpty()) {
+ FileWriter writer = null;
+ try {
+ writer = new FileWriter(failedFilePath);
+ for (List<Object> failedRecord : failedRecords) {
+ writer.write(failedRecord.get(0).toString() + ";\n");
+ }
+ } catch (IOException e) {
+ ioTPrinter.println("Cannot dump fail result because: " +
e.getMessage());
+ } finally {
+ if (ObjectUtils.isNotEmpty(writer)) {
+ try {
+ writer.flush();
+ writer.close();
+ } catch (IOException e) {
+ ;
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ protected void importSchemaFromCsvFile(File file) {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+}
diff --git
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ImportSchema.java
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ImportSchemaTree.java
similarity index 53%
copy from
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ImportSchema.java
copy to
iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ImportSchemaTree.java
index a7709313b1b..e9732287673 100644
---
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ImportSchema.java
+++
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/schema/ImportSchemaTree.java
@@ -21,17 +21,12 @@ package org.apache.iotdb.tool.schema;
import org.apache.iotdb.cli.utils.IoTPrinter;
import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.exception.ArgsErrorException;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.pool.SessionPool;
+import org.apache.iotdb.tool.common.Constants;
+import org.apache.iotdb.tool.data.ImportDataScanTool;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.DefaultParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
@@ -47,7 +42,6 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -59,258 +53,97 @@ import java.util.stream.Stream;
import static org.apache.iotdb.commons.schema.SchemaConstant.SYSTEM_DATABASE;
-/** Import Schema CSV file. */
-public class ImportSchema extends AbstractSchemaTool {
-
- private static final String FILE_ARGS = "s";
- private static final String FILE_NAME = "source";
- private static final String FILE_ARGS_NAME = "sourceDir/sourceFile";
-
- private static final String FAILED_FILE_ARGS = "fd";
- private static final String FAILED_FILE_NAME = "fail_dir";
- private static final String FAILED_FILE_ARGS_NAME = "failDir";
-
- private static final String ALIGNED_ARGS = "aligned";
- private static Boolean aligned = false;
-
- private static final String BATCH_POINT_SIZE_ARGS = "batch";
- private static final String BATCH_POINT_SIZE_NAME = "batch_size";
- private static final String BATCH_POINT_SIZE_ARGS_NAME = "batchSize";
- private static int batchPointSize = 10_000;
-
- private static final String CSV_SUFFIXS = "csv";
-
- private static final String LINES_PER_FAILED_FILE_ARGS = "lpf";
- private static final String LINES_PER_FAILED_FILE_NAME = "lines_per_file";
- private static final String LINES_PER_FAILED_FILE_ARGS_NAME = "linesPerFile";
- private static final String IMPORT_SCHEMA_CLI_PREFIX = "ImportSchema";
- private static int linesPerFailedFile = 10000;
-
- private static String targetPath;
- private static String failedFileDirectory = null;
-
- private static final String INSERT_CSV_MEET_ERROR_MSG = "Meet error when
insert csv because ";
+public class ImportSchemaTree extends AbstractImportSchema {
private static final IoTPrinter ioTPrinter = new IoTPrinter(System.out);
-
- /**
- * create the commandline options.
- *
- * @return object Options
- */
- private static Options createOptions() {
- Options options = createNewOptions();
-
- Option opFile =
- Option.builder(FILE_ARGS)
- .required()
- .longOpt(FILE_NAME)
- .hasArg()
- .argName(FILE_ARGS_NAME)
- .desc(
- "If input a file path, load a csv file, "
- + "otherwise load all csv file under this directory
(required)")
- .build();
- options.addOption(opFile);
-
- Option opFailedFile =
- Option.builder(FAILED_FILE_ARGS)
- .longOpt(FAILED_FILE_NAME)
- .hasArg()
- .argName(FAILED_FILE_ARGS_NAME)
- .desc(
- "Specifying a directory to save failed file, default
YOUR_CSV_FILE_PATH (optional)")
- .build();
- options.addOption(opFailedFile);
-
- Option opBatchPointSize =
- Option.builder(BATCH_POINT_SIZE_ARGS)
- .longOpt(BATCH_POINT_SIZE_NAME)
- .hasArg()
- .argName(BATCH_POINT_SIZE_ARGS_NAME)
- .desc("10000 (only not aligned optional)")
+ private static SessionPool sessionPool;
+
+ public void init()
+ throws InterruptedException, IoTDBConnectionException,
StatementExecutionException {
+ sessionPool =
+ new SessionPool.Builder()
+ .host(host)
+ .port(Integer.parseInt(port))
+ .user(username)
+ .password(password)
+ .maxSize(threadNum + 1)
+ .enableCompression(false)
+ .enableRedirection(false)
+ .enableAutoFetch(false)
.build();
- options.addOption(opBatchPointSize);
-
- Option opFailedLinesPerFile =
- Option.builder(LINES_PER_FAILED_FILE_ARGS)
- .longOpt(LINES_PER_FAILED_FILE_NAME)
- .hasArg()
- .argName(LINES_PER_FAILED_FILE_ARGS_NAME)
- .desc("Lines per failed file")
- .build();
- options.addOption(opFailedLinesPerFile);
-
- Option opHelp =
- Option.builder(HELP_ARGS).longOpt(HELP_ARGS).desc("Display help
information").build();
- options.addOption(opHelp);
- return options;
- }
-
- /**
- * parse optional params
- *
- * @param commandLine
- */
- private static void parseSpecialParams(CommandLine commandLine) throws
ArgsErrorException {
- targetPath = commandLine.getOptionValue(FILE_ARGS);
- if (commandLine.getOptionValue(BATCH_POINT_SIZE_ARGS) != null) {
- batchPointSize =
Integer.parseInt(commandLine.getOptionValue(BATCH_POINT_SIZE_ARGS));
- }
- if (commandLine.getOptionValue(FAILED_FILE_ARGS) != null) {
- failedFileDirectory = commandLine.getOptionValue(FAILED_FILE_ARGS);
- File file = new File(failedFileDirectory);
- if (!file.isDirectory()) {
- file.mkdir();
- failedFileDirectory = file.getAbsolutePath() + File.separator;
- } else if (!failedFileDirectory.endsWith("/") &&
!failedFileDirectory.endsWith("\\")) {
- failedFileDirectory += File.separator;
- }
- }
- if (commandLine.getOptionValue(ALIGNED_ARGS) != null) {
- aligned = Boolean.valueOf(commandLine.getOptionValue(ALIGNED_ARGS));
- }
- if (commandLine.getOptionValue(LINES_PER_FAILED_FILE_ARGS) != null) {
- linesPerFailedFile =
Integer.parseInt(commandLine.getOptionValue(LINES_PER_FAILED_FILE_ARGS));
+ sessionPool.setEnableQueryRedirection(false);
+ final File file = new File(targetPath);
+ if (!file.isFile() && !file.isDirectory()) {
+ ioTPrinter.println(String.format("Source file or directory %s does not
exist", targetPath));
+ System.exit(Constants.CODE_ERROR);
}
+ ImportDataScanTool.setSourceFullPath(targetPath);
+ ImportDataScanTool.traverseAndCollectFiles();
}
- public static void main(String[] args) throws IoTDBConnectionException {
- Options options = createOptions();
- HelpFormatter hf = new HelpFormatter();
- hf.setOptionComparator(null);
- hf.setWidth(MAX_HELP_CONSOLE_WIDTH);
- CommandLine commandLine = null;
- CommandLineParser parser = new DefaultParser();
+ @Override
+ protected Runnable getAsyncImportRunnable() {
+ return new ImportSchemaTree();
+ }
- if (args == null || args.length == 0) {
- ioTPrinter.println("Too few params input, please check the following
hint.");
- hf.printHelp(IMPORT_SCHEMA_CLI_PREFIX, options, true);
- System.exit(CODE_ERROR);
- }
- try {
- commandLine = parser.parse(options, args);
- } catch (org.apache.commons.cli.ParseException e) {
- ioTPrinter.println("Parse error: " + e.getMessage());
- hf.printHelp(IMPORT_SCHEMA_CLI_PREFIX, options, true);
- System.exit(CODE_ERROR);
- }
- if (commandLine.hasOption(HELP_ARGS)) {
- hf.printHelp(IMPORT_SCHEMA_CLI_PREFIX, options, true);
- System.exit(CODE_ERROR);
- }
- try {
- parseBasicParams(commandLine);
- String filename = commandLine.getOptionValue(FILE_ARGS);
- if (filename == null) {
- hf.printHelp(IMPORT_SCHEMA_CLI_PREFIX, options, true);
- System.exit(CODE_ERROR);
- }
- parseSpecialParams(commandLine);
- } catch (ArgsErrorException e) {
- ioTPrinter.println("Args error: " + e.getMessage());
- System.exit(CODE_ERROR);
- } catch (Exception e) {
- ioTPrinter.println("Encounter an error, because: " + e.getMessage());
- System.exit(CODE_ERROR);
- }
- System.exit(importFromTargetPath(host, Integer.parseInt(port), username,
password, targetPath));
+ @Override
+ protected void importSchemaFromSqlFile(File file) {
+ throw new UnsupportedOperationException("Not supported yet.");
}
- /**
- * Specifying a CSV file or a directory including CSV files that you want to
import. This method
- * can be offered to console cli to implement importing CSV file by command.
- *
- * @param host
- * @param port
- * @param username
- * @param password
- * @param targetPath a CSV file or a directory including CSV files
- * @return the status code
- * @throws IoTDBConnectionException
- */
- @SuppressWarnings({"squid:S2093"}) // ignore try-with-resources
- public static int importFromTargetPath(
- String host, int port, String username, String password, String
targetPath) {
+ @Override
+ protected void importSchemaFromCsvFile(File file) {
try {
- session = new Session(host, port, username, password, false);
- session.open(false);
- File file = new File(targetPath);
- if (file.isFile()) {
- importFromSingleFile(file);
- } else if (file.isDirectory()) {
- File[] files = file.listFiles();
- if (files == null) {
- return CODE_OK;
- }
- // 按文件名排序
- Arrays.sort(files, (f1, f2) -> f1.getName().compareTo(f2.getName()));
- for (File subFile : files) {
- if (subFile.isFile()) {
- importFromSingleFile(subFile);
- }
- }
- } else {
- ioTPrinter.println("File not found!");
- return CODE_ERROR;
+ CSVParser csvRecords = readCsvFile(file.getAbsolutePath());
+ List<String> headerNames = csvRecords.getHeaderNames();
+ Stream<CSVRecord> records = csvRecords.stream();
+ if (headerNames.isEmpty()) {
+ ioTPrinter.println(file.getName() + " : Empty file!");
+ return;
}
- } catch (IoTDBConnectionException e) {
- ioTPrinter.println("Encounter an error when connecting to server,
because " + e.getMessage());
- return CODE_ERROR;
- } finally {
- if (session != null) {
- try {
- session.close();
- } catch (IoTDBConnectionException e) {
- ;
- }
+ if (!checkHeader(headerNames)) {
+ return;
+ }
+ String failedFilePath = null;
+ if (failedFileDirectory == null) {
+ failedFilePath = file.getAbsolutePath() + ".failed";
+ } else {
+ failedFilePath = failedFileDirectory + file.getName() + ".failed";
}
+ writeScheme(file.getName(), headerNames, records, failedFilePath);
+ processSuccessFile();
+ } catch (IOException | IllegalPathException e) {
+ ioTPrinter.println(file.getName() + " : CSV file read exception because:
" + e.getMessage());
}
- return CODE_OK;
}
- /**
- * import the CSV file and load headers and records.
- *
- * @param file the File object of the CSV file that you want to import.
- */
- private static void importFromSingleFile(File file) {
- if (file.getName().endsWith(CSV_SUFFIXS)) {
- try {
- CSVParser csvRecords = readCsvFile(file.getAbsolutePath());
- List<String> headerNames = csvRecords.getHeaderNames();
- Stream<CSVRecord> records = csvRecords.stream();
- if (headerNames.isEmpty()) {
- ioTPrinter.println(file.getName() + " : Empty file!");
- return;
- }
- if (!checkHeader(headerNames)) {
- return;
- }
- String failedFilePath = null;
- if (failedFileDirectory == null) {
- failedFilePath = file.getAbsolutePath() + ".failed";
- } else {
- failedFilePath = failedFileDirectory + file.getName() + ".failed";
- }
- writeScheme(file.getName(), headerNames, records, failedFilePath);
- } catch (IOException | IllegalPathException e) {
+ private static CSVParser readCsvFile(String path) throws IOException {
+ return CSVFormat.Builder.create(CSVFormat.DEFAULT)
+ .setHeader()
+ .setSkipHeaderRecord(true)
+ .setQuote('`')
+ .setEscape('\\')
+ .setIgnoreEmptyLines(true)
+ .build()
+ .parse(new InputStreamReader(new FileInputStream(path)));
+ }
+
+ private static boolean checkHeader(List<String> headerNames) {
+ if (CollectionUtils.isNotEmpty(headerNames)
+ && new HashSet<>(headerNames).size() == Constants.HEAD_COLUMNS.size())
{
+ List<String> strangers =
+ headerNames.stream()
+ .filter(t -> !Constants.HEAD_COLUMNS.contains(t))
+ .collect(Collectors.toList());
+ if (CollectionUtils.isNotEmpty(strangers)) {
ioTPrinter.println(
- file.getName() + " : CSV file read exception because: " +
e.getMessage());
+ "The header of the CSV file to be imported is illegal. The correct
format is \"Timeseries, Alibaba, DataType, Encoding, Compression\"!");
+ return false;
}
- } else {
- ioTPrinter.println(file.getName() + " : The file name must end with
\"csv\"!");
}
+ return true;
}
- /**
- * if the data is aligned by time, the data will be written by this method.
- *
- * @param headerNames the header names of CSV file
- * @param records the records of CSV file
- * @param failedFilePath the directory to save the failed files
- */
- @SuppressWarnings("squid:S3776")
private static void writeScheme(
String fileName, List<String> headerNames, Stream<CSVRecord> records,
String failedFilePath)
throws IllegalPathException {
@@ -372,13 +205,15 @@ public class ImportSchema extends AbstractSchemaTool {
compressors.clear();
measurementAlias.clear();
}
- String path =
recordObj.get(headerNames.indexOf(HEAD_COLUMNS.get(0)));
- String alias =
recordObj.get(headerNames.indexOf(HEAD_COLUMNS.get(1)));
- String dataTypeRaw =
recordObj.get(headerNames.indexOf(HEAD_COLUMNS.get(2)));
+ String path =
recordObj.get(headerNames.indexOf(Constants.HEAD_COLUMNS.get(0)));
+ String alias =
recordObj.get(headerNames.indexOf(Constants.HEAD_COLUMNS.get(1)));
+ String dataTypeRaw =
recordObj.get(headerNames.indexOf(Constants.HEAD_COLUMNS.get(2)));
TSDataType dataType = typeInfer(dataTypeRaw);
- String encodingTypeRaw =
recordObj.get(headerNames.indexOf(HEAD_COLUMNS.get(3)));
+ String encodingTypeRaw =
+
recordObj.get(headerNames.indexOf(Constants.HEAD_COLUMNS.get(3)));
TSEncoding encodingType = encodingInfer(encodingTypeRaw);
- String compressionTypeRaw =
recordObj.get(headerNames.indexOf(HEAD_COLUMNS.get(4)));
+ String compressionTypeRaw =
+
recordObj.get(headerNames.indexOf(Constants.HEAD_COLUMNS.get(4)));
CompressionType compressionType = compressInfer(compressionTypeRaw);
if (StringUtils.isBlank(path) ||
path.trim().startsWith(SYSTEM_DATABASE)) {
ioTPrinter.println(
@@ -468,34 +303,6 @@ public class ImportSchema extends AbstractSchemaTool {
}
}
- private static boolean checkHeader(List<String> headerNames) {
- if (CollectionUtils.isNotEmpty(headerNames)
- && new HashSet<>(headerNames).size() == HEAD_COLUMNS.size()) {
- List<String> strangers =
- headerNames.stream().filter(t ->
!HEAD_COLUMNS.contains(t)).collect(Collectors.toList());
- if (CollectionUtils.isNotEmpty(strangers)) {
- ioTPrinter.println(
- "The header of the CSV file to be imported is illegal. The correct
format is \"Timeseries, Alibaba, DataType, Encoding, Compression\"!");
- return false;
- }
- }
- return true;
- }
-
- private static void writeFailedLinesFile(
- String failedFilePath, ArrayList<List<Object>> failedRecords) {
- int fileIndex = 0;
- int from = 0;
- int failedRecordsSize = failedRecords.size();
- int restFailedRecords = failedRecordsSize;
- while (from < failedRecordsSize) {
- int step = Math.min(restFailedRecords, linesPerFailedFile);
- writeCsvFile(failedRecords.subList(from, from + step), failedFilePath +
"_" + fileIndex++);
- from += step;
- restFailedRecords -= step;
- }
- }
-
private static void writeAndEmptyDataSet(
List<String> paths,
List<TSDataType> dataTypes,
@@ -508,7 +315,7 @@ public class ImportSchema extends AbstractSchemaTool {
int retryTime)
throws StatementExecutionException {
try {
- session.createMultiTimeseries(
+ sessionPool.createMultiTimeseries(
paths,
dataTypes,
encodings,
@@ -519,11 +326,6 @@ public class ImportSchema extends AbstractSchemaTool {
measurementAliasList);
} catch (IoTDBConnectionException e) {
if (retryTime > 0) {
- try {
- session.open();
- } catch (IoTDBConnectionException ex) {
- ioTPrinter.println(INSERT_CSV_MEET_ERROR_MSG + e.getMessage());
- }
writeAndEmptyDataSet(
paths,
dataTypes,
@@ -536,11 +338,6 @@ public class ImportSchema extends AbstractSchemaTool {
--retryTime);
}
} catch (StatementExecutionException e) {
- try {
- session.close();
- } catch (IoTDBConnectionException ex) {
- // do nothing
- }
throw e;
}
}
@@ -554,15 +351,10 @@ public class ImportSchema extends AbstractSchemaTool {
List<String> measurementAliasList,
int retryTime) {
try {
- session.createAlignedTimeseries(
+ sessionPool.createAlignedTimeseries(
deviceId, measurements, dataTypes, encodings, compressors,
measurementAliasList);
} catch (IoTDBConnectionException e) {
if (retryTime > 0) {
- try {
- session.open();
- } catch (IoTDBConnectionException ex) {
- ioTPrinter.println(INSERT_CSV_MEET_ERROR_MSG + e.getMessage());
- }
writeAndEmptyDataSetAligned(
deviceId,
measurements,
@@ -573,12 +365,7 @@ public class ImportSchema extends AbstractSchemaTool {
--retryTime);
}
} catch (StatementExecutionException e) {
- ioTPrinter.println(INSERT_CSV_MEET_ERROR_MSG + e.getMessage());
- try {
- session.close();
- } catch (IoTDBConnectionException ex) {
- // do nothing
- }
+ ioTPrinter.println(Constants.INSERT_CSV_MEET_ERROR_MSG + e.getMessage());
System.exit(1);
} finally {
deviceId = null;
@@ -590,28 +377,6 @@ public class ImportSchema extends AbstractSchemaTool {
}
}
- /**
- * read data from the CSV file
- *
- * @param path
- * @return CSVParser csv parser
- * @throws IOException when reading the csv file failed.
- */
- private static CSVParser readCsvFile(String path) throws IOException {
- return CSVFormat.Builder.create(CSVFormat.DEFAULT)
- .setHeader()
- .setSkipHeaderRecord(true)
- .setQuote('`')
- .setEscape('\\')
- .setIgnoreEmptyLines(true)
- .build()
- .parse(new InputStreamReader(new FileInputStream(path)));
- }
-
- /**
- * @param typeStr
- * @return
- */
private static TSDataType typeInfer(String typeStr) {
try {
if (StringUtils.isNotBlank(typeStr)) {
@@ -644,4 +409,18 @@ public class ImportSchema extends AbstractSchemaTool {
}
return null;
}
+
+ private static void writeFailedLinesFile(
+ String failedFilePath, ArrayList<List<Object>> failedRecords) {
+ int fileIndex = 0;
+ int from = 0;
+ int failedRecordsSize = failedRecords.size();
+ int restFailedRecords = failedRecordsSize;
+ while (from < failedRecordsSize) {
+ int step = Math.min(restFailedRecords, linesPerFailedFile);
+ writeCsvFile(failedRecords.subList(from, from + step), failedFilePath +
"_" + fileIndex++);
+ from += step;
+ restFailedRecords -= step;
+ }
+ }
}