This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch guonengtest
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/guonengtest by this push:
new 1e0cc9d4d0 add countDownLatch sync write
1e0cc9d4d0 is described below
commit 1e0cc9d4d0429ba592bb34f380d1cc44c4f48fbd
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Mon Apr 17 20:33:12 2023 +0800
add countDownLatch sync write
---
.../java/org/apache/iotdb/SessionPoolExample.java | 115 +++++++++++++--------
1 file changed, 72 insertions(+), 43 deletions(-)
diff --git
a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
index ecc04e8064..e3a82c75ea 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
@@ -34,6 +34,7 @@ import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.CountDownLatch;
public class SessionPoolExample {
@@ -47,7 +48,7 @@ public class SessionPoolExample {
private static int SENSOR_NUMBER = 500;
- private static int WRITE_NUMBER = 100;
+ private static int TOTAL_BATCH_COUNT_PER_DEVICE = 100;
private static List<String> measurements;
@@ -55,8 +56,6 @@ public class SessionPoolExample {
private static AtomicInteger totalRowNumber = new AtomicInteger();
- private static AtomicInteger finishedThreadCount = new AtomicInteger();
-
private static AtomicLong insertTime = new AtomicLong();
private static Random r;
@@ -79,6 +78,59 @@ public class SessionPoolExample {
sessionPool.setFetchSize(10000);
}
+ private static class SyncWriteSignal {
+ protected volatile boolean latchInitialized = false;
+ protected CountDownLatch latch;
+ protected long currentTimestamp;
+
+
+ protected void syncCountDownBeforeInsert() {
+ if (!latchInitialized) {
+ synchronized (this) {
+ if (!latchInitialized) {
+ latch = new CountDownLatch(THREAD_NUMBER);
+ latchInitialized = true;
+ currentTimestamp = System.currentTimeMillis();
+ }
+ }
+ }
+ }
+
+ protected void finishInsert() {
+ latch.countDown();
+ if (latch.getCount() == 0) {
+ LOGGER.info("one loop finished. cost: {}ms. total rows: {}",
(System.currentTimeMillis() - currentTimestamp), totalRowNumber.get());
+ }
+ }
+
+ protected void waitCurrentLoopFinished() throws InterruptedException {
+ latch.await();
+ }
+ }
+
+ private static class InsertWorker implements Runnable {
+ private SyncWriteSignal signal;
+ private int index;
+
+ protected InsertWorker(SyncWriteSignal signal, int index) {
+ this.signal = signal;
+ this.index = index;
+ }
+
+ @Override
+ public void run() {
+ for (int j = 0; j < TOTAL_BATCH_COUNT_PER_DEVICE; j++) {
+ signal.syncCountDownBeforeInsert();
+ try {
+ insertRecords(index);
+ } catch (Exception e) {
+ LOGGER.error("insert error. Thread: {}. Error:", index, e);
+ }
+ totalRowNumber.addAndGet(DEVICE_NUMBER / THREAD_NUMBER);
+ }
+ }
+ }
+
public static void main(String[] args) throws InterruptedException {
// Choose the SessionPool you going to use
constructRedirectSessionPool();
@@ -90,51 +142,35 @@ public class SessionPoolExample {
types.add(TSDataType.FLOAT);
}
- r = new Random();
-
Thread[] threads = new Thread[THREAD_NUMBER];
- for (int i = 0; i < THREAD_NUMBER; i++) {
- int index = i;
- threads[i] =
- new Thread(
- () -> {
- for (int j = 0; j < WRITE_NUMBER; j++) {
- long start = System.currentTimeMillis();
- try {
- insertRecords(index);
- } catch (Exception e) {
- LOGGER.error("insert error:", e);
- }
- LOGGER.info(
- "insert {} rows cost {} ms", (DEVICE_NUMBER /
THREAD_NUMBER), System.currentTimeMillis() - start);
- LOGGER.info("Total rows number: {}",
totalRowNumber.addAndGet(DEVICE_NUMBER / THREAD_NUMBER));
- finishedThreadCount.addAndGet(1);
- while (finishedThreadCount.get() <= THREAD_NUMBER) {
- try {
- TimeUnit.MILLISECONDS.sleep(1);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- finishedThreadCount.set(0);
- insertTime.set(System.currentTimeMillis());
- }
- });
+
+ SyncWriteSignal signal = new SyncWriteSignal();
+ for (int i = 0 ; i < THREAD_NUMBER; i ++) {
+ threads[i] = new Thread(new InsertWorker(signal, i));
}
- Thread.sleep(1000);
+
+ // count total execution time
long startTime = System.currentTimeMillis();
- // start write
- insertTime.set(System.currentTimeMillis());
+ Runtime.getRuntime()
+ .addShutdownHook(
+ new Thread(
+ () -> {
+ sessionPool.close();
+ System.out.println(System.currentTimeMillis() - startTime);
+ }));
+
+ // start write thread
for (Thread thread : threads) {
thread.start();
}
+ r = new Random();
Thread[] readThreads = new Thread[THREAD_NUMBER];
for (int i = 0; i < THREAD_NUMBER; i++) {
readThreads[i] =
new Thread(
() -> {
- for (int j = 0; j < WRITE_NUMBER; j++) {
+ for (int j = 0; j < TOTAL_BATCH_COUNT_PER_DEVICE; j++) {
try {
queryByIterator();
} catch (Exception e) {
@@ -149,13 +185,6 @@ public class SessionPoolExample {
thread.start();
}
- Runtime.getRuntime()
- .addShutdownHook(
- new Thread(
- () -> {
- sessionPool.close();
- System.out.println(System.currentTimeMillis() - startTime);
- }));
long startTime1 = System.nanoTime();
new Thread(