This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch guonengtest in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7abac0a1b9ba50278f0555b15b24739459cfe6f2 Author: HTHou <[email protected]> AuthorDate: Mon Apr 17 18:09:49 2023 +0800 update SessionPoolExample --- .../java/org/apache/iotdb/SessionPoolExample.java | 52 +++++++++++++++------- 1 file changed, 36 insertions(+), 16 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java index 5263fb6faf..ecc04e8064 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb; +import java.util.concurrent.atomic.AtomicLong; import org.apache.iotdb.isession.SessionDataSet.DataIterator; import org.apache.iotdb.isession.pool.SessionDataSetWrapper; import org.apache.iotdb.rpc.IoTDBConnectionException; @@ -40,20 +41,23 @@ public class SessionPoolExample { private static final Logger LOGGER = LoggerFactory.getLogger(SessionPoolExample.class); - private static int ROW_NUMBER = 100; - private static int THREAD_NUMBER = 100; + private static int THREAD_NUMBER = 300; private static int DEVICE_NUMBER = 20000; private static int SENSOR_NUMBER = 500; - private static int WRITE_NUMBER = 10000; + private static int WRITE_NUMBER = 100; private static List<String> measurements; private static List<TSDataType> types; - private static AtomicInteger total = new AtomicInteger(); + private static AtomicInteger totalRowNumber = new AtomicInteger(); + + private static AtomicInteger finishedThreadCount = new AtomicInteger(); + + private static AtomicLong insertTime = new AtomicLong(); private static Random r; @@ -90,23 +94,41 @@ public class SessionPoolExample { Thread[] threads = new Thread[THREAD_NUMBER]; for (int i = 0; i < THREAD_NUMBER; i++) { + int index = i; threads[i] = new Thread( () -> { for (int j = 0; j < WRITE_NUMBER; j++) { long start = System.currentTimeMillis(); try { - insertRecords(); + insertRecords(index); } catch (Exception e) { LOGGER.error("insert error:", e); } LOGGER.info( - "insert {} rows cost {} ms", ROW_NUMBER, System.currentTimeMillis() - start); - LOGGER.info("Total rows number: {}", total.addAndGet(ROW_NUMBER)); + "insert {} rows cost {} ms", (DEVICE_NUMBER / THREAD_NUMBER), System.currentTimeMillis() - start); + LOGGER.info("Total rows number: {}", totalRowNumber.addAndGet(DEVICE_NUMBER / THREAD_NUMBER)); + finishedThreadCount.addAndGet(1); + while (finishedThreadCount.get() <= THREAD_NUMBER) { + try { + TimeUnit.MILLISECONDS.sleep(1); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + finishedThreadCount.set(0); + insertTime.set(System.currentTimeMillis()); } }); } Thread.sleep(1000); + long startTime = System.currentTimeMillis(); + // start write + insertTime.set(System.currentTimeMillis()); + for (Thread thread : threads) { + thread.start(); + } + Thread[] readThreads = new Thread[THREAD_NUMBER]; for (int i = 0; i < THREAD_NUMBER; i++) { readThreads[i] = @@ -121,10 +143,8 @@ public class SessionPoolExample { } }); } - long startTime = System.currentTimeMillis(); - for (Thread thread : threads) { - thread.start(); - } + + // start read for (Thread thread : readThreads) { thread.start(); } @@ -149,22 +169,22 @@ public class SessionPoolExample { long currentTime = System.nanoTime(); LOGGER.info( "write rate: {} lines/minute", - total.get() / ((currentTime - startTime1) / 60_000_000_000L)); + totalRowNumber.get() / ((currentTime - startTime1) / 60_000_000_000L)); } }) .start(); } // more insert example, see SessionExample.java - private static void insertRecords() throws StatementExecutionException, IoTDBConnectionException { - long time = System.currentTimeMillis(); + private static void insertRecords(int threadIndex) throws StatementExecutionException, IoTDBConnectionException { + long time = insertTime.get(); List<String> deviceIds = new ArrayList<>(); List<Long> times = new ArrayList<>(); List<List<String>> measurementsList = new ArrayList<>(); List<List<TSDataType>> typesList = new ArrayList<>(); List<List<Object>> valuesList = new ArrayList<>(); - for (int j = 0; j < ROW_NUMBER; j++) { - String deviceId = "root.test.g_0.d_" + r.nextInt(DEVICE_NUMBER); + for (int j = 0; j < DEVICE_NUMBER / THREAD_NUMBER; j++) { + String deviceId = "root.test.g_0.d_" + (DEVICE_NUMBER / THREAD_NUMBER * threadIndex + j); deviceIds.add(deviceId); times.add(time); List<Object> values = new ArrayList<>();
