This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch guonengtest
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/guonengtest by this push:
     new 1e0cc9d4d0 add countDownLatch sync write
1e0cc9d4d0 is described below

commit 1e0cc9d4d0429ba592bb34f380d1cc44c4f48fbd
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Mon Apr 17 20:33:12 2023 +0800

    add countDownLatch sync write
---
 .../java/org/apache/iotdb/SessionPoolExample.java  | 115 +++++++++++++--------
 1 file changed, 72 insertions(+), 43 deletions(-)

diff --git 
a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java 
b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
index ecc04e8064..e3a82c75ea 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
@@ -34,6 +34,7 @@ import java.util.List;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.CountDownLatch;
 
 public class SessionPoolExample {
 
@@ -47,7 +48,7 @@ public class SessionPoolExample {
 
   private static int SENSOR_NUMBER = 500;
 
-  private static int WRITE_NUMBER = 100;
+  private static int TOTAL_BATCH_COUNT_PER_DEVICE = 100;
 
   private static List<String> measurements;
 
@@ -55,8 +56,6 @@ public class SessionPoolExample {
 
   private static AtomicInteger totalRowNumber = new AtomicInteger();
 
-  private static AtomicInteger finishedThreadCount = new AtomicInteger();
-
   private static AtomicLong insertTime = new AtomicLong();
 
   private static Random r;
@@ -79,6 +78,59 @@ public class SessionPoolExample {
     sessionPool.setFetchSize(10000);
   }
 
+  private static class SyncWriteSignal {
+    protected volatile boolean latchInitialized = false;
+    protected CountDownLatch latch;
+    protected long currentTimestamp;
+
+
+    protected void syncCountDownBeforeInsert() {
+      if (!latchInitialized) {
+        synchronized (this) {
+          if (!latchInitialized) {
+            latch = new CountDownLatch(THREAD_NUMBER);
+            latchInitialized = true;
+            currentTimestamp = System.currentTimeMillis();
+          }
+        }
+      }
+    }
+
+    protected void finishInsert() {
+      latch.countDown();
+      if (latch.getCount() == 0) {
+        LOGGER.info("one loop finished. cost: {}ms. total rows: {}", 
(System.currentTimeMillis() - currentTimestamp), totalRowNumber.get());
+      }
+    }
+
+    protected void waitCurrentLoopFinished() throws InterruptedException {
+      latch.await();
+    }
+  }
+
+  private static class InsertWorker implements Runnable {
+    private SyncWriteSignal signal;
+    private int index;
+
+    protected InsertWorker(SyncWriteSignal signal, int index) {
+      this.signal = signal;
+      this.index = index;
+    }
+
+    @Override
+    public void run() {
+      for (int j = 0; j < TOTAL_BATCH_COUNT_PER_DEVICE; j++) {
+        signal.syncCountDownBeforeInsert();
+        try {
+          insertRecords(index);
+        } catch (Exception e) {
+          LOGGER.error("insert error. Thread: {}. Error:", index, e);
+        }
+        totalRowNumber.addAndGet(DEVICE_NUMBER / THREAD_NUMBER);
+      }
+    }
+  }
+
   public static void main(String[] args) throws InterruptedException {
     // Choose the SessionPool you going to use
     constructRedirectSessionPool();
@@ -90,51 +142,35 @@ public class SessionPoolExample {
       types.add(TSDataType.FLOAT);
     }
 
-    r = new Random();
-
     Thread[] threads = new Thread[THREAD_NUMBER];
-    for (int i = 0; i < THREAD_NUMBER; i++) {
-      int index = i;
-      threads[i] =
-          new Thread(
-              () -> {
-                for (int j = 0; j < WRITE_NUMBER; j++) {
-                  long start = System.currentTimeMillis();
-                  try {
-                    insertRecords(index);
-                  } catch (Exception e) {
-                    LOGGER.error("insert error:", e);
-                  }
-                  LOGGER.info(
-                      "insert {} rows cost {} ms", (DEVICE_NUMBER / 
THREAD_NUMBER), System.currentTimeMillis() - start);
-                  LOGGER.info("Total rows number: {}", 
totalRowNumber.addAndGet(DEVICE_NUMBER / THREAD_NUMBER));
-                  finishedThreadCount.addAndGet(1);
-                  while (finishedThreadCount.get() <= THREAD_NUMBER) {
-                    try {
-                      TimeUnit.MILLISECONDS.sleep(1);
-                    } catch (InterruptedException e) {
-                      throw new RuntimeException(e);
-                    }
-                  }
-                  finishedThreadCount.set(0);
-                  insertTime.set(System.currentTimeMillis());
-                }
-              });
+
+    SyncWriteSignal signal = new SyncWriteSignal();
+    for (int i = 0 ; i < THREAD_NUMBER; i ++) {
+      threads[i] = new Thread(new InsertWorker(signal, i));
     }
-    Thread.sleep(1000);
+
+    // count total execution time
     long startTime = System.currentTimeMillis();
-    // start write
-    insertTime.set(System.currentTimeMillis());
+    Runtime.getRuntime()
+        .addShutdownHook(
+            new Thread(
+                () -> {
+                  sessionPool.close();
+                  System.out.println(System.currentTimeMillis() - startTime);
+                }));
+
+    // start write thread
     for (Thread thread : threads) {
       thread.start();
     }
 
+    r = new Random();
     Thread[] readThreads = new Thread[THREAD_NUMBER];
     for (int i = 0; i < THREAD_NUMBER; i++) {
       readThreads[i] =
           new Thread(
               () -> {
-                for (int j = 0; j < WRITE_NUMBER; j++) {
+                for (int j = 0; j < TOTAL_BATCH_COUNT_PER_DEVICE; j++) {
                   try {
                     queryByIterator();
                   } catch (Exception e) {
@@ -149,13 +185,6 @@ public class SessionPoolExample {
       thread.start();
     }
 
-    Runtime.getRuntime()
-        .addShutdownHook(
-            new Thread(
-                () -> {
-                  sessionPool.close();
-                  System.out.println(System.currentTimeMillis() - startTime);
-                }));
 
     long startTime1 = System.nanoTime();
     new Thread(

Reply via email to