This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new 3f9f367  [To rel/0.12][IOTDB-2604] batch size is invalid in import-csv 
tool (#5116)
3f9f367 is described below

commit 3f9f3670543e79613f40439e0ec7a54aae89e650
Author: Xuan Ronaldo <[email protected]>
AuthorDate: Fri Feb 25 18:39:28 2022 +0800

    [To rel/0.12][IOTDB-2604] batch size is invalid in import-csv tool (#5116)
---
 .../main/java/org/apache/iotdb/tool/ImportCsv.java | 31 +++++++++++++++++++---
 docs/UserGuide/System-Tools/CSV-Tool.md            |  4 +++
 docs/zh/UserGuide/System-Tools/CSV-Tool.md         | 16 ++++++++---
 3 files changed, 45 insertions(+), 6 deletions(-)

diff --git a/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java 
b/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java
index 6300820..79c89d6 100644
--- a/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java
+++ b/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java
@@ -51,6 +51,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -72,6 +73,9 @@ public class ImportCsv extends AbstractCsvTool {
   private static final String FAILED_FILE_ARGS = "fd";
   private static final String FAILED_FILE_NAME = "failed file directory";
 
+  private static final String BATCH_POINT_SIZE_ARGS = "batch";
+  private static final String BATCH_POINT_SIZE_NAME = "batch point size";
+
   private static final String CSV_SUFFIXS = "csv";
   private static final String TXT_SUFFIXS = "txt";
 
@@ -84,7 +88,7 @@ public class ImportCsv extends AbstractCsvTool {
   private static String timeColumn = "Time";
   private static String deviceColumn = "Device";
 
-  private static final int BATCH_SIZE = 10000;
+  private static int batchPointSize = 100_000;
 
   /**
    * create the commandline options.
@@ -130,6 +134,14 @@ public class ImportCsv extends AbstractCsvTool {
             .build();
     options.addOption(opTimeZone);
 
+    Option opBatchPointSize =
+        Option.builder(BATCH_POINT_SIZE_ARGS)
+            .argName(BATCH_POINT_SIZE_NAME)
+            .hasArg()
+            .desc("100000 (optional)")
+            .build();
+    options.addOption(opBatchPointSize);
+
     return options;
   }
 
@@ -141,6 +153,9 @@ public class ImportCsv extends AbstractCsvTool {
   private static void parseSpecialParams(CommandLine commandLine) {
     timeZoneID = commandLine.getOptionValue(TIME_ZONE_ARGS);
     targetPath = commandLine.getOptionValue(FILE_ARGS);
+    if (commandLine.getOptionValue(BATCH_POINT_SIZE_ARGS) != null) {
+      batchPointSize = 
Integer.parseInt(commandLine.getOptionValue(BATCH_POINT_SIZE_ARGS));
+    }
     if (commandLine.getOptionValue(FAILED_FILE_ARGS) != null) {
       failedFileDirectory = commandLine.getOptionValue(FAILED_FILE_ARGS);
       File file = new File(failedFileDirectory);
@@ -304,6 +319,7 @@ public class ImportCsv extends AbstractCsvTool {
     List<List<String>> measurementsList = new ArrayList<>();
     List<List<TSDataType>> typesList = new ArrayList<>();
     List<List<Object>> valuesList = new ArrayList<>();
+    AtomicInteger pointSize = new AtomicInteger(0);
 
     AtomicReference<SimpleDateFormat> timeFormatter = new 
AtomicReference<>(null);
     AtomicReference<Boolean> hasStarted = new AtomicReference<>(false);
@@ -315,8 +331,9 @@ public class ImportCsv extends AbstractCsvTool {
           if (!hasStarted.get()) {
             hasStarted.set(true);
             timeFormatter.set(formatterInit(record.get(0)));
-          } else if ((record.getRecordNumber() - 1) % BATCH_SIZE == 0) {
+          } else if (pointSize.get() >= batchPointSize) {
             writeAndEmptyDataSet(deviceIds, times, typesList, valuesList, 
measurementsList, 3);
+            pointSize.set(0);
           }
 
           boolean isFail = false;
@@ -355,6 +372,7 @@ public class ImportCsv extends AbstractCsvTool {
                     
measurements.add(headerNameMap.get(header).replace(deviceId + '.', ""));
                     types.add(type);
                     values.add(valueTrans);
+                    pointSize.getAndIncrement();
                   }
                 }
               }
@@ -383,6 +401,7 @@ public class ImportCsv extends AbstractCsvTool {
         });
     if (!deviceIds.isEmpty()) {
       writeAndEmptyDataSet(deviceIds, times, typesList, valuesList, 
measurementsList, 3);
+      pointSize.set(0);
     }
 
     if (!failedRecords.isEmpty()) {
@@ -419,6 +438,8 @@ public class ImportCsv extends AbstractCsvTool {
     List<List<Object>> valuesList = new ArrayList<>();
     List<List<String>> measurementsList = new ArrayList<>();
 
+    AtomicInteger pointSize = new AtomicInteger(0);
+
     ArrayList<List<Object>> failedRecords = new ArrayList<>();
 
     records.forEach(
@@ -431,11 +452,13 @@ public class ImportCsv extends AbstractCsvTool {
             // if device changed
             writeAndEmptyDataSet(
                 deviceName.get(), times, typesList, valuesList, 
measurementsList, 3);
+            pointSize.set(0);
             deviceName.set(record.get(1));
-          } else if (record.getRecordNumber() - 1 % BATCH_SIZE == 0 && 
times.size() != 0) {
+          } else if (pointSize.get() >= batchPointSize) {
             // insert a batch
             writeAndEmptyDataSet(
                 deviceName.get(), times, typesList, valuesList, 
measurementsList, 3);
+            pointSize.set(0);
           }
 
           // the data of the record
@@ -485,6 +508,7 @@ public class ImportCsv extends AbstractCsvTool {
                   values.add(valueTrans);
                   measurements.add(headerNameMap.get(measurement));
                   types.add(type);
+                  pointSize.getAndIncrement();
                 }
               }
             }
@@ -511,6 +535,7 @@ public class ImportCsv extends AbstractCsvTool {
         });
     if (times.size() != 0) {
       writeAndEmptyDataSet(deviceName.get(), times, typesList, valuesList, 
measurementsList, 3);
+      pointSize.set(0);
     }
     if (!failedRecords.isEmpty()) {
       writeCsvFile(headerNames, failedRecords, failedFilePath);
diff --git a/docs/UserGuide/System-Tools/CSV-Tool.md 
b/docs/UserGuide/System-Tools/CSV-Tool.md
index eefc05c..2ddbddc 100644
--- a/docs/UserGuide/System-Tools/CSV-Tool.md
+++ b/docs/UserGuide/System-Tools/CSV-Tool.md
@@ -188,6 +188,10 @@ Description:
   - specifying a directory to save files which save failed lines. If you don't 
use this parameter, the failed file will be saved at original directory, and 
the filename will be the source filename with suffix `.failed`.
   - example: `-fd ./failed/`
 
+* `-batch`:
+  - specifying the point's number of a batch. If the program throw the 
exception `org.apache.thrift.transport.TTransportException: Frame size larger 
than protect max size`, you can lower this parameter as appropriate.
+  - example: `-bs 100000`, `100000` is the default value.
+
 ### Example
 
 ```sh
diff --git a/docs/zh/UserGuide/System-Tools/CSV-Tool.md 
b/docs/zh/UserGuide/System-Tools/CSV-Tool.md
index 1afec9b..5a23bef 100644
--- a/docs/zh/UserGuide/System-Tools/CSV-Tool.md
+++ b/docs/zh/UserGuide/System-Tools/CSV-Tool.md
@@ -153,13 +153,19 @@ 
Time,root.test.t1.str(TEXT),root.test.t2.str(TEXT),root.test.t2.int(INT32)
 通过设备对齐,并且header中不包含数据类型的数据。
 
 ```sql
-Time,Device,str,int1970-01-01T08:00:00.001+08:00,root.test.t1,"123hello 
world",1970-01-01T08:00:00.002+08:00,root.test.t1,"123",1970-01-01T08:00:00.001+08:00,root.test.t2,"123\,abc",100
+Time,Device,str,int
+1970-01-01T08:00:00.001+08:00,root.test.t1,"123hello world",
+1970-01-01T08:00:00.002+08:00,root.test.t1,"123",
+1970-01-01T08:00:00.001+08:00,root.test.t2,"123\,abc",100
 ```
 
 通过设备对齐,并且header中包含数据类型的数据。
 
 ```sql
-Time,Device,str(TEXT),int(INT32)1970-01-01T08:00:00.001+08:00,root.test.t1,"123hello
 
world",1970-01-01T08:00:00.002+08:00,root.test.t1,"123",1970-01-01T08:00:00.001+08:00,root.test.t2,"123\,abc",100
+Time,Device,str(TEXT),int(INT32)
+1970-01-01T08:00:00.001+08:00,root.test.t1,"123hello world",
+1970-01-01T08:00:00.002+08:00,root.test.t1,"123",
+1970-01-01T08:00:00.001+08:00,root.test.t2,"123\,abc",100
 ```
 
 ### 运行方法
@@ -180,7 +186,11 @@ tools\import-csv.bat -h <ip> -p <port> -u <username> -pw 
<password> -f <xxx.csv>
 
 * `-fd`:
   - 指定一个目录来存放保存失败的行的文件,如果你没有指定这个参数,失败的文件将会被保存到源数据的目录中,然后文件名是源文件名加上`.failed`的后缀。
-  - example: `-fd ./failed/`
+  - 例如: `-fd ./failed/`
+
+* `-batch`:
+  - 用于指定每一批插入的数据的点数。如果程序报了`org.apache.thrift.transport.TTransportException: 
Frame size larger than protect max size`这个错的话,就可以适当的调低这个参数。
+  - 例如: `-bs 100000`,`100000`是默认值。
 
 ### Example
 

Reply via email to