Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1249 [created] d881484a4


Add keep-alive functionality to Java Driver.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/d881484a
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/d881484a
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/d881484a

Branch: refs/heads/TINKERPOP-1249
Commit: d881484a40ef7c5924e97a1adce7d0a7bf6654ea
Parents: 01d035e
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Thu Sep 22 07:48:59 2016 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Thu Sep 22 07:49:45 2016 -0400

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |  1 +
 .../src/reference/gremlin-applications.asciidoc |  1 +
 .../upgrade/release-3.2.x-incubating.asciidoc   | 10 +++
 .../tinkerpop/gremlin/driver/Channelizer.java   | 39 ++++++++---
 .../tinkerpop/gremlin/driver/Cluster.java       | 21 ++++++
 .../tinkerpop/gremlin/driver/Connection.java    | 36 +++++++++-
 .../gremlin/driver/ConnectionPool.java          |  2 +-
 .../tinkerpop/gremlin/driver/Settings.java      | 12 +++-
 .../driver/handler/WebSocketClientHandler.java  |  1 +
 .../WebSocketGremlinResponseDecoder.java        |  5 +-
 .../server/GremlinDriverIntegrateTest.java      | 69 ++++++++++++++++++++
 11 files changed, 182 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d881484a/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 394cc33..9b323af 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -33,6 +33,7 @@ TinkerPop 3.2.3 (Release Date: NOT OFFICIALLY RELEASED YET)
 * `TraversalRing` returns a `null` if it does not contain traversals 
(previously `IdentityTraversal`).
 * Fixed a `JavaTranslator` bug where `Bytecode` instructions were being 
mutated during translation.
 * Added `Path` to Gremlin-Python with respective GraphSON 2.0 deserializer.
+* Added "keep-alive" functionality to the Java driver, which will send a 
heartbeat to the server when normal request activity on a connection stops for 
a period of time.
 * Renamed the `empty.result.indicator` preference to `result.indicator.null` 
in Gremlin Console
 * If `result.indicator.null` is set to an empty string, then no "result line" 
is printed in Gremlin Console.
 * VertexPrograms can now declare traverser requirements, e.g. to have access 
to the path when used with `.program()`.

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d881484a/docs/src/reference/gremlin-applications.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/reference/gremlin-applications.asciidoc 
b/docs/src/reference/gremlin-applications.asciidoc
index bff9f0f..ce15d66 100644
--- a/docs/src/reference/gremlin-applications.asciidoc
+++ b/docs/src/reference/gremlin-applications.asciidoc
@@ -687,6 +687,7 @@ The following table describes the various configuration 
options for the Gremlin
 |Key |Description |Default
 |connectionPool.channelizer |The fully qualified classname of the client 
`Channelizer` that defines how to connect to the server. 
|`Channelizer.WebSocketChannelizer`
 |connectionPool.enableSsl |Determines if SSL should be enabled or not. If 
enabled on the server then it must be enabled on the client. |false
+|connectionPool.keepAliveInterval |Length of time in milliseconds to wait on 
an idle connection before sending a keep-alive request. Set to zero to disable 
this feature. |1800000
 |connectionPool.keyCertChainFile |The X.509 certificate chain file in PEM 
format. |_none_
 |connectionPool.keyFile |The `PKCS#8` private key file in PEM format. |_none_
 |connectionPool.keyPassword |The password of the `keyFile` if it's not 
password-protected |_none_

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d881484a/docs/src/upgrade/release-3.2.x-incubating.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/upgrade/release-3.2.x-incubating.asciidoc 
b/docs/src/upgrade/release-3.2.x-incubating.asciidoc
index f9c62e2..e85aeb9 100644
--- a/docs/src/upgrade/release-3.2.x-incubating.asciidoc
+++ b/docs/src/upgrade/release-3.2.x-incubating.asciidoc
@@ -59,6 +59,16 @@ gremlin>
 
 See: link:https://issues.apache.org/jira/browse/TINKERPOP-1409[TINKERPOP-1409]
 
+Java Driver Keep-Alive
+^^^^^^^^^^^^^^^^^^^^^^
+
+The Java Driver now has a `keepAliveInterval` setting, which controls the 
amount of time in milliseconds it should wait
+on an inactive connection before it sends a message to the server to keep the 
connection maintained. This should help
+environments that use a load balancer in front of Gremlin Server by ensuring 
connections are actively maintained even
+during periods of inactivity.
+
+See: link:https://issues.apache.org/jira/browse/TINKERPOP-1249[TINKERPOP-1249]
+
 Where Step Supports By-Modulation
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d881484a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
----------------------------------------------------------------------
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index 40be11c..6ed8e0f 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -20,9 +20,7 @@ package org.apache.tinkerpop.gremlin.driver;
 
 import io.netty.channel.Channel;
 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
-import io.netty.handler.ssl.SslContextBuilder;
-import io.netty.handler.ssl.SslProvider;
-import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
 import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
 import org.apache.tinkerpop.gremlin.driver.handler.NioGremlinRequestEncoder;
 import org.apache.tinkerpop.gremlin.driver.handler.NioGremlinResponseDecoder;
@@ -39,10 +37,7 @@ import io.netty.handler.codec.http.HttpObjectAggregator;
 import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
 import io.netty.handler.codec.http.websocketx.WebSocketVersion;
 import io.netty.handler.ssl.SslContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
@@ -70,6 +65,21 @@ public interface Channelizer extends ChannelHandler {
     public void close(final Channel channel);
 
     /**
+     * Create a message for the driver to use as a "keep-alive" for the 
connection. This method will only be used if
+     * {@link #supportsKeepAlive()} is {@code true}.
+     */
+    public default Object createKeepAliveMessage() {
+        return null;
+    }
+
+    /**
+     * Determines if the channelizer supports a method for keeping the 
connection to the server alive.
+     */
+    public default boolean supportsKeepAlive() {
+        return false;
+    }
+
+    /**
      * Called after the channel connects. The {@code Channelizer} may need to 
perform some functions, such as a
      * handshake.
      */
@@ -80,8 +90,6 @@ public interface Channelizer extends ChannelHandler {
      * Base implementation of the client side {@link Channelizer}.
      */
     abstract class AbstractChannelizer extends 
ChannelInitializer<SocketChannel> implements Channelizer {
-        private static final Logger logger = 
LoggerFactory.getLogger(AbstractChannelizer.class);
-
         protected Connection connection;
         protected Cluster cluster;
         private ConcurrentMap<UUID, ResultQueue> pending;
@@ -152,6 +160,21 @@ public interface Channelizer extends ChannelHandler {
         }
 
         /**
+         * Keep-alive is supported through the ping/pong websocket protocol.
+         *
+         * @see <a href=https://tools.ietf.org/html/rfc6455#section-5.5.2>IETF 
RFC 6455</a>
+         */
+        @Override
+        public boolean supportsKeepAlive() {
+            return true;
+        }
+
+        @Override
+        public Object createKeepAliveMessage() {
+            return new PingWebSocketFrame();
+        }
+
+        /**
          * Sends a {@code CloseWebSocketFrame} to the server for the specified 
channel.
          */
         @Override

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d881484a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
----------------------------------------------------------------------
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
index 9c08c3c..f79e719 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
@@ -163,6 +163,7 @@ public final class Cluster {
                 .port(settings.port)
                 .enableSsl(settings.connectionPool.enableSsl)
                 
.trustCertificateChainFile(settings.connectionPool.trustCertChainFile)
+                .keepAliveInterval(settings.connectionPool.keepAliveInterval)
                 .keyCertChainFile(settings.connectionPool.keyCertChainFile)
                 .keyFile(settings.connectionPool.keyFile)
                 .keyPassword(settings.connectionPool.keyPassword)
@@ -388,6 +389,14 @@ public final class Cluster {
     }
 
     /**
+     * Gets time in milliseconds to wait after the last message is sent over a 
connection before sending a keep-alive
+     * message to the server.
+     */
+    public long getKeepAliveInterval() {
+        return manager.connectionPoolSettings.keepAliveInterval;
+    }
+
+    /**
      * Specifies the load balancing strategy to use on the client side.
      */
     public Class<? extends LoadBalancingStrategy> getLoadBalancingStrategy() {
@@ -478,6 +487,7 @@ public final class Cluster {
         private int reconnectInitialDelay = Connection.RECONNECT_INITIAL_DELAY;
         private int reconnectInterval = Connection.RECONNECT_INTERVAL;
         private int resultIterationBatchSize = 
Connection.RESULT_ITERATION_BATCH_SIZE;
+        private long keepAliveInterval = Connection.KEEP_ALIVE_INTERVAL;
         private String channelizer = 
Channelizer.WebSocketChannelizer.class.getName();
         private boolean enableSsl = false;
         private String trustCertChainFile = null;
@@ -573,6 +583,16 @@ public final class Cluster {
         }
 
         /**
+         * Length of time in milliseconds to wait on an idle connection before 
sending a keep-alive request. This
+         * setting is only relevant to {@link Channelizer} implementations 
that return {@code true} for
+         * {@link Channelizer#supportsKeepAlive()}.  Set to zero to disable 
this feature.
+         */
+        public Builder keepAliveInterval(final long keepAliveInterval) {
+            this.keepAliveInterval = keepAliveInterval;
+            return this;
+        }
+
+        /**
          * The X.509 certificate chain file in PEM format.
          */
         public Builder keyCertChainFile(final String keyCertChainFile) {
@@ -871,6 +891,7 @@ public final class Cluster {
             connectionPoolSettings.keyCertChainFile = builder.keyCertChainFile;
             connectionPoolSettings.keyFile = builder.keyFile;
             connectionPoolSettings.keyPassword = builder.keyPassword;
+            connectionPoolSettings.keepAliveInterval = 
builder.keepAliveInterval;
             connectionPoolSettings.channelizer = builder.channelizer;
 
             sslContextOptional = Optional.ofNullable(builder.sslContext);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d881484a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
----------------------------------------------------------------------
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index 9dc93a7..1ef9b98 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -35,6 +35,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -55,6 +56,7 @@ final class Connection {
     private final Cluster cluster;
     private final Client client;
     private final ConnectionPool pool;
+    private final long keepAliveInterval;
 
     public static final int MAX_IN_PROCESS = 4;
     public static final int MIN_IN_PROCESS = 1;
@@ -64,6 +66,7 @@ final class Connection {
     public static final int RECONNECT_INITIAL_DELAY = 1000;
     public static final int RECONNECT_INTERVAL = 1000;
     public static final int RESULT_ITERATION_BATCH_SIZE = 64;
+    public static final long KEEP_ALIVE_INTERVAL = 1800000;
 
     /**
      * When a {@code Connection} is borrowed from the pool, this number is 
incremented to indicate the number of
@@ -82,6 +85,7 @@ final class Connection {
 
     private final AtomicReference<CompletableFuture<Void>> closeFuture = new 
AtomicReference<>();
     private final AtomicBoolean shutdownInitiated = new AtomicBoolean(false);
+    private final AtomicReference<ScheduledFuture> keepAliveFuture = new 
AtomicReference<>();
 
     public Connection(final URI uri, final ConnectionPool pool, final int 
maxInProcess) throws ConnectionException {
         this.uri = uri;
@@ -89,6 +93,7 @@ final class Connection {
         this.client = pool.getClient();
         this.pool = pool;
         this.maxInProcess = maxInProcess;
+        this.keepAliveInterval = pool.settings().keepAliveInterval;
 
         connectionLabel = String.format("Connection{host=%s}", pool.host);
 
@@ -153,6 +158,10 @@ final class Connection {
         if (!closeFuture.compareAndSet(null, future))
             return closeFuture.get();
 
+        // stop any pings being sent at the server for keep-alive
+        final ScheduledFuture keepAlive = keepAliveFuture.get();
+        if (keepAlive != null) keepAlive.cancel(true);
+
         // make sure all requests in the queue are fully processed before 
killing.  if they are then shutdown
         // can be immediate.  if not this method will signal the readCompleted 
future defined in the write()
         // operation to check if it can close.  in this way the connection no 
longer receives writes, but
@@ -181,7 +190,8 @@ final class Connection {
         // once there is a completed write, then create a traverser for the 
result set and complete
         // the promise so that the client knows that that it can start 
checking for results.
         final Connection thisConnection = this;
-        final ChannelPromise promise = channel.newPromise()
+
+        final ChannelPromise requestPromise = channel.newPromise()
                 .addListener(f -> {
                     if (!f.isSuccess()) {
                         if (logger.isDebugEnabled())
@@ -234,9 +244,29 @@ final class Connection {
                                 requestMessage, pool.host));
                     }
                 });
-        channel.writeAndFlush(requestMessage, promise);
+        channel.writeAndFlush(requestMessage, requestPromise);
+
+        // try to keep the connection alive if the channel allows such things 
- websockets will
+        if (channelizer.supportsKeepAlive() && keepAliveInterval > 0) {
+
+            final ScheduledFuture oldKeepAliveFuture = 
keepAliveFuture.getAndSet(cluster.executor().scheduleAtFixedRate(() -> {
+                logger.debug("Request sent to server to keep {} alive", 
thisConnection);
+                try {
+                    
channel.writeAndFlush(channelizer.createKeepAliveMessage());
+                } catch (Exception ex) {
+                    // will just log this for now - a future real request can 
be responsible for the failure that
+                    // marks the host as dead. this also may not mean the host 
is actually dead. more robust handling
+                    // is in play for real requests, not this simple ping
+                    logger.warn(String.format("Keep-alive did not succeed on 
%s", thisConnection), ex);
+                }
+            }, keepAliveInterval, keepAliveInterval, TimeUnit.MILLISECONDS));
+
+            // try to cancel the old future if it's still un-executed - no 
need to ping since a new write has come
+            // through on the connection
+            if (oldKeepAliveFuture != null) oldKeepAliveFuture.cancel(true);
+        }
 
-        return promise;
+        return requestPromise;
     }
 
     public void returnToPool() {

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d881484a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
----------------------------------------------------------------------
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
index 9955e82..f0d9044 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
@@ -170,7 +170,7 @@ final class ConnectionPool {
         logger.debug("Attempting to return {} on {}", connection, host);
         if (isClosed()) throw new ConnectionException(host.getHostUri(), 
host.getAddress(), "Pool is shutdown");
 
-        int borrowed = connection.borrowed.decrementAndGet();
+        final int borrowed = connection.borrowed.decrementAndGet();
         if (connection.isDead()) {
             logger.debug("Marking {} as dead", this.host);
             considerUnavailable();

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d881484a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
----------------------------------------------------------------------
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
index 43014ce..41a697c 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
@@ -20,7 +20,6 @@ package org.apache.tinkerpop.gremlin.driver;
 
 import org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV1d0;
 import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.driver.ser.GraphSONMessageSerializerV1d0;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import org.yaml.snakeyaml.TypeDescription;
 import org.yaml.snakeyaml.Yaml;
@@ -33,7 +32,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.Properties;
 import java.util.stream.Collectors;
 
 /**
@@ -207,6 +205,9 @@ final class Settings {
             if (connectionPoolConf.containsKey("resultIterationBatchSize"))
                 cpSettings.resultIterationBatchSize = 
connectionPoolConf.getInt("resultIterationBatchSize");
 
+            if (connectionPoolConf.containsKey("keepAliveInterval"))
+                cpSettings.keepAliveInterval = 
connectionPoolConf.getLong("keepAliveInterval");
+
 
             settings.connectionPool = cpSettings;
         }
@@ -251,6 +252,13 @@ final class Settings {
         public int maxSize = ConnectionPool.MAX_POOL_SIZE;
 
         /**
+         * Length of time in milliseconds to wait on an idle connection before 
sending a keep-alive request. This
+         * setting is only relevant to {@link Channelizer} implementations 
that return {@code true} for
+         * {@link Channelizer#supportsKeepAlive()}. Set to zero to disable 
this feature.
+         */
+        public long keepAliveInterval = Connection.KEEP_ALIVE_INTERVAL;
+
+        /**
          * A connection under low use can be destroyed. This setting 
determines the threshold for determining when
          * that connection can be released and is defaulted to 8.
          */

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d881484a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
----------------------------------------------------------------------
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
index 922775e..5ba0f1b 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
@@ -86,6 +86,7 @@ public final class WebSocketClientHandler extends 
SimpleChannelInboundHandler<Ob
         if (frame instanceof TextWebSocketFrame) {
             ctx.fireChannelRead(frame.retain(2));
         } else if (frame instanceof PongWebSocketFrame) {
+            logger.debug("Received response from keep-alive request");
         } else if (frame instanceof BinaryWebSocketFrame) {
             ctx.fireChannelRead(frame.retain(2));
         } else if (frame instanceof CloseWebSocketFrame)

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d881484a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketGremlinResponseDecoder.java
----------------------------------------------------------------------
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketGremlinResponseDecoder.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketGremlinResponseDecoder.java
index 0f24a9a..383e5a5 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketGremlinResponseDecoder.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketGremlinResponseDecoder.java
@@ -18,6 +18,7 @@
  */
 package org.apache.tinkerpop.gremlin.driver.handler;
 
+import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
 import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
 import org.apache.tinkerpop.gremlin.driver.ser.MessageTextSerializer;
 import io.netty.channel.ChannelHandler;
@@ -47,10 +48,12 @@ public final class WebSocketGremlinResponseDecoder extends 
MessageToMessageDecod
             if (webSocketFrame instanceof BinaryWebSocketFrame) {
                 final BinaryWebSocketFrame tf = (BinaryWebSocketFrame) 
webSocketFrame;
                 objects.add(serializer.deserializeResponse(tf.content()));
-            } else {
+            } else if (webSocketFrame instanceof TextWebSocketFrame){
                 final TextWebSocketFrame tf = (TextWebSocketFrame) 
webSocketFrame;
                 final MessageTextSerializer textSerializer = 
(MessageTextSerializer) serializer;
                 objects.add(textSerializer.deserializeResponse(tf.text()));
+            } else {
+                throw new RuntimeException(String.format("WebSocket channel 
does not handle this type of message: %s", 
webSocketFrame.getClass().getName()));
             }
         } finally {
             ReferenceCountUtil.release(webSocketFrame);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d881484a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
----------------------------------------------------------------------
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index 04faa29..96cde54 100644
--- 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -19,6 +19,7 @@
 package org.apache.tinkerpop.gremlin.server;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.log4j.Level;
 import org.apache.tinkerpop.gremlin.TestHelper;
 import org.apache.tinkerpop.gremlin.driver.Channelizer;
 import org.apache.tinkerpop.gremlin.driver.Client;
@@ -27,6 +28,7 @@ import org.apache.tinkerpop.gremlin.driver.Result;
 import org.apache.tinkerpop.gremlin.driver.ResultSet;
 import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
 import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
+import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler;
 import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
 import org.apache.tinkerpop.gremlin.driver.ser.JsonBuilderGryoSerializer;
 import org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV1d0;
@@ -35,11 +37,14 @@ import 
org.apache.tinkerpop.gremlin.server.channel.NioChannelizer;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex;
 import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory;
+import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
 import org.apache.tinkerpop.gremlin.util.TimeUtil;
 import groovy.json.JsonBuilder;
 import org.apache.tinkerpop.gremlin.util.function.FunctionUtils;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import org.hamcrest.core.IsInstanceOf;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -68,6 +73,9 @@ import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.endsWith;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.AllOf.allOf;
+import static org.hamcrest.number.OrderingComparison.greaterThan;
+import static org.hamcrest.number.OrderingComparison.lessThanOrEqualTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -83,6 +91,37 @@ import static org.hamcrest.core.StringStartsWith.startsWith;
  */
 public class GremlinDriverIntegrateTest extends 
AbstractGremlinServerIntegrationTest {
     private static final Logger logger = 
LoggerFactory.getLogger(GremlinDriverIntegrateTest.class);
+
+    private Log4jRecordingAppender recordingAppender = null;
+    private Level previousLogLevel;
+
+    @Before
+    public void setupForEachTest() {
+        recordingAppender = new Log4jRecordingAppender();
+        final org.apache.log4j.Logger rootLogger = 
org.apache.log4j.Logger.getRootLogger();
+
+        if (name.getMethodName().equals("shouldKeepAliveForWebSockets")) {
+            final org.apache.log4j.Logger webSocketClientHandlerLogger = 
org.apache.log4j.Logger.getLogger(WebSocketClientHandler.class);
+            previousLogLevel = webSocketClientHandlerLogger.getLevel();
+            webSocketClientHandlerLogger.setLevel(Level.DEBUG);
+        }
+
+        rootLogger.addAppender(recordingAppender);
+    }
+
+    @After
+    public void teardownForEachTest() {
+        final org.apache.log4j.Logger rootLogger = 
org.apache.log4j.Logger.getRootLogger();
+
+        if (name.getMethodName().equals("shouldKeepAliveForWebSockets")) {
+            final org.apache.log4j.Logger webSocketClientHandlerLogger = 
org.apache.log4j.Logger.getLogger(WebSocketClientHandler.class);
+            previousLogLevel = webSocketClientHandlerLogger.getLevel();
+            webSocketClientHandlerLogger.setLevel(previousLogLevel);
+        }
+
+        rootLogger.removeAppender(recordingAppender);
+    }
+
     /**
      * Configure specific Gremlin Server settings for specific tests.
      */
@@ -137,6 +176,36 @@ public class GremlinDriverIntegrateTest extends 
AbstractGremlinServerIntegration
     }
 
     @Test
+    public void shouldKeepAliveForWebSockets() throws Exception {
+        // keep the connection pool size at 1 to remove the possibility of 
lots of connections trying to ping which will
+        // complicate the assertion logic
+        final Cluster cluster = Cluster.build().
+                minConnectionPoolSize(1).
+                maxConnectionPoolSize(1).
+                keepAliveInterval(1000).create();
+        final Client client = cluster.connect();
+
+        // fire up lots of requests so as to schedule/deschedule lots of ping 
jobs
+        for (int ix = 0; ix < 500; ix++) {
+            assertEquals(2, client.submit("1+1").all().get().get(0).getInt());
+        }
+
+        // don't send any messages for a bit so that the driver pings in the 
background
+        Thread.sleep(3000);
+
+        // make sure no bonus messages sorta fire off once we get back to 
sending requests
+        for (int ix = 0; ix < 500; ix++) {
+            assertEquals(2, client.submit("1+1").all().get().get(0).getInt());
+        }
+
+        // there really shouldn't be more than 3 of these sent. should 
definitely be at least one though
+        final long messages = 
recordingAppender.getMessages().stream().filter(m -> m.contains("Received 
response from keep-alive request")).count();
+        assertThat(messages, allOf(greaterThan(0L), lessThanOrEqualTo(3L)));
+
+        cluster.close();
+    }
+
+    @Test
     public void shouldEventuallySucceedAfterChannelLevelError() throws 
Exception {
         final Cluster cluster = Cluster.build().addContactPoint("localhost")
                 .reconnectIntialDelay(500)

Reply via email to