This is an automated email from the ASF dual-hosted git repository. spmallette pushed a commit to branch driver-35 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit e2dfcc27b416b173329b5577f673041386104e19 Author: stephen <[email protected]> AuthorDate: Wed Dec 11 15:13:11 2019 -0500 Forced SingleRequestConnection to wait for handshake Not sure it was doing such a good job of that before this change which is why I think the travis tests were failing --- .../org/apache/tinkerpop/gremlin/driver/Channelizer.java | 16 +++++++++------- .../tinkerpop/gremlin/driver/DefaultConnectionPool.java | 7 +++++-- .../org/apache/tinkerpop/gremlin/driver/Settings.java | 5 ++--- .../gremlin/driver/SingleRequestConnection.java | 10 ++++------ .../gremlin/driver/exception/ConnectionException.java | 6 ++++++ .../gremlin/driver/handler/WebSocketClientHandler.java | 10 +++++++++- .../tinkerpop/gremlin/driver/simple/WebSocketClient.java | 4 ++-- 7 files changed, 37 insertions(+), 21 deletions(-) diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java index 7d4a41e..10cfacc 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java @@ -140,13 +140,18 @@ public interface Channelizer extends ChannelHandler { throw new IllegalStateException("To use wss scheme ensure that enableSsl is set to true in configuration"); final int maxContentLength = cluster.connectionPoolSettings().maxContentLength; + final long maxWaitForConnection = cluster.connectionPoolSettings().maxWaitForConnection; + // seems ok to use the maxWaitForConnection as the top end for the handshake because the wait for the + // handshake is going just get interrupted by the wait for the overall connection. no point to adding + // another setting specific to the handshake to complicate things. final WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker( connectionPool.getHost().getHostUri(), WebSocketVersion.V13, null, false, EmptyHttpHeaders.INSTANCE, maxContentLength); final WebSocketClientProtocolHandler nettyWsHandler = new WebSocketClientProtocolHandler( - handshaker, true, false, 9000); - final WebSocketClientHandler handler = new WebSocketClientHandler(connectionPool.getActiveChannels()); + handshaker, true, false, maxWaitForConnection); + final WebSocketClientHandler handler = new WebSocketClientHandler( + connectionPool.getHost().getHostUri(), connectionPool.getActiveChannels()); final int keepAliveInterval = toIntExact(TimeUnit.SECONDS.convert(cluster.connectionPoolSettings().keepAliveInterval, TimeUnit.MILLISECONDS)); pipeline.addLast("http-codec", new HttpClientCodec()); @@ -159,7 +164,6 @@ public interface Channelizer extends ChannelHandler { pipeline.addLast("gremlin-decoder", webSocketGremlinResponseDecoder); } - @Override public void connected(final Channel ch) { try { @@ -173,11 +177,9 @@ public interface Channelizer extends ChannelHandler { throw new ConnectionException(connectionPool.getHost().getHostUri(), "Could not complete websocket handshake - ensure that client protocol matches server", f.cause()); } - }).get(3000, TimeUnit.MILLISECONDS); + }).sync(); } - } catch (ExecutionException ex) { - throw new RuntimeException(ex.getCause()); - } catch (InterruptedException | TimeoutException ex) { + } catch (InterruptedException ex) { // catching the InterruptedException will reset the interrupted flag. This is intentional. throw new RuntimeException(new ConnectionException(connectionPool.getHost().getHostUri(), "Timed out while performing websocket handshake - ensure that client protocol matches server", ex.getCause())); diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/DefaultConnectionPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/DefaultConnectionPool.java index 8c399cb..1c173d2 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/DefaultConnectionPool.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/DefaultConnectionPool.java @@ -20,6 +20,7 @@ package org.apache.tinkerpop.gremlin.driver; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; import io.netty.channel.epoll.Epoll; import io.netty.channel.epoll.EpollSocketChannel; import io.netty.channel.group.ChannelGroup; @@ -32,12 +33,15 @@ import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.Promise; +import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException; +import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.ConnectException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; @@ -214,9 +218,8 @@ public class DefaultConnectionPool implements ConnectionPool { @Override public Connection prepareConnection() throws TimeoutException, ConnectException { - if (closeFuture.get() != null) { + if (closeFuture.get() != null) throw new RuntimeException(this + " is closing. Cannot borrow connection."); - } // Get a channel, verify handshake is done and then attach it to a connectionPool final Channel ch = this.channelPool.acquire().syncUninterruptibly().getNow(); diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java index 5deba8b..a4e8c1a 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java @@ -288,9 +288,8 @@ final class Settings { public int maxSize = ConnectionPool.DEFAULT_MAX_POOL_SIZE; /** - * Length of time in milliseconds to wait on an idle connection before sending a keep-alive request. This - * setting is only relevant to {@link Channelizer} implementations that return {@code true} for - * {@link Channelizer#supportsKeepAlive()}. Set to zero to disable this feature. + * Length of time in milliseconds to wait on an idle connection before sending a keep-alive request. Set to + * zero to disable this feature. */ public long keepAliveInterval = Connection.DEFAULT_KEEP_ALIVE_INTERVAL; diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/SingleRequestConnection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/SingleRequestConnection.java index 6bb745d..6283fff 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/SingleRequestConnection.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/SingleRequestConnection.java @@ -57,10 +57,9 @@ public class SingleRequestConnection implements Connection { static final AttributeKey<ResultQueue> RESULT_QUEUE_ATTRIBUTE_KEY = AttributeKey.newInstance("resultQueueFuture"); SingleRequestConnection(final Channel channel, final ConnectionPool pool) { - /* A channel is attached with a request only when the channel is active. This is the responsibility - * of channelpool to ensure that the channel attached to this connection is healthy. Something is fishy - * if this is not true, hence, IllegalState. - */ + // A channel is attached with a request only when the channel is active. This is the responsibility + // of channelpool to ensure that the channel attached to this connection is healthy. Something is fishy + // if this is not true, hence, IllegalState. if (!channel.isActive()) { throw new IllegalStateException("Channel " + channel + " is not active."); } @@ -150,9 +149,8 @@ public class SingleRequestConnection implements Connection { */ @Override public ChannelPromise write(final RequestMessage requestMessage, final CompletableFuture<ResultSet> resultQueueSetup) { - if (this.resultFuture != null) { + if (this.resultFuture != null) throw new IllegalStateException("This " + this + " is already in use. Cannot reuse it for request " + requestMessage); - } this.resultFuture = resultQueueSetup; diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/ConnectionException.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/ConnectionException.java index 67101b3..5710cf9 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/ConnectionException.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/ConnectionException.java @@ -29,6 +29,12 @@ public class ConnectionException extends Exception { private URI uri; private InetSocketAddress address; + public ConnectionException(final URI uri, final String message) { + super(message); + this.uri = uri; + this.address = null; + } + public ConnectionException(final URI uri, final InetSocketAddress addy, final String message) { super(message); this.address = addy; diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java index 1d0b73f..95f6923 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java @@ -32,9 +32,12 @@ import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.ReferenceCountUtil; +import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.URI; + /** * @author Stephen Mallette (http://stephen.genoprime.com) */ @@ -42,10 +45,12 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob private static final Logger logger = LoggerFactory.getLogger(WebSocketClientHandler.class); private ChannelPromise handshakeFuture; private final ChannelGroup activeChannels; + private final URI host; - public WebSocketClientHandler(final ChannelGroup activeChannels) { + public WebSocketClientHandler(final URI host, final ChannelGroup activeChannels) { super(false); this.activeChannels = activeChannels; + this.host = host; } public ChannelFuture handshakeFuture() { @@ -75,6 +80,9 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob if (event == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) { handshakeFuture.setSuccess(); activeChannels.add(ctx.channel()); + } else if (event == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_TIMEOUT) { + handshakeFuture.setFailure(new ConnectionException(host, + "Timed out while performing websocket handshake - ensure that client protocol matches server")); } else if (event instanceof IdleStateEvent) { final IdleStateEvent e = (IdleStateEvent) event; if (e.state() == IdleState.READER_IDLE) { diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java index 2d045d1..5f2603d 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java @@ -72,8 +72,8 @@ public class WebSocketClient extends AbstractClient { uri, WebSocketVersion.V13, null, false, EmptyHttpHeaders.INSTANCE, 65536); final WebSocketClientProtocolHandler nettyWsHandler = new WebSocketClientProtocolHandler( - handshaker, true, false, 9000); - final WebSocketClientHandler wsHandler = new WebSocketClientHandler(new DefaultChannelGroup(GlobalEventExecutor.INSTANCE)); + handshaker, true, false, 30000); + final WebSocketClientHandler wsHandler = new WebSocketClientHandler(uri, new DefaultChannelGroup(GlobalEventExecutor.INSTANCE)); final MessageSerializer serializer = new GraphBinaryMessageSerializerV1(); b.channel(NioSocketChannel.class)
