This is an automated email from the ASF dual-hosted git repository. spmallette pushed a commit to branch driver-35 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit fcacd69f5341cb4ba79fdafd9fd50a0aa0e630b0 Author: stephen <[email protected]> AuthorDate: Fri Dec 6 14:08:22 2019 -0500 Refactored WebSocketIdleEventHandler into the WebSocketClientHandler Not sure why this was done as two separate things really in the first place. --- CHANGELOG.asciidoc | 1 + docs/src/upgrade/release-3.5.x.asciidoc | 5 +- .../tinkerpop/gremlin/driver/Channelizer.java | 7 +-- .../driver/handler/WebSocketClientHandler.java | 22 +++++++- .../driver/handler/WebSocketIdleEventHandler.java | 58 ---------------------- .../driver/handler/WebsocketCloseHandler.java | 2 - .../gremlin/driver/simple/WebSocketClient.java | 5 +- ...ClientSingleRequestConnectionIntegrateTest.java | 2 - 8 files changed, 31 insertions(+), 71 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 876e0be..15d5db4 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -32,6 +32,7 @@ This release also includes changes from <<release-3-4-3, 3.4.3>>. * Refactored `MapStep` to move its logic to `ScalarMapStep` so that the old behavior could be preserved while allow other implementations to have more flexibility. * Modified TinkerGraph to support `null` property values and can be configured to disable that feature. * Refactored the Java driver to use one connection per request. +* Refactored functionality of `WebSocketIdleEventHandler` into the `WebSocketClientHandler`. * Modified `null` handling in mutations to be consistent for a new `Vertex` as well as update to an existing one. * Removed support for Python 2.x in gremlinpython. * Upgraded to Apache Commons Configuration2. diff --git a/docs/src/upgrade/release-3.5.x.asciidoc b/docs/src/upgrade/release-3.5.x.asciidoc index 484eb34..548ed95 100644 --- a/docs/src/upgrade/release-3.5.x.asciidoc +++ b/docs/src/upgrade/release-3.5.x.asciidoc @@ -269,9 +269,10 @@ The following deprecated classes, methods or fields have been removed in this ve ** `org.apache.tinkerpop.gremlin.neo4j.structure.trait.NoMultiNoMetaNeo4jTrait` ** `org.apache.tinkerpop.gremlin.neo4j.structure.trait.Neo4jTrait` -Certain elements of the API were not or could not be deprecated in prior versions and were simply renamed for this -release: +Certain elements of the API were not or could not be deprecated in prior versions and were simply renamed or removed +for this release: +* `org.apache.tinkerpop.gremlin.driver.handler.WebSocketIdleEventHandler` * `org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode#SERVER_ERROR_SCRIPT_EVALUATION` became `SERVER_ERROR_EVALUATION` See: link:https://issues.apache.org/jira/browse/TINKERPOP-2080[TINKERPOP-2080], diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java index 723c1c5..4b4d426 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java @@ -37,7 +37,6 @@ import org.apache.tinkerpop.gremlin.driver.handler.NioGremlinResponseDecoder; import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler; import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinRequestEncoder; import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinResponseDecoder; -import org.apache.tinkerpop.gremlin.driver.handler.WebSocketIdleEventHandler; import org.apache.tinkerpop.gremlin.driver.handler.WebsocketCloseHandler; import java.util.Optional; @@ -173,7 +172,6 @@ public interface Channelizer extends ChannelHandler { private WebSocketGremlinRequestEncoder webSocketGremlinRequestEncoder; private WebSocketGremlinResponseDecoder webSocketGremlinResponseDecoder; - private WebSocketIdleEventHandler webSocketIdleEventHandler; @Override public void init(Connection connection) { @@ -185,7 +183,6 @@ public interface Channelizer extends ChannelHandler { super.init(connpool); webSocketGremlinRequestEncoder = new WebSocketGremlinRequestEncoder(true, cluster.getSerializer()); webSocketGremlinResponseDecoder = new WebSocketGremlinResponseDecoder(cluster.getSerializer()); - webSocketIdleEventHandler = new WebSocketIdleEventHandler(connpool.getActiveChannels()); } /** @@ -217,13 +214,13 @@ public interface Channelizer extends ChannelHandler { // TODO: Replace WebSocketClientHandler with Netty's WebSocketClientProtocolHandler final WebSocketClientHandler handler = new WebSocketClientHandler( WebSocketClientHandshakerFactory.newHandshaker( - connectionPool.getHost().getHostUri(), WebSocketVersion.V13, null, false, EmptyHttpHeaders.INSTANCE, maxContentLength)); + connectionPool.getHost().getHostUri(), WebSocketVersion.V13, null, false, EmptyHttpHeaders.INSTANCE, maxContentLength), + connectionPool.getActiveChannels()); int keepAliveInterval = toIntExact(TimeUnit.SECONDS.convert(cluster.connectionPoolSettings().keepAliveInterval, TimeUnit.MILLISECONDS)); pipeline.addLast("http-codec", new HttpClientCodec()); pipeline.addLast("aggregator", new HttpObjectAggregator(maxContentLength)); pipeline.addLast("netty-idle-state-Handler", new IdleStateHandler(0, keepAliveInterval, 0)); - pipeline.addLast("ws-idle-handler", webSocketIdleEventHandler); pipeline.addLast("ws-client-handler", handler); pipeline.addLast("ws-close-handler", new WebsocketCloseHandler()); pipeline.addLast("gremlin-encoder", webSocketGremlinRequestEncoder); diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java index 1d6d92f..2cd0f95 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java @@ -23,6 +23,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.group.ChannelGroup; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; @@ -31,6 +32,8 @@ import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketFrame; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.CharsetUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,9 +45,11 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob private static final Logger logger = LoggerFactory.getLogger(WebSocketClientHandler.class); private final WebSocketClientHandshaker handshaker; private ChannelPromise handshakeFuture; + private final ChannelGroup activeChannels; - public WebSocketClientHandler(final WebSocketClientHandshaker handshaker) { + public WebSocketClientHandler(final WebSocketClientHandshaker handshaker, final ChannelGroup activeChannels) { this.handshaker = handshaker; + this.activeChannels = activeChannels; } public ChannelFuture handshakeFuture() { @@ -62,6 +67,8 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob if (!f.isSuccess()) { if (!handshakeFuture.isDone()) handshakeFuture.setFailure(f.cause()); ctx.fireExceptionCaught(f.cause()); + } else { + activeChannels.add(ctx.channel()); } }); } @@ -98,6 +105,19 @@ public final class WebSocketClientHandler extends SimpleChannelInboundHandler<Ob } @Override + public void userEventTriggered(final ChannelHandlerContext ctx, final Object event) throws Exception { + if (event instanceof IdleStateEvent) { + final IdleStateEvent e = (IdleStateEvent) event; + if (e.state() == IdleState.READER_IDLE) { + logger.warn("Server " + ctx.channel() + " has been idle for too long."); + } else if (e.state() == IdleState.WRITER_IDLE || e.state() == IdleState.ALL_IDLE) { + logger.info("Sending ping frame to the server"); + ctx.writeAndFlush(new PingWebSocketFrame()); + } + } + } + + @Override public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception { if (!handshakeFuture.isDone()) handshakeFuture.setFailure(cause); diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketIdleEventHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketIdleEventHandler.java deleted file mode 100644 index 2fb6df3..0000000 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketIdleEventHandler.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.tinkerpop.gremlin.driver.handler; - -import io.netty.channel.ChannelDuplexHandler; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.group.ChannelGroup; -import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; -import io.netty.handler.timeout.IdleState; -import io.netty.handler.timeout.IdleStateEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - [email protected] -public class WebSocketIdleEventHandler extends ChannelDuplexHandler { - private static final Logger logger = LoggerFactory.getLogger(WebSocketIdleEventHandler.class); - private final ChannelGroup activeChannels; - - public WebSocketIdleEventHandler(final ChannelGroup activeChannels) { - this.activeChannels = activeChannels; - } - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - activeChannels.add(ctx.channel()); - super.channelActive(ctx); - } - - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object event) throws Exception { - if (event instanceof IdleStateEvent) { - IdleStateEvent e = (IdleStateEvent) event; - if (e.state() == IdleState.READER_IDLE) { - logger.warn("Server " + ctx.channel() + " has been idle for too long."); - } else if (e.state() == IdleState.WRITER_IDLE || e.state() == IdleState.ALL_IDLE) { - logger.info("Sending ping frame to the server"); - ctx.writeAndFlush(new PingWebSocketFrame()); - } - } - } -} diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebsocketCloseHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebsocketCloseHandler.java index eb16c60..f93ea93 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebsocketCloseHandler.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebsocketCloseHandler.java @@ -44,8 +44,6 @@ public class WebsocketCloseHandler extends ChannelOutboundHandlerAdapter { } } - - @Override public void handlerAdded(final ChannelHandlerContext ctx) throws Exception { ctx.channel().attr(CLOSE_WS_SENT).setIfAbsent(Boolean.FALSE); diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java index b1a47a6..651b1f3 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java @@ -20,7 +20,9 @@ package org.apache.tinkerpop.gremlin.driver.simple; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelOption; +import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.http.EmptyHttpHeaders; +import io.netty.util.concurrent.GlobalEventExecutor; import org.apache.tinkerpop.gremlin.driver.MessageSerializer; import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler; import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinRequestEncoder; @@ -67,7 +69,8 @@ public class WebSocketClient extends AbstractClient { final WebSocketClientHandler wsHandler = new WebSocketClientHandler( WebSocketClientHandshakerFactory.newHandshaker( - uri, WebSocketVersion.V13, null, false, EmptyHttpHeaders.INSTANCE, 65536)); + uri, WebSocketVersion.V13, null, false, EmptyHttpHeaders.INSTANCE, 65536), + new DefaultChannelGroup(GlobalEventExecutor.INSTANCE)); final MessageSerializer serializer = new GraphBinaryMessageSerializerV1(); b.channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientSingleRequestConnectionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientSingleRequestConnectionIntegrateTest.java index 1dd62f5..72071ad 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientSingleRequestConnectionIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientSingleRequestConnectionIntegrateTest.java @@ -407,8 +407,6 @@ public class ClientSingleRequestConnectionIntegrateTest extends AbstractGremlinS @Test public void testAbruptClose() throws InterruptedException { final Cluster cluster = this.createClusterWithXNumOfConnection(50); - - final Client.ClusteredClient client = cluster.connect(); final int requests = 50;
