This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch import_iot_csv_file_new in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f51e17b1f18ed03eea798c10155679ea9b37ece4 Author: HTHou <[email protected]> AuthorDate: Wed May 8 12:13:42 2024 +0800 use files lines --- .../main/java/org/apache/iotdb/ImportCSVTool.java | 406 +++++++++++---------- 1 file changed, 206 insertions(+), 200 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/ImportCSVTool.java b/example/session/src/main/java/org/apache/iotdb/ImportCSVTool.java index 4a6ef8a6609..e630d6d7032 100644 --- a/example/session/src/main/java/org/apache/iotdb/ImportCSVTool.java +++ b/example/session/src/main/java/org/apache/iotdb/ImportCSVTool.java @@ -1,18 +1,10 @@ package org.apache.iotdb; -import java.io.FileInputStream; -import java.util.LinkedList; -import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import org.apache.iotdb.commons.file.SystemFileFactory; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.session.pool.SessionPool; -import org.apache.commons.csv.CSVFormat; -import org.apache.commons.csv.CSVParser; -import org.apache.commons.csv.CSVRecord; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.enums.CompressionType; import org.apache.tsfile.file.metadata.enums.TSEncoding; @@ -22,19 +14,23 @@ import org.apache.tsfile.write.schema.MeasurementSchema; import java.io.File; import java.io.IOException; -import java.io.InputStreamReader; import java.nio.file.Files; import java.nio.file.Paths; import java.time.OffsetDateTime; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; public class ImportCSVTool { @@ -62,8 +58,10 @@ public class ImportCSVTool { try { sessionPool.createDatabase("root.readings"); sessionPool.createDatabase("root.diagnostics"); - sessionPool.executeNonQueryStatement("create schema template readings aligned (latitude DOUBLE, longitude DOUBLE, elevation INT32, velocity INT32, heading INT32, grade INT32,fuel_consumption DOUBLE);"); - sessionPool.executeNonQueryStatement("create schema template diagnostics aligned (fuel_state DOUBLE, current_load INT32, status INT32);"); + sessionPool.executeNonQueryStatement( + "create schema template readings aligned (latitude DOUBLE, longitude DOUBLE, elevation INT32, velocity INT32, heading INT32, grade INT32,fuel_consumption DOUBLE);"); + sessionPool.executeNonQueryStatement( + "create schema template diagnostics aligned (fuel_state DOUBLE, current_load INT32, status INT32);"); sessionPool.executeNonQueryStatement("set schema template readings to root.readings;"); sessionPool.executeNonQueryStatement("set schema template diagnostics to root.diagnostics;"); } catch (Exception ignore) { @@ -83,9 +81,12 @@ public class ImportCSVTool { for (File file : files) { if (file.getName().contains("reading")) { - futures.add(loaderService.submit(() -> importCsvReading(file, file.getName().contains("null")))); + futures.add( + loaderService.submit(() -> importCsvReading(file, file.getName().contains("null")))); } else { - futures.add(loaderService.submit(() -> importCsvDiagnostics(file, file.getName().contains("null")))); + futures.add( + loaderService.submit( + () -> importCsvDiagnostics(file, file.getName().contains("null")))); } } for (Future<?> task : futures) { @@ -95,12 +96,20 @@ public class ImportCSVTool { throw new RuntimeException(e); } } - System.out.println("Import " + folder + " finished. Total cost: " + (System.currentTimeMillis() - startTime)); + System.out.println( + "Import " + + folder + + " finished. Total cost: " + + (System.currentTimeMillis() - startTime)); } else { if (folderFile.getName().contains("reading")) { - futures.add(loaderService.submit(() -> importCsvReading(folderFile, folderFile.getName().contains("null")))); + futures.add( + loaderService.submit( + () -> importCsvReading(folderFile, folderFile.getName().contains("null")))); } else { - futures.add(loaderService.submit(() -> importCsvDiagnostics(folderFile, folderFile.getName().contains("null")))); + futures.add( + loaderService.submit( + () -> importCsvDiagnostics(folderFile, folderFile.getName().contains("null")))); } for (Future<?> task : futures) { try { @@ -109,7 +118,11 @@ public class ImportCSVTool { throw new RuntimeException(e); } } - System.out.println("Import " + folder + " finished. Total cost: " + (System.currentTimeMillis() - startTime)); + System.out.println( + "Import " + + folder + + " finished. Total cost: " + + (System.currentTimeMillis() - startTime)); } loaderService.shutdown(); sessionService.shutdown(); @@ -140,101 +153,103 @@ public class ImportCSVTool { schemas.add(new MeasurementSchema("grade", TSDataType.INT32)); schemas.add(new MeasurementSchema("fuel_consumption", TSDataType.DOUBLE)); - try (CSVParser csvRecords = readCsvFile(file.getAbsolutePath())) { + try (Stream<String> lines = Files.lines(Paths.get(file.getAbsolutePath()))) { long startTime = System.currentTimeMillis(); long total = file.length(); long progressInterval = total / 100; - Stream<CSVRecord> records = csvRecords.stream(); AtomicInteger currentProgress = new AtomicInteger(); - + AtomicLong position = new AtomicLong(); final String[] currentDeviceId = {null}; - records.forEach( - recordObj -> { - String name = isNameNull ? "r_null" : recordObj.get(8); - String deviceId = - database - + "." - + recordObj.get(9) - + ".`" - + recordObj.get(11) - + "`." - + name - + "." - + recordObj.get(10); - if (!deviceId.equals(currentDeviceId[0])) { - if (!nameSet.contains(name)) { - nameSet.add(name); - String attrDevice = attributesDB + "." + name + ".attr"; - Map<String, String> attr = new HashMap<>(4); - attr.put("device_version", recordObj.get(12)); - attr.put("load_capacity", recordObj.get(13)); - attr.put("fuel_capacity", recordObj.get(14)); - attr.put("nominal_fuel_consumption", recordObj.get(15)); - sessionService.submit( - () -> { - try { - sessionPool.createTimeseries( - attrDevice, - TSDataType.BOOLEAN, - TSEncoding.RLE, - CompressionType.LZ4, - null, - null, - attr, - null); - } catch (IoTDBConnectionException | StatementExecutionException e) { - throw new RuntimeException(e); - } - }); + lines.forEach( + line -> { + if (position.getAndAdd(line.length()) > 0) { + String[] recordObj = line.split(","); + String name = isNameNull ? "r_null" : recordObj[8]; + String deviceId = + database + + "." + + recordObj[9] + + ".`" + + recordObj[11] + + "`." + + name + + "." + + recordObj[10]; + if (!deviceId.equals(currentDeviceId[0])) { + if (!nameSet.contains(name)) { + nameSet.add(name); + String attrDevice = attributesDB + "." + name + ".attr"; + Map<String, String> attr = new HashMap<>(4); + attr.put("device_version", recordObj[12]); + attr.put("load_capacity", recordObj[13]); + attr.put("fuel_capacity", recordObj[14]); + attr.put("nominal_fuel_consumption", recordObj[15]); + sessionService.submit( + () -> { + try { + sessionPool.createTimeseries( + attrDevice, + TSDataType.BOOLEAN, + TSEncoding.RLE, + CompressionType.LZ4, + null, + null, + attr, + null); + } catch (IoTDBConnectionException | StatementExecutionException e) { + throw new RuntimeException(e); + } + }); + } + if (currentDeviceId[0] != null) { + Tablet oldTablet = tabletMap.remove(currentDeviceId[0]); + sessionService.submit( + () -> { + try { + sessionPool.insertAlignedTablet(oldTablet); + } catch (IoTDBConnectionException | StatementExecutionException e) { + throw new RuntimeException(e); + } + oldTablet.reset(); + }); + } + currentDeviceId[0] = deviceId; + } else { + Tablet tablet = tabletMap.get(currentDeviceId[0]); + if (tablet.rowSize == tablet.getMaxRowNumber()) { + Tablet oldTablet = tabletMap.remove(currentDeviceId[0]); + sessionService.submit( + () -> { + try { + sessionPool.insertAlignedTablet(oldTablet); + } catch (IoTDBConnectionException | StatementExecutionException e) { + throw new RuntimeException(e); + } + oldTablet.reset(); + }); + } } - if (currentDeviceId[0] != null) { - Tablet oldTablet = tabletMap.remove(currentDeviceId[0]); - sessionService.submit( - () -> { - try { - sessionPool.insertAlignedTablet(oldTablet); - } catch (IoTDBConnectionException | StatementExecutionException e) { - throw new RuntimeException(e); - } - oldTablet.reset(); - }); + Tablet tablet = + tabletMap.computeIfAbsent(deviceId, k -> new Tablet(deviceId, schemas)); + tablet.addTimestamp(tablet.rowSize, castTime(recordObj[0])); + for (int i = 0; i < schemas.size(); i++) { + MeasurementSchema schema = schemas.get(i); + tablet.addValue( + schema.getMeasurementId(), + tablet.rowSize, + castValue(schema.getType(), recordObj[i + 1])); } - currentDeviceId[0] = deviceId; - } else { - Tablet tablet = tabletMap.get(currentDeviceId[0]); - if (tablet.rowSize == tablet.getMaxRowNumber()) { - - Tablet oldTablet = tabletMap.remove(currentDeviceId[0]); - sessionService.submit( - () -> { - try { - sessionPool.insertAlignedTablet(oldTablet); - } catch (IoTDBConnectionException | StatementExecutionException e) { - throw new RuntimeException(e); - } - oldTablet.reset(); - }); + tablet.rowSize++; + if (position.get() / progressInterval > currentProgress.get()) { + currentProgress.set((int) (position.get() / progressInterval)); + System.out.println( + file.getName() + + " Progress: " + + currentProgress.get() + + "% cost " + + (System.currentTimeMillis() - startTime)); } } - Tablet tablet = tabletMap.computeIfAbsent(deviceId, k -> new Tablet(deviceId, schemas)); - tablet.addTimestamp(tablet.rowSize, castTime(recordObj.get(0))); - for (int i = 0; i < schemas.size(); i++) { - MeasurementSchema schema = schemas.get(i); - tablet.addValue( - schema.getMeasurementId(), - tablet.rowSize, - castValue(schema.getType(), recordObj.get(i + 1))); - } - tablet.rowSize++; - if (recordObj.getCharacterPosition() / progressInterval > currentProgress.get()) { - currentProgress.set((int) (recordObj.getCharacterPosition() / progressInterval)); - System.out.println( - file.getName() - + " Progress: " - + currentProgress.get() - + "% cost " - + (System.currentTimeMillis() - startTime)); - } }); if (!tabletMap.isEmpty()) { for (Tablet tablet : tabletMap.values()) { @@ -255,101 +270,103 @@ public class ImportCSVTool { schemas.add(new MeasurementSchema("current_load", TSDataType.INT32)); schemas.add(new MeasurementSchema("status", TSDataType.INT32)); - try (CSVParser csvRecords = readCsvFile(file.getAbsolutePath())) { + try (Stream<String> lines = Files.lines(Paths.get(file.getAbsolutePath()))) { long startTime = System.currentTimeMillis(); long total = file.length(); long progressInterval = total / 100; - Stream<CSVRecord> records = csvRecords.stream(); AtomicInteger currentProgress = new AtomicInteger(); - + AtomicLong position = new AtomicLong(); final String[] currentDeviceId = {null}; - records.forEach( - recordObj -> { - String name = isNameNull ? "d_null" : recordObj.get(4); - String deviceId = - database - + "." - + recordObj.get(5) - + ".`" - + recordObj.get(7) - + "`." - + name - + "." - + recordObj.get(6); - if (!deviceId.equals(currentDeviceId[0])) { - if (!nameSet.contains(name)) { - nameSet.add(name); - String attrDevice = attributesDB + "." + name + ".attr"; - Map<String, String> attr = new HashMap<>(4); - attr.put("device_version", recordObj.get(8)); - attr.put("load_capacity", recordObj.get(9)); - attr.put("fuel_capacity", recordObj.get(10)); - attr.put("nominal_fuel_consumption", recordObj.get(11)); - sessionService.submit( - () -> { - try { - sessionPool.createTimeseries( - attrDevice, - TSDataType.BOOLEAN, - TSEncoding.RLE, - CompressionType.LZ4, - null, - null, - attr, - null); - } catch (IoTDBConnectionException | StatementExecutionException e) { - throw new RuntimeException(e); - } - }); + lines.forEach( + line -> { + if (position.getAndAdd(line.length()) > 0) { + String[] recordObj = line.split(","); + String name = isNameNull ? "d_null" : recordObj[4]; + String deviceId = + database + + "." + + recordObj[5] + + ".`" + + recordObj[7] + + "`." + + name + + "." + + recordObj[6]; + if (!deviceId.equals(currentDeviceId[0])) { + if (!nameSet.contains(name)) { + nameSet.add(name); + String attrDevice = attributesDB + "." + name + ".attr"; + Map<String, String> attr = new HashMap<>(4); + attr.put("device_version", recordObj[8]); + attr.put("load_capacity", recordObj[9]); + attr.put("fuel_capacity", recordObj[10]); + attr.put("nominal_fuel_consumption", recordObj[11]); + sessionService.submit( + () -> { + try { + sessionPool.createTimeseries( + attrDevice, + TSDataType.BOOLEAN, + TSEncoding.RLE, + CompressionType.LZ4, + null, + null, + attr, + null); + } catch (IoTDBConnectionException | StatementExecutionException e) { + throw new RuntimeException(e); + } + }); + } + if (currentDeviceId[0] != null) { + Tablet oldTablet = tabletMap.remove(currentDeviceId[0]); + sessionService.submit( + () -> { + try { + sessionPool.insertAlignedTablet(oldTablet); + } catch (IoTDBConnectionException | StatementExecutionException e) { + throw new RuntimeException(e); + } + oldTablet.reset(); + }); + } + currentDeviceId[0] = deviceId; + } else { + Tablet tablet = tabletMap.get(currentDeviceId[0]); + if (tablet.rowSize == tablet.getMaxRowNumber()) { + Tablet oldTablet = tabletMap.remove(currentDeviceId[0]); + sessionService.submit( + () -> { + try { + sessionPool.insertAlignedTablet(oldTablet); + } catch (IoTDBConnectionException | StatementExecutionException e) { + throw new RuntimeException(e); + } + oldTablet.reset(); + }); + } } - if (currentDeviceId[0] != null) { - Tablet oldTablet = tabletMap.remove(currentDeviceId[0]); - sessionService.submit( - () -> { - try { - sessionPool.insertAlignedTablet(oldTablet); - } catch (IoTDBConnectionException | StatementExecutionException e) { - throw new RuntimeException(e); - } - oldTablet.reset(); - }); + Tablet tablet = + tabletMap.computeIfAbsent(deviceId, k -> new Tablet(deviceId, schemas)); + tablet.addTimestamp(tablet.rowSize, castTime(recordObj[0])); + for (int i = 0; i < schemas.size(); i++) { + MeasurementSchema schema = schemas.get(i); + tablet.addValue( + schema.getMeasurementId(), + tablet.rowSize, + castValue(schema.getType(), recordObj[i + 1])); } - currentDeviceId[0] = deviceId; - } else { - Tablet tablet = tabletMap.get(currentDeviceId[0]); - if (tablet.rowSize == tablet.getMaxRowNumber()) { - - Tablet oldTablet = tabletMap.remove(currentDeviceId[0]); - sessionService.submit( - () -> { - try { - sessionPool.insertAlignedTablet(oldTablet); - } catch (IoTDBConnectionException | StatementExecutionException e) { - throw new RuntimeException(e); - } - oldTablet.reset(); - }); + tablet.rowSize++; + if (position.get() / progressInterval > currentProgress.get()) { + currentProgress.set((int) (position.get() / progressInterval)); + System.out.println( + file.getName() + + " Progress: " + + currentProgress.get() + + "% cost " + + (System.currentTimeMillis() - startTime)); } } - Tablet tablet = tabletMap.computeIfAbsent(deviceId, k -> new Tablet(deviceId, schemas)); - tablet.addTimestamp(tablet.rowSize, castTime(recordObj.get(0))); - for (int i = 0; i < schemas.size(); i++) { - MeasurementSchema schema = schemas.get(i); - tablet.addValue( - schema.getMeasurementId(), - tablet.rowSize, - castValue(schema.getType(), recordObj.get(i + 1))); - } - tablet.rowSize++; - if (recordObj.getCharacterPosition() / progressInterval > currentProgress.get()) { - currentProgress.set((int) (recordObj.getCharacterPosition() / progressInterval)); - System.out.println( - file.getName() - + " Progress: " - + currentProgress.get() - + "% cost " - + (System.currentTimeMillis() - startTime)); - } }); if (!tabletMap.isEmpty()) { for (Tablet tablet : tabletMap.values()) { @@ -382,15 +399,4 @@ public class ImportCSVTool { return timeStringCache.computeIfAbsent( str, k -> OffsetDateTime.parse(str, formatter).toInstant().toEpochMilli()); } - - private static CSVParser readCsvFile(String path) throws IOException { - return CSVFormat.Builder.create(CSVFormat.DEFAULT) - .setHeader() - .setSkipHeaderRecord(true) - .setQuote('`') - .setEscape('\\') - .setIgnoreEmptyLines(true) - .build() - .parse(new InputStreamReader(new FileInputStream(path))); - } }
