This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch import_iot_csv_file_new
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/import_iot_csv_file_new by
this push:
new 1e6188cf5c2 more log and support change concurrent num
1e6188cf5c2 is described below
commit 1e6188cf5c2ead196dff2f529d1bd592498ef635
Author: HTHou <[email protected]>
AuthorDate: Fri May 10 16:59:22 2024 +0800
more log and support change concurrent num
---
.../main/java/org/apache/iotdb/ImportCSVTool.java | 75 ++++++++++++++++++----
1 file changed, 62 insertions(+), 13 deletions(-)
diff --git a/example/session/src/main/java/org/apache/iotdb/ImportCSVTool.java
b/example/session/src/main/java/org/apache/iotdb/ImportCSVTool.java
index e59d0bca386..2edc539aa90 100644
--- a/example/session/src/main/java/org/apache/iotdb/ImportCSVTool.java
+++ b/example/session/src/main/java/org/apache/iotdb/ImportCSVTool.java
@@ -16,9 +16,11 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.text.SimpleDateFormat;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
+import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -57,11 +59,19 @@ public class ImportCSVTool {
private static final AtomicLong totalPoints = new AtomicLong();
+ private static final AtomicLong totalInsertTime = new AtomicLong();
+ private static final AtomicLong totalParseTime = new AtomicLong();
+ private static final AtomicLong totalInsertCount = new AtomicLong();
+
+ private static int tabletSize;
+
public static void main(String[] args)
throws IOException, IoTDBConnectionException,
StatementExecutionException {
- constructRedirectSessionPool(args[0]);
+ int concurrentNum = Integer.parseInt(args[1]);
+ constructRedirectSessionPool(args[0], concurrentNum);
+ tabletSize = Integer.parseInt(args[2]);
- sessionService = Executors.newFixedThreadPool(22);
+ sessionService = Executors.newFixedThreadPool(concurrentNum);
loaderService = Executors.newFixedThreadPool(22);
try {
@@ -79,12 +89,15 @@ public class ImportCSVTool {
}
String folder = "/data/tsbs/csvFile";
- if (args.length >= 2) {
- folder = args[1];
+ if (args.length >= 4) {
+ folder = args[3];
}
List<Future<?>> futures = new LinkedList<>();
File folderFile = SystemFileFactory.INSTANCE.getFile(folder);
- System.out.println("Start importing!");
+ Date currentDate = new Date();
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ String currentTime = sdf.format(currentDate);
+ System.out.println("Start importing at " + currentTime);
long startTime = System.currentTimeMillis();
logger = Executors.newScheduledThreadPool(1);
@@ -93,9 +106,17 @@ public class ImportCSVTool {
long points = totalPoints.get();
long time = (System.currentTimeMillis() - startTime) / 1000;
double throughout = (double) points / time;
- System.out.println("Total points: " + points);
- System.out.println("Total Time: " + points + " ms");
- System.out.println("Throughout: " + throughout + " points/s");
+ System.out.println("Current points: " + points);
+ System.out.println("Current Time cost: " + time + " s");
+ System.out.println("Throughput: " + throughout + " points/s");
+ long insertTime = totalInsertTime.get();
+ long insertCount = totalInsertCount.get();
+ if (insertCount > 0) {
+ System.out.println("Avg insertTablet CPU time: " + (insertTime /
insertCount) + " ms");
+ System.out.println("Total insertTablet CPU time: " + insertTime +
" ms");
+ }
+ long parseTime = totalParseTime.get();
+ System.out.println("Total parse CPU time: " + parseTime + " ms");
},
0,
1,
@@ -121,10 +142,14 @@ public class ImportCSVTool {
throw new RuntimeException(e);
}
}
+ currentDate = new Date();
+ currentTime = sdf.format(currentDate);
System.out.println(
"Import "
+ folder
- + " finished. Total cost: "
+ + " finished at "
+ + currentTime
+ + " Total cost: "
+ (System.currentTimeMillis() - startTime)
+ " ms");
} else {
@@ -157,14 +182,14 @@ public class ImportCSVTool {
logger.shutdown();
}
- private static void constructRedirectSessionPool(String host) {
+ private static void constructRedirectSessionPool(String host, int
concurrentNum) {
sessionPool =
new SessionPool.Builder()
.host(host)
.port(6667)
.user("root")
.password("root")
- .maxSize(22)
+ .maxSize(concurrentNum)
.build();
}
@@ -190,6 +215,7 @@ public class ImportCSVTool {
lines.forEach(
line -> {
if (position.getAndAdd(line.length()) > 0) {
+ long parseStart = System.currentTimeMillis();
String[] recordObj = line.split(",");
String name = isNameNull ? "r_null" : recordObj[8];
String deviceId =
@@ -238,9 +264,12 @@ public class ImportCSVTool {
sessionService.submit(
() -> {
try {
+ long start = System.currentTimeMillis();
sessionPool.insertAlignedTablet(oldTablet);
+ totalInsertTime.addAndGet(System.currentTimeMillis()
- start);
totalPoints.addAndGet(
(long) oldTablet.rowSize *
oldTablet.getSchemas().size());
+ totalInsertCount.addAndGet(1);
} catch (IoTDBConnectionException |
StatementExecutionException e) {
throw new RuntimeException(e);
} finally {
@@ -265,9 +294,12 @@ public class ImportCSVTool {
sessionService.submit(
() -> {
try {
+ long start = System.currentTimeMillis();
sessionPool.insertAlignedTablet(oldTablet);
+ totalInsertTime.addAndGet(System.currentTimeMillis()
- start);
totalPoints.addAndGet(
(long) oldTablet.rowSize *
oldTablet.getSchemas().size());
+ totalInsertCount.addAndGet(1);
} catch (IoTDBConnectionException |
StatementExecutionException e) {
throw new RuntimeException(e);
} finally {
@@ -281,7 +313,8 @@ public class ImportCSVTool {
}
}
Tablet tablet =
- tabletMap.computeIfAbsent(deviceId, k -> new
Tablet(deviceId, schemas));
+ tabletMap.computeIfAbsent(
+ deviceId, k -> new Tablet(deviceId, schemas,
tabletSize));
tablet.addTimestamp(tablet.rowSize, castTime(recordObj[0]));
for (int i = 0; i < schemas.size(); i++) {
MeasurementSchema schema = schemas.get(i);
@@ -301,12 +334,16 @@ public class ImportCSVTool {
+ (System.currentTimeMillis() - startTime)
+ " ms");
}
+ totalParseTime.addAndGet(System.currentTimeMillis() -
parseStart);
}
});
if (!tabletMap.isEmpty()) {
for (Tablet tablet : tabletMap.values()) {
+ long start = System.currentTimeMillis();
sessionPool.insertAlignedTablet(tablet);
+ totalInsertTime.addAndGet(System.currentTimeMillis() - start);
totalPoints.addAndGet((long) tablet.rowSize *
tablet.getSchemas().size());
+ totalInsertCount.addAndGet(1);
}
tabletMap.clear();
}
@@ -332,6 +369,7 @@ public class ImportCSVTool {
final String[] currentDeviceId = {null};
lines.forEach(
line -> {
+ long parseStart = System.currentTimeMillis();
if (position.getAndAdd(line.length()) > 0) {
String[] recordObj = line.split(",");
String name = isNameNull ? "d_null" : recordObj[4];
@@ -381,9 +419,12 @@ public class ImportCSVTool {
sessionService.submit(
() -> {
try {
+ long start = System.currentTimeMillis();
sessionPool.insertAlignedTablet(oldTablet);
+ totalInsertTime.addAndGet(System.currentTimeMillis()
- start);
totalPoints.addAndGet(
(long) oldTablet.rowSize *
oldTablet.getSchemas().size());
+ totalInsertCount.addAndGet(1);
} catch (IoTDBConnectionException |
StatementExecutionException e) {
throw new RuntimeException(e);
} finally {
@@ -408,9 +449,12 @@ public class ImportCSVTool {
sessionService.submit(
() -> {
try {
+ long start = System.currentTimeMillis();
sessionPool.insertAlignedTablet(oldTablet);
+ totalInsertTime.addAndGet(System.currentTimeMillis()
- start);
totalPoints.addAndGet(
(long) oldTablet.rowSize *
oldTablet.getSchemas().size());
+ totalInsertCount.addAndGet(1);
} catch (IoTDBConnectionException |
StatementExecutionException e) {
throw new RuntimeException(e);
} finally {
@@ -424,7 +468,8 @@ public class ImportCSVTool {
}
}
Tablet tablet =
- tabletMap.computeIfAbsent(deviceId, k -> new
Tablet(deviceId, schemas));
+ tabletMap.computeIfAbsent(
+ deviceId, k -> new Tablet(deviceId, schemas,
tabletSize));
tablet.addTimestamp(tablet.rowSize, castTime(recordObj[0]));
for (int i = 0; i < schemas.size(); i++) {
MeasurementSchema schema = schemas.get(i);
@@ -444,12 +489,16 @@ public class ImportCSVTool {
+ (System.currentTimeMillis() - startTime)
+ " ms");
}
+ totalParseTime.addAndGet(System.currentTimeMillis() -
parseStart);
}
});
if (!tabletMap.isEmpty()) {
for (Tablet tablet : tabletMap.values()) {
+ long start = System.currentTimeMillis();
sessionPool.insertAlignedTablet(tablet);
+ totalInsertTime.addAndGet(System.currentTimeMillis() - start);
totalPoints.addAndGet((long) tablet.rowSize *
tablet.getSchemas().size());
+ totalInsertCount.addAndGet(1);
}
tabletMap.clear();
}