This is an automated email from the ASF dual-hosted git repository.

valentyn pushed a commit to branch valentyn/connection-pooling
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 1c726292d88147e30c853c6e48af07a4deb59872
Author: Valentyn Kahamlyk <[email protected]>
AuthorDate: Tue Apr 30 13:29:12 2024 -0700

    connection pooling clean up
---
 .../tinkerpop/gremlin/driver/Channelizer.java      |   9 +-
 .../apache/tinkerpop/gremlin/driver/Client.java    | 292 +------
 .../apache/tinkerpop/gremlin/driver/Cluster.java   |  34 +-
 .../tinkerpop/gremlin/driver/Connection.java       |  49 +-
 .../tinkerpop/gremlin/driver/ConnectionPool.java   |  43 +-
 .../driver/handler/GremlinResponseHandler.java     |  31 +-
 .../driver/handler/HttpGremlinRequestEncoder.java  |  10 -
 .../driver/remote/DriverRemoteConnection.java      |  17 +-
 .../gremlin/driver/ConnectionPoolTest.java         |  99 +++
 .../apache/tinkerpop/gremlin/driver/HostTest.java  |   4 +-
 .../WebSocketClientBehaviorIntegrateTest.java      | 956 ++++++++++-----------
 .../driver/remote/DriverRemoteConnectionTest.java  |   2 -
 .../driver/ClientConnectionIntegrateTest.java      |   1 -
 .../gremlin/server/GremlinDriverIntegrateTest.java | 300 +------
 .../gremlin/server/HttpDriverIntegrateTest.java    | 126 ++-
 15 files changed, 705 insertions(+), 1268 deletions(-)

diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index 31c34d6a6e..a8cb6d69be 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -32,8 +32,7 @@ import 
org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinResponseStreamDeco
 import org.apache.tinkerpop.gremlin.util.ser.MessageTextSerializerV4;
 
 import java.util.Optional;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Client-side channel initializer interface.  It is responsible for 
constructing the Netty {@code ChannelPipeline}
@@ -76,7 +75,7 @@ public interface Channelizer extends ChannelHandler {
     abstract class AbstractChannelizer extends 
ChannelInitializer<SocketChannel> implements Channelizer {
         protected Connection connection;
         protected Cluster cluster;
-        private ConcurrentMap<UUID, ResultQueue> pending;
+        private AtomicReference<ResultQueue> pending;
 
         protected static final String PIPELINE_GREMLIN_HANDLER = 
"gremlin-handler";
         public static final String PIPELINE_SSL_HANDLER = 
"gremlin-ssl-handler";
@@ -149,10 +148,6 @@ public interface Channelizer extends ChannelHandler {
         public void init(final Connection connection) {
             super.init(connection);
 
-            // server does not support sessions so this channerlizer can't 
support the SessionedClient
-            if (connection.getClient() instanceof Client.SessionedClient)
-                throw new IllegalStateException(String.format("Cannot use 
sessions or tx() with %s", HttpChannelizer.class.getSimpleName()));
-
             gremlinRequestEncoder = new 
HttpGremlinRequestEncoder(cluster.getSerializer(), 
cluster.getRequestInterceptor(), cluster.isUserAgentOnConnectEnabled());
             gremlinResponseDecoder = new 
HttpGremlinResponseStreamDecoder((MessageTextSerializerV4<?>) 
cluster.getSerializer());
         }
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
index f47d529d18..93e1c9d70e 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
@@ -19,7 +19,6 @@
 package org.apache.tinkerpop.gremlin.driver;
 
 import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.tinkerpop.gremlin.util.Tokens;
 import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,19 +29,15 @@ import 
org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
 import javax.net.ssl.SSLException;
 import java.net.ConnectException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Random;
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
@@ -52,7 +47,6 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 /**
  * A {@code Client} is constructed from a {@link Cluster} and represents a way 
to send messages to Gremlin Server.
@@ -71,13 +65,11 @@ public abstract class Client {
 
     protected final Cluster cluster;
     protected volatile boolean initialized;
-    protected final Client.Settings settings;
 
     private static final Random random = new Random();
 
-    Client(final Cluster cluster, final Client.Settings settings) {
+    Client(final Cluster cluster) {
         this.cluster = cluster;
-        this.settings = settings;
     }
 
     /**
@@ -120,7 +112,7 @@ public abstract class Client {
      * the created {@code Client}.
      */
     public Client alias(final Map<String, String> aliases) {
-        return new AliasClusteredClient(this, aliases, settings);
+        return new AliasClusteredClient(this, aliases);
     }
 
     /**
@@ -402,13 +394,6 @@ public abstract class Client {
         closeAsync().join();
     }
 
-    /**
-     * Gets the {@link Client.Settings}.
-     */
-    public Settings getSettings() {
-        return settings;
-    }
-
     /**
      * Gets the {@link Cluster} that spawned this {@code Client}.
      */
@@ -433,8 +418,8 @@ public abstract class Client {
         private final AtomicReference<CompletableFuture<Void>> closing = new 
AtomicReference<>(null);
         private Throwable initializationFailure = null;
 
-        ClusteredClient(final Cluster cluster, final Client.Settings settings) 
{
-            super(cluster, settings);
+        ClusteredClient(final Cluster cluster) {
+            super(cluster);
         }
 
         @Override
@@ -487,7 +472,7 @@ public abstract class Client {
          */
         @Override
         public Client alias(final Map<String, String> aliases) {
-            return new AliasClusteredClient(this, aliases, settings);
+            return new AliasClusteredClient(this, aliases);
         }
 
         /**
@@ -496,19 +481,7 @@ public abstract class Client {
          */
         @Override
         protected Connection chooseConnection(final RequestMessageV4 msg) 
throws TimeoutException, ConnectionException {
-            final Iterator<Host> possibleHosts;
-//            if (msg.optionalArgs(Tokens.ARGS_HOST).isPresent()) {
-//                // looking at this code about putting the Host on the 
RequestMessage in light of 3.5.4, not sure
-//                // this is being used as intended here. server side usage is 
to place the channel.remoteAddress
-//                // in this token in the status metadata for the response. 
can't remember why it is being used this
-//                // way here exactly. created TINKERPOP-2821 to examine this 
more carefully to clean this up in a
-//                // future version.
-//                final Host host = (Host) msg.getArgs().get(Tokens.ARGS_HOST);
-//                msg.getArgs().remove(Tokens.ARGS_HOST);
-//                possibleHosts = IteratorUtils.of(host);
-//            } else {
-                possibleHosts = 
this.cluster.loadBalancingStrategy().select(msg);
-//            }
+            final Iterator<Host> possibleHosts = 
this.cluster.loadBalancingStrategy().select(msg);
 
             // try a random host if none are marked available. maybe it will 
reconnect in the meantime. better than
             // going straight to a fast NoHostAvailableException as was the 
case in versions 3.5.4 and earlier
@@ -634,8 +607,8 @@ public abstract class Client {
         private final Map<String, String> aliases = new HashMap<>();
         final CompletableFuture<Void> close = new CompletableFuture<>();
 
-        AliasClusteredClient(final Client client, final Map<String, String> 
aliases, final Client.Settings settings) {
-            super(client.cluster, settings);
+        AliasClusteredClient(final Client client, final Map<String, String> 
aliases) {
+            super(client.cluster);
             this.client = client;
             this.aliases.putAll(aliases);
         }
@@ -655,7 +628,6 @@ public abstract class Client {
                 // apply settings if they were made available
                 options.getBatchSize().ifPresent(batchSize -> 
request.addChunkSize(batchSize));
                 options.getTimeout().ifPresent(timeout -> 
request.addTimeoutMillis(timeout));
-//                
options.getOverrideRequestId().ifPresent(request::overrideRequestId);
 //                options.getUserAgent().ifPresent(userAgent -> 
request.add(Tokens.ARGS_USER_AGENT, userAgent));
                 options.getMaterializeProperties().ifPresent(mp -> 
request.addMaterializeProperties(mp));
 
@@ -751,253 +723,7 @@ public abstract class Client {
         @Override
         public Client alias(final Map<String, String> aliases) {
             if (close.isDone()) throw new IllegalStateException("Client is 
closed");
-            return new AliasClusteredClient(client, aliases, settings);
-        }
-    }
-
-    /**
-     * A {@code Client} implementation that operates in the context of a 
session.  Requests are sent to a single
-     * server, where each request is bound to the same thread with the same 
set of bindings across requests.
-     * Transaction are not automatically committed. It is up the client to 
issue commit/rollback commands.
-     */
-    public final static class SessionedClient extends Client {
-        private final String sessionId;
-        private final boolean manageTransactions;
-
-        private ConnectionPool connectionPool;
-
-        private final AtomicReference<CompletableFuture<Void>> closing = new 
AtomicReference<>(null);
-
-        SessionedClient(final Cluster cluster, final Client.Settings settings) 
{
-            super(cluster, settings);
-            this.sessionId = settings.getSession().get().sessionId;
-            this.manageTransactions = 
settings.getSession().get().manageTransactions;
-        }
-
-        /**
-         * Returns the session identifier bound to this {@code Client}.
-         */
-        public String getSessionId() {
-            return sessionId;
-        }
-
-        /**
-         * todo.
-         */
-        @Override
-        public RequestMessageV4.Builder buildMessage(final 
RequestMessageV4.Builder builder) {
-//            TODO: replace this with new Transaction API later.
-//            builder.processor("session");
-//            builder.addArg(Tokens.ARGS_SESSION, sessionId);
-//            builder.addArg(Tokens.ARGS_MANAGE_TRANSACTION, 
manageTransactions);
-            return builder;
-        }
-
-        /**
-         * Since the session is bound to a single host, simply borrow a 
connection from that pool.
-         */
-        @Override
-        protected Connection chooseConnection(final RequestMessageV4 msg) 
throws TimeoutException, ConnectionException {
-            return 
connectionPool.borrowConnection(cluster.connectionPoolSettings().maxWaitForConnection,
 TimeUnit.MILLISECONDS);
-        }
-
-        /**
-         * Randomly choose an available {@link Host} to bind the session too 
and initialize the {@link ConnectionPool}.
-         */
-        @Override
-        protected void initializeImplementation() {
-            // chooses a host at random from all hosts
-            if (cluster.allHosts().isEmpty()) {
-                throw new IllegalStateException("No available host in the 
cluster");
-            }
-
-            final List<Host> hosts = new ArrayList<>(cluster.allHosts());
-            Collections.shuffle(hosts);
-            // if a host has been marked as available, use it instead
-            Optional<Host> host = 
hosts.stream().filter(Host::isAvailable).findFirst();
-            final Host selectedHost = host.orElse(hosts.get(0));
-
-            // only mark host as available if we can initialize the connection 
pool successfully
-            try {
-                connectionPool = new ConnectionPool(selectedHost, this, 
Optional.of(1), Optional.of(1));
-                selectedHost.makeAvailable();
-            } catch (RuntimeException ex) {
-                logger.error("Could not initialize client for {}", host, ex);
-                throw new NoHostAvailableException(ex);
-            }
-        }
-
-        @Override
-        public boolean isClosing() {
-            return closing.get() != null;
-        }
-
-        /**
-         * Close the bound {@link ConnectionPool}.
-         */
-        @Override
-        public synchronized CompletableFuture<Void> closeAsync() {
-            if (closing.get() != null)
-                return closing.get();
-
-            // the connection pool may not have been initialized if requests 
weren't sent across it. in those cases
-            // we just need to return a pre-completed future
-            final CompletableFuture<Void> connectionPoolClose = null == 
connectionPool ?
-                    CompletableFuture.completedFuture(null) : 
connectionPool.closeAsync();
-            closing.set(connectionPoolClose);
-            return connectionPoolClose;
-        }
-    }
-
-    /**
-     * Settings given to {@link Cluster#connect(Client.Settings)} that 
configures how a {@link Client} will behave.
-     */
-    public static class Settings {
-        private final Optional<SessionSettings> session;
-
-        private Settings(final Builder builder) {
-            this.session = builder.session;
-        }
-
-        public static Builder build() {
-            return new Builder();
-        }
-
-        /**
-         * Determines if the {@link Client} is to be constructed with a 
session. If the value is present, then a
-         * session is expected.
-         */
-        public Optional<SessionSettings> getSession() {
-            return session;
-        }
-
-        public static class Builder {
-            private Optional<SessionSettings> session = Optional.empty();
-
-            private Builder() {
-            }
-
-            /**
-             * Enables a session. By default this will create a random session 
name and configure transactions to be
-             * unmanaged. This method will override settings provided by calls 
to the other overloads of
-             * {@code useSession}.
-             */
-            public Builder useSession(final boolean enabled) {
-                session = enabled ? 
Optional.of(SessionSettings.build().create()) : Optional.empty();
-                return this;
-            }
-
-            /**
-             * Enables a session. By default this will create a session with 
the provided name and configure
-             * transactions to be unmanaged. This method will override 
settings provided by calls to the other
-             * overloads of {@code useSession}.
-             */
-            public Builder useSession(final String sessionId) {
-                session = sessionId != null && !sessionId.isEmpty() ?
-                        
Optional.of(SessionSettings.build().sessionId(sessionId).create()) : 
Optional.empty();
-                return this;
-            }
-
-            /**
-             * Enables a session. This method will override settings provided 
by calls to the other overloads of
-             * {@code useSession}.
-             */
-            public Builder useSession(final SessionSettings settings) {
-                session = Optional.ofNullable(settings);
-                return this;
-            }
-
-            public Settings create() {
-                return new Settings(this);
-            }
-
-        }
-    }
-
-    /**
-     * Settings for a {@link Client} that involve a session.
-     */
-    public static class SessionSettings {
-        private final boolean manageTransactions;
-        private final String sessionId;
-        private final boolean forceClosed;
-
-        private SessionSettings(final Builder builder) {
-            manageTransactions = builder.manageTransactions;
-            sessionId = builder.sessionId;
-            forceClosed = builder.forceClosed;
-        }
-
-        /**
-         * If enabled, transactions will be "managed" such that each request 
will represent a complete transaction.
-         */
-        public boolean manageTransactions() {
-            return manageTransactions;
-        }
-
-        /**
-         * Provides the identifier of the session.
-         */
-        public String getSessionId() {
-            return sessionId;
-        }
-
-        /**
-         * Determines if the session will be force closed. See {@link 
Builder#forceClosed(boolean)} for more details
-         * on what that means.
-         */
-        public boolean isForceClosed() {
-            return forceClosed;
-        }
-
-        public static SessionSettings.Builder build() {
-            return new SessionSettings.Builder();
-        }
-
-        public static class Builder {
-            private boolean manageTransactions = false;
-            private String sessionId = UUID.randomUUID().toString();
-            private boolean forceClosed = false;
-
-            private Builder() {
-            }
-
-            /**
-             * If enabled, transactions will be "managed" such that each 
request will represent a complete transaction.
-             * By default this value is {@code false}.
-             */
-            public Builder manageTransactions(final boolean manage) {
-                manageTransactions = manage;
-                return this;
-            }
-
-            /**
-             * Provides the identifier of the session. This value cannot be 
null or empty. By default it is set to
-             * a random {@code UUID}.
-             */
-            public Builder sessionId(final String sessionId) {
-                if (null == sessionId || sessionId.isEmpty())
-                    throw new IllegalArgumentException("sessionId cannot be 
null or empty");
-                this.sessionId = sessionId;
-                return this;
-            }
-
-            /**
-             * Determines if the session should be force closed when the 
client is closed. Force closing will not
-             * attempt to close open transactions from existing running jobs 
and leave it to the underlying graph to
-             * decided how to proceed with those orphaned transactions. 
Setting this to {@code true} tends to lead to
-             * faster close operation which can be desirable if Gremlin Server 
has a long session timeout and a long
-             * script evaluation timeout as attempts to close long run jobs 
can occur more rapidly. By default, this
-             * value is {@code false}.
-             */
-            public Builder forceClosed(final boolean forced) {
-                this.forceClosed = forced;
-                return this;
-            }
-
-            public SessionSettings create() {
-                return new SessionSettings(this);
-            }
+            return new AliasClusteredClient(client, aliases);
         }
     }
 }
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
index 407b4cefe5..aacacc6042 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
@@ -28,7 +28,6 @@ import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
 import io.netty.util.concurrent.Future;
 import org.apache.commons.configuration2.Configuration;
 import org.apache.tinkerpop.gremlin.util.MessageSerializer;
-import org.apache.tinkerpop.gremlin.util.Tokens;
 import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4;
 import org.apache.tinkerpop.gremlin.util.ser.Serializers;
 import io.netty.bootstrap.Bootstrap;
@@ -92,23 +91,7 @@ public final class Cluster {
     }
 
     /**
-     * Creates a {@link Client.ClusteredClient} instance to this {@code 
Cluster}, meaning requests will be routed to
-     * one or more servers (depending on the cluster configuration), where 
each request represents the entirety of a
-     * transaction.  A commit or rollback (in case of error) is automatically 
executed at the end of the request.
-     * <p/>
-     * Note that calling this method does not imply that a connection is made 
to the server itself at this point.
-     * Therefore, if there is only one server specified in the {@code Cluster} 
and that server is not available an
-     * error will not be raised at this point.  Connections get initialized in 
the {@link Client} when a request is
-     * submitted or can be directly initialized via {@link Client#init()}.
-     */
-    public <T extends Client> T connect() {
-        final Client client = new Client.ClusteredClient(this, 
Client.Settings.build().create());
-        manager.trackClient(client);
-        return (T) client;
-    }
-
-    /**
-     * Creates a {@link Client.SessionedClient} instance to this {@code 
Cluster}, meaning requests will be routed to
+     * Creates a SessionedClient instance to this {@code Cluster}, meaning 
requests will be routed to
      * a single server (randomly selected from the cluster), where the same 
bindings will be available on each request.
      * Requests are bound to the same thread on the server and thus 
transactions may extend beyond the bounds of a
      * single request.  The transactions are managed by the user and must be 
committed or rolled-back manually.
@@ -121,11 +104,11 @@ public final class Cluster {
      * @param sessionId user supplied id for the session which should be 
unique (a UUID is ideal).
      */
     public <T extends Client> T connect(final String sessionId) {
-        return connect(sessionId, false);
+        throw new UnsupportedOperationException("not implemented");
     }
 
     /**
-     * Creates a {@link Client.SessionedClient} instance to this {@code 
Cluster}, meaning requests will be routed to
+     * Creates a SessionedClient instance to this {@code Cluster}, meaning 
requests will be routed to
      * a single server (randomly selected from the cluster), where the same 
bindings will be available on each request.
      * Requests are bound to the same thread on the server and thus 
transactions may extend beyond the bounds of a
      * single request.  If {@code manageTransactions} is set to {@code false} 
then transactions are managed by the
@@ -141,19 +124,14 @@ public final class Cluster {
      * @param manageTransactions enables auto-transactions when set to true
      */
     public <T extends Client> T connect(final String sessionId, final boolean 
manageTransactions) {
-        final Client.SessionSettings sessionSettings = 
Client.SessionSettings.build()
-                .manageTransactions(manageTransactions)
-                .sessionId(sessionId).create();
-        final Client.Settings settings = 
Client.Settings.build().useSession(sessionSettings).create();
-        return connect(settings);
+        throw new UnsupportedOperationException("not implemented");
     }
 
     /**
      * Creates a new {@link Client} based on the settings provided.
      */
-    public <T extends Client> T connect(final Client.Settings settings) {
-        final Client client = settings.getSession().isPresent() ? new 
Client.SessionedClient(this, settings) :
-                new Client.ClusteredClient(this, settings);
+    public <T extends Client> T connect() {
+        final Client client = new Client.ClusteredClient(this);
         manager.trackClient(client);
         return (T) client;
     }
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index c8e5d2a6cb..8e0ae0f4cf 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -18,7 +18,6 @@
  */
 package org.apache.tinkerpop.gremlin.driver;
 
-import org.apache.tinkerpop.gremlin.util.Tokens;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
@@ -31,17 +30,12 @@ import io.netty.channel.socket.nio.NioSocketChannel;
 
 import java.net.URI;
 import java.time.Instant;
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -50,32 +44,28 @@ import java.util.concurrent.atomic.AtomicReference;
  * @author Stephen Mallette (http://stephen.genoprime.com)
  */
 final class Connection {
+    public static final int MAX_WAIT_FOR_CONNECTION = 16000;
+    public static final int MAX_WAIT_FOR_CLOSE = 3000;
+    public static final int MAX_CONTENT_LENGTH = 10 * 1024 * 1024;
+    public static final int RECONNECT_INTERVAL = 1000;
+    public static final int RESULT_ITERATION_BATCH_SIZE = 64;
+    public static final long KEEP_ALIVE_INTERVAL = 180000;
+    public static final long CONNECTION_SETUP_TIMEOUT_MILLIS = 15000;
     private static final Logger logger = 
LoggerFactory.getLogger(Connection.class);
 
     private final Channel channel;
     private final URI uri;
-    private final ConcurrentMap<UUID, ResultQueue> pending = new 
ConcurrentHashMap<>();
+    private final AtomicReference<ResultQueue> pending = new 
AtomicReference<>();
     private final Cluster cluster;
     private final Client client;
     private final ConnectionPool pool;
     private final String creatingThread;
     private final String createdTimestamp;
 
-    public static final int MAX_WAIT_FOR_CONNECTION = 16000;
-    public static final int MAX_WAIT_FOR_CLOSE = 3000;
-    public static final int MAX_CONTENT_LENGTH = 10 * 1024 * 1024;
-
-    public static final int RECONNECT_INTERVAL = 1000;
-    public static final int RESULT_ITERATION_BATCH_SIZE = 64;
-    public static final long KEEP_ALIVE_INTERVAL = 180000;
-    public final static long CONNECTION_SETUP_TIMEOUT_MILLIS = 15000;
-
     /**
-     * When a {@code Connection} is borrowed from the pool, this number is 
incremented to indicate the number of
-     * times it has been taken and is decremented when it is returned.  This 
number is one indication as to how
-     * busy a particular {@code Connection} is.
+     * Is a {@code Connection} borrowed from the pool.
      */
-    public final AtomicInteger borrowed = new AtomicInteger(0);
+    private final AtomicBoolean isBorrowed = new AtomicBoolean(false);
     /**
      * This boolean guards the replace of the connection and ensures that it 
only occurs once.
      */
@@ -149,6 +139,10 @@ final class Connection {
         return (channel != null && !channel.isActive());
     }
 
+    public AtomicBoolean isBorrowed() {
+        return isBorrowed;
+    }
+
     boolean isClosing() {
         return closeFuture.get() != null;
     }
@@ -165,7 +159,7 @@ final class Connection {
         return client;
     }
 
-    ConcurrentMap<UUID, ResultQueue> getPending() {
+    AtomicReference<ResultQueue> getPending() {
         return pending;
     }
 
@@ -195,10 +189,6 @@ final class Connection {
     }
 
     public ChannelPromise write(final RequestMessageV4 requestMessage, final 
CompletableFuture<ResultSet> resultQueueSetup) {
-        // dont allow the same request id to be used as one that is already in 
the queue
-        if (pending.containsKey(requestMessage.getRequestId()))
-            throw new IllegalStateException(String.format("There is already a 
request pending with an id of: %s", requestMessage.getRequestId()));
-
         // once there is a completed write, then create a traverser for the 
result set and complete
         // the promise so that the client knows that that it can start 
checking for results.
         final Connection thisConnection = this;
@@ -238,7 +228,8 @@ final class Connection {
                         }, cluster.executor());
 
                         final ResultQueue handler = new 
ResultQueue(resultLinkedBlockingQueue, readCompleted);
-                        pending.put(requestMessage.getRequestId(), handler);
+                        // pending.put(requestMessage.getRequestId(), handler);
+                        pending.set(handler);
 
                         // resultQueueSetup should only be completed by a 
worker since the application code might have sync
                         // completion stages attached to it which and we do 
not want the event loop threads to process those
@@ -270,7 +261,7 @@ final class Connection {
     }
 
     private boolean isOkToClose() {
-        return pending.isEmpty() || (channel != null && !channel.isOpen()) || 
!pool.host.isAvailable();
+        return pending.get() == null || (channel != null && !channel.isOpen()) 
|| !pool.host.isAvailable();
     }
 
     /**
@@ -379,9 +370,9 @@ final class Connection {
     public String getConnectionInfo(final boolean showHost) {
         return showHost ?
                 String.format("Connection{channel=%s host=%s isDead=%s 
borrowed=%s pending=%s markedReplaced=%s closing=%s created=%s thread=%s}",
-                        getChannelId(), pool.host.toString(), isDead(), 
this.borrowed.get(), getPending().size(), this.isBeingReplaced, isClosing(), 
createdTimestamp, creatingThread) :
+                        getChannelId(), pool.host.toString(), isDead(), 
this.isBorrowed().get(), getPending().get() == null ? 0 : 1, 
this.isBeingReplaced, isClosing(), createdTimestamp, creatingThread) :
                 String.format("Connection{channel=%s isDead=%s borrowed=%s 
pending=%s markedReplaced=%s closing=%s created=%s thread=%s}",
-                        getChannelId(), isDead(), this.borrowed.get(), 
getPending().size(), this.isBeingReplaced, isClosing(), createdTimestamp, 
creatingThread);
+                        getChannelId(), isDead(), this.isBorrowed().get(), 
getPending().get() == null ? 0 : 1, this.isBeingReplaced, isClosing(), 
createdTimestamp, creatingThread);
     }
 
     /**
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
index 37ad1ac936..07e2d01dfa 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
@@ -183,7 +183,7 @@ final class ConnectionPool {
         }
 
         // Get the least used valid connection
-        final Connection leastUsedConn = getLeastUsedValidConnection();
+        final Connection leastUsedConn = getAvailableConnection();
 
         if (null == leastUsedConn) {
             if (isClosed())
@@ -201,13 +201,13 @@ final class ConnectionPool {
         logger.debug("Attempting to return {} on {}", connection, host);
         if (isClosed()) throw new ConnectionException(host.getHostUri(), 
host.getAddress(), "Pool is shutdown");
 
-        final int borrowed = connection.borrowed.decrementAndGet();
+        connection.isBorrowed().set(false);
 
         if (connection.isDead()) {
             logger.debug("Marking {} as dead", this.host);
             this.replaceConnection(connection);
         } else {
-            if (bin.contains(connection) && borrowed == 0) {
+            if (bin.contains(connection)) {
                 logger.debug("{} is already in the bin and it has no inflight 
requests so it is safe to close", connection);
                 if (bin.remove(connection))
                     connection.closeAsync();
@@ -215,9 +215,9 @@ final class ConnectionPool {
             }
 
             final int poolSize = connections.size();
-            if (poolSize > minPoolSize ) {
+            if (poolSize > minPoolSize) {
                 if (logger.isDebugEnabled())
-                    logger.debug("destroy {}",connection.getConnectionInfo());
+                    logger.debug("destroy {}", connection.getConnectionInfo());
                 destroyConnection(connection);
             } else if (maxPoolSize > 1) {
                 if (logger.isDebugEnabled())
@@ -397,7 +397,7 @@ final class ConnectionPool {
         }
 
         // only close the connection for good once it is done being borrowed 
or when it is dead
-        if (connection.isDead() || connection.borrowed.get() == 0) {
+        if (connection.isDead() || !connection.isBorrowed().get()) {
             if (bin.remove(connection)) {
                 final CompletableFuture<Void> closeFuture = 
connection.closeAsync();
                 closeFuture.whenComplete((v, t) -> {
@@ -422,7 +422,7 @@ final class ConnectionPool {
             if (isClosed())
                 throw new ConnectionException(host.getHostUri(), 
host.getAddress(), "Pool is shutdown");
 
-            final Connection leastUsed = getLeastUsedValidConnection();
+            final Connection leastUsed = getAvailableConnection();
 
             if (leastUsed != null) {
                 if (logger.isDebugEnabled())
@@ -547,30 +547,21 @@ final class ConnectionPool {
      * @return The least-used connection from the pool. Returns null if no 
valid connection could be retrieved from the
      * pool.
      */
-    private synchronized Connection getLeastUsedValidConnection() {
-        // todo: just take first unused
-        int minInFlight = Integer.MAX_VALUE;
+    private synchronized Connection getAvailableConnection() {
         Connection leastBusy = null;
+        int availableConnectionsCount = 0;
         for (Connection connection : connections) {
-            final int inFlight = connection.borrowed.get();
-            if (!connection.isDead() && inFlight < minInFlight && inFlight < 
1) {
-                minInFlight = inFlight;
-                leastBusy = connection;
+            if (!connection.isDead() && !connection.isBorrowed().get()) {
+                // try to borrow connection
+                if (leastBusy == null && 
connection.isBorrowed().compareAndSet(false, true)) {
+                    leastBusy = connection;
+                }
+                availableConnectionsCount++;
             }
         }
 
-        if (leastBusy != null) {
-            // Increment borrow count and consider making a new connection if 
least used connection hits usage maximum
-            if (leastBusy.borrowed.incrementAndGet() > 0
-                    && connections.size() < maxPoolSize) {
-                if (logger.isDebugEnabled())
-                    logger.debug("Least used {} on {} reached 
maxSimultaneousUsagePerConnection but pool size {} < maxPoolSize - consider new 
connection",
-                            leastBusy.getConnectionInfo(), host, 
connections.size());
-                considerNewConnection();
-            }
-        } else if (connections.size() < maxPoolSize) {
-            // A safeguard for scenarios where consideration of a new 
connection was somehow not triggered by an
-            // existing connection hitting the usage maximum
+        // todo: may be add new connection when only 10-20% of pool is 
available?
+        if (availableConnectionsCount == 0 && connections.size() < 
maxPoolSize) {
             considerNewConnection();
         }
 
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
index 74a29cfa67..7c516de775 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
@@ -21,7 +21,6 @@ package org.apache.tinkerpop.gremlin.driver.handler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.util.AttributeMap;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.tinkerpop.gremlin.driver.Result;
 import org.apache.tinkerpop.gremlin.driver.ResultQueue;
@@ -36,10 +35,7 @@ import org.slf4j.LoggerFactory;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-
-import static 
org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinRequestEncoder.REQUEST_ID;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Takes a map of requests pending responses and writes responses to the 
{@link ResultQueue} of a request
@@ -47,9 +43,9 @@ import static 
org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinRequestEnco
  */
 public class GremlinResponseHandler extends 
SimpleChannelInboundHandler<ResponseMessage> {
     private static final Logger logger = 
LoggerFactory.getLogger(GremlinResponseHandler.class);
-    private final ConcurrentMap<UUID, ResultQueue> pending;
+    private final AtomicReference<ResultQueue> pending;
 
-    public GremlinResponseHandler(final ConcurrentMap<UUID, ResultQueue> 
pending) {
+    public GremlinResponseHandler(final AtomicReference<ResultQueue> pending) {
         this.pending = pending;
     }
 
@@ -59,18 +55,16 @@ public class GremlinResponseHandler extends 
SimpleChannelInboundHandler<Response
         // should fire off a close message which will properly release the 
driver.
         super.channelInactive(ctx);
 
-        // the channel isn't going to get anymore results as it is closed so 
release all pending requests
-        pending.values().forEach(val -> val.markError(new 
IllegalStateException("Connection to server is no longer active")));
-        pending.clear();
+        final ResultQueue current = pending.getAndSet(null);
+        if (current != null) {
+            current.markError(new IllegalStateException("Connection to server 
is no longer active"));
+        }
     }
 
     @Override
     protected void channelRead0(final ChannelHandlerContext 
channelHandlerContext, final ResponseMessage response) throws Exception {
-        final UUID requestId = ((AttributeMap) 
channelHandlerContext).attr(REQUEST_ID).get();
-
         final HttpResponseStatus statusCode = response.getStatus() == null ? 
HttpResponseStatus.PARTIAL_CONTENT : response.getStatus().getCode();
-        final ResultQueue queue = pending.get(requestId);
-        System.out.println("GremlinResponseHandler get requestId: " + 
requestId);
+        final ResultQueue queue = pending.get();
         if (response.getResult().getData() != null) {
             System.out.println("GremlinResponseHandler payload size: " + 
((List) response.getResult().getData()).size());
         }
@@ -104,7 +98,10 @@ public class GremlinResponseHandler extends 
SimpleChannelInboundHandler<Response
         // todo:
         // as this is a non-PARTIAL_CONTENT code - the stream is done.
         if (statusCode != HttpResponseStatus.PARTIAL_CONTENT) {
-            
pending.remove(requestId).markComplete(response.getStatus().getAttributes());
+            final ResultQueue current = pending.getAndSet(null);
+            if (current != null) {
+                current.markComplete(response.getStatus().getAttributes());
+            }
         }
 
         System.out.println("----------------------------");
@@ -117,9 +114,7 @@ public class GremlinResponseHandler extends 
SimpleChannelInboundHandler<Response
         // there are that many failures someone would take notice and 
hopefully stop the client.
         logger.error("Could not process the response", cause);
 
-        // the channel took an error because of something pretty bad so 
release all the futures out there
-        pending.values().forEach(val -> val.markError(cause));
-        pending.clear();
+        pending.getAndSet(null).markError(cause);
 
         // serialization exceptions should not close the channel - that's 
worth a retry
         if 
(!IteratorUtils.anyMatch(ExceptionUtils.getThrowableList(cause).iterator(), t 
-> t instanceof SerializationException))
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java
index 5e0467a368..4ff552ca65 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java
@@ -28,9 +28,6 @@ import io.netty.handler.codec.http.HttpHeaderNames;
 import io.netty.handler.codec.http.HttpMethod;
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http.HttpVersion;
-import io.netty.util.Attribute;
-import io.netty.util.AttributeKey;
-import io.netty.util.AttributeMap;
 import org.apache.tinkerpop.gremlin.driver.UserAgent;
 import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
 import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
@@ -40,7 +37,6 @@ import 
org.apache.tinkerpop.gremlin.util.ser.MessageTextSerializerV4;
 import org.apache.tinkerpop.gremlin.util.ser.SerTokens;
 
 import java.util.List;
-import java.util.UUID;
 import java.util.function.UnaryOperator;
 
 /**
@@ -49,8 +45,6 @@ import java.util.function.UnaryOperator;
 @ChannelHandler.Sharable
 public final class HttpGremlinRequestEncoder extends 
MessageToMessageEncoder<RequestMessageV4> {
 
-    //todo: move
-    public static final AttributeKey<UUID> REQUEST_ID = 
AttributeKey.valueOf("requestId");
     private final MessageSerializer<?> serializer;
     private final boolean userAgentEnabled;
     private final UnaryOperator<FullHttpRequest> interceptor;
@@ -70,10 +64,6 @@ public final class HttpGremlinRequestEncoder extends 
MessageToMessageEncoder<Req
 
     @Override
     protected void encode(final ChannelHandlerContext channelHandlerContext, 
final RequestMessageV4 requestMessage, final List<Object> objects) throws 
Exception {
-        final Attribute<UUID> requestIdAttribute = ((AttributeMap) 
channelHandlerContext).attr(REQUEST_ID);
-        requestIdAttribute.set(requestMessage.getRequestId());
-        System.out.println("HttpGremlinRequestEncoder set requestId: " + 
requestIdAttribute.get());
-
         final String mimeType = serializer.mimeTypesSupported()[0];
         // only GraphSON3 and GraphBinary recommended for serialization of 
Bytecode requests
         if (requestMessage.getField("gremlin") instanceof Bytecode &&
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
index 99d4205005..7d249c8bc6 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java
@@ -35,13 +35,12 @@ import 
org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Optional;
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
 import static org.apache.tinkerpop.gremlin.util.Tokens.ARGS_BATCH_SIZE;
 import static org.apache.tinkerpop.gremlin.util.Tokens.ARGS_EVAL_TIMEOUT;
-import static org.apache.tinkerpop.gremlin.util.Tokens.ARGS_USER_AGENT;
 import static 
org.apache.tinkerpop.gremlin.util.Tokens.ARGS_MATERIALIZE_PROPERTIES;
+import static org.apache.tinkerpop.gremlin.util.Tokens.ARGS_USER_AGENT;
 
 
 /**
@@ -80,7 +79,7 @@ public class DriverRemoteConnection implements 
RemoteConnection {
                 cluster = conf.containsKey(GREMLIN_REMOTE_DRIVER_CLUSTERFILE) ?
                         
Cluster.open(conf.getString(GREMLIN_REMOTE_DRIVER_CLUSTERFILE)) : 
Cluster.open(conf.subset("clusterConfiguration"));
 
-            client = 
cluster.connect(Client.Settings.build().create()).alias(remoteTraversalSourceName);
+            client = cluster.connect().alias(remoteTraversalSourceName);
         } catch (Exception ex) {
             throw new IllegalStateException(ex);
         }
@@ -93,7 +92,7 @@ public class DriverRemoteConnection implements 
RemoteConnection {
     }
 
     private DriverRemoteConnection(final Cluster cluster, final boolean 
tryCloseCluster, final String remoteTraversalSourceName) {
-        client = 
cluster.connect(Client.Settings.build().create()).alias(remoteTraversalSourceName);
+        client = cluster.connect().alias(remoteTraversalSourceName);
         this.remoteTraversalSourceName = remoteTraversalSourceName;
         this.tryCloseCluster = tryCloseCluster;
         attachElements = false;
@@ -108,7 +107,7 @@ public class DriverRemoteConnection implements 
RemoteConnection {
 
         attachElements = conf.containsKey(GREMLIN_REMOTE + "attachment");
 
-        client = 
cluster.connect(Client.Settings.build().create()).alias(remoteTraversalSourceName);
+        client = cluster.connect().alias(remoteTraversalSourceName);
         tryCloseCluster = false;
         tryCloseClient = true;
         this.conf = Optional.of(conf);
@@ -237,10 +236,7 @@ public class DriverRemoteConnection implements 
RemoteConnection {
      * If the connection is bound to a session, then get the session 
identifier from it.
      */
     Optional<String> getSessionId() {
-        if (client instanceof Client.SessionedClient) {
-            Client.SessionedClient c = (Client.SessionedClient) client;
-            return Optional.of(c.getSessionId());
-        }
+        // todo: not implemented
 
         return Optional.empty();
     }
@@ -281,8 +277,9 @@ public class DriverRemoteConnection implements 
RemoteConnection {
      */
     @Override
     public Transaction tx() {
+        // todo: not implemented
         final DriverRemoteConnection session = new DriverRemoteConnection(
-                client.getCluster().connect(UUID.randomUUID().toString()), 
remoteTraversalSourceName, true);
+                client.getCluster().connect(), remoteTraversalSourceName, 
true);
         return new DriverRemoteTransaction(session);
     }
 
diff --git 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolTest.java
 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolTest.java
new file mode 100644
index 0000000000..01aa1f695e
--- /dev/null
+++ 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver;
+
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ConnectionPoolTest {
+
+    @Test
+    public void shouldCreateAndScalePool() throws TimeoutException {
+        final AtomicInteger connectionsCreated = new AtomicInteger(0);
+
+        final Connection mockConn0 = mock(Connection.class);
+        when(mockConn0.isBorrowed()).thenReturn(new AtomicBoolean(false));
+        final Connection mockConn1 = mock(Connection.class);
+        when(mockConn1.isBorrowed()).thenReturn(new AtomicBoolean(false));
+        final List<Connection> mockConns = Arrays.asList(mockConn0, mockConn1);
+
+        final ConnectionFactory connectionFactory = 
mock(ConnectionFactory.class);
+        when(connectionFactory.create(any(ConnectionPool.class)))
+                .thenAnswer(i -> 
mockConns.get(connectionsCreated.getAndIncrement()));
+
+        final Cluster cluster = mock(Cluster.class);
+        when(cluster.connectionPoolSettings()).thenReturn(new 
Settings.ConnectionPoolSettings());
+        final ScheduledThreadPoolExecutor connectionScheduler = new 
ScheduledThreadPoolExecutor(2,
+                new 
BasicThreadFactory.Builder().namingPattern("gremlin-driver-conn-scheduler-%d").build());
+        when(cluster.connectionScheduler()).thenReturn(connectionScheduler);
+
+        final Host host = mock(Host.class);
+
+        final Client client = new Client.ClusteredClient(cluster);
+
+        // create pool with 1 connection
+        final ConnectionPool connectionPool = new ConnectionPool(host, client,
+                Optional.of(1), Optional.of(2), connectionFactory);
+        // try to borrow this connection.
+        final Connection conn0 = connectionPool.borrowConnection(100, 
TimeUnit.MILLISECONDS);
+
+        assertNotNull(connectionPool);
+        assertNotNull(conn0);
+        assertEquals(1, connectionsCreated.get());
+
+        // try to borrow connection. conn0 is mocked as borrowed, so should 
create new one
+        when(mockConn0.isBorrowed()).thenReturn(new AtomicBoolean(true));
+        final Connection conn1 = connectionPool.borrowConnection(100, 
TimeUnit.MILLISECONDS);
+
+        assertNotNull(conn1);
+        assertEquals(2, connectionsCreated.get());
+
+        // mark conn1 as borrowed and try to get one more connection
+        when(mockConn1.isBorrowed()).thenReturn(new AtomicBoolean(true));
+        try {
+            connectionPool.borrowConnection(1000, TimeUnit.MILLISECONDS);
+            fail("Pool already at fool capacity, connection can't be added");
+        } catch (TimeoutException te) {
+            assertEquals(2, connectionsCreated.get());
+        }
+
+        // return conn0 to pool, can be borrowed again
+        when(mockConn0.isBorrowed()).thenReturn(new AtomicBoolean(false));
+        final Connection conn00 = connectionPool.borrowConnection(100, 
TimeUnit.MILLISECONDS);
+
+        assertNotNull(conn00);
+        assertEquals(2, connectionsCreated.get());
+    }
+}
diff --git 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/HostTest.java
 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/HostTest.java
index 2c20261335..066a77568d 100644
--- 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/HostTest.java
+++ 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/HostTest.java
@@ -35,7 +35,7 @@ public class HostTest {
         final InetSocketAddress addy = new InetSocketAddress("localhost", 
8182);
         final Host host = new Host(addy, Cluster.open());
         final URI webSocketUri = host.getHostUri();
-        assertEquals("ws://localhost:8182/gremlin", webSocketUri.toString());
+        assertEquals("http://localhost:8182/gremlin";, webSocketUri.toString());
     }
 
     @Test
@@ -43,7 +43,7 @@ public class HostTest {
         final InetSocketAddress addy = new InetSocketAddress("localhost", 
8183);
         final Host host = new Host(addy, 
Cluster.build().port(8183).path("/argh").create());
         final URI webSocketUri = host.getHostUri();
-        assertEquals("ws://localhost:8183/argh", webSocketUri.toString());
+        assertEquals("http://localhost:8183/argh";, webSocketUri.toString());
     }
 
 }
diff --git 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/WebSocketClientBehaviorIntegrateTest.java
 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/WebSocketClientBehaviorIntegrateTest.java
index f70e60c5e4..cd0ce0b97d 100644
--- 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/WebSocketClientBehaviorIntegrateTest.java
+++ 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/WebSocketClientBehaviorIntegrateTest.java
@@ -56,481 +56,481 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
-public class WebSocketClientBehaviorIntegrateTest {
-    private static final Logger logger = 
LoggerFactory.getLogger(WebSocketClientBehaviorIntegrateTest.class);
-
-    @Rule
-    public TestName name = new TestName();
-
-    private static LogCaptor logCaptor;
-
-    private final SocketServerSettings settings;
-
-    private SimpleSocketServer server;
-
-    public WebSocketClientBehaviorIntegrateTest() throws IOException {
-        settings = 
SocketServerSettings.read(FileSystems.getDefault().getPath("..","gremlin-tools",
 "gremlin-socket-server", "conf", "test-ws-gremlin.yaml"));
-        settings.SERIALIZER = "GraphSONV2";
-    }
-
-    @BeforeClass
-    public static void setupLogCaptor() {
-        logCaptor = LogCaptor.forRoot();
-    }
-
-    @AfterClass
-    public static void tearDown() {
-        logCaptor.close();
-    }
-
-    @Before
-    public void setUp() throws InterruptedException {
-        logCaptor.clearLogs();
-
-        server = new SimpleSocketServer(settings);
-        if 
(name.getMethodName().equals("shouldAttemptHandshakeForLongerThanDefaultNettySslHandshakeTimeout")
 ||
-                
name.getMethodName().equals("shouldPrintCorrectErrorForRegularWebSocketHandshakeTimeout"))
 {
-            server.start(new TestChannelizers.TestWSNoOpInitializer());
-        } else if 
(name.getMethodName().equals("shouldContinueRunningRemainingConnectionsIfServerThrottlesNewConnections")
 ||
-                
name.getMethodName().equals("shouldReturnCorrectExceptionIfServerThrottlesNewConnectionsAndMaxWaitExceeded"))
 {
-            server.start(new 
TestChannelizers.TestConnectionThrottlingInitializer(settings));
-        } else {
-            server.start(new TestWSGremlinInitializer(settings));
-        }
-    }
-
-    @After
-    public void shutdown() {
-        server.stop();
-    }
-
-    /**
-     * Tests that client is correctly sending user agent during web socket 
handshake by having the server return
-     * the captured user agent.
-     */
-    @Test
-    public void shouldIncludeUserAgentInHandshakeRequest() {
-        final Cluster cluster = Cluster.build("localhost").port(settings.PORT)
-                .minConnectionPoolSize(1)
-                .maxConnectionPoolSize(1)
-                .serializer(Serializers.GRAPHSON_V2)
-                .create();
-        final Client.ClusteredClient client = cluster.connect();
-
-        // trigger the testing server to return captured user agent
-        String returnedUserAgent = client.submit("1", RequestOptions.build()
-                        
.overrideRequestId(settings.USER_AGENT_REQUEST_ID).create()).one().getString();
-        assertEquals(UserAgent.USER_AGENT, returnedUserAgent);
-    }
-
-    /**
-     * Tests that no user agent is sent to server when that behaviour is 
disabled.
-     */
-    @Test
-    public void shouldNotIncludeUserAgentInHandshakeRequestIfDisabled() {
-        final Cluster cluster = Cluster.build("localhost").port(settings.PORT)
-                .minConnectionPoolSize(1)
-                .maxConnectionPoolSize(1)
-                .serializer(Serializers.GRAPHSON_V2)
-                .enableUserAgentOnConnect(false)
-                .create();
-        final Client.ClusteredClient client = cluster.connect();
-
-        // trigger the testing server to return captured user agent
-        String returnedUserAgent = client.submit("1", RequestOptions.build()
-                
.overrideRequestId(settings.USER_AGENT_REQUEST_ID).create()).one().getString();
-        assertEquals("", returnedUserAgent);
-    }
-
-    /**
-     * Constructs a deadlock situation when initializing a {@link Client} 
object in sessionless form that leads to
-     * hanging behavior in low resource environments (TINKERPOP-2504) and for 
certain configurations of the
-     * {@link Cluster} object where there are simply not enough threads to 
properly allow the {@link Host} and its
-     * related {@link ConnectionPool} objects to spin up properly - see 
TINKERPOP-2550.
-     */
-    @Test
-    public void shouldNotDeadlockOnInitialization() throws Exception {
-        // it seems you can add the same host more than once so while kinda 
weird it is helpful in faithfully
-        // recreating the deadlock situation, though it can/will happen with 
just one host. workerPoolSize at
-        // "1" also helps faithfully reproduce the problem though it can 
happen at larger pool sizes depending
-        // on the timing/interleaving of tasks. the larger connection pool 
sizes may not be required given the
-        // other settings at play but again, just trying to make sure the 
deadlock state is consistently produced
-        // and a larger pool size will mean more time to elapse scheduling 
connection creation tasks which may
-        // further improve chances of scheduling conflicts that produce the 
deadlock.
-        //
-        // to force this test to a fail state, change 
ClusteredClient.initializeImplementation() to use the
-        // standard Cluster.executor rather than the hostExecutor (which is a 
single threaded independent thread
-        // pool used just for the purpose of initializing the hosts).
-        final Cluster cluster = Cluster.build("localhost").
-                addContactPoint("localhost").
-                addContactPoint("localhost").port(settings.PORT).
-                workerPoolSize(1).
-                minConnectionPoolSize(32).maxConnectionPoolSize(32).create();
-
-        final AtomicBoolean failed = new AtomicBoolean(false);
-        final ExecutorService executor = Executors.newSingleThreadExecutor();
-        executor.submit(() -> {
-            try {
-                final Client client = cluster.connect();
-
-                // test will hang in init() where the Host and ConnectionPool 
are started up
-                client.init();
-            } catch (Exception ex) {
-                // should not "fail" - just hang and then timeout during the 
executor shutdown as there is
-                // a deadlock state, but we have this here just in case. a 
failed assertion of this value
-                // below could be interesting
-                logger.error("Client initialization failed with exception 
which was unexpected", ex);
-                failed.set(true);
-            } finally {
-                cluster.close();
-            }
-        });
-
-        executor.shutdown();
-
-        // 30 seconds should be ample time, even for travis. the deadlock 
state happens quite immediately in
-        // testing and in most situations this test should zip by in subsecond 
pace
-        assertThat(executor.awaitTermination(30, TimeUnit.SECONDS), is(true));
-        assertThat(failed.get(), is(false));
-    }
-
-    /**
-     * Test a scenario when server closes a connection which does not have any 
active requests. Such connection
-     * should be destroyed and replaced by another connection on next request.
-     */
-    @Test
-    public void 
shouldRemoveConnectionFromPoolWhenServerClose_WithNoPendingRequests() throws 
InterruptedException {
-        final Cluster cluster = Cluster.build("localhost").port(settings.PORT)
-                .minConnectionPoolSize(1)
-                .maxConnectionPoolSize(1)
-                .serializer(Serializers.GRAPHSON_V2)
-                .create();
-        final Client.ClusteredClient client = cluster.connect();
-
-        // Initialize the client preemptively
-        client.init();
-
-        // assert number of connections opened
-        final ConnectionPool channelPool = 
client.hostConnectionPools.values().stream().findFirst().get();
-        assertEquals(1, channelPool.getConnectionIDs().size());
-
-        final String originalConnectionID = 
channelPool.getConnectionIDs().iterator().next();
-        logger.info("On client init ConnectionIDs: " + 
channelPool.getConnectionIDs());
-
-        // trigger the testing server to send a WS close frame
-        Vertex v = client.submit("1", RequestOptions.build()
-                
.overrideRequestId(settings.SINGLE_VERTEX_DELAYED_CLOSE_CONNECTION_REQUEST_ID).create())
-                .one().getVertex();
-
-        assertNotNull(v);
-
-        // assert connection is not closed yet
-        assertEquals(1, channelPool.getConnectionIDs().size());
-
-        // wait for server to send the close WS frame
-        Thread.sleep(6000);
-
-        // assert that original connection is not part of the connection pool 
any more
-        assertThat("The original connection should have been closed by the 
server.",
-                channelPool.getConnectionIDs().contains(originalConnectionID), 
is(false));
-
-        // assert sanity after connection replacement
-        v = client.submit("1",
-                
RequestOptions.build().overrideRequestId(settings.SINGLE_VERTEX_REQUEST_ID).create())
-                .one().getVertex();
-        assertNotNull(v);
-    }
-
-    /**
-     * Tests a scenario when the connection a faulty connection replaced by a 
new connection.
-     * Ensures that the creation of a new replacement channel only happens 
once.
-     */
-    @Test
-    public void 
shouldRemoveConnectionFromPoolWhenServerClose_WithPendingRequests() throws 
InterruptedException, ExecutionException {
-        final Cluster cluster = Cluster.build("localhost").port(settings.PORT)
-                .minConnectionPoolSize(1)
-                .maxConnectionPoolSize(1)
-                .serializer(Serializers.GRAPHSON_V2)
-                .create();
-
-        final Client.ClusteredClient client = cluster.connect();
-
-        // Initialize the client preemptively
-        client.init();
-
-        // assert number of connections opened
-        final ConnectionPool channelPool = 
client.hostConnectionPools.values().stream().findFirst().get();
-        assertEquals(1, channelPool.getConnectionIDs().size());
-
-        // Send two requests in flight. Both should error out.
-        final CompletableFuture<ResultSet> req1 = client.submitAsync("1", 
RequestOptions.build()
-                
.overrideRequestId(settings.CLOSE_CONNECTION_REQUEST_ID).create());
-        final CompletableFuture<ResultSet> req2 = client.submitAsync("1", 
RequestOptions.build()
-                
.overrideRequestId(settings.CLOSE_CONNECTION_REQUEST_ID_2).create());
-
-
-        // assert both are sent on same connection
-        assertEquals(1, channelPool.getConnectionIDs().size());
-
-        // trigger write for both requests
-        req1.get();
-        req2.get();
-
-        // wait for close message to arrive from server
-        Thread.sleep(2000);
-
-        // Assert that we should consider creating a connection only once, 
since only one connection is being closed.
-        assertEquals(1, logCaptor.getLogs().stream().filter(str -> 
str.contains("Considering new connection on")).count());
-
-        // assert sanity after connection replacement
-        final Vertex v = client.submit("1",
-                
RequestOptions.build().overrideRequestId(settings.SINGLE_VERTEX_REQUEST_ID).create())
-                .one().getVertex();
-        assertNotNull(v);
-    }
-
-    /**
-     * Tests the scenario when client intentionally closes the connection. In 
this case, the
-     * connection should not be recycled.
-     */
-    @Test
-    public void 
shouldNotCreateReplacementConnectionWhenClientClosesConnection() throws 
ExecutionException, InterruptedException {
-        final Cluster cluster = Cluster.build("localhost").port(settings.PORT)
-                .minConnectionPoolSize(1)
-                .maxConnectionPoolSize(1)
-                .serializer(Serializers.GRAPHSON_V2)
-                .create();
-        final Client.ClusteredClient client = cluster.connect();
-
-        // Initialize the client preemptively
-        client.init();
-
-        // Clearing logCaptor before attempting to close the connection is in 
response to an issue where this test can
-        // be polluted by logs from a previous test when running on slow 
hardware.
-        logCaptor.clearLogs();
-
-        // assert number of connections opened
-        final ConnectionPool channelPool = 
client.hostConnectionPools.values().stream().findFirst().get();
-        assertEquals(1, channelPool.getConnectionIDs().size());
-
-        // close the connection pool in an authentic manner
-        channelPool.closeAsync().get();
-
-        // wait for channel closure callback to trigger
-        Thread.sleep(2000);
-
-        assertEquals("OnClose callback should be called but only once", 1,
-                logCaptor.getLogs().stream().filter(str -> 
str.contains("OnChannelClose callback called for channel")).count());
-
-        assertEquals("No new connection creation should be started", 0,
-                logCaptor.getLogs().stream().filter(str -> 
str.contains("Considering new connection on")).count());
-    }
-
-    /**
-     * (TINKERPOP-2814) Tests to make sure that the SSL handshake is now 
capped by connectionSetupTimeoutMillis and not
-     * the default Netty SSL handshake timeout of 10,000ms.
-     */
-    @Test
-    public void 
shouldAttemptHandshakeForLongerThanDefaultNettySslHandshakeTimeout() {
-        final Cluster cluster = Cluster.build("localhost").port(settings.PORT)
-                .minConnectionPoolSize(1)
-                .maxConnectionPoolSize(1)
-                .connectionSetupTimeoutMillis(20000) // needs to be larger 
than 10000ms.
-                .enableSsl(true)
-                .create();
-
-        final Client.ClusteredClient client = cluster.connect();
-        final long start = System.currentTimeMillis();
-
-        Exception caught = null;
-        try {
-            client.submit("1");
-        } catch (Exception e) {
-            caught = e;
-        } finally {
-            // Test against 15000ms which should give a big enough buffer to 
avoid timing issues.
-            assertTrue(System.currentTimeMillis() - start > 15000);
-            assertTrue(caught != null);
-            assertTrue(caught instanceof NoHostAvailableException);
-            assertTrue(logCaptor.getLogs().stream().anyMatch(str -> 
str.contains("SSL handshake not completed")));
-        }
-
-        cluster.close();
-    }
-
-    /**
-     * Tests to make sure that the correct error message is logged when a 
non-SSL connection attempt times out.
-     */
-    @Test
-    public void shouldPrintCorrectErrorForRegularWebSocketHandshakeTimeout() 
throws InterruptedException {
-        final Cluster cluster = Cluster.build("localhost").port(settings.PORT)
-                .minConnectionPoolSize(1)
-                .maxConnectionPoolSize(1)
-                .connectionSetupTimeoutMillis(120)
-                .create();
-
-        final Client.ClusteredClient client = cluster.connect();
-
-        Exception caught = null;
-        try {
-            client.submit("1");
-        } catch (Exception e) {
-            caught = e;
-        } finally {
-            assertTrue(caught != null);
-            assertTrue(caught instanceof NoHostAvailableException);
-            Thread.sleep(150);
-            assertTrue(logCaptor.getLogs().stream().anyMatch(str -> 
str.contains("WebSocket handshake not completed")));
-        }
-
-        cluster.close();
-    }
-
-    /**
-     * Tests that if a server throttles new connections (doesn't allow new 
connections to be made) then all requests
-     * will run and complete on the connections that are already open.
-     */
-    @Test
-    public void 
shouldContinueRunningRemainingConnectionsIfServerThrottlesNewConnections() 
throws ExecutionException, InterruptedException, TimeoutException {
-        final Cluster cluster = Cluster.build("localhost").port(settings.PORT)
-                .minConnectionPoolSize(1)
-                .maxConnectionPoolSize(5)
-                .maxWaitForConnection(15000) // large value ensures that 
request will eventually find a connection.
-                .connectionSetupTimeoutMillis(1000)
-                .serializer(Serializers.GRAPHSON_V2)
-                .create();
-
-        final Client.ClusteredClient client = cluster.connect();
-
-        final List<CompletableFuture<ResultSet>> results = new 
ArrayList<CompletableFuture<ResultSet>>();
-        for (int i = 0; i < 5; i++) {
-            results.add(client.submitAsync("500"));
-        }
-
-        for (CompletableFuture<ResultSet> result : results) {
-            assertNotNull(result.get(60000, 
TimeUnit.MILLISECONDS).one().getVertex());
-        }
-
-        cluster.close();
-    }
-
-    /**
-     * Tests that if a server throttles new connections (doesn't allow new 
connections to be made) then any request
-     * that can't find a connection within its maxWaitForConnection will 
return an informative exception regarding
-     * the inability to open new connections.
-     */
-    @Test
-    public void 
shouldReturnCorrectExceptionIfServerThrottlesNewConnectionsAndMaxWaitExceeded() 
{
-        final Cluster cluster = Cluster.build("localhost").port(settings.PORT)
-                .minConnectionPoolSize(1)
-                .maxConnectionPoolSize(5)
-                .maxWaitForConnection(250) // small value ensures that 
requests will return TimeoutException.
-                .connectionSetupTimeoutMillis(100)
-                .serializer(Serializers.GRAPHSON_V2)
-                .create();
-
-        final Client.ClusteredClient client = cluster.connect();
-
-        for (int i = 0; i < 5; i++) {
-            try {
-                client.submitAsync("3000");
-            } catch (Exception e) {
-                final Throwable rootCause = ExceptionHelper.getRootCause(e);
-                assertTrue(rootCause instanceof TimeoutException);
-                assertTrue(rootCause.getMessage().contains("WebSocket 
handshake not completed"));
-            }
-        }
-
-        cluster.close();
-    }
-
-    /**
-     * Tests that the client continues to work if the server temporarily goes 
down between two requests.
-     */
-    @Test
-    public void shouldContinueRunningIfServerGoesDownTemporarily() throws 
InterruptedException {
-        final Cluster cluster = Cluster.build("localhost").port(settings.PORT)
-                .minConnectionPoolSize(1)
-                .serializer(Serializers.GRAPHSON_V2)
-                .create();
-
-        final Client.ClusteredClient client = cluster.connect();
-        final Object lock = new Object();
-
-        final ScheduledExecutorService scheduledPool = 
Executors.newScheduledThreadPool(1);
-        scheduledPool.schedule(() -> {
-            try {
-                server.stopSync();
-                server = new SimpleSocketServer(settings);
-                server.start(new TestWSGremlinInitializer(settings));
-                synchronized (lock) {
-                    lock.notify();
-                }
-            } catch (InterruptedException ignored) {
-                // Ignored.
-            }
-        }, 1000, TimeUnit.MILLISECONDS);
-
-        synchronized (lock) {
-            assertNotNull(client.submit("1").one().getVertex());
-            lock.wait(30000);
-        }
-
-        assertNotNull(client.submit("1").one().getVertex());
-
-        cluster.close();
-    }
-
-    /**
-     * Tests that if the host is unavailable then the client will return an 
exception that contains information about
-     * the status of the host.
-     */
-    @Test
-    public void shouldReturnCorrectExceptionIfServerGoesDown() throws 
InterruptedException {
-        final Cluster cluster = Cluster.build("localhost").port(settings.PORT)
-                .minConnectionPoolSize(1)
-                .maxWaitForConnection(500)
-                .connectionSetupTimeoutMillis(100)
-                .serializer(Serializers.GRAPHSON_V2)
-                .create();
-
-        final Client.ClusteredClient client = cluster.connect();
-        client.submit("1");
-
-        server.stopSync();
-
-        try {
-            client.submit("1");
-        } catch (Exception e) {
-            final Throwable rootCause = ExceptionHelper.getRootCause(e);
-            assertTrue(rootCause instanceof TimeoutException);
-            assertTrue(rootCause.getMessage().contains("Connection refused"));
-        }
-
-        cluster.close();
-    }
-
-    /**
-     * Tests that client is correctly sending all overridable per request 
settings (requestId, batchSize,
-     * evaluationTimeout, and userAgent) to the server.
-     */
-    @Test
-    public void shouldSendPerRequestSettingsToServer() {
-        final Cluster cluster = Cluster.build("localhost").port(settings.PORT)
-                .minConnectionPoolSize(1)
-                .maxConnectionPoolSize(1)
-                .serializer(Serializers.GRAPHSON_V2)
-                .create();
-        final Client.ClusteredClient client = cluster.connect();
-
-        // trigger the testing server to return captured request settings
-        String response = client.submit("1", RequestOptions.build()
-                .overrideRequestId(settings.PER_REQUEST_SETTINGS_REQUEST_ID)
-                .timeout(1234).userAgent("helloWorld").batchSize(12)
-                .materializeProperties("tokens").create()).one().getString();
-
-        String expectedResponse = String.format("requestId=%s 
evaluationTimeout=%d, batchSize=%d, userAgent=%s, materializeProperties=%s",
-                settings.PER_REQUEST_SETTINGS_REQUEST_ID, 1234, 12, 
"helloWorld", "tokens");
-        assertEquals(expectedResponse, response);
-    }
-}
+//public class WebSocketClientBehaviorIntegrateTest {
+//    private static final Logger logger = 
LoggerFactory.getLogger(WebSocketClientBehaviorIntegrateTest.class);
+//
+//    @Rule
+//    public TestName name = new TestName();
+//
+//    private static LogCaptor logCaptor;
+//
+//    private final SocketServerSettings settings;
+//
+//    private SimpleSocketServer server;
+//
+//    public WebSocketClientBehaviorIntegrateTest() throws IOException {
+//        settings = 
SocketServerSettings.read(FileSystems.getDefault().getPath("..","gremlin-tools",
 "gremlin-socket-server", "conf", "test-ws-gremlin.yaml"));
+//        settings.SERIALIZER = "GraphSONV2";
+//    }
+//
+//    @BeforeClass
+//    public static void setupLogCaptor() {
+//        logCaptor = LogCaptor.forRoot();
+//    }
+//
+//    @AfterClass
+//    public static void tearDown() {
+//        logCaptor.close();
+//    }
+//
+//    @Before
+//    public void setUp() throws InterruptedException {
+//        logCaptor.clearLogs();
+//
+//        server = new SimpleSocketServer(settings);
+//        if 
(name.getMethodName().equals("shouldAttemptHandshakeForLongerThanDefaultNettySslHandshakeTimeout")
 ||
+//                
name.getMethodName().equals("shouldPrintCorrectErrorForRegularWebSocketHandshakeTimeout"))
 {
+//            server.start(new TestChannelizers.TestWSNoOpInitializer());
+//        } else if 
(name.getMethodName().equals("shouldContinueRunningRemainingConnectionsIfServerThrottlesNewConnections")
 ||
+//                
name.getMethodName().equals("shouldReturnCorrectExceptionIfServerThrottlesNewConnectionsAndMaxWaitExceeded"))
 {
+//            server.start(new 
TestChannelizers.TestConnectionThrottlingInitializer(settings));
+//        } else {
+//            server.start(new TestWSGremlinInitializer(settings));
+//        }
+//    }
+//
+//    @After
+//    public void shutdown() {
+//        server.stop();
+//    }
+//
+//    /**
+//     * Tests that client is correctly sending user agent during web socket 
handshake by having the server return
+//     * the captured user agent.
+//     */
+//    @Test
+//    public void shouldIncludeUserAgentInHandshakeRequest() {
+//        final Cluster cluster = 
Cluster.build("localhost").port(settings.PORT)
+//                .minConnectionPoolSize(1)
+//                .maxConnectionPoolSize(1)
+//                .serializer(Serializers.GRAPHSON_V2)
+//                .create();
+//        final Client.ClusteredClient client = cluster.connect();
+//
+//        // trigger the testing server to return captured user agent
+//        String returnedUserAgent = client.submit("1", RequestOptions.build()
+//                        
.overrideRequestId(settings.USER_AGENT_REQUEST_ID).create()).one().getString();
+//        assertEquals(UserAgent.USER_AGENT, returnedUserAgent);
+//    }
+//
+//    /**
+//     * Tests that no user agent is sent to server when that behaviour is 
disabled.
+//     */
+//    @Test
+//    public void shouldNotIncludeUserAgentInHandshakeRequestIfDisabled() {
+//        final Cluster cluster = 
Cluster.build("localhost").port(settings.PORT)
+//                .minConnectionPoolSize(1)
+//                .maxConnectionPoolSize(1)
+//                .serializer(Serializers.GRAPHSON_V2)
+//                .enableUserAgentOnConnect(false)
+//                .create();
+//        final Client.ClusteredClient client = cluster.connect();
+//
+//        // trigger the testing server to return captured user agent
+//        String returnedUserAgent = client.submit("1", RequestOptions.build()
+//                
.overrideRequestId(settings.USER_AGENT_REQUEST_ID).create()).one().getString();
+//        assertEquals("", returnedUserAgent);
+//    }
+//
+//    /**
+//     * Constructs a deadlock situation when initializing a {@link Client} 
object in sessionless form that leads to
+//     * hanging behavior in low resource environments (TINKERPOP-2504) and 
for certain configurations of the
+//     * {@link Cluster} object where there are simply not enough threads to 
properly allow the {@link Host} and its
+//     * related {@link ConnectionPool} objects to spin up properly - see 
TINKERPOP-2550.
+//     */
+//    @Test
+//    public void shouldNotDeadlockOnInitialization() throws Exception {
+//        // it seems you can add the same host more than once so while kinda 
weird it is helpful in faithfully
+//        // recreating the deadlock situation, though it can/will happen with 
just one host. workerPoolSize at
+//        // "1" also helps faithfully reproduce the problem though it can 
happen at larger pool sizes depending
+//        // on the timing/interleaving of tasks. the larger connection pool 
sizes may not be required given the
+//        // other settings at play but again, just trying to make sure the 
deadlock state is consistently produced
+//        // and a larger pool size will mean more time to elapse scheduling 
connection creation tasks which may
+//        // further improve chances of scheduling conflicts that produce the 
deadlock.
+//        //
+//        // to force this test to a fail state, change 
ClusteredClient.initializeImplementation() to use the
+//        // standard Cluster.executor rather than the hostExecutor (which is 
a single threaded independent thread
+//        // pool used just for the purpose of initializing the hosts).
+//        final Cluster cluster = Cluster.build("localhost").
+//                addContactPoint("localhost").
+//                addContactPoint("localhost").port(settings.PORT).
+//                workerPoolSize(1).
+//                minConnectionPoolSize(32).maxConnectionPoolSize(32).create();
+//
+//        final AtomicBoolean failed = new AtomicBoolean(false);
+//        final ExecutorService executor = Executors.newSingleThreadExecutor();
+//        executor.submit(() -> {
+//            try {
+//                final Client client = cluster.connect();
+//
+//                // test will hang in init() where the Host and 
ConnectionPool are started up
+//                client.init();
+//            } catch (Exception ex) {
+//                // should not "fail" - just hang and then timeout during the 
executor shutdown as there is
+//                // a deadlock state, but we have this here just in case. a 
failed assertion of this value
+//                // below could be interesting
+//                logger.error("Client initialization failed with exception 
which was unexpected", ex);
+//                failed.set(true);
+//            } finally {
+//                cluster.close();
+//            }
+//        });
+//
+//        executor.shutdown();
+//
+//        // 30 seconds should be ample time, even for travis. the deadlock 
state happens quite immediately in
+//        // testing and in most situations this test should zip by in 
subsecond pace
+//        assertThat(executor.awaitTermination(30, TimeUnit.SECONDS), 
is(true));
+//        assertThat(failed.get(), is(false));
+//    }
+//
+//    /**
+//     * Test a scenario when server closes a connection which does not have 
any active requests. Such connection
+//     * should be destroyed and replaced by another connection on next 
request.
+//     */
+//    @Test
+//    public void 
shouldRemoveConnectionFromPoolWhenServerClose_WithNoPendingRequests() throws 
InterruptedException {
+//        final Cluster cluster = 
Cluster.build("localhost").port(settings.PORT)
+//                .minConnectionPoolSize(1)
+//                .maxConnectionPoolSize(1)
+//                .serializer(Serializers.GRAPHSON_V2)
+//                .create();
+//        final Client.ClusteredClient client = cluster.connect();
+//
+//        // Initialize the client preemptively
+//        client.init();
+//
+//        // assert number of connections opened
+//        final ConnectionPool channelPool = 
client.hostConnectionPools.values().stream().findFirst().get();
+//        assertEquals(1, channelPool.getConnectionIDs().size());
+//
+//        final String originalConnectionID = 
channelPool.getConnectionIDs().iterator().next();
+//        logger.info("On client init ConnectionIDs: " + 
channelPool.getConnectionIDs());
+//
+//        // trigger the testing server to send a WS close frame
+//        Vertex v = client.submit("1", RequestOptions.build()
+//                
.overrideRequestId(settings.SINGLE_VERTEX_DELAYED_CLOSE_CONNECTION_REQUEST_ID).create())
+//                .one().getVertex();
+//
+//        assertNotNull(v);
+//
+//        // assert connection is not closed yet
+//        assertEquals(1, channelPool.getConnectionIDs().size());
+//
+//        // wait for server to send the close WS frame
+//        Thread.sleep(6000);
+//
+//        // assert that original connection is not part of the connection 
pool any more
+//        assertThat("The original connection should have been closed by the 
server.",
+//                
channelPool.getConnectionIDs().contains(originalConnectionID), is(false));
+//
+//        // assert sanity after connection replacement
+//        v = client.submit("1",
+//                
RequestOptions.build().overrideRequestId(settings.SINGLE_VERTEX_REQUEST_ID).create())
+//                .one().getVertex();
+//        assertNotNull(v);
+//    }
+//
+//    /**
+//     * Tests a scenario when the connection a faulty connection replaced by 
a new connection.
+//     * Ensures that the creation of a new replacement channel only happens 
once.
+//     */
+//    @Test
+//    public void 
shouldRemoveConnectionFromPoolWhenServerClose_WithPendingRequests() throws 
InterruptedException, ExecutionException {
+//        final Cluster cluster = 
Cluster.build("localhost").port(settings.PORT)
+//                .minConnectionPoolSize(1)
+//                .maxConnectionPoolSize(1)
+//                .serializer(Serializers.GRAPHSON_V2)
+//                .create();
+//
+//        final Client.ClusteredClient client = cluster.connect();
+//
+//        // Initialize the client preemptively
+//        client.init();
+//
+//        // assert number of connections opened
+//        final ConnectionPool channelPool = 
client.hostConnectionPools.values().stream().findFirst().get();
+//        assertEquals(1, channelPool.getConnectionIDs().size());
+//
+//        // Send two requests in flight. Both should error out.
+//        final CompletableFuture<ResultSet> req1 = client.submitAsync("1", 
RequestOptions.build()
+//                
.overrideRequestId(settings.CLOSE_CONNECTION_REQUEST_ID).create());
+//        final CompletableFuture<ResultSet> req2 = client.submitAsync("1", 
RequestOptions.build()
+//                
.overrideRequestId(settings.CLOSE_CONNECTION_REQUEST_ID_2).create());
+//
+//
+//        // assert both are sent on same connection
+//        assertEquals(1, channelPool.getConnectionIDs().size());
+//
+//        // trigger write for both requests
+//        req1.get();
+//        req2.get();
+//
+//        // wait for close message to arrive from server
+//        Thread.sleep(2000);
+//
+//        // Assert that we should consider creating a connection only once, 
since only one connection is being closed.
+//        assertEquals(1, logCaptor.getLogs().stream().filter(str -> 
str.contains("Considering new connection on")).count());
+//
+//        // assert sanity after connection replacement
+//        final Vertex v = client.submit("1",
+//                
RequestOptions.build().overrideRequestId(settings.SINGLE_VERTEX_REQUEST_ID).create())
+//                .one().getVertex();
+//        assertNotNull(v);
+//    }
+//
+//    /**
+//     * Tests the scenario when client intentionally closes the connection. 
In this case, the
+//     * connection should not be recycled.
+//     */
+//    @Test
+//    public void 
shouldNotCreateReplacementConnectionWhenClientClosesConnection() throws 
ExecutionException, InterruptedException {
+//        final Cluster cluster = 
Cluster.build("localhost").port(settings.PORT)
+//                .minConnectionPoolSize(1)
+//                .maxConnectionPoolSize(1)
+//                .serializer(Serializers.GRAPHSON_V2)
+//                .create();
+//        final Client.ClusteredClient client = cluster.connect();
+//
+//        // Initialize the client preemptively
+//        client.init();
+//
+//        // Clearing logCaptor before attempting to close the connection is 
in response to an issue where this test can
+//        // be polluted by logs from a previous test when running on slow 
hardware.
+//        logCaptor.clearLogs();
+//
+//        // assert number of connections opened
+//        final ConnectionPool channelPool = 
client.hostConnectionPools.values().stream().findFirst().get();
+//        assertEquals(1, channelPool.getConnectionIDs().size());
+//
+//        // close the connection pool in an authentic manner
+//        channelPool.closeAsync().get();
+//
+//        // wait for channel closure callback to trigger
+//        Thread.sleep(2000);
+//
+//        assertEquals("OnClose callback should be called but only once", 1,
+//                logCaptor.getLogs().stream().filter(str -> 
str.contains("OnChannelClose callback called for channel")).count());
+//
+//        assertEquals("No new connection creation should be started", 0,
+//                logCaptor.getLogs().stream().filter(str -> 
str.contains("Considering new connection on")).count());
+//    }
+//
+//    /**
+//     * (TINKERPOP-2814) Tests to make sure that the SSL handshake is now 
capped by connectionSetupTimeoutMillis and not
+//     * the default Netty SSL handshake timeout of 10,000ms.
+//     */
+//    @Test
+//    public void 
shouldAttemptHandshakeForLongerThanDefaultNettySslHandshakeTimeout() {
+//        final Cluster cluster = 
Cluster.build("localhost").port(settings.PORT)
+//                .minConnectionPoolSize(1)
+//                .maxConnectionPoolSize(1)
+//                .connectionSetupTimeoutMillis(20000) // needs to be larger 
than 10000ms.
+//                .enableSsl(true)
+//                .create();
+//
+//        final Client.ClusteredClient client = cluster.connect();
+//        final long start = System.currentTimeMillis();
+//
+//        Exception caught = null;
+//        try {
+//            client.submit("1");
+//        } catch (Exception e) {
+//            caught = e;
+//        } finally {
+//            // Test against 15000ms which should give a big enough buffer to 
avoid timing issues.
+//            assertTrue(System.currentTimeMillis() - start > 15000);
+//            assertTrue(caught != null);
+//            assertTrue(caught instanceof NoHostAvailableException);
+//            assertTrue(logCaptor.getLogs().stream().anyMatch(str -> 
str.contains("SSL handshake not completed")));
+//        }
+//
+//        cluster.close();
+//    }
+//
+//    /**
+//     * Tests to make sure that the correct error message is logged when a 
non-SSL connection attempt times out.
+//     */
+//    @Test
+//    public void shouldPrintCorrectErrorForRegularWebSocketHandshakeTimeout() 
throws InterruptedException {
+//        final Cluster cluster = 
Cluster.build("localhost").port(settings.PORT)
+//                .minConnectionPoolSize(1)
+//                .maxConnectionPoolSize(1)
+//                .connectionSetupTimeoutMillis(120)
+//                .create();
+//
+//        final Client.ClusteredClient client = cluster.connect();
+//
+//        Exception caught = null;
+//        try {
+//            client.submit("1");
+//        } catch (Exception e) {
+//            caught = e;
+//        } finally {
+//            assertTrue(caught != null);
+//            assertTrue(caught instanceof NoHostAvailableException);
+//            Thread.sleep(150);
+//            assertTrue(logCaptor.getLogs().stream().anyMatch(str -> 
str.contains("WebSocket handshake not completed")));
+//        }
+//
+//        cluster.close();
+//    }
+//
+//    /**
+//     * Tests that if a server throttles new connections (doesn't allow new 
connections to be made) then all requests
+//     * will run and complete on the connections that are already open.
+//     */
+//    @Test
+//    public void 
shouldContinueRunningRemainingConnectionsIfServerThrottlesNewConnections() 
throws ExecutionException, InterruptedException, TimeoutException {
+//        final Cluster cluster = 
Cluster.build("localhost").port(settings.PORT)
+//                .minConnectionPoolSize(1)
+//                .maxConnectionPoolSize(5)
+//                .maxWaitForConnection(15000) // large value ensures that 
request will eventually find a connection.
+//                .connectionSetupTimeoutMillis(1000)
+//                .serializer(Serializers.GRAPHSON_V2)
+//                .create();
+//
+//        final Client.ClusteredClient client = cluster.connect();
+//
+//        final List<CompletableFuture<ResultSet>> results = new 
ArrayList<CompletableFuture<ResultSet>>();
+//        for (int i = 0; i < 5; i++) {
+//            results.add(client.submitAsync("500"));
+//        }
+//
+//        for (CompletableFuture<ResultSet> result : results) {
+//            assertNotNull(result.get(60000, 
TimeUnit.MILLISECONDS).one().getVertex());
+//        }
+//
+//        cluster.close();
+//    }
+//
+//    /**
+//     * Tests that if a server throttles new connections (doesn't allow new 
connections to be made) then any request
+//     * that can't find a connection within its maxWaitForConnection will 
return an informative exception regarding
+//     * the inability to open new connections.
+//     */
+//    @Test
+//    public void 
shouldReturnCorrectExceptionIfServerThrottlesNewConnectionsAndMaxWaitExceeded() 
{
+//        final Cluster cluster = 
Cluster.build("localhost").port(settings.PORT)
+//                .minConnectionPoolSize(1)
+//                .maxConnectionPoolSize(5)
+//                .maxWaitForConnection(250) // small value ensures that 
requests will return TimeoutException.
+//                .connectionSetupTimeoutMillis(100)
+//                .serializer(Serializers.GRAPHSON_V2)
+//                .create();
+//
+//        final Client.ClusteredClient client = cluster.connect();
+//
+//        for (int i = 0; i < 5; i++) {
+//            try {
+//                client.submitAsync("3000");
+//            } catch (Exception e) {
+//                final Throwable rootCause = ExceptionHelper.getRootCause(e);
+//                assertTrue(rootCause instanceof TimeoutException);
+//                assertTrue(rootCause.getMessage().contains("WebSocket 
handshake not completed"));
+//            }
+//        }
+//
+//        cluster.close();
+//    }
+//
+//    /**
+//     * Tests that the client continues to work if the server temporarily 
goes down between two requests.
+//     */
+//    @Test
+//    public void shouldContinueRunningIfServerGoesDownTemporarily() throws 
InterruptedException {
+//        final Cluster cluster = 
Cluster.build("localhost").port(settings.PORT)
+//                .minConnectionPoolSize(1)
+//                .serializer(Serializers.GRAPHSON_V2)
+//                .create();
+//
+//        final Client.ClusteredClient client = cluster.connect();
+//        final Object lock = new Object();
+//
+//        final ScheduledExecutorService scheduledPool = 
Executors.newScheduledThreadPool(1);
+//        scheduledPool.schedule(() -> {
+//            try {
+//                server.stopSync();
+//                server = new SimpleSocketServer(settings);
+//                server.start(new TestWSGremlinInitializer(settings));
+//                synchronized (lock) {
+//                    lock.notify();
+//                }
+//            } catch (InterruptedException ignored) {
+//                // Ignored.
+//            }
+//        }, 1000, TimeUnit.MILLISECONDS);
+//
+//        synchronized (lock) {
+//            assertNotNull(client.submit("1").one().getVertex());
+//            lock.wait(30000);
+//        }
+//
+//        assertNotNull(client.submit("1").one().getVertex());
+//
+//        cluster.close();
+//    }
+//
+//    /**
+//     * Tests that if the host is unavailable then the client will return an 
exception that contains information about
+//     * the status of the host.
+//     */
+//    @Test
+//    public void shouldReturnCorrectExceptionIfServerGoesDown() throws 
InterruptedException {
+//        final Cluster cluster = 
Cluster.build("localhost").port(settings.PORT)
+//                .minConnectionPoolSize(1)
+//                .maxWaitForConnection(500)
+//                .connectionSetupTimeoutMillis(100)
+//                .serializer(Serializers.GRAPHSON_V2)
+//                .create();
+//
+//        final Client.ClusteredClient client = cluster.connect();
+//        client.submit("1");
+//
+//        server.stopSync();
+//
+//        try {
+//            client.submit("1");
+//        } catch (Exception e) {
+//            final Throwable rootCause = ExceptionHelper.getRootCause(e);
+//            assertTrue(rootCause instanceof TimeoutException);
+//            assertTrue(rootCause.getMessage().contains("Connection 
refused"));
+//        }
+//
+//        cluster.close();
+//    }
+//
+//    /**
+//     * Tests that client is correctly sending all overridable per request 
settings (requestId, batchSize,
+//     * evaluationTimeout, and userAgent) to the server.
+//     */
+//    @Test
+//    public void shouldSendPerRequestSettingsToServer() {
+//        final Cluster cluster = 
Cluster.build("localhost").port(settings.PORT)
+//                .minConnectionPoolSize(1)
+//                .maxConnectionPoolSize(1)
+//                .serializer(Serializers.GRAPHSON_V2)
+//                .create();
+//        final Client.ClusteredClient client = cluster.connect();
+//
+//        // trigger the testing server to return captured request settings
+//        String response = client.submit("1", RequestOptions.build()
+//                .overrideRequestId(settings.PER_REQUEST_SETTINGS_REQUEST_ID)
+//                .timeout(1234).userAgent("helloWorld").batchSize(12)
+//                .materializeProperties("tokens").create()).one().getString();
+//
+//        String expectedResponse = String.format("requestId=%s 
evaluationTimeout=%d, batchSize=%d, userAgent=%s, materializeProperties=%s",
+//                settings.PER_REQUEST_SETTINGS_REQUEST_ID, 1234, 12, 
"helloWorld", "tokens");
+//        assertEquals(expectedResponse, response);
+//    }
+//}
diff --git 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnectionTest.java
 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnectionTest.java
index 1b616a9cdd..17fcb0438c 100644
--- 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnectionTest.java
+++ 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnectionTest.java
@@ -36,7 +36,6 @@ public class DriverRemoteConnectionTest {
 
     @Test
     public void shouldBuildRequestOptions() {
-        final UUID requestId = 
UUID.fromString("34a9f45f-8854-4d33-8b40-92a8171ee495");
         final RequestOptions options = 
DriverRemoteConnection.getRequestOptions(
                 g.with("x").
                         with("y", 100).
@@ -44,7 +43,6 @@ public class DriverRemoteConnectionTest {
                         with(Tokens.ARGS_EVAL_TIMEOUT, 100000L).
                         with(Tokens.ARGS_USER_AGENT, "test").
                         V().asAdmin().getBytecode());
-        assertEquals(requestId, options.getOverrideRequestId().get());
         assertEquals(1000, options.getBatchSize().get().intValue());
         assertEquals(100000L, options.getTimeout().get().longValue());
         assertEquals("test", options.getUserAgent().get());
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java
index f51bd29445..f6aaf77b11 100644
--- 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java
@@ -22,7 +22,6 @@ import ch.qos.logback.classic.Level;
 import ch.qos.logback.classic.Logger;
 import io.netty.handler.codec.CorruptedFrameException;
 import nl.altindag.log.LogCaptor;
-import org.apache.tinkerpop.gremlin.util.Tokens;
 import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4;
 import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
 import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index 9210f00a34..9c4d59b0e1 100644
--- 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -29,25 +29,18 @@ import org.apache.tinkerpop.gremlin.driver.Cluster;
 import org.apache.tinkerpop.gremlin.driver.RequestOptions;
 import org.apache.tinkerpop.gremlin.driver.Result;
 import org.apache.tinkerpop.gremlin.driver.ResultSet;
-import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
 import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
 import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
 import org.apache.tinkerpop.gremlin.driver.handler.GremlinResponseHandler;
-import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection;
 import org.apache.tinkerpop.gremlin.jsr223.ScriptFileGremlinPlugin;
-import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
-import 
org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
 import org.apache.tinkerpop.gremlin.server.channel.HttpChannelizer;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.io.Storage;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex;
-import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory;
 import org.apache.tinkerpop.gremlin.util.ExceptionHelper;
 import org.apache.tinkerpop.gremlin.util.TimeUtil;
-import org.apache.tinkerpop.gremlin.util.Tokens;
 import org.apache.tinkerpop.gremlin.util.function.FunctionUtils;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4;
 import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV1;
 import org.apache.tinkerpop.gremlin.util.ser.Serializers;
 import org.junit.After;
@@ -55,8 +48,6 @@ import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
 import org.slf4j.LoggerFactory;
 
 import java.awt.*;
@@ -72,8 +63,6 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -82,7 +71,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import static 
org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource.traversal;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.AllOf.allOf;
@@ -98,7 +86,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.verify;
 
 /**
  * Integration tests for gremlin-driver configurations and settings.
@@ -594,49 +581,6 @@ public class GremlinDriverIntegrateTest extends 
AbstractGremlinServerIntegration
         }
     }
 
-    /**
-     * This test validates that the session requests are processed in-order on 
the server. The order of results
-     * returned to the client might be different though since each result is 
handled by a different executor thread.
-     */
-    @Test
-    public void shouldProcessSessionRequestsInOrder() throws Exception {
-        final Cluster cluster = TestClientFactory.open();
-        try {
-            final Client client = cluster.connect(name.getMethodName());
-
-            final ResultSet first = 
client.submit("Thread.sleep(5000);g.V().fold().coalesce(unfold(), 
__.addV('person'))");
-            final ResultSet second = client.submit("g.V().count()");
-
-            final CompletableFuture<List<Result>> futureFirst = first.all();
-            final CompletableFuture<List<Result>> futureSecond = second.all();
-
-            final CountDownLatch latch = new CountDownLatch(2);
-            final List<Object> results = new ArrayList<>();
-            final ExecutorService executor = 
Executors.newSingleThreadExecutor();
-
-            futureFirst.thenAcceptAsync(r -> {
-                results.add(r.get(0).getVertex().label());
-                latch.countDown();
-            }, executor);
-
-            futureSecond.thenAcceptAsync(r -> {
-                results.add(r.get(0).getLong());
-                latch.countDown();
-            }, executor);
-
-            // wait for both results
-            latch.await(30000, TimeUnit.MILLISECONDS);
-
-            assertThat("Should contain 2 results", results.size() == 2);
-            assertThat("The numeric result should be 1", results.contains(1L));
-            assertThat("The string result contain label person", 
results.contains("person"));
-
-            executor.shutdown();
-        } finally {
-            cluster.close();
-        }
-    }
-
     @Test
     public void shouldWaitForAllResultsToArrive() throws Exception {
         final Cluster cluster = TestClientFactory.open();
@@ -1062,23 +1006,6 @@ public class GremlinDriverIntegrateTest extends 
AbstractGremlinServerIntegration
         }
     }
 
-    @Test
-    public void shouldExecuteScriptInSession() throws Exception {
-        final Cluster cluster = TestClientFactory.build().create();
-        final Client client = cluster.connect(name.getMethodName());
-
-        final ResultSet results1 = client.submit("x = [1,2,3,4,5,6,7,8,9]");
-        assertEquals(9, results1.all().get().size());
-
-        final ResultSet results2 = client.submit("x[0]+1");
-        assertEquals(2, results2.all().get().get(0).getInt());
-
-        final ResultSet results3 = client.submit("x[1]+2");
-        assertEquals(4, results3.all().get().get(0).getInt());
-
-        cluster.close();
-    }
-
     @Test
     public void shouldNotThrowNoSuchElementException() throws Exception {
         final Cluster cluster = TestClientFactory.open();
@@ -1124,127 +1051,6 @@ public class GremlinDriverIntegrateTest extends 
AbstractGremlinServerIntegration
         }
     }
 
-    @Test
-    public void shouldCloseSession() throws Exception {
-        final Cluster cluster = TestClientFactory.build().create();
-        final Client client = cluster.connect(name.getMethodName());
-
-        final ResultSet results1 = client.submit("x = [1,2,3,4,5,6,7,8,9]");
-        assertEquals(9, results1.all().get().size());
-        final ResultSet results2 = client.submit("x[0]+1");
-        assertEquals(2, results2.all().get().get(0).getInt());
-
-        client.close();
-
-        try {
-            client.submit("x[0]+1").all().get();
-            fail("Should have thrown an exception because the connection is 
closed");
-        } catch (Exception ex) {
-            final Throwable root = ExceptionHelper.getRootCause(ex);
-            assertThat(root, instanceOf(IllegalStateException.class));
-        } finally {
-            cluster.close();
-        }
-    }
-
-    @Test
-    public void shouldExecuteScriptInSessionAssumingDefaultedImports() throws 
Exception {
-        final Cluster cluster = TestClientFactory.open();
-        final Client client = cluster.connect(name.getMethodName());
-
-        final ResultSet results1 = client.submit("TinkerFactory.class.name");
-        assertEquals(TinkerFactory.class.getName(), 
results1.all().get().get(0).getString());
-
-        cluster.close();
-    }
-
-    @Test
-    public void shouldExecuteScriptInSessionOnTransactionalGraph() throws 
Exception {
-
-        final Cluster cluster = TestClientFactory.open();
-        final Client client = cluster.connect(name.getMethodName());
-
-        final Vertex vertexBeforeTx = 
client.submit("v=g.addV(\"person\").property(\"name\",\"stephen\").next()").all().get().get(0).getVertex();
-        assertEquals("person", vertexBeforeTx.label());
-
-        final String nameValueFromV = 
client.submit("g.V().values('name').next()").all().get().get(0).getString();
-        assertEquals("stephen", nameValueFromV);
-
-        final Vertex vertexFromBinding = 
client.submit("v").all().get().get(0).getVertex();
-        assertEquals("person", vertexFromBinding.label());
-
-        final Map<String,Object> vertexAfterTx = 
client.submit("g.V(v).property(\"color\",\"blue\").iterate(); g.tx().commit(); 
g.V(v).valueMap().by(unfold())").all().get().get(0).get(Map.class);
-        assertEquals("stephen", vertexAfterTx.get("name"));
-        assertEquals("blue", vertexAfterTx.get("color"));
-
-        cluster.close();
-    }
-
-    @Test
-    public void 
shouldExecuteScriptInSessionOnTransactionalWithManualTransactionsGraph() throws 
Exception {
-
-        final Cluster cluster = TestClientFactory.open();
-        final Client client = cluster.connect(name.getMethodName());
-        final Client sessionlessClient = cluster.connect();
-        
client.submit("graph.tx().onReadWrite(Transaction.READ_WRITE_BEHAVIOR.MANUAL);null").all().get();
-        client.submit("graph.tx().open()").all().get();
-
-        final Vertex vertexBeforeTx = 
client.submit("v=g.addV(\"person\").property(\"name\", 
\"stephen\").next()").all().get().get(0).getVertex();
-        assertEquals("person", vertexBeforeTx.label());
-
-        final String nameValueFromV = 
client.submit("g.V().values(\"name\").next()").all().get().get(0).getString();
-        assertEquals("stephen", nameValueFromV);
-
-        final Vertex vertexFromBinding = 
client.submit("v").all().get().get(0).getVertex();
-        assertEquals("person", vertexFromBinding.label());
-
-        client.submit("g.V(v).property(\"color\",\"blue\")").all().get();
-        client.submit("g.tx().commit()").all().get();
-
-        // Run a sessionless request to change transaction.readWriteConsumer 
back to AUTO
-        // The will make the next in session request fail if consumers aren't 
ThreadLocal
-        sessionlessClient.submit("g.V().next()").all().get();
-
-        client.submit("g.tx().open()").all().get();
-
-        final Map<String,Object> vertexAfterTx = 
client.submit("g.V().valueMap().by(unfold())").all().get().get(0).get(Map.class);
-        assertEquals("stephen", vertexAfterTx.get("name"));
-        assertEquals("blue", vertexAfterTx.get("color"));
-
-        client.submit("g.tx().rollback()").all().get();
-
-        cluster.close();
-    }
-
-    @Test
-    public void 
shouldExecuteInSessionAndSessionlessWithoutOpeningTransaction() throws 
Exception {
-
-        final Cluster cluster = TestClientFactory.open();
-        try {
-            final Client sessionClient = cluster.connect(name.getMethodName());
-            final Client sessionlessClient = cluster.connect();
-
-            //open transaction in session, then add vertex and commit
-            sessionClient.submit("g.tx().open()").all().get();
-            final Vertex vertexBeforeTx = 
sessionClient.submit("v=g.addV(\"person\").property(\"name\",\"stephen\").next()").all().get().get(0).getVertex();
-            assertEquals("person", vertexBeforeTx.label());
-            sessionClient.submit("g.tx().commit()").all().get();
-
-            // check that session transaction is closed
-            final boolean isOpen = 
sessionClient.submit("g.tx().isOpen()").all().get().get(0).getBoolean();
-            assertTrue("Transaction should be closed", !isOpen);
-
-            //run a sessionless read
-            sessionlessClient.submit("g.V()").all().get();
-
-            // check that session transaction is still closed
-            final boolean isOpenAfterSessionless = 
sessionClient.submit("g.tx().isOpen()").all().get().get(0).getBoolean();
-            assertTrue("Transaction should stil be closed", 
!isOpenAfterSessionless);
-        } finally {
-            cluster.close();
-        }
-    }
-
     @Test
     public void shouldExecuteSessionlessScriptOnTransactionalGraph() throws 
Exception {
 
@@ -1531,105 +1337,6 @@ public class GremlinDriverIntegrateTest extends 
AbstractGremlinServerIntegration
         cluster.close();
     }
 
-    @Test
-    public void shouldManageTransactionsInSession() throws Exception {
-
-        final Cluster cluster = TestClientFactory.open();
-        final Client client = cluster.connect();
-        final Client sessionWithManagedTx = 
cluster.connect(name.getMethodName() + "-managed", true);
-        final Client sessionWithoutManagedTx = 
cluster.connect(name.getMethodName() + "-not-managed");
-
-        // this should auto-commit
-        sessionWithManagedTx.submit("v = 
g.addV().property('name','stephen').next()").all().get().get(0).getVertex();
-
-        // the other clients should see that change because of auto-commit
-        
assertThat(client.submit("g.V().has('name','stephen').hasNext()").all().get().get(0).getBoolean(),
 is(true));
-        
assertThat(sessionWithoutManagedTx.submit("g.V().has('name','stephen').hasNext()").all().get().get(0).getBoolean(),
 is(true));
-
-        // this should NOT auto-commit
-        final Vertex vDaniel = sessionWithoutManagedTx.submit("v = 
g.addV().property('name','daniel').next()").all().get().get(0).getVertex();
-
-        // the other clients should NOT see that change because of auto-commit
-        
assertThat(client.submit("g.V().has('name','daniel').hasNext()").all().get().get(0).getBoolean(),
 is(false));
-        
assertThat(sessionWithManagedTx.submit("g.V().has('name','daniel').hasNext()").all().get().get(0).getBoolean(),
 is(false));
-
-        // but "v" should still be there
-        final Vertex vDanielAgain = 
sessionWithoutManagedTx.submit("v").all().get().get(0).getVertex();
-        assertEquals(vDaniel.id(), vDanielAgain.id());
-
-        // now commit manually
-        sessionWithoutManagedTx.submit("g.tx().commit()").all().get();
-
-        // should be there for all now
-        
assertThat(client.submit("g.V().has('name','daniel').hasNext()").all().get().get(0).getBoolean(),
 is(true));
-        
assertThat(sessionWithManagedTx.submit("g.V().has('name','daniel').hasNext()").all().get().get(0).getBoolean(),
 is(true));
-        
assertThat(sessionWithoutManagedTx.submit("g.V().has('name','daniel').hasNext()").all().get().get(0).getBoolean(),
 is(true));
-
-        cluster.close();
-    }
-
-    @Test
-    public void shouldProcessSessionRequestsInOrderAfterTimeout() throws 
Exception {
-        final Cluster cluster = TestClientFactory.open();
-
-        try {
-            // this configures the client to behave like OpProcessor for 
UnifiedChannelizer
-            final Client.SessionSettings settings = 
Client.SessionSettings.build().
-                    sessionId(name.getMethodName()).create();
-            final Client client = 
cluster.connect(Client.Settings.build().useSession(settings).create());
-
-            for (int index = 0; index < 50; index++) {
-                final CompletableFuture<ResultSet> first = client.submitAsync(
-                        "Object mon1 = 'mon1';\n" +
-                                "synchronized (mon1) {\n" +
-                                "    mon1.wait();\n" +
-                                "} ");
-
-                final CompletableFuture<ResultSet> second = client.submitAsync(
-                        "Object mon2 = 'mon2';\n" +
-                                "synchronized (mon2) {\n" +
-                                "    mon2.wait();\n" +
-                                "}");
-
-                final CompletableFuture<ResultSet> third = client.submitAsync(
-                        "Object mon3 = 'mon3';\n" +
-                                "synchronized (mon3) {\n" +
-                                "    mon3.wait();\n" +
-                                "}");
-
-                final CompletableFuture<ResultSet> fourth = client.submitAsync(
-                        "Object mon4 = 'mon4';\n" +
-                                "synchronized (mon4) {\n" +
-                                "    mon4.wait();\n" +
-                                "}");
-
-                final CompletableFuture<List<Result>> futureFirst = 
first.get().all();
-                final CompletableFuture<List<Result>> futureSecond = 
second.get().all();
-                final CompletableFuture<List<Result>> futureThird = 
third.get().all();
-                final CompletableFuture<List<Result>> futureFourth = 
fourth.get().all();
-
-                assertFutureTimeout(futureFirst);
-                assertFutureTimeout(futureSecond);
-                assertFutureTimeout(futureThird);
-                assertFutureTimeout(futureFourth);
-            }
-        } finally {
-            cluster.close();
-        }
-    }
-
-    private void assertFutureTimeout(final CompletableFuture<List<Result>> f) {
-        try {
-            f.get();
-            fail("Should have timed out");
-        } catch (Exception ex) {
-            final Throwable root = ExceptionHelper.getRootCause(ex);
-            assertThat(root, instanceOf(ResponseException.class));
-            assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, 
((ResponseException) root).getResponseStatusCode());
-            assertThat(root.getMessage(), allOf(startsWith("Evaluation 
exceeded"), containsString("250 ms")));
-        }
-    }
-
     @Test
     public void shouldCloseAllClientsOnCloseOfCluster() throws Exception {
         final Cluster cluster = TestClientFactory.open();
@@ -1822,11 +1529,8 @@ public class GremlinDriverIntegrateTest extends 
AbstractGremlinServerIntegration
                 client.submit("1+1").all().get(3000, TimeUnit.MILLISECONDS);
                 fail("Should throw exception on the retry");
             } catch (RuntimeException re2) {
-                if (client instanceof Client.SessionedClient) {
-                    assertThat(re2.getCause().getCause(), 
instanceOf(ConnectionException.class));
-                } else {
-                    assertThat(re2.getCause().getCause().getCause(), 
instanceOf(ConnectException.class));
-                }
+                assertThat(re2.getCause().getCause().getCause(), 
instanceOf(ConnectException.class));
+
             }
 
             //
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java
index b70d8c18f7..82aba1cef3 100644
--- 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java
@@ -40,8 +40,11 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static 
org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource.traversal;
 import static org.hamcrest.CoreMatchers.is;
@@ -63,19 +66,6 @@ public class HttpDriverIntegrateTest extends 
AbstractGremlinServerIntegrationTes
         return settings;
     }
 
-//    @Test
-//    public void shouldSubmitScriptWithGraphSON() throws Exception {
-//        final Cluster cluster = TestClientFactory.build().create();
-//        try {
-//            final Client client = cluster.connect();
-//            assertEquals(2, 
client.submit("1+1").all().get().get(0).getInt());
-//        } catch (Exception ex) {
-//            throw ex;
-//        } finally {
-//            cluster.close();
-//        }
-//    }
-
     @Test
     public void shouldSubmitScriptWithGraphBinary() throws Exception {
         final Cluster cluster = TestClientFactory.build().create();
@@ -90,43 +80,11 @@ public class HttpDriverIntegrateTest extends 
AbstractGremlinServerIntegrationTes
         }
     }
 
-//    @Test
-//    public void shouldSubmitBytecodeWithGraphSON() throws Exception {
-//        final Cluster cluster = TestClientFactory.build()
-//                .channelizer(Channelizer.HttpChannelizer.class)
-//                .serializer(Serializers.GRAPHSON_V4)
-//                .create();
-//        try {
-//            final GraphTraversalSource g = 
traversal().withRemote(DriverRemoteConnection.using(cluster));
-//            final String result = g.inject("2").toList().get(0);
-//            assertEquals("2", result);
-//        } catch (Exception ex) {
-//            throw ex;
-//        } finally {
-//            cluster.close();
-//        }
-//    }
-
-//    @Test
-//    public void shouldGetErrorForBytecodeWithUntypedGraphSON() throws 
Exception {
-//        final Cluster cluster = TestClientFactory.build().create();
-//        try {
-//            final GraphTraversalSource g = 
traversal().withRemote(DriverRemoteConnection.using(cluster));
-//            g.inject("2").toList();
-//            fail("Exception expected");
-//        } catch (EncoderException ex) {
-//            assertThat(ex.getMessage(), allOf(containsString("An error 
occurred during serialization of this request"),
-//                    containsString("it could not be sent to the server - 
Reason: only GraphSON3 and GraphBinary recommended for serialization of 
Bytecode requests, but used org.apache.tinkerpop.gremlin.")));
-//        } finally {
-//            cluster.close();
-//        }
-//    }
-
     @Test
-    public void shouldSubmitBytecodeWithGraphBinary() throws Exception {
+    public void shouldSubmitBytecodeWithGraphBinary() {
         final Cluster cluster = TestClientFactory.build().create();
         try {
-            final GraphTraversalSource g = 
traversal().withRemote(DriverRemoteConnection.using(cluster));
+            final GraphTraversalSource g = 
traversal().with(DriverRemoteConnection.using(cluster));
             final String result = g.inject("2").toList().get(0);
             assertEquals("2", result);
         } catch (Exception ex) {
@@ -137,42 +95,72 @@ public class HttpDriverIntegrateTest extends 
AbstractGremlinServerIntegrationTes
     }
 
     @Test
-    public void shouldFailToUseSession() {
+    public void shouldSubmitMultipleQueriesWithSameConnection() throws 
InterruptedException, ExecutionException {
         final Cluster cluster = TestClientFactory.build().create();
+        final Client client = cluster.connect();
+
         try {
-            final Client client = cluster.connect("shouldFailToUseSession");
-            client.submit("1+1").all().get();
-            fail("Can't use session with HTTP");
+            final int result = 
client.submit("Thread.sleep(1000);1").all().get().get(0).getInt();
+            assertEquals(1, result);
+
+            final AtomicInteger result2 = new AtomicInteger(-1);
+            final Thread thread = new Thread(() -> {
+                try {
+                    
result2.set(client.submit("2").all().get().get(0).getInt());
+                } catch (InterruptedException | ExecutionException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+            thread.start();
+            thread.join();
+
+            assertEquals(2, result2.get());
         } catch (Exception ex) {
-            final Throwable t = ExceptionUtils.getRootCause(ex);
-            assertEquals("Cannot use sessions or tx() with HttpChannelizer", 
t.getMessage());
+            throw ex;
         } finally {
             cluster.close();
         }
     }
 
     @Test
-    public void shouldFailToUseTx() {
+    public void shouldFailToUseSession() {
         final Cluster cluster = TestClientFactory.build().create();
         try {
-            final GraphTraversalSource g = 
traversal().withRemote(DriverRemoteConnection.using(cluster));
-            final Transaction tx = g.tx();
-            final GraphTraversalSource gtx = tx.begin();
-            gtx.inject("1").toList();
-            fail("Can't use tx() with HTTP");
+            final Client client = cluster.connect("shouldFailToUseSession");
+            client.submit("1+1").all().get();
+            fail("Can't use session with HTTP");
         } catch (Exception ex) {
             final Throwable t = ExceptionUtils.getRootCause(ex);
-            assertEquals("Cannot use sessions or tx() with HttpChannelizer", 
t.getMessage());
+            // assertEquals("Cannot use sessions or tx() with 
HttpChannelizer", t.getMessage());
+            assertEquals("not implemented", t.getMessage());
         } finally {
             cluster.close();
         }
     }
 
+//    @Test
+//    public void shouldFailToUseTx() {
+//        final Cluster cluster = TestClientFactory.build().create();
+//        try {
+//            final GraphTraversalSource g = 
traversal().with(DriverRemoteConnection.using(cluster));
+//            final Transaction tx = g.tx();
+//            final GraphTraversalSource gtx = tx.begin();
+//            gtx.inject("1").toList();
+//            fail("Can't use tx() with HTTP");
+//        } catch (Exception ex) {
+//            final Throwable t = ExceptionUtils.getRootCause(ex);
+//            // assertEquals("Cannot use sessions or tx() with 
HttpChannelizer", t.getMessage());
+//            assertEquals("not implemented", t.getMessage());
+//        } finally {
+//            cluster.close();
+//        }
+//    }
+
     @Test
-    public void shouldDeserializeErrorWithGraphBinary() throws Exception {
+    public void shouldDeserializeErrorWithGraphBinary() {
         final Cluster cluster = TestClientFactory.build().create();
         try {
-            final GraphTraversalSource g = 
traversal().withRemote(DriverRemoteConnection.using(cluster, "doesNotExist"));
+            final GraphTraversalSource g = 
traversal().with(DriverRemoteConnection.using(cluster, "doesNotExist"));
             g.V().next();
             fail("Expected exception to be thrown.");
         } catch (Exception ex) {
@@ -182,20 +170,6 @@ public class HttpDriverIntegrateTest extends 
AbstractGremlinServerIntegrationTes
         }
     }
 
-//    @Test
-//    public void shouldDeserializeErrorWithGraphSON() throws Exception {
-//        final Cluster cluster = TestClientFactory.build().create();
-//        try {
-//            final GraphTraversalSource g = 
traversal().withRemote(DriverRemoteConnection.using(cluster, "doesNotExist"));
-//            g.V().next();
-//            fail("Expected exception to be thrown.");
-//        } catch (Exception ex) {
-//            assert ex.getMessage().contains("Could not rebind");
-//        } finally {
-//            cluster.close();
-//        }
-//    }
-
     @Ignore("driver side error")
     @Test
     public void shouldReportErrorWhenRequestCantBeSerialized() throws 
Exception {

Reply via email to