Improved session cleanup on client close.

While not a perfect implementation, a long run job blocking a close request 
from the client will now at least get an attempt at interruption rather thant 
consuming the thread indefinitely. TINKERPOP-1442


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/75baf01e
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/75baf01e
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/75baf01e

Branch: refs/heads/TINKERPOP-1442
Commit: 75baf01e83e7db2cfd60850e9facf535cf10d887
Parents: e7e7481
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Tue Sep 13 18:10:09 2016 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Fri Sep 16 07:29:39 2016 -0400

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |  1 +
 .../tinkerpop/gremlin/driver/Connection.java    |  2 +-
 .../gremlin/groovy/engine/GremlinExecutor.java  |  2 +-
 .../gremlin/server/op/session/Session.java      | 12 +++++++
 .../server/op/session/SessionOpProcessor.java   |  9 ++++++
 .../server/GremlinDriverIntegrateTest.java      |  2 +-
 .../server/GremlinServerIntegrateTest.java      |  4 +--
 .../GremlinServerSessionIntegrateTest.java      | 33 ++++++++++++++++++++
 8 files changed, 60 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/75baf01e/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 4d990ee..a9dae9d 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -26,6 +26,7 @@ 
image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
 TinkerPop 3.1.5 (Release Date: NOT OFFICIALLY RELEASED YET)
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
+* Improved session cleanup when a close is triggered by the client.
 * Removed the `appveyor.yml` file as the AppVeyor build is no longer enabled 
by Apache Infrastructure.
 * Fixed a bug in `RangeByIsCountStrategy` which didn't use the `NotStep` 
properly.
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/75baf01e/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
----------------------------------------------------------------------
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index 22e48fe..220ad42 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -92,7 +92,7 @@ final class Connection {
 
         connectionLabel = String.format("Connection{host=%s}", pool.host);
 
-        if (cluster.isClosing()) throw new IllegalStateException("Cannot open 
a connection while the cluster after close() is called");
+        if (cluster.isClosing()) throw new IllegalStateException("Cannot open 
a connection with the cluster after close() is called");
 
         final Bootstrap b = this.cluster.getFactory().createBootstrap();
         try {

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/75baf01e/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
----------------------------------------------------------------------
diff --git 
a/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
 
b/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
index da12e1e..785442a 100644
--- 
a/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
+++ 
b/gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java
@@ -312,7 +312,7 @@ public class GremlinExecutor implements AutoCloseable {
                 if (root instanceof InterruptedException) {
                     
lifeCycle.getAfterTimeout().orElse(afterTimeout).accept(bindings);
                     evaluationFuture.completeExceptionally(new 
TimeoutException(
-                            String.format("Script evaluation exceeded the 
configured 'scriptEvaluationTimeout' threshold of %s ms for request [%s]: %s", 
scriptEvalTimeOut, script, root.getMessage())));
+                            String.format("Script evaluation exceeded the 
configured 'scriptEvaluationTimeout' threshold of %s ms or evaluation was 
otherwise cancelled directly for request [%s]: %s", scriptEvalTimeOut, script, 
root.getMessage())));
                 } else {
                     
lifeCycle.getAfterFailure().orElse(afterFailure).accept(bindings, root);
                     evaluationFuture.completeExceptionally(root);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/75baf01e/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/Session.java
----------------------------------------------------------------------
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/Session.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/Session.java
index 33b2752..c9bc7c1 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/Session.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/Session.java
@@ -38,6 +38,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -52,6 +53,7 @@ public class Session {
     private final ScheduledExecutorService scheduledExecutorService;
     private final long configuredSessionTimeout;
 
+    private AtomicBoolean killing = new AtomicBoolean(false);
     private AtomicReference<ScheduledFuture> kill = new AtomicReference<>();
 
     /**
@@ -104,6 +106,10 @@ public class Session {
         return session;
     }
 
+    public boolean acceptingRequests() {
+        return !killing.get();
+    }
+
     public void touch() {
         // if the task of killing is cancelled successfully then reset the 
session monitor. otherwise this session
         // has already been killed and there's nothing left to do with this 
session.
@@ -134,6 +140,8 @@ public class Session {
      * Kills the session and rollback any uncommitted changes on transactional 
graphs.
      */
     public synchronized void kill() {
+        killing.set(true);
+
         // if the session has already been removed then there's no need to do 
this process again.  it's possible that
         // the manuallKill and the kill future could have both called kill at 
roughly the same time. this prevents
         // kill() from being called more than once
@@ -157,6 +165,10 @@ public class Session {
                 }
             }
         });
+
+        // prevent any additional requests from processing now that the mass 
rollback has been completed
+        executor.shutdownNow();
+
         sessions.remove(session);
         logger.info("Session {} closed", session);
     }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/75baf01e/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java
----------------------------------------------------------------------
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java
index 3497169..bec0c55 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/session/SessionOpProcessor.java
@@ -147,6 +147,15 @@ public class SessionOpProcessor extends 
AbstractEvalOpProcessor {
         final RequestMessage msg = context.getRequestMessage();
         final Session session = getSession(context, msg);
 
+        // check if the session is still accepting requests - if not block 
further requests
+        if (!session.acceptingRequests()) {
+            final String sessionClosedMessage = String.format("Session %s is 
no longer accepting requests as it has been closed",
+                    session.getSessionId());
+            final ResponseMessage response = 
ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR)
+                    .statusMessage(sessionClosedMessage).create();
+            throw new OpProcessorException(sessionClosedMessage, response);
+        }
+
         // place the session on the channel context so that it can be used 
during serialization.  in this way
         // the serialization can occur on the same thread used to execute the 
gremlin within the session.  this
         // is important given the threadlocal nature of Graph implementation 
transactions.

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/75baf01e/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
----------------------------------------------------------------------
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index 7314243..1a04b6b 100644
--- 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -1258,7 +1258,7 @@ public class GremlinDriverIntegrateTest extends 
AbstractGremlinServerIntegration
         {
             final Throwable root = ExceptionUtils.getRootCause(ex);
             assertThat(root, instanceOf(ResponseException.class));
-            assertThat(root.getMessage(), startsWith("Script evaluation 
exceeded the configured 'scriptEvaluationTimeout' threshold of 250 ms for 
request"));
+            assertThat(root.getMessage(), startsWith("Script evaluation 
exceeded the configured 'scriptEvaluationTimeout' threshold of 250 ms"));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/75baf01e/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
----------------------------------------------------------------------
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
index 2f091d9..0f0cdae 100644
--- 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
@@ -543,7 +543,7 @@ public class GremlinServerIntegrateTest extends 
AbstractGremlinServerIntegration
     public void shouldReceiveFailureTimeOutOnScriptEval() throws Exception {
         try (SimpleClient client = new WebSocketClient()){
             final List<ResponseMessage> responses = 
client.submit("Thread.sleep(3000);'some-stuff-that-should not return'");
-            assertThat(responses.get(0).getStatus().getMessage(), 
startsWith("Script evaluation exceeded the configured 'scriptEvaluationTimeout' 
threshold of 200 ms for request"));
+            assertThat(responses.get(0).getStatus().getMessage(), 
startsWith("Script evaluation exceeded the configured 'scriptEvaluationTimeout' 
threshold of 200 ms"));
 
             // validate that we can still send messages to the server
             assertEquals(2, ((List<Integer>) 
client.submit("1+1").get(0).getResult().getData()).get(0).intValue());
@@ -559,7 +559,7 @@ public class GremlinServerIntegrateTest extends 
AbstractGremlinServerIntegration
                     .addArg(Tokens.ARGS_GREMLIN, 
"Thread.sleep(3000);'some-stuff-that-should not return'")
                     .create();
             final List<ResponseMessage> responses = client.submit(msg);
-            assertThat(responses.get(0).getStatus().getMessage(), 
startsWith("Script evaluation exceeded the configured 'scriptEvaluationTimeout' 
threshold of 100 ms for request"));
+            assertThat(responses.get(0).getStatus().getMessage(), 
startsWith("Script evaluation exceeded the configured 'scriptEvaluationTimeout' 
threshold of 100 ms"));
 
             // validate that we can still send messages to the server
             assertEquals(2, ((List<Integer>) 
client.submit("1+1").get(0).getResult().getData()).get(0).intValue());

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/75baf01e/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
----------------------------------------------------------------------
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
index 8b34038..99b3a1b 100644
--- 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
@@ -50,6 +50,7 @@ import java.util.stream.IntStream;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.core.StringStartsWith.startsWith;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
@@ -109,6 +110,38 @@ public class GremlinServerSessionIntegrateTest  extends 
AbstractGremlinServerInt
     }
 
     @Test
+    public void shouldBlockAdditionalRequestsDuringClose() throws Exception {
+        // this is sorta cobbled together a bit given limits/rules about how 
you can use Cluster/Client instances.
+        // basically, we need one to submit the long run job and one to do the 
close operation that will cancel the
+        // long run job. it is probably possible to do this with some 
low-level message manipulation but that's
+        // probably not necessary
+        final Cluster cluster1 = Cluster.build().create();
+        final Client client1 = cluster1.connect(name.getMethodName());
+        client1.submit("1+1").all().join();
+        final Cluster cluster2 = Cluster.build().create();
+        final Client client2 = cluster2.connect(name.getMethodName());
+        client2.submit("1+1").all().join();
+
+        final ResultSet rs = client1.submit("Thread.sleep(10000);1+1");
+
+        client2.close();
+
+        try {
+            rs.all().join();
+            fail("The close of the session on client2 should have interrupted 
the script sent on client1");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionUtils.getRootCause(ex);
+            assertThat(root.getMessage(), startsWith("Script evaluation 
exceeded the configured 'scriptEvaluationTimeout' threshold of 30000 ms or 
evaluation was otherwise cancelled directly for request"));
+        }
+
+        client1.close();
+
+        cluster1.close();
+        cluster2.close();
+    }
+
+
+    @Test
     public void shouldRollbackOnEvalExceptionForManagedTransaction() throws 
Exception {
         assumeNeo4jIsPresent();
 

Reply via email to