This is an automated email from the ASF dual-hosted git repository. spmallette pushed a commit to branch driver-35 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 1f0120daf290c40f69a697db07e9d31a21db7380 Author: stephen <[email protected]> AuthorDate: Mon Dec 9 08:50:28 2019 -0500 Used netty's WebSocketClientProtocolHandler Let netty do more of the heavy lifting so we could factor out our own custom code. --- .../tinkerpop/gremlin/driver/Channelizer.java | 21 +++++---- .../tinkerpop/gremlin/driver/ConnectionPool.java | 7 ++- .../driver/handler/WebSocketClientHandler.java | 50 ++++------------------ ...loseHandler.java => WebSocketCloseHandler.java} | 2 +- .../handler/WebSocketGremlinResponseDecoder.java | 1 - .../gremlin/driver/simple/WebSocketClient.java | 15 ++++--- 6 files changed, 37 insertions(+), 59 deletions(-) diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java index 5d20ab1..0ebfbfa 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java @@ -26,7 +26,9 @@ import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.EmptyHttpHeaders; import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory; +import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler; import io.netty.handler.codec.http.websocketx.WebSocketVersion; import io.netty.handler.ssl.SslContext; import io.netty.handler.timeout.IdleStateHandler; @@ -37,7 +39,7 @@ import org.apache.tinkerpop.gremlin.driver.handler.NioGremlinResponseDecoder; import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler; import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinRequestEncoder; import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinResponseDecoder; -import org.apache.tinkerpop.gremlin.driver.handler.WebsocketCloseHandler; +import org.apache.tinkerpop.gremlin.driver.handler.WebSocketCloseHandler; import java.util.Optional; import java.util.concurrent.ExecutionException; @@ -140,18 +142,21 @@ public interface Channelizer extends ChannelHandler { throw new IllegalStateException("To use wss scheme ensure that enableSsl is set to true in configuration"); final int maxContentLength = cluster.connectionPoolSettings().maxContentLength; - // TODO: Replace WebSocketClientHandler with Netty's WebSocketClientProtocolHandler - final WebSocketClientHandler handler = new WebSocketClientHandler( - WebSocketClientHandshakerFactory.newHandshaker( - connectionPool.getHost().getHostUri(), WebSocketVersion.V13, null, false, EmptyHttpHeaders.INSTANCE, maxContentLength), - connectionPool.getActiveChannels()); - int keepAliveInterval = toIntExact(TimeUnit.SECONDS.convert(cluster.connectionPoolSettings().keepAliveInterval, TimeUnit.MILLISECONDS)); + final WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker( + connectionPool.getHost().getHostUri(), WebSocketVersion.V13, null, false, + EmptyHttpHeaders.INSTANCE, maxContentLength); + final WebSocketClientProtocolHandler nettyWsHandler = new WebSocketClientProtocolHandler( + handshaker, true, false, 9000); + final WebSocketClientHandler handler = new WebSocketClientHandler(connectionPool.getActiveChannels()); + + final int keepAliveInterval = toIntExact(TimeUnit.SECONDS.convert(cluster.connectionPoolSettings().keepAliveInterval, TimeUnit.MILLISECONDS)); pipeline.addLast("http-codec", new HttpClientCodec()); pipeline.addLast("aggregator", new HttpObjectAggregator(maxContentLength)); pipeline.addLast("netty-idle-state-Handler", new IdleStateHandler(0, keepAliveInterval, 0)); + pipeline.addLast("netty-ws-handler", nettyWsHandler); pipeline.addLast("ws-client-handler", handler); - pipeline.addLast("ws-close-handler", new WebsocketCloseHandler()); + pipeline.addLast("ws-close-handler", new WebSocketCloseHandler()); pipeline.addLast("gremlin-encoder", webSocketGremlinRequestEncoder); pipeline.addLast("gremlin-decoder", webSocketGremlinResponseDecoder); } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java index 5f7ee3b..35cecee 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java @@ -31,7 +31,7 @@ import java.util.concurrent.TimeoutException; * requests to a specific server. It is also the gatekeeper for the number of simultaneous requests * to the server. * - * More specifically, it associates a Netty {@link Channel} with a {@link Connection}. + * More specifically, it associates a Netty {@code Channel} with a {@link Connection}. * * A typical workflow for the lifetime of a Gremlin request would be as follows: * 1. Connection pool is set up attached to a host on initialization. @@ -65,7 +65,8 @@ public interface ConnectionPool { /** * Release the connection and associated resources (like channel) so that the resources can be re-used. */ - CompletableFuture<Void> releaseConnection(Connection conn); + CompletableFuture<Void> releaseConnection(final Connection conn); + /** * Close the connection pool and all associated resources gracefully. * This method should be made idempotent and thread safe. @@ -73,10 +74,12 @@ public interface ConnectionPool { CompletableFuture<Void> closeAsync(); ScheduledExecutorService executor(); + /** * @return {@link Host} associated with the connection pool */ Host getHost(); + /** * @return {@link Cluster} containing the {@link Host} */ diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java index 2cd0f95..aded787 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java @@ -18,23 +18,19 @@ */ package org.apache.tinkerpop.gremlin.driver.handler; -import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; -import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; -import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; -import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; +import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; -import io.netty.util.CharsetUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,12 +39,10 @@ import org.slf4j.LoggerFactory; */ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> { private static final Logger logger = LoggerFactory.getLogger(WebSocketClientHandler.class); - private final WebSocketClientHandshaker handshaker; private ChannelPromise handshakeFuture; private final ChannelGroup activeChannels; - public WebSocketClientHandler(final WebSocketClientHandshaker handshaker, final ChannelGroup activeChannels) { - this.handshaker = handshaker; + public WebSocketClientHandler(final ChannelGroup activeChannels) { this.activeChannels = activeChannels; } @@ -62,46 +56,15 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob } @Override - public void channelActive(final ChannelHandlerContext ctx) throws Exception { - handshaker.handshake(ctx.channel()).addListener(f -> { - if (!f.isSuccess()) { - if (!handshakeFuture.isDone()) handshakeFuture.setFailure(f.cause()); - ctx.fireExceptionCaught(f.cause()); - } else { - activeChannels.add(ctx.channel()); - } - }); - } - - @Override protected void channelRead0(final ChannelHandlerContext ctx, final Object msg) throws Exception { - final Channel ch = ctx.channel(); - if (!handshaker.isHandshakeComplete()) { - // web socket client connected - handshaker.finishHandshake(ch, (FullHttpResponse) msg); - handshakeFuture.setSuccess(); - return; - } - - if (msg instanceof FullHttpResponse) { - final FullHttpResponse response = (FullHttpResponse) msg; - throw new Exception("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" - + response.content().toString(CharsetUtil.UTF_8) + ')'); - } - - // a close frame doesn't mean much here. errors raised from closed channels will mark the host as dead final WebSocketFrame frame = (WebSocketFrame) msg; if (frame instanceof TextWebSocketFrame) { ctx.fireChannelRead(frame.retain(2)); - } else if (frame instanceof PingWebSocketFrame) { - ctx.writeAndFlush(new PongWebSocketFrame()); - }else if (frame instanceof PongWebSocketFrame) { - logger.debug("Received response from keep-alive request"); } else if (frame instanceof BinaryWebSocketFrame) { ctx.fireChannelRead(frame.retain(2)); - } else if (frame instanceof CloseWebSocketFrame) - ch.close(); - + } else if (frame instanceof PongWebSocketFrame) { + logger.debug("Received response from keep-alive request"); + } } @Override @@ -114,6 +77,9 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob logger.info("Sending ping frame to the server"); ctx.writeAndFlush(new PingWebSocketFrame()); } + } else if (event == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) { + handshakeFuture.setSuccess(); + activeChannels.add(ctx.channel()); } } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebsocketCloseHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketCloseHandler.java similarity index 97% rename from gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebsocketCloseHandler.java rename to gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketCloseHandler.java index f93ea93..86c6a75 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebsocketCloseHandler.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketCloseHandler.java @@ -32,7 +32,7 @@ import io.netty.util.AttributeKey; * <p> * This handler is also idempotent and sends out the CloseFrame only once. */ -public class WebsocketCloseHandler extends ChannelOutboundHandlerAdapter { +public class WebSocketCloseHandler extends ChannelOutboundHandlerAdapter { private static final AttributeKey<Boolean> CLOSE_WS_SENT = AttributeKey.newInstance("closeWebSocketSent"); @Override diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketGremlinResponseDecoder.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketGremlinResponseDecoder.java index 383e5a5..ec88bb9 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketGremlinResponseDecoder.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketGremlinResponseDecoder.java @@ -18,7 +18,6 @@ */ package org.apache.tinkerpop.gremlin.driver.handler; -import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; import org.apache.tinkerpop.gremlin.driver.MessageSerializer; import org.apache.tinkerpop.gremlin.driver.ser.MessageTextSerializer; import io.netty.channel.ChannelHandler; diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java index 651b1f3..2d045d1 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java @@ -22,6 +22,8 @@ import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelOption; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.http.EmptyHttpHeaders; +import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; +import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler; import io.netty.util.concurrent.GlobalEventExecutor; import org.apache.tinkerpop.gremlin.driver.MessageSerializer; import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler; @@ -66,11 +68,13 @@ public class WebSocketClient extends AbstractClient { throw new IllegalArgumentException("Unsupported protocol: " + protocol); try { - final WebSocketClientHandler wsHandler = - new WebSocketClientHandler( - WebSocketClientHandshakerFactory.newHandshaker( - uri, WebSocketVersion.V13, null, false, EmptyHttpHeaders.INSTANCE, 65536), - new DefaultChannelGroup(GlobalEventExecutor.INSTANCE)); + final WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker( + uri, WebSocketVersion.V13, null, false, + EmptyHttpHeaders.INSTANCE, 65536); + final WebSocketClientProtocolHandler nettyWsHandler = new WebSocketClientProtocolHandler( + handshaker, true, false, 9000); + final WebSocketClientHandler wsHandler = new WebSocketClientHandler(new DefaultChannelGroup(GlobalEventExecutor.INSTANCE)); + final MessageSerializer serializer = new GraphBinaryMessageSerializerV1(); b.channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @@ -80,6 +84,7 @@ public class WebSocketClient extends AbstractClient { p.addLast( new HttpClientCodec(), new HttpObjectAggregator(65536), + nettyWsHandler, wsHandler, new WebSocketGremlinRequestEncoder(true, serializer), new WebSocketGremlinResponseDecoder(serializer),
