This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch import_iot_csv_file_new in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 536136009a996e5d46ffb1a59a8267c7da86a3dd Author: HTHou <[email protected]> AuthorDate: Tue May 7 17:37:19 2024 +0800 Dev ImportCSVTool --- example/session/pom.xml | 4 + .../main/java/org/apache/iotdb/ImportCSVTool.java | 366 +++++++++++++++++++++ 2 files changed, 370 insertions(+) diff --git a/example/session/pom.xml b/example/session/pom.xml index ac5d936f997..83144603ad2 100644 --- a/example/session/pom.xml +++ b/example/session/pom.xml @@ -39,5 +39,9 @@ <artifactId>node-commons</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-csv</artifactId> + </dependency> </dependencies> </project> diff --git a/example/session/src/main/java/org/apache/iotdb/ImportCSVTool.java b/example/session/src/main/java/org/apache/iotdb/ImportCSVTool.java new file mode 100644 index 00000000000..3a7e702019f --- /dev/null +++ b/example/session/src/main/java/org/apache/iotdb/ImportCSVTool.java @@ -0,0 +1,366 @@ +package org.apache.iotdb; + +import java.util.Set; +import org.apache.iotdb.commons.file.SystemFileFactory; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.pool.SessionPool; + +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.enums.CompressionType; +import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.apache.tsfile.fileSystem.FSFactoryProducer; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.MeasurementSchema; + +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; + +public class ImportCSVTool { + + private static final DateTimeFormatter formatter = DateTimeFormatter.ISO_OFFSET_DATE_TIME; + + private static SessionPool sessionPool; + private static ExecutorService service; + + private static final String attributesDB = "root.attributes"; + + private static final Map<String, Long> timeStringCache = new ConcurrentHashMap<>(); + + private static final Set<String> nameSet = ConcurrentHashMap.newKeySet(); + + public static void main(String[] args) + throws IOException, IoTDBConnectionException, StatementExecutionException { + constructRedirectSessionPool(); + + service = Executors.newFixedThreadPool(20); + + try { + sessionPool.createDatabase("root.readings"); + sessionPool.createDatabase("root.diagnostics"); + sessionPool.executeNonQueryStatement("create schema template readings aligned (latitude DOUBLE, longitude DOUBLE, elevation INT32, velocity INT32, heading INT32, grade INT32,fuel_consumption DOUBLE);"); + sessionPool.executeNonQueryStatement("create schema template diagnostics aligned (fuel_state DOUBLE, current_load INT32, status INT32);"); + sessionPool.executeNonQueryStatement("set schema template readings to root.readings;"); + sessionPool.executeNonQueryStatement("set schema template diagnostics to root.diagnostics;"); + } catch (Exception ignore) { + // do nothing + } + + String folder = "/Volumes/ExtHD/ExtDocuments/csv/newCsv"; + if (args.length >= 1) { + folder = args[0]; + } + File folderFile = SystemFileFactory.INSTANCE.getFile(folder); + if (folderFile.isDirectory()) { + File[] files = + FSFactoryProducer.getFSFactory().listFilesBySuffix(folderFile.getAbsolutePath(), ".csv"); + + for (File file : files) { + if (file.getName().contains("reading")) { + importCsvReading(file, file.getName().contains("null")); + } else { + importCsvDiagnostics(file, file.getName().contains("null")); + } + } + System.out.println("Import " + folder + " finished."); + } else { + if (folderFile.getName().contains("reading")) { + importCsvReading(folderFile, folderFile.getName().contains("null")); + } else { + importCsvDiagnostics(folderFile, folderFile.getName().contains("null")); + } + System.out.println("Import " + folder + " finished."); + } + service.shutdown(); + } + + private static void constructRedirectSessionPool() { + List<String> nodeUrls = new ArrayList<>(); + nodeUrls.add("127.0.0.1:6667"); + // nodeUrls.add("127.0.0.1:6668"); + sessionPool = + new SessionPool.Builder() + .nodeUrls(nodeUrls) + .user("root") + .password("root") + .maxSize(20) + .build(); + } + + public static void importCsvReading(File file, boolean isNameNull) { + Map<String, Tablet> tabletMap = new ConcurrentHashMap<>(); + String database = "root.readings"; + List<MeasurementSchema> schemas = new ArrayList<>(); + schemas.add(new MeasurementSchema("latitude", TSDataType.DOUBLE)); + schemas.add(new MeasurementSchema("longitude", TSDataType.DOUBLE)); + schemas.add(new MeasurementSchema("elevation", TSDataType.INT32)); + schemas.add(new MeasurementSchema("velocity", TSDataType.INT32)); + schemas.add(new MeasurementSchema("heading", TSDataType.INT32)); + schemas.add(new MeasurementSchema("grade", TSDataType.INT32)); + schemas.add(new MeasurementSchema("fuel_consumption", TSDataType.DOUBLE)); + + try (CSVParser csvRecords = readCsvFile(file.getAbsolutePath())) { + long startTime = System.currentTimeMillis(); + long total = file.length(); + long progressInterval = total / 100; + Stream<CSVRecord> records = csvRecords.stream(); + AtomicInteger currentProgress = new AtomicInteger(); + + final String[] currentDeviceId = {null}; + records.forEach( + recordObj -> { + String name = isNameNull ? "r_null" : recordObj.get(8); + String deviceId = + database + + "." + + recordObj.get(9) + + ".`" + + recordObj.get(11) + + "`." + + name + + "." + + recordObj.get(10); + if (!deviceId.equals(currentDeviceId[0])) { + if (!nameSet.contains(name)) { + nameSet.add(name); + String attrDevice = attributesDB + "." + name + ".attr"; + Map<String, String> attr = new HashMap<>(4); + attr.put("device_version", recordObj.get(12)); + attr.put("load_capacity", recordObj.get(13)); + attr.put("fuel_capacity", recordObj.get(14)); + attr.put("nominal_fuel_consumption", recordObj.get(15)); + service.submit( + () -> { + try { + sessionPool.createTimeseries( + attrDevice, + TSDataType.BOOLEAN, + TSEncoding.RLE, + CompressionType.LZ4, + null, + null, + attr, + null); + } catch (IoTDBConnectionException | StatementExecutionException e) { + throw new RuntimeException(e); + } + }); + } + if (currentDeviceId[0] != null) { + Tablet oldTablet = tabletMap.remove(currentDeviceId[0]); + service.submit( + () -> { + try { + sessionPool.insertAlignedTablet(oldTablet); + } catch (IoTDBConnectionException | StatementExecutionException e) { + throw new RuntimeException(e); + } + oldTablet.reset(); + }); + } + currentDeviceId[0] = deviceId; + } else if (tabletMap.get(currentDeviceId[0]).rowSize + == tabletMap.get(currentDeviceId[0]).getMaxRowNumber()) { + Tablet oldTablet = tabletMap.remove(currentDeviceId[0]); + service.submit( + () -> { + try { + sessionPool.insertAlignedTablet(oldTablet); + } catch (IoTDBConnectionException | StatementExecutionException e) { + throw new RuntimeException(e); + } + oldTablet.reset(); + }); + } + Tablet tablet = tabletMap.computeIfAbsent(deviceId, k -> new Tablet(deviceId, schemas)); + tablet.addTimestamp(tablet.rowSize, castTime(recordObj.get(0))); + for (int i = 0; i < schemas.size(); i++) { + MeasurementSchema schema = schemas.get(i); + tablet.addValue( + schema.getMeasurementId(), + tablet.rowSize, + castValue(schema.getType(), recordObj.get(i + 1))); + } + tablet.rowSize++; + if (recordObj.getCharacterPosition() / progressInterval > currentProgress.get()) { + currentProgress.set((int) (recordObj.getCharacterPosition() / progressInterval)); + System.out.println( + file.getName() + + " Progress: " + + currentProgress.get() + + "% cost " + + (System.currentTimeMillis() - startTime)); + } + }); + if (!tabletMap.isEmpty()) { + for (Tablet tablet : tabletMap.values()) { + sessionPool.insertAlignedTablet(tablet); + } + tabletMap.clear(); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static void importCsvDiagnostics(File file, boolean isNameNull) { + Map<String, Tablet> tabletMap = new ConcurrentHashMap<>(); + String database = "root.diagnostics"; + List<MeasurementSchema> schemas = new ArrayList<>(); + schemas.add(new MeasurementSchema("fuel_state", TSDataType.DOUBLE)); + schemas.add(new MeasurementSchema("current_load", TSDataType.INT32)); + schemas.add(new MeasurementSchema("status", TSDataType.INT32)); + + try (CSVParser csvRecords = readCsvFile(file.getAbsolutePath())) { + long startTime = System.currentTimeMillis(); + long total = file.length(); + long progressInterval = total / 100; + Stream<CSVRecord> records = csvRecords.stream(); + AtomicInteger currentProgress = new AtomicInteger(); + + final String[] currentDeviceId = {null}; + records.forEach( + recordObj -> { + String name = isNameNull ? "d_null" : recordObj.get(4); + String deviceId = + database + + "." + + recordObj.get(5) + + ".`" + + recordObj.get(7) + + "`." + + name + + "." + + recordObj.get(6); + if (!deviceId.equals(currentDeviceId[0])) { + if (!nameSet.contains(name)) { + nameSet.add(name); + String attrDevice = attributesDB + "." + name + ".attr"; + Map<String, String> attr = new HashMap<>(4); + attr.put("device_version", recordObj.get(8)); + attr.put("load_capacity", recordObj.get(9)); + attr.put("fuel_capacity", recordObj.get(10)); + attr.put("nominal_fuel_consumption", recordObj.get(11)); + service.submit( + () -> { + try { + sessionPool.createTimeseries( + attrDevice, + TSDataType.BOOLEAN, + TSEncoding.RLE, + CompressionType.LZ4, + null, + null, + attr, + null); + } catch (IoTDBConnectionException | StatementExecutionException e) { + throw new RuntimeException(e); + } + }); + } + if (currentDeviceId[0] != null) { + Tablet oldTablet = tabletMap.remove(currentDeviceId[0]); + service.submit( + () -> { + try { + sessionPool.insertAlignedTablet(oldTablet); + } catch (IoTDBConnectionException | StatementExecutionException e) { + throw new RuntimeException(e); + } + oldTablet.reset(); + }); + } + currentDeviceId[0] = deviceId; + } else if (tabletMap.get(currentDeviceId[0]).rowSize + == tabletMap.get(currentDeviceId[0]).getMaxRowNumber()) { + Tablet oldTablet = tabletMap.remove(currentDeviceId[0]); + service.submit( + () -> { + try { + sessionPool.insertAlignedTablet(oldTablet); + } catch (IoTDBConnectionException | StatementExecutionException e) { + throw new RuntimeException(e); + } + oldTablet.reset(); + }); + } + Tablet tablet = tabletMap.computeIfAbsent(deviceId, k -> new Tablet(deviceId, schemas)); + tablet.addTimestamp(tablet.rowSize, castTime(recordObj.get(0))); + for (int i = 0; i < schemas.size(); i++) { + MeasurementSchema schema = schemas.get(i); + tablet.addValue( + schema.getMeasurementId(), + tablet.rowSize, + castValue(schema.getType(), recordObj.get(i + 1))); + } + tablet.rowSize++; + if (recordObj.getCharacterPosition() / progressInterval > currentProgress.get()) { + currentProgress.set((int) (recordObj.getCharacterPosition() / progressInterval)); + System.out.println( + file.getName() + + " Progress: " + + currentProgress.get() + + "% cost " + + (System.currentTimeMillis() - startTime)); + } + }); + if (!tabletMap.isEmpty()) { + for (Tablet tablet : tabletMap.values()) { + sessionPool.insertAlignedTablet(tablet); + } + tabletMap.clear(); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + private static CSVParser readCsvFile(String path) throws IOException { + return CSVFormat.Builder.create(CSVFormat.DEFAULT) + .setHeader() + .setSkipHeaderRecord(true) + .setQuote('`') + .setEscape('\\') + .setIgnoreEmptyLines(true) + .build() + .parse(new InputStreamReader(Files.newInputStream(Paths.get(path)))); + } + + private static Object castValue(TSDataType dataType, String str) { + try { + if (dataType == TSDataType.INT32) { + return Integer.parseInt(str); + } else if (dataType == TSDataType.DOUBLE) { + return Double.parseDouble(str); + } + } catch (NumberFormatException e) { + if ("NULL".equals(str)) { + return null; + } + throw e; + } + return str; + } + + private static long castTime(String str) { + return timeStringCache.computeIfAbsent( + str, k -> OffsetDateTime.parse(str, formatter).toInstant().toEpochMilli()); + } +}
