This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a commit to branch driver-35
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 8ef8f214b1d50e191a9b9a5e7540ca9501ef78ca
Author: stephen <[email protected]>
AuthorDate: Thu Dec 5 13:09:52 2019 -0500

    Control the number of request in flight
    
    Bind it to the size of the max connections so it just doesn't timeout right 
away
---
 .../gremlin/driver/util/ProfilingApplication.java       | 17 ++++++++++++++++-
 1 file changed, 16 insertions(+), 1 deletion(-)

diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java
index 5838ab4..bf675e5 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java
@@ -23,6 +23,7 @@ import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.tinkerpop.gremlin.driver.Channelizer;
 import org.apache.tinkerpop.gremlin.driver.Client;
 import org.apache.tinkerpop.gremlin.driver.Cluster;
+import org.apache.tinkerpop.gremlin.driver.Connection;
 import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
 import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
 
@@ -94,11 +95,24 @@ public class ProfilingApplication {
         final String executionId = "[" + executionName + "]";
         try {
             final CountDownLatch latch = new CountDownLatch(requests);
+            final AtomicInteger inFlight = new AtomicInteger(0);
             client.init();
 
             final long start = System.nanoTime();
             IntStream.range(0, requests).forEach(i -> {
                 final String s = exercise ? chooseScript() : script;
+
+                // control number of requests in flight for testing purposes 
or else we'll timeout without enough
+                // connections to service all this stuff.
+                while (inFlight.intValue() >= cluster.maxConnectionPoolSize()) 
{
+                    try {
+                        TimeUnit.MILLISECONDS.sleep(10L);
+                    } catch (Exception ex) {
+                        // wait wait wait
+                    }
+                }
+
+                inFlight.incrementAndGet();
                 client.submitAsync(s).thenAcceptAsync(r -> {
                     try {
                         r.all().get(tooSlowThreshold, TimeUnit.MILLISECONDS);
@@ -107,6 +121,7 @@ public class ProfilingApplication {
                     } catch (Exception ex) {
                         ex.printStackTrace();
                     } finally {
+                        inFlight.decrementAndGet();
                         latch.countDown();
                     }
                 }, executor);
@@ -149,7 +164,7 @@ public class ProfilingApplication {
         final int nioPoolSize = 
Integer.parseInt(options.getOrDefault("nioPoolSize", "1").toString());
         final int requests = Integer.parseInt(options.getOrDefault("requests", 
"10000").toString());
         final int maxConnectionPoolSize = 
Integer.parseInt(options.getOrDefault("maxConnectionPoolSize", 
"256").toString());
-        final int maxWaitForConnection = 
Integer.parseInt(options.getOrDefault("maxWaitForConnection", 
"3000").toString());
+        final int maxWaitForConnection = 
Integer.parseInt(options.getOrDefault("maxWaitForConnection", 
Connection.DEFAULT_MAX_WAIT_FOR_CONNECTION).toString());
         final int workerPoolSize = 
Integer.parseInt(options.getOrDefault("workerPoolSize", "2").toString());
         final int tooSlowThreshold = 
Integer.parseInt(options.getOrDefault("tooSlowThreshold", "125").toString());
         final String channelizer = options.getOrDefault("channelizer", 
Channelizer.WebSocketChannelizer.class.getName()).toString();

Reply via email to