Hi Dhruv, It may due to the concurrency problem.
The ImportCsv is not thread-safe due to the following static fields: > private static int i; > private static int startIndex; You could try to use two progresses instead of two threads. Thanks, -- Jialin Qiao School of Software, Tsinghua University 乔嘉林 清华大学 软件学院 > -----原始邮件----- > 发件人: "Dhruv Garg" <[email protected]> > 发送时间: 2021-03-18 17:14:56 (星期四) > 收件人: [email protected] > 抄送: > 主题: Re: Parallel writes from CSV into IoTDB > > Hello Jialin, > > Here it is. Errors start after "Parallel-2" starts. These are the same > blocks which were used for a single thread. And it succeeded in the > sequential run. Hence I am thinking that it might not be due to a faulty > CSV. I have also attached the parallel task code. Do let me know in case > you feel something else might be going wrong. > > Starting parallel-1-------------------------- > Start to import data from: SF_1.csv > Import from: SF_1.csv 0% │ > │ 0/482 (0:00:00 / ?) > Importing... > Import from: SF_1.csv 100% > │█████████████████████████████████████████████████████████████████████████████████████│ > 482/482 (0:00:00 / 0:00:00) Importing... > Data ingestion time: 278 > Start to import data from: SF_2.csv > Import from: SF_2.csv 0% │ > │ 0/482 (0:00:00 / ?) > Importing... > Import from: SF_2.csv 100% > │█████████████████████████████████████████████████████████████████████████████████████│ > 482/482 (0:00:00 / 0:00:00) Importing... > Data ingestion time: 191 > Start to import data from: SF_3.csv > Import from: SF_3.csv 0% │ > │ 0/482 (0:00:00 / ?) > Importing... > Import from: SF_3.csv 100% > │█████████████████████████████████████████████████████████████████████████████████████│ > 482/482 (0:00:00 / 0:00:00) Importing... > Data ingestion time: 135 > Start to import data from: SF_4.csv > Import from: SF_4.csv 0% │ > │ 0/482 (0:00:00 / ?) > Importing... > Import from: SF_4.csv 100% > │█████████████████████████████████████████████████████████████████████████████████████│ > 482/482 (0:00:00 / 0:00:00) Importing... > Data ingestion time: 124 > Start to import data from: SF_5.csv > Import from: SF_5.csv 0% │ > │ 0/482 (0:00:00 / ?) > Importing... > Import from: SF_5.csv 100% > │█████████████████████████████████████████████████████████████████████████████████████│ > 482/482 (0:00:00 / 0:00:00) Importing... > Data ingestion time: 123 > Start to import data from: SF_6.csv > Import from: SF_6.csv 0% │ > │ 0/482 (0:00:00 / ?) > Importing... > Import from: SF_6.csv 100% > │█████████████████████████████████████████████████████████████████████████████████████│ > 482/482 (0:00:00 / 0:00:00) Importing... > Data ingestion time: 152 > Start to import data from: SF_7.csv > Import from: SF_7.csv 0% │ > │ 0/482 (0:00:00 / ?) > Importing... > Import from: SF_7.csv 100% > │█████████████████████████████████████████████████████████████████████████████████████│ > 482/482 (0:00:00 / 0:00:00) Importing... > Data ingestion time: 195 > Start to import data from: SF_8.csv > Import from: SF_8.csv 0% │ > │ 0/482 (0:00:00 / ?) > Importing... > Import from: SF_8.csv 100% > │█████████████████████████████████████████████████████████████████████████████████████│ > 482/482 (0:00:00 / 0:00:00) Importing... > Data ingestion time: 159 > Finished parallel-1, time: 1358 > Starting parallel-2-------------------------- > Start to import data from: SF_2.csv > Start to import data from: SF_1.csv > Import from: SF_1.csv 0% │ │ 0/482 (0:00:00 / ?) Importing... > │ 0/482 (0:00:00 / ?) > Importing... > Exception in thread "pool-4-thread-1" > java.lang.ArrayIndexOutOfBoundsException: Index 41 out of bounds for length > 1 > at > org.apache.iotdb.tool.ImportCsv.loadDataFromCSV(ImportCsv.java:141) > at > org.apache.iotdb.tool.ImportCsv.importFromSingleFile(ImportCsv.java:292) > at > org.apache.iotdb.tool.ImportCsv.importCsvFromFile(ImportCsv.java:269) > at TaskIoTDBInsert.run(benchmarkIoTDB.java:43) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:834) > Start to import data from: SF_3.csv > Import from: SF_2.csv 0% │ │ 0/482 (0:00:00 / ?) Importing... > Exception in thread "pool-4-thread-2" > java.lang.StringIndexOutOfBoundsException: begin 104, end 103, length 867 > at java.base/java.lang.String.checkBoundsBeginEnd(String.java:3319) > at java.base/java.lang.String.substring(String.java:1874) > at org.apache.iotdb.tool.ImportCsv.splitCsvLine(ImportCsv.java:379) > at > org.apache.iotdb.tool.ImportCsv.loadDataFromCSV(ImportCsv.java:130) > at > org.apache.iotdb.tool.ImportCsv.importFromSingleFile(ImportCsv.java:292) > at > org.apache.iotdb.tool.ImportCsv.importCsvFromFile(ImportCsv.java:269) > at TaskIoTDBInsert.run(benchmarkIoTDB.java:43) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:834) > Start to import data from: SF_4.csv > Import from: SF_3.csv 0% │ │ 0/482 (0:00:00 / ?) Importing... > Exception in thread "pool-4-thread-3" > java.lang.StringIndexOutOfBoundsException: String index out of range: 1620 > at java.base/java.lang.StringLatin1.charAt(StringLatin1.java:47) > at java.base/java.lang.String.charAt(String.java:693) > at org.apache.iotdb.tool.ImportCsv.splitCsvLine(ImportCsv.java:381) > at > org.apache.iotdb.tool.ImportCsv.loadDataFromCSV(ImportCsv.java:130) > at > org.apache.iotdb.tool.ImportCsv.importFromSingleFile(ImportCsv.java:292) > at > org.apache.iotdb.tool.ImportCsv.importCsvFromFile(ImportCsv.java:269) > at TaskIoTDBInsert.run(benchmarkIoTDB.java:43) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:834) > Start to import data from: SF_5.csv > Import from: SF_4.csv 0% │ │ 0/482 (0:00:00 / ?) Importing... > Exception in thread "pool-4-thread-4" > java.lang.StringIndexOutOfBoundsException: String index out of range: 1508 > at java.base/java.lang.StringLatin1.charAt(StringLatin1.java:47) > at java.base/java.lang.String.charAt(String.java:693) > at org.apache.iotdb.tool.ImportCsv.splitCsvLine(ImportCsv.java:383) > at > org.apache.iotdb.tool.ImportCsv.loadDataFromCSV(ImportCsv.java:130) > at > org.apache.iotdb.tool.ImportCsv.importFromSingleFile(ImportCsv.java:292) > at > org.apache.iotdb.tool.ImportCsv.importCsvFromFile(ImportCsv.java:269) > at TaskIoTDBInsert.run(benchmarkIoTDB.java:43) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:834) > Start to import data from: SF_6.csv > Import from: SF_5.csv 0% │ │ 0/482 (0:00:00 / ?) Importing... > Exception in thread "pool-4-thread-5" > java.lang.ArrayIndexOutOfBoundsException: Index 111 out of bounds for > length 109 > at > org.apache.iotdb.tool.ImportCsv.loadDataFromCSV(ImportCsv.java:141) > at > org.apache.iotdb.tool.ImportCsv.importFromSingleFile(ImportCsv.java:292) > at > org.apache.iotdb.tool.ImportCsv.importCsvFromFile(ImportCsv.java:269) > at TaskIoTDBInsert.run(benchmarkIoTDB.java:43) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:834) > Start to import data from: SF_7.csv > Import from: SF_6.csv 0% │ │ 0/482 (0:00:00 / ?) Importing... > Exception in thread "pool-4-thread-6" > java.lang.ArrayIndexOutOfBoundsException: Index 140 out of bounds for > length 121 > at > org.apache.iotdb.tool.ImportCsv.loadDataFromCSV(ImportCsv.java:141) > at > org.apache.iotdb.tool.ImportCsv.importFromSingleFile(ImportCsv.java:292) > at > org.apache.iotdb.tool.ImportCsv.importCsvFromFile(ImportCsv.java:269) > at TaskIoTDBInsert.run(benchmarkIoTDB.java:43) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:834) > Start to import data from: SF_8.csv > Import from: SF_7.csv 0% │ │ 0/482 (0:00:00 / ?) Importing... > Exception in thread "pool-4-thread-7" > java.lang.StringIndexOutOfBoundsException: begin 149, end 148, length 920 > at java.base/java.lang.String.checkBoundsBeginEnd(String.java:3319) > at java.base/java.lang.String.substring(String.java:1874) > at org.apache.iotdb.tool.ImportCsv.splitCsvLine(ImportCsv.java:379) > at > org.apache.iotdb.tool.ImportCsv.loadDataFromCSV(ImportCsv.java:130) > at > org.apache.iotdb.tool.ImportCsv.importFromSingleFile(ImportCsv.java:292) > at > org.apache.iotdb.tool.ImportCsv.importCsvFromFile(ImportCsv.java:269) > at TaskIoTDBInsert.run(benchmarkIoTDB.java:43) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:834) > Import from: SF_8.csv 0% │ │ 0/482 (0:00:00 / ?) Importing... > Meet error when insert csv because 601: null > Data ingestion time: 120 > Finished parallel-2, time: 279 > -------------------------------------------------------------------------------------------------------------------- > > Code: > > class TaskIoTDBInsert implements Runnable { > public Connection IoTDBconnection; > public String filepath; > > public TaskIoTDBInsert(Connection IoTDBconnection, String > filepath) throws SQLException { > this.IoTDBconnection = IoTDBconnection; > this.filepath = filepath; > Statement statement = IoTDBconnection.createStatement(); > //Create storage group > try { > statement.execute("SET STORAGE GROUP TO root.SF"); > } catch (IoTDBSQLException e) { > System.out.println(e.getMessage()); > } > } > > public void run() { > long ingest_start = System.currentTimeMillis(); > ImportCsv.importCsvFromFile("0.0.0.0", "6667", "root", "root", > filepath,"+05:30"); > long ingest_end = System.currentTimeMillis(); > System.out.println("Data ingestion time: " + (ingest_end - > ingest_start)); > } > } > > ------------------------------------------------------------------- > > Thanks in advance. > > Regards, > > dgargcs > > > > On Thu, 18 Mar 2021 at 14:26, Jialin Qiao <[email protected]> > wrote: > > > Hi, > > > > It should work, could you attach the error you met? > > > > Thanks, > > -- > > Jialin Qiao > > School of Software, Tsinghua University > > > > 乔嘉林 > > 清华大学 软件学院 > > > > > -----原始邮件----- > > > 发件人: "Dhruv Garg" <[email protected]> > > > 发送时间: 2021-03-18 16:08:13 (星期四) > > > 收件人: [email protected] > > > 抄送: > > > 主题: Parallel writes from CSV into IoTDB > > > > > > Hello all, > > > > > > I have a few CSV files to write to IoTDB and I am using > > importCsvFromFile() > > > from ImportCsv class. I am able to create a connection to IoTDB using > > the > > > jdbc client and the data ingestion using the Csv API is happening. > > > > > > If I give the csv files sequentially, it works well. However, I > > wanted to > > > see if I can insert csv files parallely into IoTDB and get better > > speed. > > > Now, if I give it the same set of csv files in a thread-pool of 2 and > > try > > > parallel inserts, it throws an error. I also tried giving both > > threads a > > > separate IoTDB client connection, but got the same result. > > > > > > It would be helpful if someone can comment on this. Do let me know if > > there > > > is another API to be used or if this functionality is not supported > > at all. > > > > > > Regards, > > > dgargcs > > </[email protected]> </[email protected]></[email protected]>
