This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch guonengtest
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7abac0a1b9ba50278f0555b15b24739459cfe6f2
Author: HTHou <[email protected]>
AuthorDate: Mon Apr 17 18:09:49 2023 +0800

    update SessionPoolExample
---
 .../java/org/apache/iotdb/SessionPoolExample.java  | 52 +++++++++++++++-------
 1 file changed, 36 insertions(+), 16 deletions(-)

diff --git 
a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java 
b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
index 5263fb6faf..ecc04e8064 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb;
 
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.isession.SessionDataSet.DataIterator;
 import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
@@ -40,20 +41,23 @@ public class SessionPoolExample {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SessionPoolExample.class);
 
-  private static int ROW_NUMBER = 100;
-  private static int THREAD_NUMBER = 100;
+  private static int THREAD_NUMBER = 300;
 
   private static int DEVICE_NUMBER = 20000;
 
   private static int SENSOR_NUMBER = 500;
 
-  private static int WRITE_NUMBER = 10000;
+  private static int WRITE_NUMBER = 100;
 
   private static List<String> measurements;
 
   private static List<TSDataType> types;
 
-  private static AtomicInteger total = new AtomicInteger();
+  private static AtomicInteger totalRowNumber = new AtomicInteger();
+
+  private static AtomicInteger finishedThreadCount = new AtomicInteger();
+
+  private static AtomicLong insertTime = new AtomicLong();
 
   private static Random r;
 
@@ -90,23 +94,41 @@ public class SessionPoolExample {
 
     Thread[] threads = new Thread[THREAD_NUMBER];
     for (int i = 0; i < THREAD_NUMBER; i++) {
+      int index = i;
       threads[i] =
           new Thread(
               () -> {
                 for (int j = 0; j < WRITE_NUMBER; j++) {
                   long start = System.currentTimeMillis();
                   try {
-                    insertRecords();
+                    insertRecords(index);
                   } catch (Exception e) {
                     LOGGER.error("insert error:", e);
                   }
                   LOGGER.info(
-                      "insert {} rows cost {} ms", ROW_NUMBER, 
System.currentTimeMillis() - start);
-                  LOGGER.info("Total rows number: {}", 
total.addAndGet(ROW_NUMBER));
+                      "insert {} rows cost {} ms", (DEVICE_NUMBER / 
THREAD_NUMBER), System.currentTimeMillis() - start);
+                  LOGGER.info("Total rows number: {}", 
totalRowNumber.addAndGet(DEVICE_NUMBER / THREAD_NUMBER));
+                  finishedThreadCount.addAndGet(1);
+                  while (finishedThreadCount.get() <= THREAD_NUMBER) {
+                    try {
+                      TimeUnit.MILLISECONDS.sleep(1);
+                    } catch (InterruptedException e) {
+                      throw new RuntimeException(e);
+                    }
+                  }
+                  finishedThreadCount.set(0);
+                  insertTime.set(System.currentTimeMillis());
                 }
               });
     }
     Thread.sleep(1000);
+    long startTime = System.currentTimeMillis();
+    // start write
+    insertTime.set(System.currentTimeMillis());
+    for (Thread thread : threads) {
+      thread.start();
+    }
+
     Thread[] readThreads = new Thread[THREAD_NUMBER];
     for (int i = 0; i < THREAD_NUMBER; i++) {
       readThreads[i] =
@@ -121,10 +143,8 @@ public class SessionPoolExample {
                 }
               });
     }
-    long startTime = System.currentTimeMillis();
-    for (Thread thread : threads) {
-      thread.start();
-    }
+
+    // start read
     for (Thread thread : readThreads) {
       thread.start();
     }
@@ -149,22 +169,22 @@ public class SessionPoolExample {
                 long currentTime = System.nanoTime();
                 LOGGER.info(
                     "write rate: {} lines/minute",
-                    total.get() / ((currentTime - startTime1) / 
60_000_000_000L));
+                    totalRowNumber.get() / ((currentTime - startTime1) / 
60_000_000_000L));
               }
             })
         .start();
   }
 
   // more insert example, see SessionExample.java
-  private static void insertRecords() throws StatementExecutionException, 
IoTDBConnectionException {
-    long time = System.currentTimeMillis();
+  private static void insertRecords(int threadIndex) throws 
StatementExecutionException, IoTDBConnectionException {
+    long time = insertTime.get();
     List<String> deviceIds = new ArrayList<>();
     List<Long> times = new ArrayList<>();
     List<List<String>> measurementsList = new ArrayList<>();
     List<List<TSDataType>> typesList = new ArrayList<>();
     List<List<Object>> valuesList = new ArrayList<>();
-    for (int j = 0; j < ROW_NUMBER; j++) {
-      String deviceId = "root.test.g_0.d_" + r.nextInt(DEVICE_NUMBER);
+    for (int j = 0; j < DEVICE_NUMBER / THREAD_NUMBER; j++) {
+      String deviceId = "root.test.g_0.d_" + (DEVICE_NUMBER / THREAD_NUMBER * 
threadIndex + j);
       deviceIds.add(deviceId);
       times.add(time);
       List<Object> values = new ArrayList<>();

Reply via email to