This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch import_iot_csv_file_new
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0ba6264c453f85698bdf2668698114c9fe8a7fff
Author: HTHou <[email protected]>
AuthorDate: Wed May 8 11:55:40 2024 +0800

    concurrent
---
 .../main/java/org/apache/iotdb/ImportCSVTool.java  | 128 +++++++++++++--------
 1 file changed, 79 insertions(+), 49 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/ImportCSVTool.java 
b/example/session/src/main/java/org/apache/iotdb/ImportCSVTool.java
index 3a7e702019f..4a6ef8a6609 100644
--- a/example/session/src/main/java/org/apache/iotdb/ImportCSVTool.java
+++ b/example/session/src/main/java/org/apache/iotdb/ImportCSVTool.java
@@ -1,6 +1,10 @@
 package org.apache.iotdb;
 
+import java.io.FileInputStream;
+import java.util.LinkedList;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
@@ -38,7 +42,9 @@ public class ImportCSVTool {
   private static final DateTimeFormatter formatter = 
DateTimeFormatter.ISO_OFFSET_DATE_TIME;
 
   private static SessionPool sessionPool;
-  private static ExecutorService service;
+  private static ExecutorService sessionService;
+
+  private static ExecutorService loaderService;
 
   private static final String attributesDB = "root.attributes";
 
@@ -50,7 +56,8 @@ public class ImportCSVTool {
       throws IOException, IoTDBConnectionException, 
StatementExecutionException {
     constructRedirectSessionPool();
 
-    service = Executors.newFixedThreadPool(20);
+    sessionService = Executors.newFixedThreadPool(20);
+    loaderService = Executors.newFixedThreadPool(20);
 
     try {
       sessionPool.createDatabase("root.readings");
@@ -63,32 +70,49 @@ public class ImportCSVTool {
       // do nothing
     }
 
-    String folder = "/Volumes/ExtHD/ExtDocuments/csv/newCsv";
+    String folder = "/Volumes/ExtHD/ExtDocuments/csv";
     if (args.length >= 1) {
       folder = args[0];
     }
+    List<Future<?>> futures = new LinkedList<>();
     File folderFile = SystemFileFactory.INSTANCE.getFile(folder);
+    long startTime = System.currentTimeMillis();
     if (folderFile.isDirectory()) {
       File[] files =
           
FSFactoryProducer.getFSFactory().listFilesBySuffix(folderFile.getAbsolutePath(),
 ".csv");
 
       for (File file : files) {
         if (file.getName().contains("reading")) {
-          importCsvReading(file, file.getName().contains("null"));
+          futures.add(loaderService.submit(() -> importCsvReading(file, 
file.getName().contains("null"))));
         } else {
-          importCsvDiagnostics(file, file.getName().contains("null"));
+          futures.add(loaderService.submit(() -> importCsvDiagnostics(file, 
file.getName().contains("null"))));
+        }
+      }
+      for (Future<?> task : futures) {
+        try {
+          task.get();
+        } catch (ExecutionException | InterruptedException e) {
+          throw new RuntimeException(e);
         }
       }
-      System.out.println("Import " + folder + " finished.");
+      System.out.println("Import " + folder + " finished. Total cost: " + 
(System.currentTimeMillis() - startTime));
     } else {
       if (folderFile.getName().contains("reading")) {
-        importCsvReading(folderFile, folderFile.getName().contains("null"));
+        futures.add(loaderService.submit(() -> importCsvReading(folderFile, 
folderFile.getName().contains("null"))));
       } else {
-        importCsvDiagnostics(folderFile, 
folderFile.getName().contains("null"));
+        futures.add(loaderService.submit(() -> 
importCsvDiagnostics(folderFile, folderFile.getName().contains("null"))));
       }
-      System.out.println("Import " + folder + " finished.");
+      for (Future<?> task : futures) {
+        try {
+          task.get();
+        } catch (ExecutionException | InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      }
+      System.out.println("Import " + folder + " finished. Total cost: " + 
(System.currentTimeMillis() - startTime));
     }
-    service.shutdown();
+    loaderService.shutdown();
+    sessionService.shutdown();
   }
 
   private static void constructRedirectSessionPool() {
@@ -146,7 +170,7 @@ public class ImportCSVTool {
                 attr.put("load_capacity", recordObj.get(13));
                 attr.put("fuel_capacity", recordObj.get(14));
                 attr.put("nominal_fuel_consumption", recordObj.get(15));
-                service.submit(
+                sessionService.submit(
                     () -> {
                       try {
                         sessionPool.createTimeseries(
@@ -165,7 +189,7 @@ public class ImportCSVTool {
               }
               if (currentDeviceId[0] != null) {
                 Tablet oldTablet = tabletMap.remove(currentDeviceId[0]);
-                service.submit(
+                sessionService.submit(
                     () -> {
                       try {
                         sessionPool.insertAlignedTablet(oldTablet);
@@ -176,18 +200,21 @@ public class ImportCSVTool {
                     });
               }
               currentDeviceId[0] = deviceId;
-            } else if (tabletMap.get(currentDeviceId[0]).rowSize
-                == tabletMap.get(currentDeviceId[0]).getMaxRowNumber()) {
-              Tablet oldTablet = tabletMap.remove(currentDeviceId[0]);
-              service.submit(
-                  () -> {
-                    try {
-                      sessionPool.insertAlignedTablet(oldTablet);
-                    } catch (IoTDBConnectionException | 
StatementExecutionException e) {
-                      throw new RuntimeException(e);
-                    }
-                    oldTablet.reset();
-                  });
+            } else {
+              Tablet tablet = tabletMap.get(currentDeviceId[0]);
+              if (tablet.rowSize == tablet.getMaxRowNumber()) {
+
+                Tablet oldTablet = tabletMap.remove(currentDeviceId[0]);
+                sessionService.submit(
+                    () -> {
+                      try {
+                        sessionPool.insertAlignedTablet(oldTablet);
+                      } catch (IoTDBConnectionException | 
StatementExecutionException e) {
+                        throw new RuntimeException(e);
+                      }
+                      oldTablet.reset();
+                    });
+              }
             }
             Tablet tablet = tabletMap.computeIfAbsent(deviceId, k -> new 
Tablet(deviceId, schemas));
             tablet.addTimestamp(tablet.rowSize, castTime(recordObj.get(0)));
@@ -258,7 +285,7 @@ public class ImportCSVTool {
                 attr.put("load_capacity", recordObj.get(9));
                 attr.put("fuel_capacity", recordObj.get(10));
                 attr.put("nominal_fuel_consumption", recordObj.get(11));
-                service.submit(
+                sessionService.submit(
                     () -> {
                       try {
                         sessionPool.createTimeseries(
@@ -277,7 +304,7 @@ public class ImportCSVTool {
               }
               if (currentDeviceId[0] != null) {
                 Tablet oldTablet = tabletMap.remove(currentDeviceId[0]);
-                service.submit(
+                sessionService.submit(
                     () -> {
                       try {
                         sessionPool.insertAlignedTablet(oldTablet);
@@ -288,18 +315,21 @@ public class ImportCSVTool {
                     });
               }
               currentDeviceId[0] = deviceId;
-            } else if (tabletMap.get(currentDeviceId[0]).rowSize
-                == tabletMap.get(currentDeviceId[0]).getMaxRowNumber()) {
-              Tablet oldTablet = tabletMap.remove(currentDeviceId[0]);
-              service.submit(
-                  () -> {
-                    try {
-                      sessionPool.insertAlignedTablet(oldTablet);
-                    } catch (IoTDBConnectionException | 
StatementExecutionException e) {
-                      throw new RuntimeException(e);
-                    }
-                    oldTablet.reset();
-                  });
+            } else {
+              Tablet tablet = tabletMap.get(currentDeviceId[0]);
+              if (tablet.rowSize == tablet.getMaxRowNumber()) {
+
+                Tablet oldTablet = tabletMap.remove(currentDeviceId[0]);
+                sessionService.submit(
+                    () -> {
+                      try {
+                        sessionPool.insertAlignedTablet(oldTablet);
+                      } catch (IoTDBConnectionException | 
StatementExecutionException e) {
+                        throw new RuntimeException(e);
+                      }
+                      oldTablet.reset();
+                    });
+              }
             }
             Tablet tablet = tabletMap.computeIfAbsent(deviceId, k -> new 
Tablet(deviceId, schemas));
             tablet.addTimestamp(tablet.rowSize, castTime(recordObj.get(0)));
@@ -332,17 +362,6 @@ public class ImportCSVTool {
     }
   }
 
-  private static CSVParser readCsvFile(String path) throws IOException {
-    return CSVFormat.Builder.create(CSVFormat.DEFAULT)
-        .setHeader()
-        .setSkipHeaderRecord(true)
-        .setQuote('`')
-        .setEscape('\\')
-        .setIgnoreEmptyLines(true)
-        .build()
-        .parse(new InputStreamReader(Files.newInputStream(Paths.get(path))));
-  }
-
   private static Object castValue(TSDataType dataType, String str) {
     try {
       if (dataType == TSDataType.INT32) {
@@ -363,4 +382,15 @@ public class ImportCSVTool {
     return timeStringCache.computeIfAbsent(
         str, k -> OffsetDateTime.parse(str, 
formatter).toInstant().toEpochMilli());
   }
+
+  private static CSVParser readCsvFile(String path) throws IOException {
+    return CSVFormat.Builder.create(CSVFormat.DEFAULT)
+        .setHeader()
+        .setSkipHeaderRecord(true)
+        .setQuote('`')
+        .setEscape('\\')
+        .setIgnoreEmptyLines(true)
+        .build()
+        .parse(new InputStreamReader(new FileInputStream(path)));
+  }
 }

Reply via email to