Fixed a problem with how timeouts were scheduled in GremlinExecutor

Made timeouts schedule closer to when execution of a script actually occurred. 
All together, this change helps prevent sessions from locking when submitting 
multiple parallel requests that have the potential to block.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/cd0e58bb
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/cd0e58bb
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/cd0e58bb

Branch: refs/heads/TINKERPOP-1254
Commit: cd0e58bb769faf06ddfee9473649c7b53ba1b50c
Parents: d699cb6
Author: Stephen Mallette <[email protected]>
Authored: Tue Jun 28 16:19:54 2016 -0400
Committer: Stephen Mallette <[email protected]>
Committed: Tue Jun 28 16:19:54 2016 -0400

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |  4 +-
 .../src/reference/gremlin-applications.asciidoc |  6 ++-
 .../upgrade/release-3.1.x-incubating.asciidoc   | 14 ++++-
 .../gremlin/groovy/engine/GremlinExecutor.java  | 51 +++++++++++-------
 .../server/GremlinDriverIntegrateTest.java      | 54 ++++++++++++++++++++
 5 files changed, 106 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/cd0e58bb/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index a359eca..6fcd00a 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -26,7 +26,9 @@ 
image::https://raw.githubusercontent.com/apache/incubator-tinkerpop/master/docs/
 TinkerPop 3.1.3 (NOT OFFICIALLY RELEASED YET)
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
-* Avoid hamcrest conflict by using mockito-core instead of mockito-all 
dependency in `gremlin-test`.
+* Avoided hamcrest conflict by using mockito-core instead of mockito-all 
dependency in `gremlin-test`.
+* Fixed bug in `GremlinExecutor` causing Gremlin Server to lock up when 
parallel requests were submitted on the same session if those parallel requests 
included a script that blocked indefinitely.
+* Changed `GremlinExecutor` timeout scheduling so that the timer would not 
start until a time closer to the actual start of script evaluation.
 * Fixed bug in `SubgraphStrategy` where step labels were not being propogated 
properly to new steps injected by the strategy.
 * Defaulted to `Edge.DEFAULT` if no edge label was supplied in GraphML.
 * Fixed bug in `IoGraphTest` causing IllegalArgumentException: URI is not 
hierarchical error for external graph implementations.

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/cd0e58bb/docs/src/reference/gremlin-applications.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/reference/gremlin-applications.asciidoc 
b/docs/src/reference/gremlin-applications.asciidoc
index 0cb875e..488663f 100644
--- a/docs/src/reference/gremlin-applications.asciidoc
+++ b/docs/src/reference/gremlin-applications.asciidoc
@@ -1261,7 +1261,7 @@ Tuning
 image:gremlin-handdrawn.png[width=120,float=right] Tuning Gremlin Server for a 
particular environment may require some simple trial-and-error, but the 
following represent some basic guidelines that might be useful:
 
 * Gremlin Server defaults to a very modest maximum heap size.  Consider 
increasing this value for non-trivial uses.  Maximum heap size (`-Xmx`) is 
defined with the `JAVA_OPTIONS` setting in `gremlin-server.sh`.
-* When configuring the size of `threadPoolWorker` start with the default of 
`1` and increment by one as needed to a maximum of `2*number of cores`.
+* When configuring the size of `threadPoolWorker` start with the default of 
`1` and increment by one as needed to a maximum of `2*number of cores`. Note 
that if using sessions that will accept parallel requests on the same session, 
then this value should be no less than `2`.
 * The "right" size of the `gremlinPool` setting is somewhat dependent on the 
type of scripts that will be processed
 by Gremlin Server.  As requests arrive to Gremlin Server they are decoded and 
queued to be processed by threads in
 this pool.  When this pool is exhausted of threads, Gremlin Server will 
continue to accept incoming requests, but
@@ -1407,6 +1407,10 @@ request.
 A session is a "heavier" approach to the simple "request/response" approach of 
sessionless requests, but is sometimes
 necessary for a given use case.
 
+IMPORTANT: If submitting requests in parallel to a single session in Gremlin 
Server, then the `threadPoolWorker`
+setting can be no less than `2` or else the session may be prone to becoming 
locked if scripts sent on that session
+tend to block for extended periods of time.
+
 [[considering-transactions]]
 Considering Transactions
 ^^^^^^^^^^^^^^^^^^^^^^^^

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/cd0e58bb/docs/src/upgrade/release-3.1.x-incubating.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/upgrade/release-3.1.x-incubating.asciidoc 
b/docs/src/upgrade/release-3.1.x-incubating.asciidoc
index b1b99f8..d8c5de2 100644
--- a/docs/src/upgrade/release-3.1.x-incubating.asciidoc
+++ b/docs/src/upgrade/release-3.1.x-incubating.asciidoc
@@ -52,7 +52,17 @@ still possible to get a timeout on a request if the server 
timeout limits are re
 refers to how long the console will wait for a response from the server before 
giving up. By default, the timeout is
 set to `none`.
 
-See: https://issues.apache.org/jira/browse/TINKERPOP-1267[TINKERPOP-1267]
+See: link:https://issues.apache.org/jira/browse/TINKERPOP-1267[TINKERPOP-1267]
+
+Gremlin Server Workers
+^^^^^^^^^^^^^^^^^^^^^^
+
+Past configuration recommendations for the `threadPoolWorker` setting on 
Gremlin Server stated this value could be
+safely set to `1` at the low end. A size of `1` is still valid for most cases, 
however, if Gremlin Server is being used
+with sessions that accept parallel requests, then this value should be no less 
than `2` or else certain scripts (i.e.
+those that block for an extended period of time) may cause Gremlin Server to 
lock up the session.
+
+See: link:https://issues.apache.org/jira/browse/TINKERPOP-1350[TINKERPOP-1350]
 
 Upgrading for Providers
 ~~~~~~~~~~~~~~~~~~~~~~~
@@ -93,7 +103,7 @@ is an optional argument and does not represent a breaking 
change, but it does ma
 complete. While the default authentication implementations packaged with 
Gremlin Server don't utilize this argument
 other implementations might, so the drivers should be able to pass it as per 
the SASL specification.
 
-See: https://issues.apache.org/jira/browse/[TINKERPOP-1263]
+See: link:https://issues.apache.org/jira/browse/[TINKERPOP-1263]
 
 TinkerPop 3.1.2
 ---------------

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/cd0e58bb/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
----------------------------------------------------------------------
diff --git 
a/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
 
b/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
index 63721eb..d70ff3d 100644
--- 
a/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
+++ 
b/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
@@ -51,6 +51,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -231,6 +232,8 @@ public class GremlinExecutor implements AutoCloseable {
         return eval(script, language, boundVars, lifeCycle);
     }
 
+    private static final AtomicInteger ugh = new AtomicInteger(0);
+
     /**
      * Evaluate a script and allow for the submission of alteration to the 
entire evaluation execution lifecycle.
      *
@@ -249,10 +252,35 @@ public class GremlinExecutor implements AutoCloseable {
         bindings.putAll(boundVars);
 
         // override the timeout if the lifecycle has a value assigned
-        final long seto = 
lifeCycle.getScriptEvaluationTimeoutOverride().orElse(scriptEvaluationTimeout);
+        final long scriptEvalTimeOut = 
lifeCycle.getScriptEvaluationTimeoutOverride().orElse(scriptEvaluationTimeout);
 
         final CompletableFuture<Object> evaluationFuture = new 
CompletableFuture<>();
-        final FutureTask<Void> f = new FutureTask<>(() -> {
+        final FutureTask<Void> evalFuture = new FutureTask<>(() -> {
+
+            if (scriptEvalTimeOut > 0) {
+                final Thread scriptEvalThread = Thread.currentThread();
+
+                logger.debug("Schedule timeout for script - {} - in thread 
[{}]", script, scriptEvalThread.getName());
+
+                // Schedule a timeout in the thread pool for future execution
+                final ScheduledFuture<?> sf = 
scheduledExecutorService.schedule(() -> {
+                    logger.warn("Timing out script - {} - in thread [{}]", 
script, Thread.currentThread().getName());
+                    if (!evaluationFuture.isDone()) 
scriptEvalThread.interrupt();
+                }, scriptEvalTimeOut, TimeUnit.MILLISECONDS);
+
+                // Cancel the scheduled timeout if the eval future is complete 
or the script evaluation failed
+                // with exception
+                evaluationFuture.handleAsync((v, t) -> {
+                    if (!sf.isDone()) {
+                        logger.debug("Killing scheduled timeout on script 
evaluation - {} - as the eval completed (possibly with exception).", script);
+                        sf.cancel(true);
+                    }
+
+                    // no return is necessary - nothing downstream is 
concerned with what happens in here
+                    return null;
+                }, scheduledExecutorService);
+            }
+
             try {
                 lifeCycle.getBeforeEval().orElse(beforeEval).accept(bindings);
 
@@ -286,7 +314,7 @@ public class GremlinExecutor implements AutoCloseable {
                 if (root instanceof InterruptedException) {
                     
lifeCycle.getAfterTimeout().orElse(afterTimeout).accept(bindings);
                     evaluationFuture.completeExceptionally(new 
TimeoutException(
-                            String.format("Script evaluation exceeded the 
configured 'scriptEvaluationTimeout' threshold of %s ms for request [%s]: %s", 
seto, script, root.getMessage())));
+                            String.format("Script evaluation exceeded the 
configured 'scriptEvaluationTimeout' threshold of %s ms for request [%s]: %s", 
scriptEvalTimeOut, script, root.getMessage())));
                 } else {
                     
lifeCycle.getAfterFailure().orElse(afterFailure).accept(bindings, root);
                     evaluationFuture.completeExceptionally(root);
@@ -296,22 +324,7 @@ public class GremlinExecutor implements AutoCloseable {
             return null;
         });
 
-        executorService.execute(f);
-
-        if (seto > 0) {
-            // Schedule a timeout in the thread pool for future execution
-            final ScheduledFuture<?> sf = scheduledExecutorService.schedule(() 
-> {
-                logger.warn("Timing out script - {} - in thread [{}]", script, 
Thread.currentThread().getName());
-                if (!f.isDone()) f.cancel(true);
-            }, seto, TimeUnit.MILLISECONDS);
-
-            // Cancel the scheduled timeout if the eval future is complete or 
the script evaluation failed
-            // with exception
-            evaluationFuture.handleAsync((v, t) -> {
-                logger.debug("Killing scheduled timeout on script evaluation 
as the eval completed (possibly with exception).");
-                return sf.cancel(true);
-            }, scheduledExecutorService);
-        }
+        executorService.execute(evalFuture);
 
         return evaluationFuture;
     }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/cd0e58bb/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
----------------------------------------------------------------------
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index 8515e8a..846ed7a 100644
--- 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -124,6 +124,10 @@ public class GremlinDriverIntegrateTest extends 
AbstractGremlinServerIntegration
                 deleteDirectory(new File("/tmp/neo4j"));
                 settings.graphs.put("graph", "conf/neo4j-empty.properties");
                 break;
+            case "shouldProcessSessionRequestsInOrderAfterTimeout":
+                settings.scriptEvaluationTimeout = 1000;
+                settings.threadPoolWorker = 2;
+                break;
         }
 
         return settings;
@@ -1193,4 +1197,54 @@ public class GremlinDriverIntegrateTest extends 
AbstractGremlinServerIntegration
 
         cluster.close();
     }
+
+    @Test
+    public void shouldProcessSessionRequestsInOrderAfterTimeout() throws 
Exception {
+        final Cluster cluster = Cluster.open();
+        final Client client = cluster.connect(name.getMethodName());
+
+        final ResultSet first = client.submit(
+                "Object mon1 = 'mon1';\n" +
+                        "synchronized (mon1) {\n" +
+                        "    mon1.wait();\n" +
+                        "} ");
+
+        final ResultSet second = client.submit(
+                "Object mon2 = 'mon2';\n" +
+                        "synchronized (mon2) {\n" +
+                        "    mon2.wait();\n" +
+                        "}");
+
+        final CompletableFuture<List<Result>> futureFirst = first.all();
+        final CompletableFuture<List<Result>> futureSecond = second.all();
+
+        final AtomicBoolean hit = new AtomicBoolean(false);
+        while (!futureFirst.isDone()) {
+            // futureSecond can't finish before futureFirst - racy business 
here?
+            assertThat(futureSecond.isDone(), is(false));
+            hit.set(true);
+        }
+
+        // should have entered the loop at least once and thus proven that 
futureSecond didn't return ahead of
+        // futureFirst
+        assertThat(hit.get(), is(true));
+
+        try {
+            futureFirst.get();
+            fail("Should have timed out");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionUtils.getRootCause(ex);
+            assertThat(root, instanceOf(ResponseException.class));
+            assertThat(root.getMessage(), startsWith("Script evaluation 
exceeded the configured 'scriptEvaluationTimeout' threshold of 1000 ms for 
request"));
+        }
+
+        try {
+            futureSecond.get();
+            fail("Should have timed out");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionUtils.getRootCause(ex);
+            assertThat(root, instanceOf(ResponseException.class));
+            assertThat(root.getMessage(), startsWith("Script evaluation 
exceeded the configured 'scriptEvaluationTimeout' threshold of 1000 ms for 
request"));
+        }
+    }
 }

Reply via email to