This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch import_iot_csv_file_new in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0ba6264c453f85698bdf2668698114c9fe8a7fff Author: HTHou <[email protected]> AuthorDate: Wed May 8 11:55:40 2024 +0800 concurrent --- .../main/java/org/apache/iotdb/ImportCSVTool.java | 128 +++++++++++++-------- 1 file changed, 79 insertions(+), 49 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/ImportCSVTool.java b/example/session/src/main/java/org/apache/iotdb/ImportCSVTool.java index 3a7e702019f..4a6ef8a6609 100644 --- a/example/session/src/main/java/org/apache/iotdb/ImportCSVTool.java +++ b/example/session/src/main/java/org/apache/iotdb/ImportCSVTool.java @@ -1,6 +1,10 @@ package org.apache.iotdb; +import java.io.FileInputStream; +import java.util.LinkedList; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.apache.iotdb.commons.file.SystemFileFactory; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; @@ -38,7 +42,9 @@ public class ImportCSVTool { private static final DateTimeFormatter formatter = DateTimeFormatter.ISO_OFFSET_DATE_TIME; private static SessionPool sessionPool; - private static ExecutorService service; + private static ExecutorService sessionService; + + private static ExecutorService loaderService; private static final String attributesDB = "root.attributes"; @@ -50,7 +56,8 @@ public class ImportCSVTool { throws IOException, IoTDBConnectionException, StatementExecutionException { constructRedirectSessionPool(); - service = Executors.newFixedThreadPool(20); + sessionService = Executors.newFixedThreadPool(20); + loaderService = Executors.newFixedThreadPool(20); try { sessionPool.createDatabase("root.readings"); @@ -63,32 +70,49 @@ public class ImportCSVTool { // do nothing } - String folder = "/Volumes/ExtHD/ExtDocuments/csv/newCsv"; + String folder = "/Volumes/ExtHD/ExtDocuments/csv"; if (args.length >= 1) { folder = args[0]; } + List<Future<?>> futures = new LinkedList<>(); File folderFile = SystemFileFactory.INSTANCE.getFile(folder); + long startTime = System.currentTimeMillis(); if (folderFile.isDirectory()) { File[] files = FSFactoryProducer.getFSFactory().listFilesBySuffix(folderFile.getAbsolutePath(), ".csv"); for (File file : files) { if (file.getName().contains("reading")) { - importCsvReading(file, file.getName().contains("null")); + futures.add(loaderService.submit(() -> importCsvReading(file, file.getName().contains("null")))); } else { - importCsvDiagnostics(file, file.getName().contains("null")); + futures.add(loaderService.submit(() -> importCsvDiagnostics(file, file.getName().contains("null")))); + } + } + for (Future<?> task : futures) { + try { + task.get(); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); } } - System.out.println("Import " + folder + " finished."); + System.out.println("Import " + folder + " finished. Total cost: " + (System.currentTimeMillis() - startTime)); } else { if (folderFile.getName().contains("reading")) { - importCsvReading(folderFile, folderFile.getName().contains("null")); + futures.add(loaderService.submit(() -> importCsvReading(folderFile, folderFile.getName().contains("null")))); } else { - importCsvDiagnostics(folderFile, folderFile.getName().contains("null")); + futures.add(loaderService.submit(() -> importCsvDiagnostics(folderFile, folderFile.getName().contains("null")))); } - System.out.println("Import " + folder + " finished."); + for (Future<?> task : futures) { + try { + task.get(); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } + System.out.println("Import " + folder + " finished. Total cost: " + (System.currentTimeMillis() - startTime)); } - service.shutdown(); + loaderService.shutdown(); + sessionService.shutdown(); } private static void constructRedirectSessionPool() { @@ -146,7 +170,7 @@ public class ImportCSVTool { attr.put("load_capacity", recordObj.get(13)); attr.put("fuel_capacity", recordObj.get(14)); attr.put("nominal_fuel_consumption", recordObj.get(15)); - service.submit( + sessionService.submit( () -> { try { sessionPool.createTimeseries( @@ -165,7 +189,7 @@ public class ImportCSVTool { } if (currentDeviceId[0] != null) { Tablet oldTablet = tabletMap.remove(currentDeviceId[0]); - service.submit( + sessionService.submit( () -> { try { sessionPool.insertAlignedTablet(oldTablet); @@ -176,18 +200,21 @@ public class ImportCSVTool { }); } currentDeviceId[0] = deviceId; - } else if (tabletMap.get(currentDeviceId[0]).rowSize - == tabletMap.get(currentDeviceId[0]).getMaxRowNumber()) { - Tablet oldTablet = tabletMap.remove(currentDeviceId[0]); - service.submit( - () -> { - try { - sessionPool.insertAlignedTablet(oldTablet); - } catch (IoTDBConnectionException | StatementExecutionException e) { - throw new RuntimeException(e); - } - oldTablet.reset(); - }); + } else { + Tablet tablet = tabletMap.get(currentDeviceId[0]); + if (tablet.rowSize == tablet.getMaxRowNumber()) { + + Tablet oldTablet = tabletMap.remove(currentDeviceId[0]); + sessionService.submit( + () -> { + try { + sessionPool.insertAlignedTablet(oldTablet); + } catch (IoTDBConnectionException | StatementExecutionException e) { + throw new RuntimeException(e); + } + oldTablet.reset(); + }); + } } Tablet tablet = tabletMap.computeIfAbsent(deviceId, k -> new Tablet(deviceId, schemas)); tablet.addTimestamp(tablet.rowSize, castTime(recordObj.get(0))); @@ -258,7 +285,7 @@ public class ImportCSVTool { attr.put("load_capacity", recordObj.get(9)); attr.put("fuel_capacity", recordObj.get(10)); attr.put("nominal_fuel_consumption", recordObj.get(11)); - service.submit( + sessionService.submit( () -> { try { sessionPool.createTimeseries( @@ -277,7 +304,7 @@ public class ImportCSVTool { } if (currentDeviceId[0] != null) { Tablet oldTablet = tabletMap.remove(currentDeviceId[0]); - service.submit( + sessionService.submit( () -> { try { sessionPool.insertAlignedTablet(oldTablet); @@ -288,18 +315,21 @@ public class ImportCSVTool { }); } currentDeviceId[0] = deviceId; - } else if (tabletMap.get(currentDeviceId[0]).rowSize - == tabletMap.get(currentDeviceId[0]).getMaxRowNumber()) { - Tablet oldTablet = tabletMap.remove(currentDeviceId[0]); - service.submit( - () -> { - try { - sessionPool.insertAlignedTablet(oldTablet); - } catch (IoTDBConnectionException | StatementExecutionException e) { - throw new RuntimeException(e); - } - oldTablet.reset(); - }); + } else { + Tablet tablet = tabletMap.get(currentDeviceId[0]); + if (tablet.rowSize == tablet.getMaxRowNumber()) { + + Tablet oldTablet = tabletMap.remove(currentDeviceId[0]); + sessionService.submit( + () -> { + try { + sessionPool.insertAlignedTablet(oldTablet); + } catch (IoTDBConnectionException | StatementExecutionException e) { + throw new RuntimeException(e); + } + oldTablet.reset(); + }); + } } Tablet tablet = tabletMap.computeIfAbsent(deviceId, k -> new Tablet(deviceId, schemas)); tablet.addTimestamp(tablet.rowSize, castTime(recordObj.get(0))); @@ -332,17 +362,6 @@ public class ImportCSVTool { } } - private static CSVParser readCsvFile(String path) throws IOException { - return CSVFormat.Builder.create(CSVFormat.DEFAULT) - .setHeader() - .setSkipHeaderRecord(true) - .setQuote('`') - .setEscape('\\') - .setIgnoreEmptyLines(true) - .build() - .parse(new InputStreamReader(Files.newInputStream(Paths.get(path)))); - } - private static Object castValue(TSDataType dataType, String str) { try { if (dataType == TSDataType.INT32) { @@ -363,4 +382,15 @@ public class ImportCSVTool { return timeStringCache.computeIfAbsent( str, k -> OffsetDateTime.parse(str, formatter).toInstant().toEpochMilli()); } + + private static CSVParser readCsvFile(String path) throws IOException { + return CSVFormat.Builder.create(CSVFormat.DEFAULT) + .setHeader() + .setSkipHeaderRecord(true) + .setQuote('`') + .setEscape('\\') + .setIgnoreEmptyLines(true) + .build() + .parse(new InputStreamReader(new FileInputStream(path))); + } }
