This is an automated email from the ASF dual-hosted git repository. spmallette pushed a commit to branch driver-35 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 4535aa5ffc9c0c6c1d0172af58aaf37dd1808bdd Author: stephen <[email protected]> AuthorDate: Mon Dec 9 12:16:35 2019 -0500 Limited some listener creation for handshakes The handshake should only happen once so calls to Channelizer.connect() might be redundant if the handshake already occurred. --- .../tinkerpop/gremlin/driver/Channelizer.java | 22 +++++++++++++--------- .../gremlin/driver/DefaultConnectionPool.java | 2 -- .../driver/handler/WebSocketClientHandler.java | 19 +++++++++++-------- 3 files changed, 24 insertions(+), 19 deletions(-) diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java index 0ebfbfa..fc10371 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java @@ -19,6 +19,7 @@ package org.apache.tinkerpop.gremlin.driver; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; @@ -165,15 +166,18 @@ public interface Channelizer extends ChannelHandler { @Override public void connected(final Channel ch) { try { - // block for a few seconds - if the handshake takes longer than there's gotta be issues with that - // server. more than likely, SSL is enabled on the server, but the client forgot to enable it or - // perhaps the server is not configured for websockets. - ((WebSocketClientHandler)(ch.pipeline().get("ws-client-handler"))).handshakeFuture().addListener( f -> { - if (!f.isSuccess()) { - throw new ConnectionException(connectionPool.getHost().getHostUri(), - "Could not complete websocket handshake - ensure that client protocol matches server", f.cause()); - } - }).get(1500, TimeUnit.MILLISECONDS); + // be sure the handshake is done - if the handshake takes longer than the specified time there's + // gotta be issues with that server. a common problem where this comes up: SSL is enabled on the + // server, but the client forgot to enable it or perhaps the server is not configured for websockets. + final ChannelFuture handshakeFuture = ((WebSocketClientHandler)(ch.pipeline().get("ws-client-handler"))).handshakeFuture(); + if (!handshakeFuture.isDone()) { + handshakeFuture.addListener(f -> { + if (!f.isSuccess()) { + throw new ConnectionException(connectionPool.getHost().getHostUri(), + "Could not complete websocket handshake - ensure that client protocol matches server", f.cause()); + } + }).get(3000, TimeUnit.MILLISECONDS); + } } catch (ExecutionException ex) { throw new RuntimeException(ex.getCause()); } catch (InterruptedException | TimeoutException ex) { diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/DefaultConnectionPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/DefaultConnectionPool.java index 7bda3d5..8c399cb 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/DefaultConnectionPool.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/DefaultConnectionPool.java @@ -220,8 +220,6 @@ public class DefaultConnectionPool implements ConnectionPool { // Get a channel, verify handshake is done and then attach it to a connectionPool final Channel ch = this.channelPool.acquire().syncUninterruptibly().getNow(); - - // TODO: This call is un-necessary on every channel acquire, since handshake is done once. channelizer.connected(ch); return new SingleRequestConnection(ch, this); diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java index aded787..1d0b73f 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java @@ -31,6 +31,7 @@ import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; +import io.netty.util.ReferenceCountUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +44,7 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob private final ChannelGroup activeChannels; public WebSocketClientHandler(final ChannelGroup activeChannels) { + super(false); this.activeChannels = activeChannels; } @@ -58,18 +60,22 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob @Override protected void channelRead0(final ChannelHandlerContext ctx, final Object msg) throws Exception { final WebSocketFrame frame = (WebSocketFrame) msg; - if (frame instanceof TextWebSocketFrame) { - ctx.fireChannelRead(frame.retain(2)); - } else if (frame instanceof BinaryWebSocketFrame) { - ctx.fireChannelRead(frame.retain(2)); + if (frame instanceof TextWebSocketFrame || frame instanceof BinaryWebSocketFrame) { + ctx.fireChannelRead(frame.retain()); } else if (frame instanceof PongWebSocketFrame) { logger.debug("Received response from keep-alive request"); + ReferenceCountUtil.release(frame); + } else { + throw new IllegalStateException("Unexpected message of " + msg.getClass().getSimpleName() + ": " + msg); } } @Override public void userEventTriggered(final ChannelHandlerContext ctx, final Object event) throws Exception { - if (event instanceof IdleStateEvent) { + if (event == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) { + handshakeFuture.setSuccess(); + activeChannels.add(ctx.channel()); + } else if (event instanceof IdleStateEvent) { final IdleStateEvent e = (IdleStateEvent) event; if (e.state() == IdleState.READER_IDLE) { logger.warn("Server " + ctx.channel() + " has been idle for too long."); @@ -77,9 +83,6 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob logger.info("Sending ping frame to the server"); ctx.writeAndFlush(new PingWebSocketFrame()); } - } else if (event == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) { - handshakeFuture.setSuccess(); - activeChannels.add(ctx.channel()); } }
