Add keep-alive functionality to Java Driver.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/d881484a Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/d881484a Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/d881484a Branch: refs/heads/TINKERPOP-790 Commit: d881484a40ef7c5924e97a1adce7d0a7bf6654ea Parents: 01d035e Author: Stephen Mallette <[email protected]> Authored: Thu Sep 22 07:48:59 2016 -0400 Committer: Stephen Mallette <[email protected]> Committed: Thu Sep 22 07:49:45 2016 -0400 ---------------------------------------------------------------------- CHANGELOG.asciidoc | 1 + .../src/reference/gremlin-applications.asciidoc | 1 + .../upgrade/release-3.2.x-incubating.asciidoc | 10 +++ .../tinkerpop/gremlin/driver/Channelizer.java | 39 ++++++++--- .../tinkerpop/gremlin/driver/Cluster.java | 21 ++++++ .../tinkerpop/gremlin/driver/Connection.java | 36 +++++++++- .../gremlin/driver/ConnectionPool.java | 2 +- .../tinkerpop/gremlin/driver/Settings.java | 12 +++- .../driver/handler/WebSocketClientHandler.java | 1 + .../WebSocketGremlinResponseDecoder.java | 5 +- .../server/GremlinDriverIntegrateTest.java | 69 ++++++++++++++++++++ 11 files changed, 182 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d881484a/CHANGELOG.asciidoc ---------------------------------------------------------------------- diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 394cc33..9b323af 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -33,6 +33,7 @@ TinkerPop 3.2.3 (Release Date: NOT OFFICIALLY RELEASED YET) * `TraversalRing` returns a `null` if it does not contain traversals (previously `IdentityTraversal`). * Fixed a `JavaTranslator` bug where `Bytecode` instructions were being mutated during translation. * Added `Path` to Gremlin-Python with respective GraphSON 2.0 deserializer. +* Added "keep-alive" functionality to the Java driver, which will send a heartbeat to the server when normal request activity on a connection stops for a period of time. * Renamed the `empty.result.indicator` preference to `result.indicator.null` in Gremlin Console * If `result.indicator.null` is set to an empty string, then no "result line" is printed in Gremlin Console. * VertexPrograms can now declare traverser requirements, e.g. to have access to the path when used with `.program()`. http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d881484a/docs/src/reference/gremlin-applications.asciidoc ---------------------------------------------------------------------- diff --git a/docs/src/reference/gremlin-applications.asciidoc b/docs/src/reference/gremlin-applications.asciidoc index bff9f0f..ce15d66 100644 --- a/docs/src/reference/gremlin-applications.asciidoc +++ b/docs/src/reference/gremlin-applications.asciidoc @@ -687,6 +687,7 @@ The following table describes the various configuration options for the Gremlin |Key |Description |Default |connectionPool.channelizer |The fully qualified classname of the client `Channelizer` that defines how to connect to the server. |`Channelizer.WebSocketChannelizer` |connectionPool.enableSsl |Determines if SSL should be enabled or not. If enabled on the server then it must be enabled on the client. |false +|connectionPool.keepAliveInterval |Length of time in milliseconds to wait on an idle connection before sending a keep-alive request. Set to zero to disable this feature. |1800000 |connectionPool.keyCertChainFile |The X.509 certificate chain file in PEM format. |_none_ |connectionPool.keyFile |The `PKCS#8` private key file in PEM format. |_none_ |connectionPool.keyPassword |The password of the `keyFile` if it's not password-protected |_none_ http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d881484a/docs/src/upgrade/release-3.2.x-incubating.asciidoc ---------------------------------------------------------------------- diff --git a/docs/src/upgrade/release-3.2.x-incubating.asciidoc b/docs/src/upgrade/release-3.2.x-incubating.asciidoc index f9c62e2..e85aeb9 100644 --- a/docs/src/upgrade/release-3.2.x-incubating.asciidoc +++ b/docs/src/upgrade/release-3.2.x-incubating.asciidoc @@ -59,6 +59,16 @@ gremlin> See: link:https://issues.apache.org/jira/browse/TINKERPOP-1409[TINKERPOP-1409] +Java Driver Keep-Alive +^^^^^^^^^^^^^^^^^^^^^^ + +The Java Driver now has a `keepAliveInterval` setting, which controls the amount of time in milliseconds it should wait +on an inactive connection before it sends a message to the server to keep the connection maintained. This should help +environments that use a load balancer in front of Gremlin Server by ensuring connections are actively maintained even +during periods of inactivity. + +See: link:https://issues.apache.org/jira/browse/TINKERPOP-1249[TINKERPOP-1249] + Where Step Supports By-Modulation ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d881484a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java ---------------------------------------------------------------------- diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java index 40be11c..6ed8e0f 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java @@ -20,9 +20,7 @@ package org.apache.tinkerpop.gremlin.driver; import io.netty.channel.Channel; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; -import io.netty.handler.ssl.SslContextBuilder; -import io.netty.handler.ssl.SslProvider; -import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException; import org.apache.tinkerpop.gremlin.driver.handler.NioGremlinRequestEncoder; import org.apache.tinkerpop.gremlin.driver.handler.NioGremlinResponseDecoder; @@ -39,10 +37,7 @@ import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory; import io.netty.handler.codec.http.websocketx.WebSocketVersion; import io.netty.handler.ssl.SslContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.io.File; import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentMap; @@ -70,6 +65,21 @@ public interface Channelizer extends ChannelHandler { public void close(final Channel channel); /** + * Create a message for the driver to use as a "keep-alive" for the connection. This method will only be used if + * {@link #supportsKeepAlive()} is {@code true}. + */ + public default Object createKeepAliveMessage() { + return null; + } + + /** + * Determines if the channelizer supports a method for keeping the connection to the server alive. + */ + public default boolean supportsKeepAlive() { + return false; + } + + /** * Called after the channel connects. The {@code Channelizer} may need to perform some functions, such as a * handshake. */ @@ -80,8 +90,6 @@ public interface Channelizer extends ChannelHandler { * Base implementation of the client side {@link Channelizer}. */ abstract class AbstractChannelizer extends ChannelInitializer<SocketChannel> implements Channelizer { - private static final Logger logger = LoggerFactory.getLogger(AbstractChannelizer.class); - protected Connection connection; protected Cluster cluster; private ConcurrentMap<UUID, ResultQueue> pending; @@ -152,6 +160,21 @@ public interface Channelizer extends ChannelHandler { } /** + * Keep-alive is supported through the ping/pong websocket protocol. + * + * @see <a href=https://tools.ietf.org/html/rfc6455#section-5.5.2>IETF RFC 6455</a> + */ + @Override + public boolean supportsKeepAlive() { + return true; + } + + @Override + public Object createKeepAliveMessage() { + return new PingWebSocketFrame(); + } + + /** * Sends a {@code CloseWebSocketFrame} to the server for the specified channel. */ @Override http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d881484a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java ---------------------------------------------------------------------- diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java index 9c08c3c..f79e719 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java @@ -163,6 +163,7 @@ public final class Cluster { .port(settings.port) .enableSsl(settings.connectionPool.enableSsl) .trustCertificateChainFile(settings.connectionPool.trustCertChainFile) + .keepAliveInterval(settings.connectionPool.keepAliveInterval) .keyCertChainFile(settings.connectionPool.keyCertChainFile) .keyFile(settings.connectionPool.keyFile) .keyPassword(settings.connectionPool.keyPassword) @@ -388,6 +389,14 @@ public final class Cluster { } /** + * Gets time in milliseconds to wait after the last message is sent over a connection before sending a keep-alive + * message to the server. + */ + public long getKeepAliveInterval() { + return manager.connectionPoolSettings.keepAliveInterval; + } + + /** * Specifies the load balancing strategy to use on the client side. */ public Class<? extends LoadBalancingStrategy> getLoadBalancingStrategy() { @@ -478,6 +487,7 @@ public final class Cluster { private int reconnectInitialDelay = Connection.RECONNECT_INITIAL_DELAY; private int reconnectInterval = Connection.RECONNECT_INTERVAL; private int resultIterationBatchSize = Connection.RESULT_ITERATION_BATCH_SIZE; + private long keepAliveInterval = Connection.KEEP_ALIVE_INTERVAL; private String channelizer = Channelizer.WebSocketChannelizer.class.getName(); private boolean enableSsl = false; private String trustCertChainFile = null; @@ -573,6 +583,16 @@ public final class Cluster { } /** + * Length of time in milliseconds to wait on an idle connection before sending a keep-alive request. This + * setting is only relevant to {@link Channelizer} implementations that return {@code true} for + * {@link Channelizer#supportsKeepAlive()}. Set to zero to disable this feature. + */ + public Builder keepAliveInterval(final long keepAliveInterval) { + this.keepAliveInterval = keepAliveInterval; + return this; + } + + /** * The X.509 certificate chain file in PEM format. */ public Builder keyCertChainFile(final String keyCertChainFile) { @@ -871,6 +891,7 @@ public final class Cluster { connectionPoolSettings.keyCertChainFile = builder.keyCertChainFile; connectionPoolSettings.keyFile = builder.keyFile; connectionPoolSettings.keyPassword = builder.keyPassword; + connectionPoolSettings.keepAliveInterval = builder.keepAliveInterval; connectionPoolSettings.channelizer = builder.channelizer; sslContextOptional = Optional.ofNullable(builder.sslContext); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d881484a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java ---------------------------------------------------------------------- diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java index 9dc93a7..1ef9b98 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java @@ -35,6 +35,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -55,6 +56,7 @@ final class Connection { private final Cluster cluster; private final Client client; private final ConnectionPool pool; + private final long keepAliveInterval; public static final int MAX_IN_PROCESS = 4; public static final int MIN_IN_PROCESS = 1; @@ -64,6 +66,7 @@ final class Connection { public static final int RECONNECT_INITIAL_DELAY = 1000; public static final int RECONNECT_INTERVAL = 1000; public static final int RESULT_ITERATION_BATCH_SIZE = 64; + public static final long KEEP_ALIVE_INTERVAL = 1800000; /** * When a {@code Connection} is borrowed from the pool, this number is incremented to indicate the number of @@ -82,6 +85,7 @@ final class Connection { private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>(); private final AtomicBoolean shutdownInitiated = new AtomicBoolean(false); + private final AtomicReference<ScheduledFuture> keepAliveFuture = new AtomicReference<>(); public Connection(final URI uri, final ConnectionPool pool, final int maxInProcess) throws ConnectionException { this.uri = uri; @@ -89,6 +93,7 @@ final class Connection { this.client = pool.getClient(); this.pool = pool; this.maxInProcess = maxInProcess; + this.keepAliveInterval = pool.settings().keepAliveInterval; connectionLabel = String.format("Connection{host=%s}", pool.host); @@ -153,6 +158,10 @@ final class Connection { if (!closeFuture.compareAndSet(null, future)) return closeFuture.get(); + // stop any pings being sent at the server for keep-alive + final ScheduledFuture keepAlive = keepAliveFuture.get(); + if (keepAlive != null) keepAlive.cancel(true); + // make sure all requests in the queue are fully processed before killing. if they are then shutdown // can be immediate. if not this method will signal the readCompleted future defined in the write() // operation to check if it can close. in this way the connection no longer receives writes, but @@ -181,7 +190,8 @@ final class Connection { // once there is a completed write, then create a traverser for the result set and complete // the promise so that the client knows that that it can start checking for results. final Connection thisConnection = this; - final ChannelPromise promise = channel.newPromise() + + final ChannelPromise requestPromise = channel.newPromise() .addListener(f -> { if (!f.isSuccess()) { if (logger.isDebugEnabled()) @@ -234,9 +244,29 @@ final class Connection { requestMessage, pool.host)); } }); - channel.writeAndFlush(requestMessage, promise); + channel.writeAndFlush(requestMessage, requestPromise); + + // try to keep the connection alive if the channel allows such things - websockets will + if (channelizer.supportsKeepAlive() && keepAliveInterval > 0) { + + final ScheduledFuture oldKeepAliveFuture = keepAliveFuture.getAndSet(cluster.executor().scheduleAtFixedRate(() -> { + logger.debug("Request sent to server to keep {} alive", thisConnection); + try { + channel.writeAndFlush(channelizer.createKeepAliveMessage()); + } catch (Exception ex) { + // will just log this for now - a future real request can be responsible for the failure that + // marks the host as dead. this also may not mean the host is actually dead. more robust handling + // is in play for real requests, not this simple ping + logger.warn(String.format("Keep-alive did not succeed on %s", thisConnection), ex); + } + }, keepAliveInterval, keepAliveInterval, TimeUnit.MILLISECONDS)); + + // try to cancel the old future if it's still un-executed - no need to ping since a new write has come + // through on the connection + if (oldKeepAliveFuture != null) oldKeepAliveFuture.cancel(true); + } - return promise; + return requestPromise; } public void returnToPool() { http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d881484a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java ---------------------------------------------------------------------- diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java index 9955e82..f0d9044 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java @@ -170,7 +170,7 @@ final class ConnectionPool { logger.debug("Attempting to return {} on {}", connection, host); if (isClosed()) throw new ConnectionException(host.getHostUri(), host.getAddress(), "Pool is shutdown"); - int borrowed = connection.borrowed.decrementAndGet(); + final int borrowed = connection.borrowed.decrementAndGet(); if (connection.isDead()) { logger.debug("Marking {} as dead", this.host); considerUnavailable(); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d881484a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java ---------------------------------------------------------------------- diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java index 43014ce..41a697c 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java @@ -20,7 +20,6 @@ package org.apache.tinkerpop.gremlin.driver; import org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV1d0; import org.apache.commons.configuration.Configuration; -import org.apache.tinkerpop.gremlin.driver.ser.GraphSONMessageSerializerV1d0; import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; import org.yaml.snakeyaml.TypeDescription; import org.yaml.snakeyaml.Yaml; @@ -33,7 +32,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.Properties; import java.util.stream.Collectors; /** @@ -207,6 +205,9 @@ final class Settings { if (connectionPoolConf.containsKey("resultIterationBatchSize")) cpSettings.resultIterationBatchSize = connectionPoolConf.getInt("resultIterationBatchSize"); + if (connectionPoolConf.containsKey("keepAliveInterval")) + cpSettings.keepAliveInterval = connectionPoolConf.getLong("keepAliveInterval"); + settings.connectionPool = cpSettings; } @@ -251,6 +252,13 @@ final class Settings { public int maxSize = ConnectionPool.MAX_POOL_SIZE; /** + * Length of time in milliseconds to wait on an idle connection before sending a keep-alive request. This + * setting is only relevant to {@link Channelizer} implementations that return {@code true} for + * {@link Channelizer#supportsKeepAlive()}. Set to zero to disable this feature. + */ + public long keepAliveInterval = Connection.KEEP_ALIVE_INTERVAL; + + /** * A connection under low use can be destroyed. This setting determines the threshold for determining when * that connection can be released and is defaulted to 8. */ http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d881484a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java ---------------------------------------------------------------------- diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java index 922775e..5ba0f1b 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java @@ -86,6 +86,7 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob if (frame instanceof TextWebSocketFrame) { ctx.fireChannelRead(frame.retain(2)); } else if (frame instanceof PongWebSocketFrame) { + logger.debug("Received response from keep-alive request"); } else if (frame instanceof BinaryWebSocketFrame) { ctx.fireChannelRead(frame.retain(2)); } else if (frame instanceof CloseWebSocketFrame) http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d881484a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketGremlinResponseDecoder.java ---------------------------------------------------------------------- diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketGremlinResponseDecoder.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketGremlinResponseDecoder.java index 0f24a9a..383e5a5 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketGremlinResponseDecoder.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketGremlinResponseDecoder.java @@ -18,6 +18,7 @@ */ package org.apache.tinkerpop.gremlin.driver.handler; +import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; import org.apache.tinkerpop.gremlin.driver.MessageSerializer; import org.apache.tinkerpop.gremlin.driver.ser.MessageTextSerializer; import io.netty.channel.ChannelHandler; @@ -47,10 +48,12 @@ public final class WebSocketGremlinResponseDecoder extends MessageToMessageDecod if (webSocketFrame instanceof BinaryWebSocketFrame) { final BinaryWebSocketFrame tf = (BinaryWebSocketFrame) webSocketFrame; objects.add(serializer.deserializeResponse(tf.content())); - } else { + } else if (webSocketFrame instanceof TextWebSocketFrame){ final TextWebSocketFrame tf = (TextWebSocketFrame) webSocketFrame; final MessageTextSerializer textSerializer = (MessageTextSerializer) serializer; objects.add(textSerializer.deserializeResponse(tf.text())); + } else { + throw new RuntimeException(String.format("WebSocket channel does not handle this type of message: %s", webSocketFrame.getClass().getName())); } } finally { ReferenceCountUtil.release(webSocketFrame); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d881484a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java ---------------------------------------------------------------------- diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java index 04faa29..96cde54 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java @@ -19,6 +19,7 @@ package org.apache.tinkerpop.gremlin.server; import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.log4j.Level; import org.apache.tinkerpop.gremlin.TestHelper; import org.apache.tinkerpop.gremlin.driver.Channelizer; import org.apache.tinkerpop.gremlin.driver.Client; @@ -27,6 +28,7 @@ import org.apache.tinkerpop.gremlin.driver.Result; import org.apache.tinkerpop.gremlin.driver.ResultSet; import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException; import org.apache.tinkerpop.gremlin.driver.exception.ResponseException; +import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler; import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode; import org.apache.tinkerpop.gremlin.driver.ser.JsonBuilderGryoSerializer; import org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV1d0; @@ -35,11 +37,14 @@ import org.apache.tinkerpop.gremlin.server.channel.NioChannelizer; import org.apache.tinkerpop.gremlin.structure.Vertex; import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex; import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory; +import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender; import org.apache.tinkerpop.gremlin.util.TimeUtil; import groovy.json.JsonBuilder; import org.apache.tinkerpop.gremlin.util.function.FunctionUtils; import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; import org.hamcrest.core.IsInstanceOf; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,6 +73,9 @@ import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.endsWith; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.AllOf.allOf; +import static org.hamcrest.number.OrderingComparison.greaterThan; +import static org.hamcrest.number.OrderingComparison.lessThanOrEqualTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; @@ -83,6 +91,37 @@ import static org.hamcrest.core.StringStartsWith.startsWith; */ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegrationTest { private static final Logger logger = LoggerFactory.getLogger(GremlinDriverIntegrateTest.class); + + private Log4jRecordingAppender recordingAppender = null; + private Level previousLogLevel; + + @Before + public void setupForEachTest() { + recordingAppender = new Log4jRecordingAppender(); + final org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getRootLogger(); + + if (name.getMethodName().equals("shouldKeepAliveForWebSockets")) { + final org.apache.log4j.Logger webSocketClientHandlerLogger = org.apache.log4j.Logger.getLogger(WebSocketClientHandler.class); + previousLogLevel = webSocketClientHandlerLogger.getLevel(); + webSocketClientHandlerLogger.setLevel(Level.DEBUG); + } + + rootLogger.addAppender(recordingAppender); + } + + @After + public void teardownForEachTest() { + final org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getRootLogger(); + + if (name.getMethodName().equals("shouldKeepAliveForWebSockets")) { + final org.apache.log4j.Logger webSocketClientHandlerLogger = org.apache.log4j.Logger.getLogger(WebSocketClientHandler.class); + previousLogLevel = webSocketClientHandlerLogger.getLevel(); + webSocketClientHandlerLogger.setLevel(previousLogLevel); + } + + rootLogger.removeAppender(recordingAppender); + } + /** * Configure specific Gremlin Server settings for specific tests. */ @@ -137,6 +176,36 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration } @Test + public void shouldKeepAliveForWebSockets() throws Exception { + // keep the connection pool size at 1 to remove the possibility of lots of connections trying to ping which will + // complicate the assertion logic + final Cluster cluster = Cluster.build(). + minConnectionPoolSize(1). + maxConnectionPoolSize(1). + keepAliveInterval(1000).create(); + final Client client = cluster.connect(); + + // fire up lots of requests so as to schedule/deschedule lots of ping jobs + for (int ix = 0; ix < 500; ix++) { + assertEquals(2, client.submit("1+1").all().get().get(0).getInt()); + } + + // don't send any messages for a bit so that the driver pings in the background + Thread.sleep(3000); + + // make sure no bonus messages sorta fire off once we get back to sending requests + for (int ix = 0; ix < 500; ix++) { + assertEquals(2, client.submit("1+1").all().get().get(0).getInt()); + } + + // there really shouldn't be more than 3 of these sent. should definitely be at least one though + final long messages = recordingAppender.getMessages().stream().filter(m -> m.contains("Received response from keep-alive request")).count(); + assertThat(messages, allOf(greaterThan(0L), lessThanOrEqualTo(3L))); + + cluster.close(); + } + + @Test public void shouldEventuallySucceedAfterChannelLevelError() throws Exception { final Cluster cluster = Cluster.build().addContactPoint("localhost") .reconnectIntialDelay(500)
