This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch import_iot_csv_file_new
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/import_iot_csv_file_new by 
this push:
     new 1e6188cf5c2 more log and support change concurrent num
1e6188cf5c2 is described below

commit 1e6188cf5c2ead196dff2f529d1bd592498ef635
Author: HTHou <[email protected]>
AuthorDate: Fri May 10 16:59:22 2024 +0800

    more log and support change concurrent num
---
 .../main/java/org/apache/iotdb/ImportCSVTool.java  | 75 ++++++++++++++++++----
 1 file changed, 62 insertions(+), 13 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/ImportCSVTool.java 
b/example/session/src/main/java/org/apache/iotdb/ImportCSVTool.java
index e59d0bca386..2edc539aa90 100644
--- a/example/session/src/main/java/org/apache/iotdb/ImportCSVTool.java
+++ b/example/session/src/main/java/org/apache/iotdb/ImportCSVTool.java
@@ -16,9 +16,11 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.text.SimpleDateFormat;
 import java.time.OffsetDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -57,11 +59,19 @@ public class ImportCSVTool {
 
   private static final AtomicLong totalPoints = new AtomicLong();
 
+  private static final AtomicLong totalInsertTime = new AtomicLong();
+  private static final AtomicLong totalParseTime = new AtomicLong();
+  private static final AtomicLong totalInsertCount = new AtomicLong();
+
+  private static int tabletSize;
+
   public static void main(String[] args)
       throws IOException, IoTDBConnectionException, 
StatementExecutionException {
-    constructRedirectSessionPool(args[0]);
+    int concurrentNum = Integer.parseInt(args[1]);
+    constructRedirectSessionPool(args[0], concurrentNum);
+    tabletSize = Integer.parseInt(args[2]);
 
-    sessionService = Executors.newFixedThreadPool(22);
+    sessionService = Executors.newFixedThreadPool(concurrentNum);
     loaderService = Executors.newFixedThreadPool(22);
 
     try {
@@ -79,12 +89,15 @@ public class ImportCSVTool {
     }
 
     String folder = "/data/tsbs/csvFile";
-    if (args.length >= 2) {
-      folder = args[1];
+    if (args.length >= 4) {
+      folder = args[3];
     }
     List<Future<?>> futures = new LinkedList<>();
     File folderFile = SystemFileFactory.INSTANCE.getFile(folder);
-    System.out.println("Start importing!");
+    Date currentDate = new Date();
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    String currentTime = sdf.format(currentDate);
+    System.out.println("Start importing at " + currentTime);
     long startTime = System.currentTimeMillis();
 
     logger = Executors.newScheduledThreadPool(1);
@@ -93,9 +106,17 @@ public class ImportCSVTool {
           long points = totalPoints.get();
           long time = (System.currentTimeMillis() - startTime) / 1000;
           double throughout = (double) points / time;
-          System.out.println("Total points: " + points);
-          System.out.println("Total Time: " + points + " ms");
-          System.out.println("Throughout: " + throughout + " points/s");
+          System.out.println("Current points: " + points);
+          System.out.println("Current Time cost: " + time + " s");
+          System.out.println("Throughput: " + throughout + " points/s");
+          long insertTime = totalInsertTime.get();
+          long insertCount = totalInsertCount.get();
+          if (insertCount > 0) {
+            System.out.println("Avg insertTablet CPU time: " + (insertTime / 
insertCount) + " ms");
+            System.out.println("Total insertTablet CPU time: " + insertTime + 
" ms");
+          }
+          long parseTime = totalParseTime.get();
+          System.out.println("Total parse CPU time: " + parseTime + " ms");
         },
         0,
         1,
@@ -121,10 +142,14 @@ public class ImportCSVTool {
           throw new RuntimeException(e);
         }
       }
+      currentDate = new Date();
+      currentTime = sdf.format(currentDate);
       System.out.println(
           "Import "
               + folder
-              + " finished. Total cost: "
+              + " finished at "
+              + currentTime
+              + " Total cost: "
               + (System.currentTimeMillis() - startTime)
               + " ms");
     } else {
@@ -157,14 +182,14 @@ public class ImportCSVTool {
     logger.shutdown();
   }
 
-  private static void constructRedirectSessionPool(String host) {
+  private static void constructRedirectSessionPool(String host, int 
concurrentNum) {
     sessionPool =
         new SessionPool.Builder()
             .host(host)
             .port(6667)
             .user("root")
             .password("root")
-            .maxSize(22)
+            .maxSize(concurrentNum)
             .build();
   }
 
@@ -190,6 +215,7 @@ public class ImportCSVTool {
       lines.forEach(
           line -> {
             if (position.getAndAdd(line.length()) > 0) {
+              long parseStart = System.currentTimeMillis();
               String[] recordObj = line.split(",");
               String name = isNameNull ? "r_null" : recordObj[8];
               String deviceId =
@@ -238,9 +264,12 @@ public class ImportCSVTool {
                   sessionService.submit(
                       () -> {
                         try {
+                          long start = System.currentTimeMillis();
                           sessionPool.insertAlignedTablet(oldTablet);
+                          totalInsertTime.addAndGet(System.currentTimeMillis() 
- start);
                           totalPoints.addAndGet(
                               (long) oldTablet.rowSize * 
oldTablet.getSchemas().size());
+                          totalInsertCount.addAndGet(1);
                         } catch (IoTDBConnectionException | 
StatementExecutionException e) {
                           throw new RuntimeException(e);
                         } finally {
@@ -265,9 +294,12 @@ public class ImportCSVTool {
                   sessionService.submit(
                       () -> {
                         try {
+                          long start = System.currentTimeMillis();
                           sessionPool.insertAlignedTablet(oldTablet);
+                          totalInsertTime.addAndGet(System.currentTimeMillis() 
- start);
                           totalPoints.addAndGet(
                               (long) oldTablet.rowSize * 
oldTablet.getSchemas().size());
+                          totalInsertCount.addAndGet(1);
                         } catch (IoTDBConnectionException | 
StatementExecutionException e) {
                           throw new RuntimeException(e);
                         } finally {
@@ -281,7 +313,8 @@ public class ImportCSVTool {
                 }
               }
               Tablet tablet =
-                  tabletMap.computeIfAbsent(deviceId, k -> new 
Tablet(deviceId, schemas));
+                  tabletMap.computeIfAbsent(
+                      deviceId, k -> new Tablet(deviceId, schemas, 
tabletSize));
               tablet.addTimestamp(tablet.rowSize, castTime(recordObj[0]));
               for (int i = 0; i < schemas.size(); i++) {
                 MeasurementSchema schema = schemas.get(i);
@@ -301,12 +334,16 @@ public class ImportCSVTool {
                         + (System.currentTimeMillis() - startTime)
                         + " ms");
               }
+              totalParseTime.addAndGet(System.currentTimeMillis() - 
parseStart);
             }
           });
       if (!tabletMap.isEmpty()) {
         for (Tablet tablet : tabletMap.values()) {
+          long start = System.currentTimeMillis();
           sessionPool.insertAlignedTablet(tablet);
+          totalInsertTime.addAndGet(System.currentTimeMillis() - start);
           totalPoints.addAndGet((long) tablet.rowSize * 
tablet.getSchemas().size());
+          totalInsertCount.addAndGet(1);
         }
         tabletMap.clear();
       }
@@ -332,6 +369,7 @@ public class ImportCSVTool {
       final String[] currentDeviceId = {null};
       lines.forEach(
           line -> {
+            long parseStart = System.currentTimeMillis();
             if (position.getAndAdd(line.length()) > 0) {
               String[] recordObj = line.split(",");
               String name = isNameNull ? "d_null" : recordObj[4];
@@ -381,9 +419,12 @@ public class ImportCSVTool {
                   sessionService.submit(
                       () -> {
                         try {
+                          long start = System.currentTimeMillis();
                           sessionPool.insertAlignedTablet(oldTablet);
+                          totalInsertTime.addAndGet(System.currentTimeMillis() 
- start);
                           totalPoints.addAndGet(
                               (long) oldTablet.rowSize * 
oldTablet.getSchemas().size());
+                          totalInsertCount.addAndGet(1);
                         } catch (IoTDBConnectionException | 
StatementExecutionException e) {
                           throw new RuntimeException(e);
                         } finally {
@@ -408,9 +449,12 @@ public class ImportCSVTool {
                   sessionService.submit(
                       () -> {
                         try {
+                          long start = System.currentTimeMillis();
                           sessionPool.insertAlignedTablet(oldTablet);
+                          totalInsertTime.addAndGet(System.currentTimeMillis() 
- start);
                           totalPoints.addAndGet(
                               (long) oldTablet.rowSize * 
oldTablet.getSchemas().size());
+                          totalInsertCount.addAndGet(1);
                         } catch (IoTDBConnectionException | 
StatementExecutionException e) {
                           throw new RuntimeException(e);
                         } finally {
@@ -424,7 +468,8 @@ public class ImportCSVTool {
                 }
               }
               Tablet tablet =
-                  tabletMap.computeIfAbsent(deviceId, k -> new 
Tablet(deviceId, schemas));
+                  tabletMap.computeIfAbsent(
+                      deviceId, k -> new Tablet(deviceId, schemas, 
tabletSize));
               tablet.addTimestamp(tablet.rowSize, castTime(recordObj[0]));
               for (int i = 0; i < schemas.size(); i++) {
                 MeasurementSchema schema = schemas.get(i);
@@ -444,12 +489,16 @@ public class ImportCSVTool {
                         + (System.currentTimeMillis() - startTime)
                         + " ms");
               }
+              totalParseTime.addAndGet(System.currentTimeMillis() - 
parseStart);
             }
           });
       if (!tabletMap.isEmpty()) {
         for (Tablet tablet : tabletMap.values()) {
+          long start = System.currentTimeMillis();
           sessionPool.insertAlignedTablet(tablet);
+          totalInsertTime.addAndGet(System.currentTimeMillis() - start);
           totalPoints.addAndGet((long) tablet.rowSize * 
tablet.getSchemas().size());
+          totalInsertCount.addAndGet(1);
         }
         tabletMap.clear();
       }

Reply via email to