This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a commit to branch driver-35
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 4535aa5ffc9c0c6c1d0172af58aaf37dd1808bdd
Author: stephen <[email protected]>
AuthorDate: Mon Dec 9 12:16:35 2019 -0500

    Limited some listener creation for handshakes
    
    The handshake should only happen once so calls to Channelizer.connect() 
might be redundant if the handshake already occurred.
---
 .../tinkerpop/gremlin/driver/Channelizer.java      | 22 +++++++++++++---------
 .../gremlin/driver/DefaultConnectionPool.java      |  2 --
 .../driver/handler/WebSocketClientHandler.java     | 19 +++++++++++--------
 3 files changed, 24 insertions(+), 19 deletions(-)

diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index 0ebfbfa..fc10371 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -19,6 +19,7 @@
 package org.apache.tinkerpop.gremlin.driver;
 
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
@@ -165,15 +166,18 @@ public interface Channelizer extends ChannelHandler {
         @Override
         public void connected(final Channel ch) {
             try {
-                // block for a few seconds - if the handshake takes longer 
than there's gotta be issues with that
-                // server. more than likely, SSL is enabled on the server, but 
the client forgot to enable it or
-                // perhaps the server is not configured for websockets.
-                
((WebSocketClientHandler)(ch.pipeline().get("ws-client-handler"))).handshakeFuture().addListener(
 f -> {
-                    if (!f.isSuccess()) {
-                        throw new 
ConnectionException(connectionPool.getHost().getHostUri(),
-                                                                           
"Could not complete websocket handshake - ensure that client protocol matches 
server", f.cause());
-                    }
-                }).get(1500, TimeUnit.MILLISECONDS);
+                // be sure the handshake is done - if the handshake takes 
longer than the specified time there's
+                // gotta be issues with that  server. a common problem where 
this comes up: SSL is enabled on the
+                // server, but the client forgot to enable it or perhaps the 
server is not configured for websockets.
+                final ChannelFuture handshakeFuture = 
((WebSocketClientHandler)(ch.pipeline().get("ws-client-handler"))).handshakeFuture();
+                if (!handshakeFuture.isDone()) {
+                    handshakeFuture.addListener(f -> {
+                        if (!f.isSuccess()) {
+                            throw new 
ConnectionException(connectionPool.getHost().getHostUri(),
+                                    "Could not complete websocket handshake - 
ensure that client protocol matches server", f.cause());
+                        }
+                    }).get(3000, TimeUnit.MILLISECONDS);
+                }
             } catch (ExecutionException ex) {
                 throw new RuntimeException(ex.getCause());
             } catch (InterruptedException | TimeoutException ex) {
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/DefaultConnectionPool.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/DefaultConnectionPool.java
index 7bda3d5..8c399cb 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/DefaultConnectionPool.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/DefaultConnectionPool.java
@@ -220,8 +220,6 @@ public class DefaultConnectionPool implements 
ConnectionPool {
 
         // Get a channel, verify handshake is done and then attach it to a 
connectionPool
         final Channel ch = 
this.channelPool.acquire().syncUninterruptibly().getNow();
-
-        // TODO: This call is un-necessary on every channel acquire, since 
handshake is done once.
         channelizer.connected(ch);
 
         return new SingleRequestConnection(ch, this);
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
index aded787..1d0b73f 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
@@ -31,6 +31,7 @@ import 
io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
 import io.netty.handler.codec.http.websocketx.WebSocketFrame;
 import io.netty.handler.timeout.IdleState;
 import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.util.ReferenceCountUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,6 +44,7 @@ public final class WebSocketClientHandler extends 
SimpleChannelInboundHandler<Ob
     private final ChannelGroup activeChannels;
 
     public WebSocketClientHandler(final ChannelGroup activeChannels) {
+        super(false);
         this.activeChannels = activeChannels;
     }
 
@@ -58,18 +60,22 @@ public final class WebSocketClientHandler extends 
SimpleChannelInboundHandler<Ob
     @Override
     protected void channelRead0(final ChannelHandlerContext ctx, final Object 
msg) throws Exception {
         final WebSocketFrame frame = (WebSocketFrame) msg;
-        if (frame instanceof TextWebSocketFrame) {
-            ctx.fireChannelRead(frame.retain(2));
-        } else if (frame instanceof BinaryWebSocketFrame) {
-            ctx.fireChannelRead(frame.retain(2));
+        if (frame instanceof TextWebSocketFrame || frame instanceof 
BinaryWebSocketFrame) {
+            ctx.fireChannelRead(frame.retain());
         } else if (frame instanceof PongWebSocketFrame) {
             logger.debug("Received response from keep-alive request");
+            ReferenceCountUtil.release(frame);
+        } else {
+            throw new IllegalStateException("Unexpected message of " + 
msg.getClass().getSimpleName() + ": " + msg);
         }
     }
 
     @Override
     public void userEventTriggered(final ChannelHandlerContext ctx, final 
Object event) throws Exception {
-        if (event instanceof IdleStateEvent) {
+        if (event == 
WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) {
+            handshakeFuture.setSuccess();
+            activeChannels.add(ctx.channel());
+        } else if (event instanceof IdleStateEvent) {
             final IdleStateEvent e = (IdleStateEvent) event;
             if (e.state() == IdleState.READER_IDLE) {
                 logger.warn("Server " + ctx.channel() + " has been idle for 
too long.");
@@ -77,9 +83,6 @@ public final class WebSocketClientHandler extends 
SimpleChannelInboundHandler<Ob
                 logger.info("Sending ping frame to the server");
                 ctx.writeAndFlush(new PingWebSocketFrame());
             }
-        } else if (event == 
WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) {
-            handshakeFuture.setSuccess();
-            activeChannels.add(ctx.channel());
         }
     }
 

Reply via email to