This is an automated email from the ASF dual-hosted git repository. spmallette pushed a commit to branch driver-35 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 9a037b4b14eb6da02eb6af9db15ff77d886333a2 Author: stephen <[email protected]> AuthorDate: Thu Dec 12 07:33:43 2019 -0500 Fixed some problems with closing sessions and waiting for handshakes --- .../apache/tinkerpop/gremlin/driver/Channelizer.java | 16 +++++++++++----- .../org/apache/tinkerpop/gremlin/driver/Client.java | 13 ++++++++----- .../org/apache/tinkerpop/gremlin/driver/Connection.java | 2 +- .../gremlin/driver/handler/WebSocketClientHandler.java | 2 ++ .../gremlin/server/handler/OpExecutorHandler.java | 10 ++++++++++ .../gremlin/server/GremlinServerSslIntegrateTest.java | 17 +++++++++++------ 6 files changed, 43 insertions(+), 17 deletions(-) diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java index 10cfacc..19c35c0 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java @@ -168,21 +168,27 @@ public interface Channelizer extends ChannelHandler { public void connected(final Channel ch) { try { // be sure the handshake is done - if the handshake takes longer than the specified time there's - // gotta be issues with that server. a common problem where this comes up: SSL is enabled on the + // gotta be issues with that server. a common problem where this comes up: SSL is enabled on the // server, but the client forgot to enable it or perhaps the server is not configured for websockets. - final ChannelFuture handshakeFuture = ((WebSocketClientHandler)(ch.pipeline().get("ws-client-handler"))).handshakeFuture(); + // we see this because a normal websocket handshake is sent to the server and it can't be decoded by + // SSL and no response get sent back so the handshake has no idea what to do. used the + // maxWaitForConnection here in the same way it was used as the netty websocket handshaker timeout. + // the wait for the overall connection should trigger before this. + final ChannelFuture handshakeFuture = ((WebSocketClientHandler) (ch.pipeline().get("ws-client-handler"))).handshakeFuture(); if (!handshakeFuture.isDone()) { handshakeFuture.addListener(f -> { if (!f.isSuccess()) { throw new ConnectionException(connectionPool.getHost().getHostUri(), "Could not complete websocket handshake - ensure that client protocol matches server", f.cause()); } - }).sync(); + }).get(cluster.connectionPoolSettings().maxWaitForConnection, TimeUnit.MILLISECONDS); } - } catch (InterruptedException ex) { + } catch (ExecutionException ex) { + throw new RuntimeException(ex); + } catch (InterruptedException | TimeoutException ex) { // catching the InterruptedException will reset the interrupted flag. This is intentional. throw new RuntimeException(new ConnectionException(connectionPool.getHost().getHostUri(), - "Timed out while performing websocket handshake - ensure that client protocol matches server", ex.getCause())); + "Timed out while performing websocket handshake - ensure that client protocol matches server", ex.getCause())); } } } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java index ae48e4f..3c9a7b9 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java @@ -781,23 +781,26 @@ public abstract class Client { final RequestMessage closeMessage = buildMessage(RequestMessage.build(Tokens.OPS_CLOSE) .addArg(Tokens.ARGS_FORCE, forceClose)).create(); - closing.set(CompletableFuture.supplyAsync(() -> { + // need to submit from here because after the future is set to closing we can't send anymore messages. + final CompletableFuture<ResultSet> closeRequestFuture = submitAsync(closeMessage); + + closing.set(CompletableFuture.runAsync(() -> { try { // block this up until we get a response from the server or an exception. it might not be accurate // to wait for maxWaitForSessionClose because we wait that long for this future in calls to close() // but in either case we don't want to wait longer than that so perhaps this is still a sensible // wait time - or at least better than something hardcoded. this wait will just expire a bit after // the close() call's expiration....at least i think that's right. - submitAsync(closeMessage).get( + closeRequestFuture.get( cluster.connectionPoolSettings().maxWaitForSessionClose, TimeUnit.MILLISECONDS).all().get(); - } catch (Exception ignored) { + } catch (Exception ex) { // ignored - if the close message doesn't get to the server it's not a real worry. the server will // eventually kill the session + logger.warn("Session close request failed for [" + this.sessionId + "]- the server will close the session once it has been determined to be idle or during shutdown", ex); } finally { connectionPool.closeAsync(); } - return null; - }, cluster.executor())); + })); return closing.get(); } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java index c1a0b5a..417a170 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java @@ -37,7 +37,7 @@ import java.util.concurrent.CompletableFuture; */ public interface Connection { - int DEFAULT_MAX_WAIT_FOR_CONNECTION = 3000; + int DEFAULT_MAX_WAIT_FOR_CONNECTION = 30000; int DEFAULT_MAX_WAIT_FOR_SESSION_CLOSE = 3000; int DEFAULT_MAX_CONTENT_LENGTH = 65536; int DEFAULT_RECONNECT_INTERVAL = 1000; diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java index 95f6923..e79944e 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java @@ -29,6 +29,8 @@ import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler; import io.netty.handler.codec.http.websocketx.WebSocketFrame; +import io.netty.handler.ssl.SslCompletionEvent; +import io.netty.handler.ssl.SslHandshakeCompletionEvent; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.ReferenceCountUtil; diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpExecutorHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpExecutorHandler.java index 068c332..60e0177 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpExecutorHandler.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/OpExecutorHandler.java @@ -18,6 +18,7 @@ */ package org.apache.tinkerpop.gremlin.server.handler; +import io.netty.handler.ssl.NotSslRecordException; import org.apache.tinkerpop.gremlin.driver.message.RequestMessage; import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage; import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode; @@ -82,4 +83,13 @@ public class OpExecutorHandler extends SimpleChannelInboundHandler<Pair<RequestM ReferenceCountUtil.release(objects); } } + + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception { + // basic catch all for server exceptions + logger.error(cause.getMessage(), cause); + if (cause.getCause() instanceof NotSslRecordException) { + ctx.close(); + } + } } diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSslIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSslIntegrateTest.java index ba562a1..7dfe566 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSslIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSslIntegrateTest.java @@ -38,6 +38,10 @@ import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +/** + * Tests we know will fail have Shorten the wait for a connection settings as there is really no need to wait the full + * time just to get a failure. + */ public class GremlinServerSslIntegrateTest extends AbstractGremlinServerIntegrationTest { /** @@ -122,7 +126,6 @@ public class GremlinServerSslIntegrateTest extends AbstractGremlinServerIntegrat } } - @Test public void shouldEnableSsl() { final Cluster cluster = TestClientFactory.build().enableSsl(true).keyStore(JKS_SERVER_KEY).keyStorePassword(KEY_PASS).sslSkipCertValidation(true).create(); @@ -156,7 +159,7 @@ public class GremlinServerSslIntegrateTest extends AbstractGremlinServerIntegrat @Test public void shouldEnableSslButFailIfClientConnectsWithoutIt() { - final Cluster cluster = TestClientFactory.build().enableSsl(false).create(); + final Cluster cluster = TestClientFactory.build().enableSsl(false).maxWaitForConnection(3000).create(); final Client client = cluster.connect(); try { @@ -199,7 +202,7 @@ public class GremlinServerSslIntegrateTest extends AbstractGremlinServerIntegrat @Test public void shouldEnableSslAndClientCertificateAuthAndFailWithoutCert() { final Cluster cluster = TestClientFactory.build().enableSsl(true).keyStore(JKS_SERVER_KEY).keyStorePassword(KEY_PASS) - .keyStoreType(KEYSTORE_TYPE_JKS).sslSkipCertValidation(true).create(); + .keyStoreType(KEYSTORE_TYPE_JKS).sslSkipCertValidation(true).maxWaitForConnection(3000).create(); final Client client = cluster.connect(); try { @@ -216,7 +219,8 @@ public class GremlinServerSslIntegrateTest extends AbstractGremlinServerIntegrat @Test public void shouldEnableSslAndClientCertificateAuthAndFailWithoutTrustedClientCert() { final Cluster cluster = TestClientFactory.build().enableSsl(true).keyStore(JKS_CLIENT_KEY).keyStorePassword(KEY_PASS) - .keyStoreType(KEYSTORE_TYPE_JKS).trustStore(JKS_CLIENT_TRUST).trustStorePassword(KEY_PASS).create(); + .keyStoreType(KEYSTORE_TYPE_JKS).trustStore(JKS_CLIENT_TRUST).trustStorePassword(KEY_PASS) + .maxWaitForConnection(3000).create(); final Client client = cluster.connect(); try { @@ -233,7 +237,7 @@ public class GremlinServerSslIntegrateTest extends AbstractGremlinServerIntegrat @Test public void shouldEnableSslAndFailIfProtocolsDontMatch() { final Cluster cluster = TestClientFactory.build().enableSsl(true).keyStore(JKS_SERVER_KEY).keyStorePassword(KEY_PASS) - .sslSkipCertValidation(true).sslEnabledProtocols(Arrays.asList("TLSv1.2")).create(); + .sslSkipCertValidation(true).sslEnabledProtocols(Arrays.asList("TLSv1.2")).maxWaitForConnection(3000).create(); final Client client = cluster.connect(); try { @@ -250,7 +254,8 @@ public class GremlinServerSslIntegrateTest extends AbstractGremlinServerIntegrat @Test public void shouldEnableSslAndFailIfCiphersDontMatch() { final Cluster cluster = TestClientFactory.build().enableSsl(true).keyStore(JKS_SERVER_KEY).keyStorePassword(KEY_PASS) - .sslSkipCertValidation(true).sslCipherSuites(Arrays.asList("SSL_RSA_WITH_RC4_128_SHA")).create(); + .sslSkipCertValidation(true).sslCipherSuites(Arrays.asList("SSL_RSA_WITH_RC4_128_SHA")) + .maxWaitForConnection(3000).create(); final Client client = cluster.connect(); try {
