Corrected a number of problems in close() operations for the driver.

This was more of a commit than I wanted for tp31, but close() was really messed 
up. Fixed a number of race conditions and other logic that would allow the 
driver to hang on close. Also made it so that the Cluster makes an attempt to 
clean up any Client instances that it spawns.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/15d9aa28
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/15d9aa28
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/15d9aa28

Branch: refs/heads/TINKERPOP-1467-master
Commit: 15d9aa2826e7cda8e74acd372108edfe7fa912a4
Parents: 20c5f84
Author: Stephen Mallette <[email protected]>
Authored: Thu Sep 29 08:40:07 2016 -0400
Committer: Stephen Mallette <[email protected]>
Committed: Thu Sep 29 11:04:00 2016 -0400

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |  1 +
 .../upgrade/release-3.1.x-incubating.asciidoc   | 17 +++++
 .../tinkerpop/gremlin/driver/Channelizer.java   |  2 +-
 .../apache/tinkerpop/gremlin/driver/Client.java | 42 +++++++++--
 .../tinkerpop/gremlin/driver/Cluster.java       | 24 +++++-
 .../tinkerpop/gremlin/driver/Connection.java    | 56 ++++++++++++--
 .../gremlin/driver/ConnectionPool.java          | 21 +++---
 .../tinkerpop/gremlin/driver/Handler.java       | 26 +++++--
 .../tinkerpop/gremlin/driver/ResultQueue.java   |  4 +
 .../driver/handler/WebSocketClientHandler.java  |  4 +-
 .../server/GremlinDriverIntegrateTest.java      | 79 +++++++++++++++++++-
 .../server/GremlinServerAuthIntegrateTest.java  |  5 +-
 .../GremlinServerAuthOldIntegrateTest.java      |  4 +-
 .../GremlinServerSessionIntegrateTest.java      |  6 +-
 14 files changed, 247 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/15d9aa28/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 47d4bbf..33e0f5a 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -421,6 +421,7 @@ 
image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
 TinkerPop 3.1.5 (Release Date: NOT OFFICIALLY RELEASED YET)
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
+* Improved handling of `Cluster.close()` and `Client.close()` to prevent the 
methods from hanging.
 * Fixed output redirection and potential memory leak in 
`GremlinGroovyScriptEngine`.
 * Corrected naming of `g_withPath_V_asXaX_out_out_mapXa_name_it_nameX` and 
`g_withPath_V_asXaX_out_mapXa_nameX` in `MapTest`.
 * Improved session cleanup when a close is triggered by the client.

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/15d9aa28/docs/src/upgrade/release-3.1.x-incubating.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/upgrade/release-3.1.x-incubating.asciidoc 
b/docs/src/upgrade/release-3.1.x-incubating.asciidoc
index 400ea10..6cf9fb2 100644
--- a/docs/src/upgrade/release-3.1.x-incubating.asciidoc
+++ b/docs/src/upgrade/release-3.1.x-incubating.asciidoc
@@ -27,6 +27,23 @@ TinkerPop 3.1.5
 
 *Release Date: NOT OFFICIALLY RELEASED YET*
 
+Please see the 
link:https://github.com/apache/tinkerpop/blob/3.1.4/CHANGELOG.asciidoc#tinkerpop-315-release-date-XXXXXXXXXXXX[changelog]
 for a complete list of all the modifications that are part of this release.
+
+Upgrading for Users
+~~~~~~~~~~~~~~~~~~~
+
+Java Driver and close()
+^^^^^^^^^^^^^^^^^^^^^^^
+
+There were a few problems noted around the `close()` of `Cluster` and `Client` 
instances, including issues that
+presented as system hangs. These issues have been resolved, however, it is 
worth noting that an unchecked exception
+that was thrown under a certain situation has changed as part of the bug 
fixes. When submitting an in-session request
+on a `Client` that was closed (or closing) an `IllegalStateException` is 
thrown. This replaces older functionality
+that threw a `ConnectionException` and relied logic far deeper in the driver 
to produce that error and had the
+potential to open additional resources despite the intention of the user to 
"close".
+
+See: https://issues.apache.org/jira/browse/TINKERPOP-1467[TINKERPOP-1467]
+
 TinkerPop 3.1.4
 ---------------
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/15d9aa28/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
----------------------------------------------------------------------
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index 6ed8e0f..c5d1cc5 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -179,7 +179,7 @@ public interface Channelizer extends ChannelHandler {
          */
         @Override
         public void close(final Channel channel) {
-            channel.writeAndFlush(new CloseWebSocketFrame());
+            if (channel.isOpen()) channel.writeAndFlush(new 
CloseWebSocketFrame());
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/15d9aa28/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
----------------------------------------------------------------------
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
index 7b9262e..bd397a1 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
@@ -40,6 +40,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 /**
@@ -291,6 +292,8 @@ public abstract class Client {
      * A low-level method that allows the submission of a manually constructed 
{@link RequestMessage}.
      */
     public CompletableFuture<ResultSet> submitAsync(final RequestMessage msg) {
+        if (isClosing()) throw new IllegalStateException("Client has been 
closed");
+
         if (!initialized)
             init();
 
@@ -315,6 +318,8 @@ public abstract class Client {
         }
     }
 
+    public abstract boolean isClosing();
+
     /**
      * Closes the client by making a synchronous call to {@link #closeAsync()}.
      */
@@ -350,11 +355,17 @@ public abstract class Client {
     public final static class ClusteredClient extends Client {
 
         private ConcurrentMap<Host, ConnectionPool> hostConnectionPools = new 
ConcurrentHashMap<>();
+        private final AtomicReference<CompletableFuture<Void>> closing = new 
AtomicReference<>(null);
 
         ClusteredClient(final Cluster cluster, final Client.Settings settings) 
{
             super(cluster, settings);
         }
 
+        @Override
+        public boolean isClosing() {
+            return closing.get() != null;
+        }
+
         /**
          * Submits a Gremlin script to the server and returns a {@link 
ResultSet} once the write of the request is
          * complete.
@@ -515,10 +526,14 @@ public abstract class Client {
          * Closes all the connection pools on all hosts.
          */
         @Override
-        public CompletableFuture<Void> closeAsync() {
+        public synchronized CompletableFuture<Void> closeAsync() {
+            if (closing.get() != null)
+                return closing.get();
+
             final CompletableFuture[] poolCloseFutures = new 
CompletableFuture[hostConnectionPools.size()];
             
hostConnectionPools.values().stream().map(ConnectionPool::closeAsync).collect(Collectors.toList()).toArray(poolCloseFutures);
-            return CompletableFuture.allOf(poolCloseFutures);
+            closing.set(CompletableFuture.allOf(poolCloseFutures));
+            return closing.get();
         }
     }
 
@@ -615,11 +630,16 @@ public abstract class Client {
          * close on the {@code Client} that created it.
          */
         @Override
-        public CompletableFuture<Void> closeAsync() {
+        public synchronized CompletableFuture<Void> closeAsync() {
             close.complete(null);
             return close;
         }
 
+        @Override
+        public boolean isClosing() {
+            return close.isDone();
+        }
+
         /**
          * {@inheritDoc}
          */
@@ -650,6 +670,8 @@ public abstract class Client {
 
         private ConnectionPool connectionPool;
 
+        private final AtomicReference<CompletableFuture<Void>> closing = new 
AtomicReference<>(null);
+
         SessionedClient(final Cluster cluster, final Client.Settings settings) 
{
             super(cluster, settings);
             this.sessionId = settings.getSession().get().sessionId;
@@ -693,12 +715,22 @@ public abstract class Client {
             connectionPool = new ConnectionPool(host, this, Optional.of(1), 
Optional.of(1));
         }
 
+        @Override
+        public boolean isClosing() {
+            return closing.get() != null;
+        }
+
         /**
          * Close the bound {@link ConnectionPool}.
          */
         @Override
-        public CompletableFuture<Void> closeAsync() {
-            return connectionPool.closeAsync();
+        public synchronized CompletableFuture<Void> closeAsync() {
+            if (closing.get() != null)
+                return closing.get();
+
+            final CompletableFuture<Void> connectionPoolClose = 
connectionPool.closeAsync();
+            closing.set(connectionPoolClose);
+            return connectionPoolClose;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/15d9aa28/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
----------------------------------------------------------------------
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
index f79e719..e85471c 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
@@ -37,6 +37,7 @@ import javax.net.ssl.TrustManager;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
+import java.lang.ref.WeakReference;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URI;
@@ -84,7 +85,9 @@ public final class Cluster {
      * submitted or can be directly initialized via {@link Client#init()}.
      */
     public <T extends Client> T connect() {
-        return (T) new Client.ClusteredClient(this, 
Client.Settings.build().create());
+        final Client client = new Client.ClusteredClient(this, 
Client.Settings.build().create());
+        manager.trackClient(client);
+        return (T) client;
     }
 
     /**
@@ -132,8 +135,10 @@ public final class Cluster {
      * Creates a new {@link Client} based on the settings provided.
      */
     public <T extends Client> T connect(final Client.Settings settings) {
-        return settings.getSession().isPresent() ? (T) new 
Client.SessionedClient(this, settings) :
-                (T) new Client.ClusteredClient(this, settings);
+        final Client client = settings.getSession().isPresent() ? new 
Client.SessionedClient(this, settings) :
+                new Client.ClusteredClient(this, settings);
+        manager.trackClient(client);
+        return (T) client;
     }
 
     @Override
@@ -868,6 +873,8 @@ public final class Cluster {
 
         private final AtomicReference<CompletableFuture<Void>> closeFuture = 
new AtomicReference<>();
 
+        private final List<WeakReference<Client>> openedClients = new 
ArrayList<>();
+
         private Manager(final Builder builder) {
             this.loadBalancingStrategy = builder.loadBalancingStrategy;
             this.authProps = builder.authProps;
@@ -919,6 +926,10 @@ public final class Cluster {
             });
         }
 
+        void trackClient(final Client client) {
+            openedClients.add(new WeakReference<>(client));
+        }
+
         public Host add(final InetSocketAddress address) {
             final Host newHost = new Host(address, Cluster.this);
             final Host previous = hosts.putIfAbsent(address, newHost);
@@ -934,6 +945,13 @@ public final class Cluster {
             if (closeFuture.get() != null)
                 return closeFuture.get();
 
+            for (WeakReference<Client> openedClient : openedClients) {
+                final Client client = openedClient.get();
+                if (client != null && !client.isClosing()) {
+                    client.close();
+                }
+            }
+
             final CompletableFuture<Void> closeIt = new CompletableFuture<>();
             closeFuture.set(closeIt);
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/15d9aa28/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
----------------------------------------------------------------------
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index 1ef9b98..8576de5 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -35,6 +35,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -153,10 +154,11 @@ final class Connection {
         return pending;
     }
 
-    public CompletableFuture<Void> closeAsync() {
+    public synchronized CompletableFuture<Void> closeAsync() {
+        if (isClosed()) return closeFuture.get();
+
         final CompletableFuture<Void> future = new CompletableFuture<>();
-        if (!closeFuture.compareAndSet(null, future))
-            return closeFuture.get();
+        closeFuture.set(future);
 
         // stop any pings being sent at the server for keep-alive
         final ScheduledFuture keepAlive = keepAliveFuture.get();
@@ -167,12 +169,15 @@ final class Connection {
         // operation to check if it can close.  in this way the connection no 
longer receives writes, but
         // can continue to read. If a request never comes back the future 
won't get fulfilled and the connection
         // will maintain a "pending" request, that won't quite ever go away.  
The build up of such a dead requests
-        // on a connection in the connection pool will force the pool to 
replace the connection for a fresh one
+        // on a connection in the connection pool will force the pool to 
replace the connection for a fresh one.
         if (pending.isEmpty()) {
             if (null == channel)
                 future.complete(null);
             else
                 shutdown(future);
+        } else {
+            // there may be some pending requests. schedule a job to wait for 
those to complete and then shutdown
+            new CheckForPending(future).runUntilDone(cluster.executor(), 1000, 
TimeUnit.MILLISECONDS);
         }
 
         return future;
@@ -287,7 +292,7 @@ final class Connection {
             shutdown(closeFuture.get());
     }
 
-    private void shutdown(final CompletableFuture<Void> future) {
+    private synchronized void shutdown(final CompletableFuture<Void> future) {
         // shutdown can be called directly from closeAsync() or after write() 
and therefore this method should only
         // be called once. once shutdown is initiated, it shouldn't be 
executed a second time or else it sends more
         // messages at the server and leads to ugly log messages over there.
@@ -317,6 +322,7 @@ final class Connection {
             }
 
             channelizer.close(channel);
+
             final ChannelPromise promise = channel.newPromise();
             promise.addListener(f -> {
                 if (f.cause() != null)
@@ -338,4 +344,44 @@ final class Connection {
     public String toString() {
         return connectionLabel;
     }
+
+    /**
+     * Self-cancelling tasks that periodically checks for the pending queue to 
clear before shutting down the
+     * {@code Connection}. Once it does that, it self cancels the scheduled 
job in the executor.
+     */
+    private final class CheckForPending implements Runnable {
+        private volatile ScheduledFuture<?> self;
+        private final CompletableFuture<Void> future;
+
+        CheckForPending(final CompletableFuture<Void> future) {
+            this.future = future;
+        }
+
+        @Override
+        public void run() {
+            logger.info("Checking for pending messages to complete before 
close on {}", this);
+            if (pending.isEmpty()) {
+                shutdown(future);
+                boolean interrupted = false;
+                try {
+                    while(null == self) {
+                        try {
+                            Thread.sleep(1);
+                        } catch (InterruptedException e) {
+                            interrupted = true;
+                        }
+                    }
+                    self.cancel(false);
+                } finally {
+                    if(interrupted) {
+                        Thread.currentThread().interrupt();
+                    }
+                }
+            }
+        }
+
+        void runUntilDone(final ScheduledExecutorService executor, final long 
period, final TimeUnit unit) {
+            self = executor.scheduleAtFixedRate(this, period, period, unit);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/15d9aa28/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
----------------------------------------------------------------------
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
index f0d9044..8d63e13 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
@@ -219,27 +219,26 @@ final class ConnectionPool {
     /**
      * Permanently kills the pool.
      */
-    public CompletableFuture<Void> closeAsync() {
-        logger.info("Signalled closing of connection pool on {} with core size 
of {}", host, minPoolSize);
+    public synchronized CompletableFuture<Void> closeAsync() {
+        if (closeFuture.get() != null) return closeFuture.get();
 
-        CompletableFuture<Void> future = closeFuture.get();
-        if (future != null)
-            return future;
+        logger.info("Signalled closing of connection pool on {} with core size 
of {}", host, minPoolSize);
 
         announceAllAvailableConnection();
-        future = CompletableFuture.allOf(killAvailableConnections());
-
-        return closeFuture.compareAndSet(null, future) ? future : 
closeFuture.get();
+        final CompletableFuture<Void> future = killAvailableConnections();
+        closeFuture.set(future);
+        return future;
     }
 
-    private CompletableFuture[] killAvailableConnections() {
+    private CompletableFuture<Void> killAvailableConnections() {
         final List<CompletableFuture<Void>> futures = new 
ArrayList<>(connections.size());
         for (Connection connection : connections) {
             final CompletableFuture<Void> future = connection.closeAsync();
-            future.thenRunAsync(open::decrementAndGet, cluster.executor());
+            future.thenRun(open::decrementAndGet);
             futures.add(future);
         }
-        return futures.toArray(new CompletableFuture[futures.size()]);
+
+        return CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[futures.size()]));
     }
 
     void replaceConnection(final Connection connection) {

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/15d9aa28/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
----------------------------------------------------------------------
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
index 656c993..681354c 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java
@@ -29,6 +29,7 @@ import io.netty.util.Attribute;
 import io.netty.util.AttributeKey;
 import io.netty.util.ReferenceCountUtil;
 import org.apache.tinkerpop.gremlin.driver.ser.SerializationException;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,11 +83,24 @@ final class Handler {
             if (response.getStatus().getCode() == 
ResponseStatusCode.AUTHENTICATE) {
                 final Attribute<SaslClient> saslClient = 
channelHandlerContext.attr(saslClientKey);
                 final Attribute<Subject> subject = 
channelHandlerContext.attr(subjectKey);
-                RequestMessage.Builder messageBuilder = 
RequestMessage.build(Tokens.OPS_AUTHENTICATION);
+                final RequestMessage.Builder messageBuilder = 
RequestMessage.build(Tokens.OPS_AUTHENTICATION);
                 // First time through we don't have a sasl client
                 if (saslClient.get() == null) {
                     subject.set(login());
-                    
saslClient.set(saslClient(getHostName(channelHandlerContext)));
+                    try {
+                        
saslClient.set(saslClient(getHostName(channelHandlerContext)));
+                    } catch (SaslException saslException) {
+                        // push the sasl error into a failure response from 
the server. this ensures that standard
+                        // processing for the ResultQueue is kept. without 
this SaslException trap and subsequent
+                        // conversion to an authentication failure, the 
close() of the connection might not
+                        // succeed as it will appear as though pending 
messages remain present in the queue on the
+                        // connection and the shutdown won't proceed
+                        final ResponseMessage clientSideError = 
ResponseMessage.build(response.getRequestId())
+                                
.code(ResponseStatusCode.FORBIDDEN).statusMessage(saslException.getMessage()).create();
+                        channelHandlerContext.fireChannelRead(clientSideError);
+                        return;
+                    }
+
                     messageBuilder.addArg(Tokens.ARGS_SASL_MECHANISM, 
getMechanism());
                     messageBuilder.addArg(Tokens.ARGS_SASL, 
saslClient.get().hasInitialResponse() ?
                                                                 
evaluateChallenge(subject, saslClient, NULL_CHALLENGE) : null);
@@ -231,12 +245,12 @@ final class Handler {
             // there are that many failures someone would take notice and 
hopefully stop the client.
             logger.error("Could not process the response", cause);
 
-            // the channel took an error because of something pretty bad so 
release all the completeable
-            // futures out there
-            pending.entrySet().stream().forEach(kv -> 
kv.getValue().markError(cause));
+            // the channel took an error because of something pretty bad so 
release all the futures out there
+            pending.values().forEach(val -> val.markError(cause));
+            pending.clear();
 
             // serialization exceptions should not close the channel - that's 
worth a retry
-            if (!ExceptionUtils.getThrowableList(cause).stream().anyMatch(t -> 
t instanceof SerializationException))
+            if 
(!IteratorUtils.anyMatch(ExceptionUtils.getThrowableList(cause).iterator(), t 
-> t instanceof SerializationException))
                 ctx.close();
         }
     }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/15d9aa28/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
----------------------------------------------------------------------
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
index d47810c..5719f42 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java
@@ -154,6 +154,10 @@ final class ResultQueue {
         return this.size() == 0;
     }
 
+    public boolean isComplete() {
+        return readComplete.isDone();
+    }
+
     void drainTo(final Collection<Result> collection) {
         if (error.get() != null) throw new RuntimeException(error.get());
         resultLinkedBlockingQueue.drainTo(collection);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/15d9aa28/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
----------------------------------------------------------------------
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
index 5ba0f1b..c63d790 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
@@ -96,9 +96,9 @@ public final class WebSocketClientHandler extends 
SimpleChannelInboundHandler<Ob
 
     @Override
     public void exceptionCaught(final ChannelHandlerContext ctx, final 
Throwable cause) throws Exception {
-        logger.warn("Exception caught during WebSocket processing - closing 
connection", cause);
         if (!handshakeFuture.isDone()) handshakeFuture.setFailure(cause);
-        ctx.close();
+
+        // let the GremlinResponseHandler take care of exception logging, 
channel closing, and cleanup
         ctx.fireExceptionCaught(cause);
     }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/15d9aa28/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
----------------------------------------------------------------------
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index 96cde54..d8aff4a 100644
--- 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -26,7 +26,6 @@ import org.apache.tinkerpop.gremlin.driver.Client;
 import org.apache.tinkerpop.gremlin.driver.Cluster;
 import org.apache.tinkerpop.gremlin.driver.Result;
 import org.apache.tinkerpop.gremlin.driver.ResultSet;
-import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
 import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
 import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
@@ -852,11 +851,13 @@ public class GremlinDriverIntegrateTest extends 
AbstractGremlinServerIntegration
         client.close();
 
         try {
-            client.submit("x[0]+1");
+            client.submit("x[0]+1").all().get();
             fail("Should have thrown an exception because the connection is 
closed");
         } catch (Exception ex) {
             final Throwable root = ExceptionUtils.getRootCause(ex);
-            assertThat(root, instanceOf(ConnectionException.class));
+            assertThat(root, instanceOf(IllegalStateException.class));
+        } finally {
+            cluster.close();
         }
     }
 
@@ -1354,6 +1355,78 @@ public class GremlinDriverIntegrateTest extends 
AbstractGremlinServerIntegration
         }
     }
 
+    @Test
+    public void shouldCloseAllClientsOnCloseOfCluster() throws Exception {
+        final Cluster cluster = Cluster.open();
+        final Client sessionlessOne = cluster.connect();
+        final Client session = cluster.connect("session");
+        final Client sessionlessTwo = cluster.connect();
+        final Client sessionlessThree = cluster.connect();
+        final Client sessionlessFour = cluster.connect();
+
+        assertEquals(2, 
sessionlessOne.submit("1+1").all().get().get(0).getInt());
+        assertEquals(2, session.submit("1+1").all().get().get(0).getInt());
+        assertEquals(2, 
sessionlessTwo.submit("1+1").all().get().get(0).getInt());
+        assertEquals(2, 
sessionlessThree.submit("1+1").all().get().get(0).getInt());
+        // dont' send anything on the 4th client
+
+        // close one of these Clients before the Cluster
+        sessionlessThree.close();
+        cluster.close();
+
+        try {
+            sessionlessOne.submit("1+1").all().get();
+            fail("Should have tossed an exception because cluster was closed");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionUtils.getRootCause(ex);
+            assertThat(root, instanceOf(IllegalStateException.class));
+            assertEquals("Client has been closed", root.getMessage());
+        }
+
+        try {
+            session.submit("1+1").all().get();
+            fail("Should have tossed an exception because cluster was closed");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionUtils.getRootCause(ex);
+            assertThat(root, instanceOf(IllegalStateException.class));
+            assertEquals("Client has been closed", root.getMessage());
+        }
+
+        try {
+            sessionlessTwo.submit("1+1").all().get();
+            fail("Should have tossed an exception because cluster was closed");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionUtils.getRootCause(ex);
+            assertThat(root, instanceOf(IllegalStateException.class));
+            assertEquals("Client has been closed", root.getMessage());
+        }
+
+        try {
+            sessionlessThree.submit("1+1").all().get();
+            fail("Should have tossed an exception because cluster was closed");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionUtils.getRootCause(ex);
+            assertThat(root, instanceOf(IllegalStateException.class));
+            assertEquals("Client has been closed", root.getMessage());
+        }
+
+        try {
+            sessionlessFour.submit("1+1").all().get();
+            fail("Should have tossed an exception because cluster was closed");
+        } catch (Exception ex) {
+            final Throwable root = ExceptionUtils.getRootCause(ex);
+            assertThat(root, instanceOf(IllegalStateException.class));
+            assertEquals("Client has been closed", root.getMessage());
+        }
+
+        // allow call to close() even though closed through cluster
+        sessionlessOne.close();
+        session.close();
+        sessionlessTwo.close();
+
+        cluster.close();
+    }
+
     private void assertFutureTimeout(final CompletableFuture<List<Result>> 
futureFirst) {
         try
         {

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/15d9aa28/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
----------------------------------------------------------------------
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
index 3e1b7e9..887d408 100644
--- 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
@@ -23,7 +23,6 @@ import org.apache.tinkerpop.gremlin.driver.Client;
 import org.apache.tinkerpop.gremlin.driver.Cluster;
 import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
 import org.apache.tinkerpop.gremlin.server.auth.SimpleAuthenticator;
-import org.ietf.jgss.GSSException;
 import org.junit.Test;
 
 import java.util.HashMap;
@@ -128,7 +127,7 @@ public class GremlinServerAuthIntegrateTest extends 
AbstractGremlinServerIntegra
             fail("This should not succeed as the client did not provide 
credentials");
         } catch(Exception ex) {
             final Throwable root = ExceptionUtils.getRootCause(ex);
-            assertEquals(GSSException.class, root.getClass());
+            assertEquals(ResponseException.class, root.getClass());
         } finally {
             cluster.close();
         }
@@ -157,7 +156,7 @@ public class GremlinServerAuthIntegrateTest extends 
AbstractGremlinServerIntegra
         final Client client = cluster.connect();
 
         try {
-            client.submit("1+1").all();
+            client.submit("1+1").all().get();
         } catch(Exception ex) {
             final Throwable root = ExceptionUtils.getRootCause(ex);
             assertEquals(ResponseException.class, root.getClass());

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/15d9aa28/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthOldIntegrateTest.java
----------------------------------------------------------------------
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthOldIntegrateTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthOldIntegrateTest.java
index f2e5622..2f332be 100644
--- 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthOldIntegrateTest.java
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthOldIntegrateTest.java
@@ -131,7 +131,7 @@ public class GremlinServerAuthOldIntegrateTest extends 
AbstractGremlinServerInte
             fail("This should not succeed as the client did not provide 
credentials");
         } catch(Exception ex) {
             final Throwable root = ExceptionUtils.getRootCause(ex);
-            assertEquals(GSSException.class, root.getClass());
+            assertEquals(ResponseException.class, root.getClass());
 
             // removed this assert as the text of the message changes based on 
kerberos config - stupid kerberos
             // assertThat(root.getMessage(), startsWith("Invalid name 
provided"));
@@ -163,7 +163,7 @@ public class GremlinServerAuthOldIntegrateTest extends 
AbstractGremlinServerInte
         final Client client = cluster.connect();
 
         try {
-            client.submit("1+1").all();
+            client.submit("1+1").all().get();
         } catch(Exception ex) {
             final Throwable root = ExceptionUtils.getRootCause(ex);
             assertEquals(ResponseException.class, root.getClass());

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/15d9aa28/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
----------------------------------------------------------------------
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
index 99b3a1b..3c1fef9 100644
--- 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSessionIntegrateTest.java
@@ -26,7 +26,6 @@ import org.apache.tinkerpop.gremlin.driver.Cluster;
 import org.apache.tinkerpop.gremlin.driver.Result;
 import org.apache.tinkerpop.gremlin.driver.ResultSet;
 import org.apache.tinkerpop.gremlin.driver.Tokens;
-import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
 import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
 import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
@@ -189,7 +188,7 @@ public class GremlinServerSessionIntegrateTest  extends 
AbstractGremlinServerInt
             fail("Session should be dead");
         } catch (Exception ex) {
             final Throwable root = ExceptionUtils.getRootCause(ex);
-            assertThat(root, instanceOf(ConnectionException.class));
+            assertThat(root, instanceOf(IllegalStateException.class));
         } finally {
             cluster.close();
         }
@@ -229,7 +228,8 @@ public class GremlinServerSessionIntegrateTest  extends 
AbstractGremlinServerInt
             cluster.close();
         }
 
-        assertEquals(1, recordingAppender.getMessages().stream()
+        // there will be on for the timeout and a second for closing the 
cluster
+        assertEquals(2, recordingAppender.getMessages().stream()
                 .filter(msg -> msg.equals("INFO - Session 
shouldHaveTheSessionTimeout closed\n")).count());
     }
 

Reply via email to