This is an automated email from the ASF dual-hosted git repository. spmallette pushed a commit to branch driver-35 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 8ef8f214b1d50e191a9b9a5e7540ca9501ef78ca Author: stephen <[email protected]> AuthorDate: Thu Dec 5 13:09:52 2019 -0500 Control the number of request in flight Bind it to the size of the max connections so it just doesn't timeout right away --- .../gremlin/driver/util/ProfilingApplication.java | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java index 5838ab4..bf675e5 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java @@ -23,6 +23,7 @@ import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.tinkerpop.gremlin.driver.Channelizer; import org.apache.tinkerpop.gremlin.driver.Client; import org.apache.tinkerpop.gremlin.driver.Cluster; +import org.apache.tinkerpop.gremlin.driver.Connection; import org.apache.tinkerpop.gremlin.driver.ser.Serializers; import org.apache.tinkerpop.gremlin.structure.util.ElementHelper; @@ -94,11 +95,24 @@ public class ProfilingApplication { final String executionId = "[" + executionName + "]"; try { final CountDownLatch latch = new CountDownLatch(requests); + final AtomicInteger inFlight = new AtomicInteger(0); client.init(); final long start = System.nanoTime(); IntStream.range(0, requests).forEach(i -> { final String s = exercise ? chooseScript() : script; + + // control number of requests in flight for testing purposes or else we'll timeout without enough + // connections to service all this stuff. + while (inFlight.intValue() >= cluster.maxConnectionPoolSize()) { + try { + TimeUnit.MILLISECONDS.sleep(10L); + } catch (Exception ex) { + // wait wait wait + } + } + + inFlight.incrementAndGet(); client.submitAsync(s).thenAcceptAsync(r -> { try { r.all().get(tooSlowThreshold, TimeUnit.MILLISECONDS); @@ -107,6 +121,7 @@ public class ProfilingApplication { } catch (Exception ex) { ex.printStackTrace(); } finally { + inFlight.decrementAndGet(); latch.countDown(); } }, executor); @@ -149,7 +164,7 @@ public class ProfilingApplication { final int nioPoolSize = Integer.parseInt(options.getOrDefault("nioPoolSize", "1").toString()); final int requests = Integer.parseInt(options.getOrDefault("requests", "10000").toString()); final int maxConnectionPoolSize = Integer.parseInt(options.getOrDefault("maxConnectionPoolSize", "256").toString()); - final int maxWaitForConnection = Integer.parseInt(options.getOrDefault("maxWaitForConnection", "3000").toString()); + final int maxWaitForConnection = Integer.parseInt(options.getOrDefault("maxWaitForConnection", Connection.DEFAULT_MAX_WAIT_FOR_CONNECTION).toString()); final int workerPoolSize = Integer.parseInt(options.getOrDefault("workerPoolSize", "2").toString()); final int tooSlowThreshold = Integer.parseInt(options.getOrDefault("tooSlowThreshold", "125").toString()); final String channelizer = options.getOrDefault("channelizer", Channelizer.WebSocketChannelizer.class.getName()).toString();
