This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch import_iot_csv_file_new
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f51e17b1f18ed03eea798c10155679ea9b37ece4
Author: HTHou <[email protected]>
AuthorDate: Wed May 8 12:13:42 2024 +0800

    use files lines
---
 .../main/java/org/apache/iotdb/ImportCSVTool.java  | 406 +++++++++++----------
 1 file changed, 206 insertions(+), 200 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/ImportCSVTool.java 
b/example/session/src/main/java/org/apache/iotdb/ImportCSVTool.java
index 4a6ef8a6609..e630d6d7032 100644
--- a/example/session/src/main/java/org/apache/iotdb/ImportCSVTool.java
+++ b/example/session/src/main/java/org/apache/iotdb/ImportCSVTool.java
@@ -1,18 +1,10 @@
 package org.apache.iotdb;
 
-import java.io.FileInputStream;
-import java.util.LinkedList;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.session.pool.SessionPool;
 
-import org.apache.commons.csv.CSVFormat;
-import org.apache.commons.csv.CSVParser;
-import org.apache.commons.csv.CSVRecord;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.metadata.enums.CompressionType;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
@@ -22,19 +14,23 @@ import org.apache.tsfile.write.schema.MeasurementSchema;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.time.OffsetDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Stream;
 
 public class ImportCSVTool {
@@ -62,8 +58,10 @@ public class ImportCSVTool {
     try {
       sessionPool.createDatabase("root.readings");
       sessionPool.createDatabase("root.diagnostics");
-      sessionPool.executeNonQueryStatement("create schema template readings 
aligned (latitude DOUBLE, longitude DOUBLE, elevation INT32, velocity INT32, 
heading INT32, grade INT32,fuel_consumption DOUBLE);");
-      sessionPool.executeNonQueryStatement("create schema template diagnostics 
aligned (fuel_state DOUBLE, current_load INT32, status INT32);");
+      sessionPool.executeNonQueryStatement(
+          "create schema template readings aligned (latitude DOUBLE, longitude 
DOUBLE, elevation INT32, velocity INT32, heading INT32, grade 
INT32,fuel_consumption DOUBLE);");
+      sessionPool.executeNonQueryStatement(
+          "create schema template diagnostics aligned (fuel_state DOUBLE, 
current_load INT32, status INT32);");
       sessionPool.executeNonQueryStatement("set schema template readings to 
root.readings;");
       sessionPool.executeNonQueryStatement("set schema template diagnostics to 
root.diagnostics;");
     } catch (Exception ignore) {
@@ -83,9 +81,12 @@ public class ImportCSVTool {
 
       for (File file : files) {
         if (file.getName().contains("reading")) {
-          futures.add(loaderService.submit(() -> importCsvReading(file, 
file.getName().contains("null"))));
+          futures.add(
+              loaderService.submit(() -> importCsvReading(file, 
file.getName().contains("null"))));
         } else {
-          futures.add(loaderService.submit(() -> importCsvDiagnostics(file, 
file.getName().contains("null"))));
+          futures.add(
+              loaderService.submit(
+                  () -> importCsvDiagnostics(file, 
file.getName().contains("null"))));
         }
       }
       for (Future<?> task : futures) {
@@ -95,12 +96,20 @@ public class ImportCSVTool {
           throw new RuntimeException(e);
         }
       }
-      System.out.println("Import " + folder + " finished. Total cost: " + 
(System.currentTimeMillis() - startTime));
+      System.out.println(
+          "Import "
+              + folder
+              + " finished. Total cost: "
+              + (System.currentTimeMillis() - startTime));
     } else {
       if (folderFile.getName().contains("reading")) {
-        futures.add(loaderService.submit(() -> importCsvReading(folderFile, 
folderFile.getName().contains("null"))));
+        futures.add(
+            loaderService.submit(
+                () -> importCsvReading(folderFile, 
folderFile.getName().contains("null"))));
       } else {
-        futures.add(loaderService.submit(() -> 
importCsvDiagnostics(folderFile, folderFile.getName().contains("null"))));
+        futures.add(
+            loaderService.submit(
+                () -> importCsvDiagnostics(folderFile, 
folderFile.getName().contains("null"))));
       }
       for (Future<?> task : futures) {
         try {
@@ -109,7 +118,11 @@ public class ImportCSVTool {
           throw new RuntimeException(e);
         }
       }
-      System.out.println("Import " + folder + " finished. Total cost: " + 
(System.currentTimeMillis() - startTime));
+      System.out.println(
+          "Import "
+              + folder
+              + " finished. Total cost: "
+              + (System.currentTimeMillis() - startTime));
     }
     loaderService.shutdown();
     sessionService.shutdown();
@@ -140,101 +153,103 @@ public class ImportCSVTool {
     schemas.add(new MeasurementSchema("grade", TSDataType.INT32));
     schemas.add(new MeasurementSchema("fuel_consumption", TSDataType.DOUBLE));
 
-    try (CSVParser csvRecords = readCsvFile(file.getAbsolutePath())) {
+    try (Stream<String> lines = 
Files.lines(Paths.get(file.getAbsolutePath()))) {
       long startTime = System.currentTimeMillis();
       long total = file.length();
       long progressInterval = total / 100;
-      Stream<CSVRecord> records = csvRecords.stream();
       AtomicInteger currentProgress = new AtomicInteger();
-
+      AtomicLong position = new AtomicLong();
       final String[] currentDeviceId = {null};
-      records.forEach(
-          recordObj -> {
-            String name = isNameNull ? "r_null" : recordObj.get(8);
-            String deviceId =
-                database
-                    + "."
-                    + recordObj.get(9)
-                    + ".`"
-                    + recordObj.get(11)
-                    + "`."
-                    + name
-                    + "."
-                    + recordObj.get(10);
-            if (!deviceId.equals(currentDeviceId[0])) {
-              if (!nameSet.contains(name)) {
-                nameSet.add(name);
-                String attrDevice = attributesDB + "." + name + ".attr";
-                Map<String, String> attr = new HashMap<>(4);
-                attr.put("device_version", recordObj.get(12));
-                attr.put("load_capacity", recordObj.get(13));
-                attr.put("fuel_capacity", recordObj.get(14));
-                attr.put("nominal_fuel_consumption", recordObj.get(15));
-                sessionService.submit(
-                    () -> {
-                      try {
-                        sessionPool.createTimeseries(
-                            attrDevice,
-                            TSDataType.BOOLEAN,
-                            TSEncoding.RLE,
-                            CompressionType.LZ4,
-                            null,
-                            null,
-                            attr,
-                            null);
-                      } catch (IoTDBConnectionException | 
StatementExecutionException e) {
-                        throw new RuntimeException(e);
-                      }
-                    });
+      lines.forEach(
+          line -> {
+            if (position.getAndAdd(line.length()) > 0) {
+              String[] recordObj = line.split(",");
+              String name = isNameNull ? "r_null" : recordObj[8];
+              String deviceId =
+                  database
+                      + "."
+                      + recordObj[9]
+                      + ".`"
+                      + recordObj[11]
+                      + "`."
+                      + name
+                      + "."
+                      + recordObj[10];
+              if (!deviceId.equals(currentDeviceId[0])) {
+                if (!nameSet.contains(name)) {
+                  nameSet.add(name);
+                  String attrDevice = attributesDB + "." + name + ".attr";
+                  Map<String, String> attr = new HashMap<>(4);
+                  attr.put("device_version", recordObj[12]);
+                  attr.put("load_capacity", recordObj[13]);
+                  attr.put("fuel_capacity", recordObj[14]);
+                  attr.put("nominal_fuel_consumption", recordObj[15]);
+                  sessionService.submit(
+                      () -> {
+                        try {
+                          sessionPool.createTimeseries(
+                              attrDevice,
+                              TSDataType.BOOLEAN,
+                              TSEncoding.RLE,
+                              CompressionType.LZ4,
+                              null,
+                              null,
+                              attr,
+                              null);
+                        } catch (IoTDBConnectionException | 
StatementExecutionException e) {
+                          throw new RuntimeException(e);
+                        }
+                      });
+                }
+                if (currentDeviceId[0] != null) {
+                  Tablet oldTablet = tabletMap.remove(currentDeviceId[0]);
+                  sessionService.submit(
+                      () -> {
+                        try {
+                          sessionPool.insertAlignedTablet(oldTablet);
+                        } catch (IoTDBConnectionException | 
StatementExecutionException e) {
+                          throw new RuntimeException(e);
+                        }
+                        oldTablet.reset();
+                      });
+                }
+                currentDeviceId[0] = deviceId;
+              } else {
+                Tablet tablet = tabletMap.get(currentDeviceId[0]);
+                if (tablet.rowSize == tablet.getMaxRowNumber()) {
+                  Tablet oldTablet = tabletMap.remove(currentDeviceId[0]);
+                  sessionService.submit(
+                      () -> {
+                        try {
+                          sessionPool.insertAlignedTablet(oldTablet);
+                        } catch (IoTDBConnectionException | 
StatementExecutionException e) {
+                          throw new RuntimeException(e);
+                        }
+                        oldTablet.reset();
+                      });
+                }
               }
-              if (currentDeviceId[0] != null) {
-                Tablet oldTablet = tabletMap.remove(currentDeviceId[0]);
-                sessionService.submit(
-                    () -> {
-                      try {
-                        sessionPool.insertAlignedTablet(oldTablet);
-                      } catch (IoTDBConnectionException | 
StatementExecutionException e) {
-                        throw new RuntimeException(e);
-                      }
-                      oldTablet.reset();
-                    });
+              Tablet tablet =
+                  tabletMap.computeIfAbsent(deviceId, k -> new 
Tablet(deviceId, schemas));
+              tablet.addTimestamp(tablet.rowSize, castTime(recordObj[0]));
+              for (int i = 0; i < schemas.size(); i++) {
+                MeasurementSchema schema = schemas.get(i);
+                tablet.addValue(
+                    schema.getMeasurementId(),
+                    tablet.rowSize,
+                    castValue(schema.getType(), recordObj[i + 1]));
               }
-              currentDeviceId[0] = deviceId;
-            } else {
-              Tablet tablet = tabletMap.get(currentDeviceId[0]);
-              if (tablet.rowSize == tablet.getMaxRowNumber()) {
-
-                Tablet oldTablet = tabletMap.remove(currentDeviceId[0]);
-                sessionService.submit(
-                    () -> {
-                      try {
-                        sessionPool.insertAlignedTablet(oldTablet);
-                      } catch (IoTDBConnectionException | 
StatementExecutionException e) {
-                        throw new RuntimeException(e);
-                      }
-                      oldTablet.reset();
-                    });
+              tablet.rowSize++;
+              if (position.get() / progressInterval > currentProgress.get()) {
+                currentProgress.set((int) (position.get() / progressInterval));
+                System.out.println(
+                    file.getName()
+                        + " Progress: "
+                        + currentProgress.get()
+                        + "% cost "
+                        + (System.currentTimeMillis() - startTime));
               }
             }
-            Tablet tablet = tabletMap.computeIfAbsent(deviceId, k -> new 
Tablet(deviceId, schemas));
-            tablet.addTimestamp(tablet.rowSize, castTime(recordObj.get(0)));
-            for (int i = 0; i < schemas.size(); i++) {
-              MeasurementSchema schema = schemas.get(i);
-              tablet.addValue(
-                  schema.getMeasurementId(),
-                  tablet.rowSize,
-                  castValue(schema.getType(), recordObj.get(i + 1)));
-            }
-            tablet.rowSize++;
-            if (recordObj.getCharacterPosition() / progressInterval > 
currentProgress.get()) {
-              currentProgress.set((int) (recordObj.getCharacterPosition() / 
progressInterval));
-              System.out.println(
-                  file.getName()
-                      + " Progress: "
-                      + currentProgress.get()
-                      + "% cost "
-                      + (System.currentTimeMillis() - startTime));
-            }
           });
       if (!tabletMap.isEmpty()) {
         for (Tablet tablet : tabletMap.values()) {
@@ -255,101 +270,103 @@ public class ImportCSVTool {
     schemas.add(new MeasurementSchema("current_load", TSDataType.INT32));
     schemas.add(new MeasurementSchema("status", TSDataType.INT32));
 
-    try (CSVParser csvRecords = readCsvFile(file.getAbsolutePath())) {
+    try (Stream<String> lines = 
Files.lines(Paths.get(file.getAbsolutePath()))) {
       long startTime = System.currentTimeMillis();
       long total = file.length();
       long progressInterval = total / 100;
-      Stream<CSVRecord> records = csvRecords.stream();
       AtomicInteger currentProgress = new AtomicInteger();
-
+      AtomicLong position = new AtomicLong();
       final String[] currentDeviceId = {null};
-      records.forEach(
-          recordObj -> {
-            String name = isNameNull ? "d_null" : recordObj.get(4);
-            String deviceId =
-                database
-                    + "."
-                    + recordObj.get(5)
-                    + ".`"
-                    + recordObj.get(7)
-                    + "`."
-                    + name
-                    + "."
-                    + recordObj.get(6);
-            if (!deviceId.equals(currentDeviceId[0])) {
-              if (!nameSet.contains(name)) {
-                nameSet.add(name);
-                String attrDevice = attributesDB + "." + name + ".attr";
-                Map<String, String> attr = new HashMap<>(4);
-                attr.put("device_version", recordObj.get(8));
-                attr.put("load_capacity", recordObj.get(9));
-                attr.put("fuel_capacity", recordObj.get(10));
-                attr.put("nominal_fuel_consumption", recordObj.get(11));
-                sessionService.submit(
-                    () -> {
-                      try {
-                        sessionPool.createTimeseries(
-                            attrDevice,
-                            TSDataType.BOOLEAN,
-                            TSEncoding.RLE,
-                            CompressionType.LZ4,
-                            null,
-                            null,
-                            attr,
-                            null);
-                      } catch (IoTDBConnectionException | 
StatementExecutionException e) {
-                        throw new RuntimeException(e);
-                      }
-                    });
+      lines.forEach(
+          line -> {
+            if (position.getAndAdd(line.length()) > 0) {
+              String[] recordObj = line.split(",");
+              String name = isNameNull ? "d_null" : recordObj[4];
+              String deviceId =
+                  database
+                      + "."
+                      + recordObj[5]
+                      + ".`"
+                      + recordObj[7]
+                      + "`."
+                      + name
+                      + "."
+                      + recordObj[6];
+              if (!deviceId.equals(currentDeviceId[0])) {
+                if (!nameSet.contains(name)) {
+                  nameSet.add(name);
+                  String attrDevice = attributesDB + "." + name + ".attr";
+                  Map<String, String> attr = new HashMap<>(4);
+                  attr.put("device_version", recordObj[8]);
+                  attr.put("load_capacity", recordObj[9]);
+                  attr.put("fuel_capacity", recordObj[10]);
+                  attr.put("nominal_fuel_consumption", recordObj[11]);
+                  sessionService.submit(
+                      () -> {
+                        try {
+                          sessionPool.createTimeseries(
+                              attrDevice,
+                              TSDataType.BOOLEAN,
+                              TSEncoding.RLE,
+                              CompressionType.LZ4,
+                              null,
+                              null,
+                              attr,
+                              null);
+                        } catch (IoTDBConnectionException | 
StatementExecutionException e) {
+                          throw new RuntimeException(e);
+                        }
+                      });
+                }
+                if (currentDeviceId[0] != null) {
+                  Tablet oldTablet = tabletMap.remove(currentDeviceId[0]);
+                  sessionService.submit(
+                      () -> {
+                        try {
+                          sessionPool.insertAlignedTablet(oldTablet);
+                        } catch (IoTDBConnectionException | 
StatementExecutionException e) {
+                          throw new RuntimeException(e);
+                        }
+                        oldTablet.reset();
+                      });
+                }
+                currentDeviceId[0] = deviceId;
+              } else {
+                Tablet tablet = tabletMap.get(currentDeviceId[0]);
+                if (tablet.rowSize == tablet.getMaxRowNumber()) {
+                  Tablet oldTablet = tabletMap.remove(currentDeviceId[0]);
+                  sessionService.submit(
+                      () -> {
+                        try {
+                          sessionPool.insertAlignedTablet(oldTablet);
+                        } catch (IoTDBConnectionException | 
StatementExecutionException e) {
+                          throw new RuntimeException(e);
+                        }
+                        oldTablet.reset();
+                      });
+                }
               }
-              if (currentDeviceId[0] != null) {
-                Tablet oldTablet = tabletMap.remove(currentDeviceId[0]);
-                sessionService.submit(
-                    () -> {
-                      try {
-                        sessionPool.insertAlignedTablet(oldTablet);
-                      } catch (IoTDBConnectionException | 
StatementExecutionException e) {
-                        throw new RuntimeException(e);
-                      }
-                      oldTablet.reset();
-                    });
+              Tablet tablet =
+                  tabletMap.computeIfAbsent(deviceId, k -> new 
Tablet(deviceId, schemas));
+              tablet.addTimestamp(tablet.rowSize, castTime(recordObj[0]));
+              for (int i = 0; i < schemas.size(); i++) {
+                MeasurementSchema schema = schemas.get(i);
+                tablet.addValue(
+                    schema.getMeasurementId(),
+                    tablet.rowSize,
+                    castValue(schema.getType(), recordObj[i + 1]));
               }
-              currentDeviceId[0] = deviceId;
-            } else {
-              Tablet tablet = tabletMap.get(currentDeviceId[0]);
-              if (tablet.rowSize == tablet.getMaxRowNumber()) {
-
-                Tablet oldTablet = tabletMap.remove(currentDeviceId[0]);
-                sessionService.submit(
-                    () -> {
-                      try {
-                        sessionPool.insertAlignedTablet(oldTablet);
-                      } catch (IoTDBConnectionException | 
StatementExecutionException e) {
-                        throw new RuntimeException(e);
-                      }
-                      oldTablet.reset();
-                    });
+              tablet.rowSize++;
+              if (position.get() / progressInterval > currentProgress.get()) {
+                currentProgress.set((int) (position.get() / progressInterval));
+                System.out.println(
+                    file.getName()
+                        + " Progress: "
+                        + currentProgress.get()
+                        + "% cost "
+                        + (System.currentTimeMillis() - startTime));
               }
             }
-            Tablet tablet = tabletMap.computeIfAbsent(deviceId, k -> new 
Tablet(deviceId, schemas));
-            tablet.addTimestamp(tablet.rowSize, castTime(recordObj.get(0)));
-            for (int i = 0; i < schemas.size(); i++) {
-              MeasurementSchema schema = schemas.get(i);
-              tablet.addValue(
-                  schema.getMeasurementId(),
-                  tablet.rowSize,
-                  castValue(schema.getType(), recordObj.get(i + 1)));
-            }
-            tablet.rowSize++;
-            if (recordObj.getCharacterPosition() / progressInterval > 
currentProgress.get()) {
-              currentProgress.set((int) (recordObj.getCharacterPosition() / 
progressInterval));
-              System.out.println(
-                  file.getName()
-                      + " Progress: "
-                      + currentProgress.get()
-                      + "% cost "
-                      + (System.currentTimeMillis() - startTime));
-            }
           });
       if (!tabletMap.isEmpty()) {
         for (Tablet tablet : tabletMap.values()) {
@@ -382,15 +399,4 @@ public class ImportCSVTool {
     return timeStringCache.computeIfAbsent(
         str, k -> OffsetDateTime.parse(str, 
formatter).toInstant().toEpochMilli());
   }
-
-  private static CSVParser readCsvFile(String path) throws IOException {
-    return CSVFormat.Builder.create(CSVFormat.DEFAULT)
-        .setHeader()
-        .setSkipHeaderRecord(true)
-        .setQuote('`')
-        .setEscape('\\')
-        .setIgnoreEmptyLines(true)
-        .build()
-        .parse(new InputStreamReader(new FileInputStream(path)));
-  }
 }

Reply via email to