Hi Dhruv,

It may due to the concurrency problem.

The ImportCsv is not thread-safe due to the following static fields:

> private static int i;
> private static int startIndex;

You could try to use two progresses instead of two threads.

Thanks,
--
Jialin Qiao
School of Software, Tsinghua University

乔嘉林
清华大学 软件学院

> -----原始邮件-----
&gt; 发件人: "Dhruv Garg" <[email protected]>
&gt; 发送时间: 2021-03-18 17:14:56 (星期四)
&gt; 收件人: [email protected]
&gt; 抄送: 
&gt; 主题: Re: Parallel writes from CSV into IoTDB
&gt; 
&gt; Hello Jialin,
&gt; 
&gt; Here it is. Errors start after "Parallel-2" starts. These are the same
&gt; blocks which were used for a single thread. And it succeeded in the
&gt; sequential run. Hence I am thinking that it might not be due to a faulty
&gt; CSV. I have also attached the parallel task code. Do let me know in case
&gt; you feel something else might be going wrong.
&gt; 
&gt; Starting parallel-1--------------------------
&gt; Start to import data from: SF_1.csv
&gt; Import from: SF_1.csv   0% │
&gt;                                            │   0/482 (0:00:00 / ?)
&gt; Importing...
&gt; Import from: SF_1.csv 100%
&gt; 
│█████████████████████████████████████████████████████████████████████████████████████│
&gt; 482/482 (0:00:00 / 0:00:00) Importing...
&gt; Data ingestion time: 278
&gt; Start to import data from: SF_2.csv
&gt; Import from: SF_2.csv   0% │
&gt;                                            │   0/482 (0:00:00 / ?)
&gt; Importing...
&gt; Import from: SF_2.csv 100%
&gt; 
│█████████████████████████████████████████████████████████████████████████████████████│
&gt; 482/482 (0:00:00 / 0:00:00) Importing...
&gt; Data ingestion time: 191
&gt; Start to import data from: SF_3.csv
&gt; Import from: SF_3.csv   0% │
&gt;                                            │   0/482 (0:00:00 / ?)
&gt; Importing...
&gt; Import from: SF_3.csv 100%
&gt; 
│█████████████████████████████████████████████████████████████████████████████████████│
&gt; 482/482 (0:00:00 / 0:00:00) Importing...
&gt; Data ingestion time: 135
&gt; Start to import data from: SF_4.csv
&gt; Import from: SF_4.csv   0% │
&gt;                                            │   0/482 (0:00:00 / ?)
&gt; Importing...
&gt; Import from: SF_4.csv 100%
&gt; 
│█████████████████████████████████████████████████████████████████████████████████████│
&gt; 482/482 (0:00:00 / 0:00:00) Importing...
&gt; Data ingestion time: 124
&gt; Start to import data from: SF_5.csv
&gt; Import from: SF_5.csv   0% │
&gt;                                            │   0/482 (0:00:00 / ?)
&gt; Importing...
&gt; Import from: SF_5.csv 100%
&gt; 
│█████████████████████████████████████████████████████████████████████████████████████│
&gt; 482/482 (0:00:00 / 0:00:00) Importing...
&gt; Data ingestion time: 123
&gt; Start to import data from: SF_6.csv
&gt; Import from: SF_6.csv   0% │
&gt;                                            │   0/482 (0:00:00 / ?)
&gt; Importing...
&gt; Import from: SF_6.csv 100%
&gt; 
│█████████████████████████████████████████████████████████████████████████████████████│
&gt; 482/482 (0:00:00 / 0:00:00) Importing...
&gt; Data ingestion time: 152
&gt; Start to import data from: SF_7.csv
&gt; Import from: SF_7.csv   0% │
&gt;                                            │   0/482 (0:00:00 / ?)
&gt; Importing...
&gt; Import from: SF_7.csv 100%
&gt; 
│█████████████████████████████████████████████████████████████████████████████████████│
&gt; 482/482 (0:00:00 / 0:00:00) Importing...
&gt; Data ingestion time: 195
&gt; Start to import data from: SF_8.csv
&gt; Import from: SF_8.csv   0% │
&gt;                                            │   0/482 (0:00:00 / ?)
&gt; Importing...
&gt; Import from: SF_8.csv 100%
&gt; 
│█████████████████████████████████████████████████████████████████████████████████████│
&gt; 482/482 (0:00:00 / 0:00:00) Importing...
&gt; Data ingestion time: 159
&gt; Finished parallel-1, time: 1358
&gt; Starting parallel-2--------------------------
&gt; Start to import data from: SF_2.csv
&gt; Start to import data from: SF_1.csv
&gt; Import from: SF_1.csv   0% │     │   0/482 (0:00:00 / ?) Importing...
&gt;                                             │   0/482 (0:00:00 / ?)
&gt; Importing...
&gt; Exception in thread "pool-4-thread-1"
&gt; java.lang.ArrayIndexOutOfBoundsException: Index 41 out of bounds for length
&gt; 1
&gt;         at
&gt; org.apache.iotdb.tool.ImportCsv.loadDataFromCSV(ImportCsv.java:141)
&gt;         at
&gt; org.apache.iotdb.tool.ImportCsv.importFromSingleFile(ImportCsv.java:292)
&gt;         at
&gt; org.apache.iotdb.tool.ImportCsv.importCsvFromFile(ImportCsv.java:269)
&gt;         at TaskIoTDBInsert.run(benchmarkIoTDB.java:43)
&gt;         at
&gt; 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
&gt;         at
&gt; 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
&gt;         at java.base/java.lang.Thread.run(Thread.java:834)
&gt; Start to import data from: SF_3.csv
&gt; Import from: SF_2.csv   0% │     │   0/482 (0:00:00 / ?) Importing...
&gt; Exception in thread "pool-4-thread-2"
&gt; java.lang.StringIndexOutOfBoundsException: begin 104, end 103, length 867
&gt;         at java.base/java.lang.String.checkBoundsBeginEnd(String.java:3319)
&gt;         at java.base/java.lang.String.substring(String.java:1874)
&gt;         at org.apache.iotdb.tool.ImportCsv.splitCsvLine(ImportCsv.java:379)
&gt;         at
&gt; org.apache.iotdb.tool.ImportCsv.loadDataFromCSV(ImportCsv.java:130)
&gt;         at
&gt; org.apache.iotdb.tool.ImportCsv.importFromSingleFile(ImportCsv.java:292)
&gt;         at
&gt; org.apache.iotdb.tool.ImportCsv.importCsvFromFile(ImportCsv.java:269)
&gt;         at TaskIoTDBInsert.run(benchmarkIoTDB.java:43)
&gt;         at
&gt; 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
&gt;         at
&gt; 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
&gt;         at java.base/java.lang.Thread.run(Thread.java:834)
&gt; Start to import data from: SF_4.csv
&gt; Import from: SF_3.csv   0% │     │   0/482 (0:00:00 / ?) Importing...
&gt; Exception in thread "pool-4-thread-3"
&gt; java.lang.StringIndexOutOfBoundsException: String index out of range: 1620
&gt;         at java.base/java.lang.StringLatin1.charAt(StringLatin1.java:47)
&gt;         at java.base/java.lang.String.charAt(String.java:693)
&gt;         at org.apache.iotdb.tool.ImportCsv.splitCsvLine(ImportCsv.java:381)
&gt;         at
&gt; org.apache.iotdb.tool.ImportCsv.loadDataFromCSV(ImportCsv.java:130)
&gt;         at
&gt; org.apache.iotdb.tool.ImportCsv.importFromSingleFile(ImportCsv.java:292)
&gt;         at
&gt; org.apache.iotdb.tool.ImportCsv.importCsvFromFile(ImportCsv.java:269)
&gt;         at TaskIoTDBInsert.run(benchmarkIoTDB.java:43)
&gt;         at
&gt; 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
&gt;         at
&gt; 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
&gt;         at java.base/java.lang.Thread.run(Thread.java:834)
&gt; Start to import data from: SF_5.csv
&gt; Import from: SF_4.csv   0% │     │   0/482 (0:00:00 / ?) Importing...
&gt; Exception in thread "pool-4-thread-4"
&gt; java.lang.StringIndexOutOfBoundsException: String index out of range: 1508
&gt;         at java.base/java.lang.StringLatin1.charAt(StringLatin1.java:47)
&gt;         at java.base/java.lang.String.charAt(String.java:693)
&gt;         at org.apache.iotdb.tool.ImportCsv.splitCsvLine(ImportCsv.java:383)
&gt;         at
&gt; org.apache.iotdb.tool.ImportCsv.loadDataFromCSV(ImportCsv.java:130)
&gt;         at
&gt; org.apache.iotdb.tool.ImportCsv.importFromSingleFile(ImportCsv.java:292)
&gt;         at
&gt; org.apache.iotdb.tool.ImportCsv.importCsvFromFile(ImportCsv.java:269)
&gt;         at TaskIoTDBInsert.run(benchmarkIoTDB.java:43)
&gt;         at
&gt; 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
&gt;         at
&gt; 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
&gt;         at java.base/java.lang.Thread.run(Thread.java:834)
&gt; Start to import data from: SF_6.csv
&gt; Import from: SF_5.csv   0% │     │   0/482 (0:00:00 / ?) Importing...
&gt; Exception in thread "pool-4-thread-5"
&gt; java.lang.ArrayIndexOutOfBoundsException: Index 111 out of bounds for
&gt; length 109
&gt;         at
&gt; org.apache.iotdb.tool.ImportCsv.loadDataFromCSV(ImportCsv.java:141)
&gt;         at
&gt; org.apache.iotdb.tool.ImportCsv.importFromSingleFile(ImportCsv.java:292)
&gt;         at
&gt; org.apache.iotdb.tool.ImportCsv.importCsvFromFile(ImportCsv.java:269)
&gt;         at TaskIoTDBInsert.run(benchmarkIoTDB.java:43)
&gt;         at
&gt; 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
&gt;         at
&gt; 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
&gt;         at java.base/java.lang.Thread.run(Thread.java:834)
&gt; Start to import data from: SF_7.csv
&gt; Import from: SF_6.csv   0% │     │   0/482 (0:00:00 / ?) Importing...
&gt; Exception in thread "pool-4-thread-6"
&gt; java.lang.ArrayIndexOutOfBoundsException: Index 140 out of bounds for
&gt; length 121
&gt;         at
&gt; org.apache.iotdb.tool.ImportCsv.loadDataFromCSV(ImportCsv.java:141)
&gt;         at
&gt; org.apache.iotdb.tool.ImportCsv.importFromSingleFile(ImportCsv.java:292)
&gt;         at
&gt; org.apache.iotdb.tool.ImportCsv.importCsvFromFile(ImportCsv.java:269)
&gt;         at TaskIoTDBInsert.run(benchmarkIoTDB.java:43)
&gt;         at
&gt; 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
&gt;         at
&gt; 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
&gt;         at java.base/java.lang.Thread.run(Thread.java:834)
&gt; Start to import data from: SF_8.csv
&gt; Import from: SF_7.csv   0% │     │   0/482 (0:00:00 / ?) Importing...
&gt; Exception in thread "pool-4-thread-7"
&gt; java.lang.StringIndexOutOfBoundsException: begin 149, end 148, length 920
&gt;         at java.base/java.lang.String.checkBoundsBeginEnd(String.java:3319)
&gt;         at java.base/java.lang.String.substring(String.java:1874)
&gt;         at org.apache.iotdb.tool.ImportCsv.splitCsvLine(ImportCsv.java:379)
&gt;         at
&gt; org.apache.iotdb.tool.ImportCsv.loadDataFromCSV(ImportCsv.java:130)
&gt;         at
&gt; org.apache.iotdb.tool.ImportCsv.importFromSingleFile(ImportCsv.java:292)
&gt;         at
&gt; org.apache.iotdb.tool.ImportCsv.importCsvFromFile(ImportCsv.java:269)
&gt;         at TaskIoTDBInsert.run(benchmarkIoTDB.java:43)
&gt;         at
&gt; 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
&gt;         at
&gt; 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
&gt;         at java.base/java.lang.Thread.run(Thread.java:834)
&gt; Import from: SF_8.csv   0% │     │   0/482 (0:00:00 / ?) Importing...
&gt; Meet error when insert csv because 601: null
&gt; Data ingestion time: 120
&gt; Finished parallel-2, time: 279
&gt; 
--------------------------------------------------------------------------------------------------------------------
&gt; 
&gt; Code:
&gt; 
&gt; class TaskIoTDBInsert implements Runnable {
&gt;     public Connection IoTDBconnection;
&gt;     public String filepath;
&gt; 
&gt;     public TaskIoTDBInsert(Connection IoTDBconnection, String
&gt; filepath) throws SQLException {
&gt;         this.IoTDBconnection = IoTDBconnection;
&gt;         this.filepath = filepath;
&gt;         Statement statement = IoTDBconnection.createStatement();
&gt;         //Create storage group
&gt;         try {
&gt;             statement.execute("SET STORAGE GROUP TO root.SF");
&gt;         } catch (IoTDBSQLException e) {
&gt;             System.out.println(e.getMessage());
&gt;         }
&gt;     }
&gt; 
&gt;     public void run() {
&gt;         long ingest_start = System.currentTimeMillis();
&gt;         ImportCsv.importCsvFromFile("0.0.0.0", "6667", "root", "root",
&gt; filepath,"+05:30");
&gt;         long ingest_end = System.currentTimeMillis();
&gt;         System.out.println("Data ingestion time: " + (ingest_end -
&gt; ingest_start));
&gt;     }
&gt; }
&gt; 
&gt; -------------------------------------------------------------------
&gt; 
&gt; Thanks in advance.
&gt; 
&gt; Regards,
&gt; 
&gt; dgargcs
&gt; 
&gt; 
&gt; 
&gt; On Thu, 18 Mar 2021 at 14:26, Jialin Qiao <[email protected]>
&gt; wrote:
&gt; 
&gt; &gt; Hi,
&gt; &gt;
&gt; &gt; It should work, could you attach the error you met?
&gt; &gt;
&gt; &gt; Thanks,
&gt; &gt; --
&gt; &gt; Jialin Qiao
&gt; &gt; School of Software, Tsinghua University
&gt; &gt;
&gt; &gt; 乔嘉林
&gt; &gt; 清华大学 软件学院
&gt; &gt;
&gt; &gt; &gt; -----原始邮件-----
&gt; &gt; &gt; 发件人: "Dhruv Garg" <[email protected]>
&gt; &gt; &gt; 发送时间: 2021-03-18 16:08:13 (星期四)
&gt; &gt; &gt; 收件人: [email protected]
&gt; &gt; &gt; 抄送:
&gt; &gt; &gt; 主题: Parallel writes from CSV into IoTDB
&gt; &gt; &gt;
&gt; &gt; &gt; Hello all,
&gt; &gt; &gt;
&gt; &gt; &gt; I have a few CSV files to write to IoTDB and I am using
&gt; &gt; importCsvFromFile()
&gt; &gt; &gt; from ImportCsv class. I am able to create a connection to IoTDB 
using
&gt; &gt; the
&gt; &gt; &gt; jdbc client and the data ingestion using the Csv API is 
happening.
&gt; &gt; &gt;
&gt; &gt; &gt; If I give the csv files sequentially, it works well. However, I
&gt; &gt; wanted to
&gt; &gt; &gt; see if I can insert csv files parallely into IoTDB and get better
&gt; &gt; speed.
&gt; &gt; &gt; Now, if I give it the same set of csv files in a thread-pool of 
2 and
&gt; &gt; try
&gt; &gt; &gt; parallel inserts, it throws an error. I also tried giving both
&gt; &gt; threads a
&gt; &gt; &gt; separate IoTDB client connection, but got the same result.
&gt; &gt; &gt;
&gt; &gt; &gt; It would be helpful if someone can comment on this. Do let me 
know if
&gt; &gt; there
&gt; &gt; &gt; is another API to be used or if this functionality is not 
supported
&gt; &gt; at all.
&gt; &gt; &gt;
&gt; &gt; &gt; Regards,
&gt; &gt; &gt; dgargcs
&gt; &gt; </[email protected]>
</[email protected]></[email protected]>

Reply via email to