This is an automated email from the ASF dual-hosted git repository. spmallette pushed a commit to branch TINKERPOP-2982 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 675fc4ff8f71c7d37366c26d5da672fcea75fb91 Author: Stephen Mallette <[email protected]> AuthorDate: Tue Aug 15 10:48:32 2023 -0400 TINKERPOP-2982 Allowed gremlin-driver to work over HTTP. HTTP is just an option. Websockets is still intact. There shouldn't be any breaking changes here. While there were some server changes, compatibility should be retained there...just added some extra parameters that were formerly only needed with TraversalOpProcessor. g.tx() and sessions will not work with this configuration. --- CHANGELOG.asciidoc | 3 +- .../tinkerpop/gremlin/driver/Channelizer.java | 91 +++++++++- .../apache/tinkerpop/gremlin/driver/Cluster.java | 28 +++- .../gremlin/driver/HandshakeInterceptor.java | 2 + .../org/apache/tinkerpop/gremlin/driver/Host.java | 14 +- .../gremlin/driver/MessageSerializer.java | 3 +- ...akeInterceptor.java => RequestInterceptor.java} | 12 +- .../driver/handler/HttpGremlinRequestEncoder.java | 92 +++++++++++ .../driver/handler/HttpGremlinResponseDecoder.java | 51 ++++++ .../driver/handler/WebSocketClientHandler.java | 5 +- .../gremlin/driver/message/RequestMessage.java | 8 + .../driver/ser/GraphBinaryMessageSerializerV1.java | 10 +- .../binary/GraphBinaryMessageSerializerV1Test.java | 27 +++ .../handler/HttpBasicAuthorizationHandler.java | 16 +- .../server/handler/HttpGremlinEndpointHandler.java | 78 +++++---- .../gremlin/server/handler/HttpHandlerUtil.java | 46 ++++-- .../gremlin/server/GremlinDriverIntegrateTest.java | 33 +++- .../gremlin/server/HttpDriverIntegrateTest.java | 183 +++++++++++++++++++++ 18 files changed, 605 insertions(+), 97 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 41703a3990..4278e2ff2a 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -26,7 +26,8 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima This release also includes changes from <<release-3-5-8, 3.5.8>>. * Fix a javadoc comment in GraphTraversal.not method. - +* Allowed `gremlin-driver` to be used over HTTP. +* Deprecated the `HandshakeInterceptor` in favor of a more generic `RequestInterceptor`. [[release-3-6-5]] === TinkerPop 3.6.5 (Release Date: July 31, 2023) diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java index efe47c5977..4c87f7ec78 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java @@ -19,6 +19,8 @@ package org.apache.tinkerpop.gremlin.driver; import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException; +import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinRequestEncoder; +import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinResponseDecoder; import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler; import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinRequestEncoder; import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinResponseDecoder; @@ -77,6 +79,13 @@ public interface Channelizer extends ChannelHandler { public default void connected() { } + /** + * Gets the scheme to use to construct the URL and by default uses HTTP. + */ + public default String getScheme(final boolean sslEnabled) { + return sslEnabled ? "https" : "http"; + } + /** * Base implementation of the client side {@link Channelizer}. */ @@ -126,7 +135,7 @@ public interface Channelizer extends ChannelHandler { } if (sslCtx.isPresent()) { - SslHandler sslHandler = sslCtx.get().newHandler(socketChannel.alloc(), connection.getUri().getHost(), connection.getUri().getPort()); + final SslHandler sslHandler = sslCtx.get().newHandler(socketChannel.alloc(), connection.getUri().getHost(), connection.getUri().getPort()); // TINKERPOP-2814. Remove the SSL handshake timeout so that handshakes that take longer than 10000ms // (Netty default) but less than connectionSetupTimeoutMillis can succeed. This means the SSL handshake // will instead be capped by connectionSetupTimeoutMillis. @@ -143,18 +152,18 @@ public interface Channelizer extends ChannelHandler { /** * WebSocket {@link Channelizer} implementation. */ - public final class WebSocketChannelizer extends AbstractChannelizer { + final class WebSocketChannelizer extends AbstractChannelizer { private static final Logger logger = LoggerFactory.getLogger(WebSocketChannelizer.class); private WebSocketClientHandler handler; - private WebSocketGremlinRequestEncoder webSocketGremlinRequestEncoder; - private WebSocketGremlinResponseDecoder webSocketGremlinResponseDecoder; + private WebSocketGremlinRequestEncoder gremlinRequestEncoder; + private WebSocketGremlinResponseDecoder gremlinResponseDecoder; @Override public void init(final Connection connection) { super.init(connection); - webSocketGremlinRequestEncoder = new WebSocketGremlinRequestEncoder(true, cluster.getSerializer()); - webSocketGremlinResponseDecoder = new WebSocketGremlinResponseDecoder(cluster.getSerializer()); + gremlinRequestEncoder = new WebSocketGremlinRequestEncoder(true, cluster.getSerializer()); + gremlinResponseDecoder = new WebSocketGremlinResponseDecoder(cluster.getSerializer()); } /** @@ -194,7 +203,7 @@ public interface Channelizer extends ChannelHandler { new WebSocketClientHandler.InterceptedWebSocketClientHandshaker13( connection.getUri(), WebSocketVersion.V13, null, true, httpHeaders, maxContentLength, true, false, -1, - cluster.getHandshakeInterceptor()), cluster.getConnectionSetupTimeout(), supportsSsl()); + cluster.getRequestInterceptor()), cluster.getConnectionSetupTimeout(), supportsSsl()); final int keepAliveInterval = toIntExact(TimeUnit.SECONDS.convert( cluster.connectionPoolSettings().keepAliveInterval, TimeUnit.MILLISECONDS)); @@ -205,8 +214,8 @@ public interface Channelizer extends ChannelHandler { pipeline.addLast(WebSocketClientCompressionHandler.INSTANCE); pipeline.addLast("idle-state-Handler", new IdleStateHandler(0, keepAliveInterval, 0)); pipeline.addLast("ws-handler", handler); - pipeline.addLast("gremlin-encoder", webSocketGremlinRequestEncoder); - pipeline.addLast("gremlin-decoder", webSocketGremlinResponseDecoder); + pipeline.addLast("gremlin-encoder", gremlinRequestEncoder); + pipeline.addLast("gremlin-decoder", gremlinResponseDecoder); } @Override @@ -231,5 +240,69 @@ public interface Channelizer extends ChannelHandler { throw new ConnectionException(connection.getUri(), errMsg, ex); } } + + @Override + public String getScheme(boolean sslEnabled) { + return sslEnabled ? "wss" : "ws"; + } + } + + /** + * Sends requests over the HTTP endpoint. Client functionality is governed by the limitations of the HTTP endpoint, + * meaning that sessions are not available and as such {@code tx()} (i.e. transactions) are not available over this + * channelizer. Only sessionless requests are possible. + */ + final class HttpChannelizer extends AbstractChannelizer { + + private HttpClientCodec handler; + + private HttpGremlinRequestEncoder gremlinRequestEncoder; + private HttpGremlinResponseDecoder gremlinResponseDecoder; + + @Override + public void init(final Connection connection) { + super.init(connection); + + // server does not support sessions so this channerlizer can't support the SessionedClient + if (connection.getClient() instanceof Client.SessionedClient) + throw new IllegalStateException(String.format("Cannot use sessions or tx() with %s", HttpChannelizer.class.getSimpleName())); + + gremlinRequestEncoder = new HttpGremlinRequestEncoder(cluster.getSerializer(), cluster.getRequestInterceptor()); + gremlinResponseDecoder = new HttpGremlinResponseDecoder(cluster.getSerializer()); + } + + @Override + public void connected() { + super.connected(); + } + + @Override + public boolean supportsSsl() { + final String scheme = connection.getUri().getScheme(); + return "https".equalsIgnoreCase(scheme); + } + + @Override + public void configure(final ChannelPipeline pipeline) { + final String scheme = connection.getUri().getScheme(); + if (!"http".equalsIgnoreCase(scheme) && !"https".equalsIgnoreCase(scheme)) + throw new IllegalStateException("Unsupported scheme (only http: or https: supported): " + scheme); + + if (!supportsSsl() && "https".equalsIgnoreCase(scheme)) + throw new IllegalStateException("To use https scheme ensure that enableSsl is set to true in configuration"); + + final int maxContentLength = cluster.connectionPoolSettings().maxContentLength; + final HttpHeaders httpHeaders = new DefaultHttpHeaders(); + if(connection.getCluster().isUserAgentOnConnectEnabled()) { + httpHeaders.set(UserAgent.USER_AGENT_HEADER_NAME, UserAgent.USER_AGENT); + } + + handler = new HttpClientCodec(); + + pipeline.addLast("http-codec", handler); + pipeline.addLast("aggregator", new HttpObjectAggregator(maxContentLength)); + pipeline.addLast("gremlin-encoder", gremlinRequestEncoder); + pipeline.addLast("gremlin-decoder", gremlinResponseDecoder); + } } } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java index 3ed867a3ea..702ecb0ddb 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java @@ -20,6 +20,7 @@ package org.apache.tinkerpop.gremlin.driver; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelOption; +import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslProvider; @@ -61,12 +62,12 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import java.util.function.UnaryOperator; import java.util.stream.Collectors; /** @@ -470,7 +471,7 @@ public final class Cluster { return manager.serializer; } - HandshakeInterceptor getHandshakeInterceptor() { + UnaryOperator<FullHttpRequest> getRequestInterceptor() { return manager.interceptor; } @@ -608,7 +609,7 @@ public final class Cluster { private boolean sslSkipCertValidation = false; private SslContext sslContext = null; private LoadBalancingStrategy loadBalancingStrategy = new LoadBalancingStrategy.RoundRobin(); - private HandshakeInterceptor interceptor = HandshakeInterceptor.NO_OP; + private UnaryOperator<FullHttpRequest> interceptor = HandshakeInterceptor.NO_OP; private AuthProperties authProps = new AuthProperties(); private long connectionSetupTimeoutMillis = Connection.CONNECTION_SETUP_TIMEOUT_MILLIS; private boolean enableUserAgentOnConnect = true; @@ -891,7 +892,7 @@ public final class Cluster { * Specify the {@link Channelizer} implementation to use on the client when creating a {@link Connection}. */ public Builder channelizer(final Class channelizerClass) { - return channelizer(channelizerClass.getCanonicalName()); + return channelizer(channelizerClass.getName()); } /** @@ -922,10 +923,23 @@ public final class Cluster { } /** - * Specifies an {@link HandshakeInterceptor} that will allow manipulation of the - * {@code FullHttpRequest} prior to its being sent over the websocket. + * Specifies an {@link HandshakeInterceptor} that will allow manipulation of the {@code FullHttpRequest} prior + * to its being sent to the server. + * @deprecated As of release 3.6.6, replaced with {@link #requestInterceptor(RequestInterceptor)}. */ + @Deprecated public Builder handshakeInterceptor(final HandshakeInterceptor interceptor) { + // when this deprecated method is removed, the interceptor can have its type promoted from + // UnaryOperator<FullHttpRequest> to RequestInterceptor + this.interceptor = interceptor; + return this; + } + + /** + * Specifies an {@link HandshakeInterceptor} that will allow manipulation of the {@code FullHttpRequest} prior + * to its being sent to the server. For websockets the interceptor is only called on the handshake. + */ + public Builder requestInterceptor(final RequestInterceptor interceptor) { this.interceptor = interceptor; return this; } @@ -1062,7 +1076,7 @@ public final class Cluster { private final AuthProperties authProps; private final Optional<SslContext> sslContextOptional; private final Supplier<RequestMessage.Builder> validationRequest; - private final HandshakeInterceptor interceptor; + private final UnaryOperator<FullHttpRequest> interceptor; /** * Thread pool for requests. diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/HandshakeInterceptor.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/HandshakeInterceptor.java index ea1ff87dbd..9b5132192e 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/HandshakeInterceptor.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/HandshakeInterceptor.java @@ -26,7 +26,9 @@ import java.util.function.UnaryOperator; * This function is called when the websocket handshake is attempted and the first {@code FullHttpRequest} is sent to * the server. The interceptor allows this message to be modified as needed before it is sent to the server. * Implementations are supplied to {@link Cluster.Builder#handshakeInterceptor(HandshakeInterceptor)}. + * @deprecated As of release 3.6.6, replaced by {@link RequestInterceptor}. */ +@Deprecated public interface HandshakeInterceptor extends UnaryOperator<FullHttpRequest> { /** diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Host.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Host.java index 3d492fa612..ef65e3afc3 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Host.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Host.java @@ -49,7 +49,8 @@ public final class Host { Host(final InetSocketAddress address, final Cluster cluster) { this.cluster = cluster; this.address = address; - this.hostUri = makeUriFromAddress(address, cluster.getPath(), cluster.connectionPoolSettings().enableSsl); + this.hostUri = makeUriFromAddress(address, cluster.getPath(), cluster.connectionPoolSettings().enableSsl, + cluster.getChannelizer()); hostLabel = String.format("Host{address=%s, hostUri=%s}", address, hostUri); } @@ -112,9 +113,16 @@ public final class Host { makeAvailable(); } - private static URI makeUriFromAddress(final InetSocketAddress addy, final String path, final boolean ssl) { + private static URI makeUriFromAddress(final InetSocketAddress addy, final String path, final boolean ssl, final String channelizerClass) { + final Channelizer channelizer; try { - final String scheme = ssl ? "wss" : "ws"; + channelizer = (Channelizer) Class.forName(channelizerClass).newInstance(); + } catch (Exception ex) { + throw new RuntimeException(String.format("Invalid Channelizer instance: %s", channelizerClass)); + } + + try { + final String scheme = channelizer.getScheme(ssl); return new URI(scheme, null, addy.getHostName(), addy.getPort(), path, null, null); } catch (URISyntaxException use) { throw new RuntimeException(String.format("URI for host could not be constructed from: %s", addy), use); diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/MessageSerializer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/MessageSerializer.java index 615077b2d6..bb8423bc95 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/MessageSerializer.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/MessageSerializer.java @@ -79,7 +79,8 @@ public interface MessageSerializer<M> { public ResponseMessage deserializeResponse(final ByteBuf msg) throws SerializationException; /** - * The list of mime types that the serializer supports. + * The list of mime types that the serializer supports. They should be ordered in preferred ordered where the + * greatest fidelity match is first. */ public String[] mimeTypesSupported(); diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/HandshakeInterceptor.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestInterceptor.java similarity index 63% copy from gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/HandshakeInterceptor.java copy to gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestInterceptor.java index ea1ff87dbd..0aae506e66 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/HandshakeInterceptor.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestInterceptor.java @@ -23,14 +23,14 @@ import io.netty.handler.codec.http.FullHttpRequest; import java.util.function.UnaryOperator; /** - * This function is called when the websocket handshake is attempted and the first {@code FullHttpRequest} is sent to - * the server. The interceptor allows this message to be modified as needed before it is sent to the server. - * Implementations are supplied to {@link Cluster.Builder#handshakeInterceptor(HandshakeInterceptor)}. + * This function is called a {@code FullHttpRequest} constructed and allow it to be modified as needed before it is + * sent to the server. Implementations are supplied to {@link Cluster.Builder#requestInterceptor(RequestInterceptor)}. + * When this method is called is dependent on the {@link Channelizer} implementation. */ -public interface HandshakeInterceptor extends UnaryOperator<FullHttpRequest> { +public interface RequestInterceptor extends UnaryOperator<FullHttpRequest> { /** - * The default implementation of a {@link HandshakeInterceptor} and behaves as a no-op. + * The default implementation of a {@link RequestInterceptor} and behaves as a no-op. */ - public static final HandshakeInterceptor NO_OP = o -> o; + public static final RequestInterceptor NO_OP = o -> o; } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java new file mode 100644 index 0000000000..f3a85e0f9f --- /dev/null +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver.handler; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageEncoder; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpVersion; +import org.apache.tinkerpop.gremlin.driver.MessageSerializer; +import org.apache.tinkerpop.gremlin.driver.Tokens; +import org.apache.tinkerpop.gremlin.driver.exception.ResponseException; +import org.apache.tinkerpop.gremlin.driver.message.RequestMessage; +import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode; +import org.apache.tinkerpop.gremlin.process.traversal.Bytecode; +import org.apache.tinkerpop.gremlin.process.traversal.translator.GroovyTranslator; +import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper; + +import java.util.HashMap; +import java.util.List; +import java.util.function.UnaryOperator; + +/** + * Converts {@link RequestMessage} to a {@code HttpRequest}. + */ [email protected] +public final class HttpGremlinRequestEncoder extends MessageToMessageEncoder<RequestMessage> { + private final MessageSerializer<?> serializer; + + private final ObjectMapper mapper = new ObjectMapper(); + + private final UnaryOperator<FullHttpRequest> interceptor; + + public HttpGremlinRequestEncoder(final MessageSerializer<?> serializer, final UnaryOperator<FullHttpRequest> interceptor) { + this.serializer = serializer; + this.interceptor = interceptor; + } + + @Override + protected void encode(final ChannelHandlerContext channelHandlerContext, final RequestMessage requestMessage, final List<Object> objects) throws Exception { + try { + final String gremlin; + final Object gremlinStringOrBytecode = requestMessage.getArg(Tokens.ARGS_GREMLIN); + + // the gremlin key can contain a Gremlin script or bytecode. if it's bytecode we can't submit it over + // http as such. it has to be converted to a script and we can do that with the Groovy translator. + final boolean usesBytecode = gremlinStringOrBytecode instanceof Bytecode; + if (usesBytecode) { + gremlin = GroovyTranslator.of("g").translate((Bytecode) gremlinStringOrBytecode).getScript(); + } else { + gremlin = gremlinStringOrBytecode.toString(); + } + final byte[] payload = mapper.writeValueAsBytes(new HashMap<String,Object>() {{ + put(Tokens.ARGS_GREMLIN, gremlin); + put(Tokens.REQUEST_ID, requestMessage.getRequestId()); + if (usesBytecode) put("op", Tokens.OPS_BYTECODE); + }}); + final ByteBuf bb = channelHandlerContext.alloc().buffer(payload.length); + bb.writeBytes(payload); + final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/", bb); + request.headers().add(HttpHeaderNames.CONTENT_TYPE, "application/json"); + request.headers().add(HttpHeaderNames.CONTENT_LENGTH, payload.length); + request.headers().add(HttpHeaderNames.ACCEPT, serializer.mimeTypesSupported()[0]); + + objects.add(interceptor.apply(request)); + } catch (Exception ex) { + throw new ResponseException(ResponseStatusCode.REQUEST_ERROR_SERIALIZATION, String.format( + "An error occurred during serialization of this request [%s] - it could not be sent to the server - Reason: %s", + requestMessage, ex)); + } + } +} diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseDecoder.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseDecoder.java new file mode 100644 index 0000000000..00c345828a --- /dev/null +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseDecoder.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver.handler; + +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketFrame; +import io.netty.util.CharsetUtil; +import org.apache.tinkerpop.gremlin.driver.MessageSerializer; +import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage; +import org.apache.tinkerpop.gremlin.driver.ser.MessageTextSerializer; + +import java.util.List; + +/** + * Converts {@code HttpResponse} to a {@link ResponseMessage}. + */ [email protected] +public final class HttpGremlinResponseDecoder extends MessageToMessageDecoder<FullHttpResponse> { + private final MessageSerializer<?> serializer; + + public HttpGremlinResponseDecoder(final MessageSerializer<?> serializer) { + this.serializer = serializer; + } + + @Override + protected void decode(final ChannelHandlerContext channelHandlerContext, final FullHttpResponse httpResponse, final List<Object> objects) throws Exception { + final String content = httpResponse.content().toString(CharsetUtil.UTF_8); + objects.add(((MessageTextSerializer) serializer).deserializeResponse(content)); + } +} diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java index 82ae2fedd2..55df38f989 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java @@ -37,6 +37,7 @@ import io.netty.util.concurrent.Promise; import java.net.URI; import java.util.concurrent.TimeoutException; +import java.util.function.UnaryOperator; import javax.net.ssl.SSLHandshakeException; @@ -140,13 +141,13 @@ public final class WebSocketClientHandler extends WebSocketClientProtocolHandler */ public static class InterceptedWebSocketClientHandshaker13 extends WebSocketClientHandshaker13 { - private final HandshakeInterceptor interceptor; + private final UnaryOperator<FullHttpRequest> interceptor; public InterceptedWebSocketClientHandshaker13(final URI webSocketURL, final WebSocketVersion version, final String subprotocol, final boolean allowExtensions, final HttpHeaders customHeaders, final int maxFramePayloadLength, final boolean performMasking, final boolean allowMaskMismatch, - final long forceCloseTimeoutMillis, final HandshakeInterceptor interceptor) { + final long forceCloseTimeoutMillis, final UnaryOperator<FullHttpRequest> interceptor) { super(webSocketURL, version, subprotocol, allowExtensions, customHeaders, maxFramePayloadLength, performMasking, allowMaskMismatch, forceCloseTimeoutMillis); this.interceptor = interceptor; diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/message/RequestMessage.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/message/RequestMessage.java index aeebf35d0d..21fa12c18b 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/message/RequestMessage.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/message/RequestMessage.java @@ -96,6 +96,14 @@ public final class RequestMessage { return o == null ? Optional.empty() : Optional.of((T) o); } + public <T> T getArg(final String key) { + return (T) args.get(key); + } + + public <T> T getArgOrDefault(final String key, final T def) { + return (T) optionalArgs(key).orElse(def); + } + public static Builder from(final RequestMessage msg) { final Builder builder = build(msg.op) .overrideRequestId(msg.requestId) diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ser/GraphBinaryMessageSerializerV1.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ser/GraphBinaryMessageSerializerV1.java index 1c7a69a238..c983a89543 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ser/GraphBinaryMessageSerializerV1.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ser/GraphBinaryMessageSerializerV1.java @@ -198,7 +198,15 @@ public class GraphBinaryMessageSerializerV1 extends AbstractMessageSerializer<Gr @Override public String serializeRequestAsString(final RequestMessage requestMessage, final ByteBufAllocator allocator) throws SerializationException { - final ByteBuf bb = serializeRequestAsBinary(requestMessage, allocator); + final ByteBuf bb = allocator.buffer(); + + try { + requestSerializer.writeValue(requestMessage, bb, writer); + } catch (Exception ex) { + bb.release(); + throw ex; + } + return base64Encoder.encodeToString(convertToBytes(bb)); } diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ser/binary/GraphBinaryMessageSerializerV1Test.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ser/binary/GraphBinaryMessageSerializerV1Test.java index 6cd37b87ce..052bfce456 100644 --- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ser/binary/GraphBinaryMessageSerializerV1Test.java +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ser/binary/GraphBinaryMessageSerializerV1Test.java @@ -57,6 +57,19 @@ public class GraphBinaryMessageSerializerV1Test { assertThat(request, reflectionEquals(deserialized)); } + @Test + public void shouldSerializeAndDeserializeRequestOverText() throws SerializationException { + final RequestMessage request = RequestMessage.build("op1") + .processor("proc1") + .overrideRequestId(UUID.randomUUID()) + .addArg("arg1", "value1") + .create(); + + final String base64 = serializer.serializeRequestAsString(request, allocator); + final RequestMessage deserialized = serializer.deserializeRequest(base64); + assertThat(request, reflectionEquals(deserialized)); + } + @Test public void shouldSerializeAndDeserializeRequestWithoutArgs() throws SerializationException { final RequestMessage request = RequestMessage.build("op1") @@ -99,6 +112,20 @@ public class GraphBinaryMessageSerializerV1Test { assertResponseEquals(response, deserialized); } + @Test + public void shouldSerializeAndDeserializeResponseOverText() throws SerializationException { + final ResponseMessage response = ResponseMessage.build(UUID.randomUUID()) + .code(ResponseStatusCode.SUCCESS) + .statusMessage("Found") + .statusAttribute("k1", 1) + .result("This is a fine message with a string") + .create(); + + final String base64 = serializer.serializeResponseAsString(response, allocator); + final ResponseMessage deserialized = serializer.deserializeResponse(base64); + assertResponseEquals(response, deserialized); + } + @Test public void shouldSerializeAndDeserializeResponseWithoutStatusMessage() throws SerializationException { final ResponseMessage response = ResponseMessage.build(UUID.randomUUID()) diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpBasicAuthorizationHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpBasicAuthorizationHandler.java index 0951a5d561..e58a2b3a46 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpBasicAuthorizationHandler.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpBasicAuthorizationHandler.java @@ -31,12 +31,9 @@ import org.apache.tinkerpop.gremlin.server.GremlinServer; import org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser; import org.apache.tinkerpop.gremlin.server.authz.AuthorizationException; import org.apache.tinkerpop.gremlin.server.authz.Authorizer; -import org.javatuples.Quartet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; - import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED; @@ -64,20 +61,13 @@ public class HttpBasicAuthorizationHandler extends ChannelInboundHandlerAdapter if (msg instanceof FullHttpMessage){ final FullHttpMessage request = (FullHttpMessage) msg; final boolean keepAlive = HttpUtil.isKeepAlive(request); + try { user = ctx.channel().attr(StateKey.AUTHENTICATED_USER).get(); if (null == user) { // This is expected when using the AllowAllAuthenticator user = AuthenticatedUser.ANONYMOUS_USER; } - final Quartet<String, Map<String, Object>, String, Map<String, String>> requestArguments = - HttpHandlerUtil.getRequestArguments((FullHttpRequest) request); - final RequestMessage requestMessage = RequestMessage.build(Tokens.OPS_EVAL). - processor(""). - addArg(Tokens.ARGS_GREMLIN, requestArguments.getValue0()). - addArg(Tokens.ARGS_BINDINGS, requestArguments.getValue1()). - addArg(Tokens.ARGS_LANGUAGE, requestArguments.getValue2()). - addArg(Tokens.ARGS_ALIASES, requestArguments.getValue3()). - create(); + final RequestMessage requestMessage = HttpHandlerUtil.getRequestMessageFromHttpRequest((FullHttpRequest) request); authorizer.authorize(user, requestMessage); ctx.fireChannelRead(request); } catch (AuthorizationException ex) { // Expected: users can alternate between allowed and disallowed requests @@ -85,7 +75,7 @@ public class HttpBasicAuthorizationHandler extends ChannelInboundHandlerAdapter if (address.startsWith("/") && address.length() > 1) address = address.substring(1); final String script; try { - script = HttpHandlerUtil.getRequestArguments((FullHttpRequest) request).getValue0(); + script = HttpHandlerUtil.getRequestMessageFromHttpRequest((FullHttpRequest) request).getArgOrDefault(Tokens.ARGS_GREMLIN, ""); } catch (IllegalArgumentException iae) { HttpHandlerUtil.sendError(ctx, BAD_REQUEST, iae.getMessage(), keepAlive); return; diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java index 6995b21583..47d1e68186 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java @@ -19,12 +19,13 @@ package org.apache.tinkerpop.gremlin.server.handler; import com.codahale.metrics.Timer; -import io.netty.buffer.ByteBufAllocator; +import io.netty.handler.codec.http.*; +import org.apache.tinkerpop.gremlin.driver.Tokens; import org.apache.tinkerpop.gremlin.driver.message.RequestMessage; import org.apache.tinkerpop.gremlin.driver.ser.SerializationException; +import org.apache.tinkerpop.gremlin.process.remote.traversal.DefaultRemoteTraverser; import org.apache.tinkerpop.gremlin.server.util.TextPlainMessageSerializer; import org.javatuples.Pair; -import org.javatuples.Quartet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.tinkerpop.gremlin.util.ExceptionHelper; @@ -49,33 +50,21 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelPromise; import io.netty.handler.codec.TooLongFrameException; -import io.netty.handler.codec.http.DefaultFullHttpResponse; -import io.netty.handler.codec.http.FullHttpRequest; -import io.netty.handler.codec.http.FullHttpResponse; -import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.codec.http.HttpUtil; import io.netty.util.ReferenceCountUtil; import javax.script.Bindings; import javax.script.SimpleBindings; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.UUID; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; import static com.codahale.metrics.MetricRegistry.name; -import static io.netty.handler.codec.http.HttpHeaderNames.*; import static io.netty.handler.codec.http.HttpMethod.GET; import static io.netty.handler.codec.http.HttpMethod.POST; import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED; @@ -147,32 +136,34 @@ public class HttpGremlinEndpointHandler extends ChannelInboundHandlerAdapter { return; } - final Quartet<String, Map<String, Object>, String, Map<String, String>> requestArguments; + final RequestMessage requestMessage; try { - requestArguments = HttpHandlerUtil.getRequestArguments(req); + requestMessage = HttpHandlerUtil.getRequestMessageFromHttpRequest(req); } catch (IllegalArgumentException iae) { HttpHandlerUtil.sendError(ctx, BAD_REQUEST, iae.getMessage(), keepAlive); ReferenceCountUtil.release(msg); return; } - final String acceptString = Optional.ofNullable(req.headers().get("Accept")).orElse("application/json"); - final Pair<String, MessageTextSerializer<?>> serializer = chooseSerializer(acceptString); + final String acceptMime = Optional.ofNullable(req.headers().get(HttpHeaderNames.ACCEPT)).orElse("application/json"); + final Pair<String, MessageTextSerializer<?>> serializer = chooseSerializer(acceptMime); if (null == serializer) { - HttpHandlerUtil.sendError(ctx, BAD_REQUEST, String.format("no serializer for requested Accept header: %s", acceptString), + HttpHandlerUtil.sendError(ctx, BAD_REQUEST, String.format("no serializer for requested Accept header: %s", acceptMime), keepAlive); ReferenceCountUtil.release(msg); return; } - final String origin = req.headers().get(ORIGIN); + final String origin = req.headers().get(HttpHeaderNames.ORIGIN); - // not using the req any where below here - assume it is safe to release at this point. + // not using the req anywhere below here - assume it is safe to release at this point. ReferenceCountUtil.release(msg); try { logger.debug("Processing request containing script [{}] and bindings of [{}] on {}", - requestArguments.getValue0(), requestArguments.getValue1(), Thread.currentThread().getName()); + requestMessage.getArgOrDefault(Tokens.ARGS_GREMLIN, ""), + requestMessage.getArgOrDefault(Tokens.ARGS_BINDINGS, Collections.emptyMap()), + Thread.currentThread().getName()); if (settings.enableAuditLog) { AuthenticatedUser user = ctx.channel().attr(StateKey.AUTHENTICATED_USER).get(); if (null == user) { // This is expected when using the AllowAllAuthenticator @@ -180,7 +171,8 @@ public class HttpGremlinEndpointHandler extends ChannelInboundHandlerAdapter { } String address = ctx.channel().remoteAddress().toString(); if (address.startsWith("/") && address.length() > 1) address = address.substring(1); - auditLogger.info("User {} with address {} requested: {}", user.getName(), address, requestArguments.getValue0()); + auditLogger.info("User {} with address {} requested: {}", user.getName(), address, + requestMessage.getArgOrDefault(Tokens.ARGS_GREMLIN, "")); } final ChannelPromise promise = ctx.channel().newPromise(); final AtomicReference<Object> resultHolder = new AtomicReference<>(); @@ -189,12 +181,14 @@ public class HttpGremlinEndpointHandler extends ChannelInboundHandlerAdapter { // processing of the exception if (future.isSuccess()) { logger.debug("Preparing HTTP response for request with script [{}] and bindings of [{}] with result of [{}] on [{}]", - requestArguments.getValue0(), requestArguments.getValue1(), resultHolder.get(), Thread.currentThread().getName()); + requestMessage.getArgOrDefault(Tokens.ARGS_GREMLIN, ""), + requestMessage.getArgOrDefault(Tokens.ARGS_BINDINGS, Collections.emptyMap()), + resultHolder.get(), Thread.currentThread().getName()); final FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, (ByteBuf) resultHolder.get()); - response.headers().set(CONTENT_TYPE, serializer.getValue0()); + response.headers().set(HttpHeaderNames.CONTENT_TYPE, serializer.getValue0()); // handle cors business - if (origin != null) response.headers().set(ACCESS_CONTROL_ALLOW_ORIGIN, origin); + if (origin != null) response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, origin); HttpHandlerUtil.sendAndCleanupConnection(ctx, keepAlive, response); } @@ -204,7 +198,8 @@ public class HttpGremlinEndpointHandler extends ChannelInboundHandlerAdapter { final Bindings bindings; try { - bindings = createBindings(requestArguments.getValue1(), requestArguments.getValue3()); + bindings = createBindings(requestMessage.getArgOrDefault(Tokens.ARGS_BINDINGS, Collections.emptyMap()), + requestMessage.getArgOrDefault(Tokens.ARGS_ALIASES, Collections.emptyMap())); } catch (IllegalStateException iae) { HttpHandlerUtil.sendError(ctx, BAD_REQUEST, iae.getMessage(), keepAlive); ReferenceCountUtil.release(msg); @@ -214,23 +209,33 @@ public class HttpGremlinEndpointHandler extends ChannelInboundHandlerAdapter { // provide a transform function to serialize to message - this will force serialization to occur // in the same thread as the eval. after the CompletableFuture is returned from the eval the result // is ready to be written as a ByteBuf directly to the response. nothing should be blocking here. - final CompletableFuture<Object> evalFuture = gremlinExecutor.eval(requestArguments.getValue0(), requestArguments.getValue2(), bindings, + final CompletableFuture<Object> evalFuture = gremlinExecutor.eval( + requestMessage.getArg(Tokens.ARGS_GREMLIN), requestMessage.getArg(Tokens.ARGS_LANGUAGE), bindings, FunctionUtils.wrapFunction(o -> { // stopping the timer here is roughly equivalent to where the timer would have been stopped for // this metric in other contexts. we just want to measure eval time not serialization time. timerContext.stop(); logger.debug("Transforming result of request with script [{}] and bindings of [{}] with result of [{}] on [{}]", - requestArguments.getValue0(), requestArguments.getValue1(), o, Thread.currentThread().getName()); - final ResponseMessage responseMessage = ResponseMessage.build(UUID.randomUUID()) + requestMessage.getArg(Tokens.ARGS_GREMLIN), + requestMessage.getArg(Tokens.ARGS_BINDINGS), o, Thread.currentThread().getName()); + + // need to replicate what TraversalOpProcessor does with the bytecode op. it converts + // results to Traverser so that GLVs can handle the results. don't quite get the same + // benefit here because the bulk has to be 1 since we've already resolved the result, + // but at least http is compatible + final List<Object> results = requestMessage.getProcessor().equals(Tokens.OPS_BYTECODE) ? + (List<Object>) IteratorUtils.asList(o).stream().map(r -> new DefaultRemoteTraverser<Object>(r, 1)).collect(Collectors.toList()) : + IteratorUtils.asList(o); + final ResponseMessage responseMessage = ResponseMessage.build(requestMessage.getRequestId()) .code(ResponseStatusCode.SUCCESS) - .result(IteratorUtils.asList(o)).create(); + .result(results).create(); // http server is sessionless and must handle commit on transactions. the commit occurs // before serialization to be consistent with how things work for websocket based // communication. this means that failed serialization does not mean that you won't get // a commit to the database - attemptCommit(requestArguments.getValue3(), graphManager, settings.strictTransactionManagement); + attemptCommit(requestMessage.getArg(Tokens.ARGS_ALIASES), graphManager, settings.strictTransactionManagement); try { return Unpooled.wrappedBuffer(serializer.getValue1().serializeResponseAsString(responseMessage, ctx.alloc()).getBytes(UTF8)); @@ -256,7 +261,8 @@ public class HttpGremlinEndpointHandler extends ChannelInboundHandlerAdapter { if (t.getMessage() != null) HttpHandlerUtil.sendError(ctx, INTERNAL_SERVER_ERROR, t.getMessage(), Optional.of(t), keepAlive); else - HttpHandlerUtil.sendError(ctx, INTERNAL_SERVER_ERROR, String.format("Error encountered evaluating script: %s", requestArguments.getValue0()) + HttpHandlerUtil.sendError(ctx, INTERNAL_SERVER_ERROR, String.format("Error encountered evaluating script: %s", + requestMessage.getArg(Tokens.ARGS_GREMLIN)) , Optional.of(t), keepAlive); promise.setFailure(t); return null; @@ -326,8 +332,8 @@ public class HttpGremlinEndpointHandler extends ChannelInboundHandlerAdapter { return bindings; } - private Pair<String, MessageTextSerializer<?>> chooseSerializer(final String acceptString) { - final List<Pair<String, Double>> ordered = Stream.of(acceptString.split(",")).map(mediaType -> { + private Pair<String, MessageTextSerializer<?>> chooseSerializer(final String mimeType) { + final List<Pair<String, Double>> ordered = Stream.of(mimeType.split(",")).map(mediaType -> { // parse out each mediaType with its params - keeping it simple and just looking for "quality". if // that value isn't there, default it to 1.0. not really validating here so users better get their // accept headers straight diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java index c1ae1e54ea..fc82820e8c 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java @@ -23,32 +23,23 @@ import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.http.DefaultFullHttpResponse; -import io.netty.handler.codec.http.FullHttpRequest; -import io.netty.handler.codec.http.FullHttpResponse; -import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.codec.http.HttpUtil; -import io.netty.handler.codec.http.QueryStringDecoder; +import io.netty.handler.codec.http.*; import io.netty.util.CharsetUtil; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.tinkerpop.gremlin.driver.Tokens; +import org.apache.tinkerpop.gremlin.driver.message.RequestMessage; import org.apache.tinkerpop.gremlin.server.GremlinServer; +import org.apache.tinkerpop.gremlin.server.op.standard.StandardOpProcessor; import org.apache.tinkerpop.gremlin.server.util.MetricManager; import org.apache.tinkerpop.shaded.jackson.databind.JsonNode; import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper; import org.apache.tinkerpop.shaded.jackson.databind.node.ArrayNode; import org.apache.tinkerpop.shaded.jackson.databind.node.ObjectNode; -import org.javatuples.Quartet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; import static com.codahale.metrics.MetricRegistry.name; import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE; @@ -71,7 +62,16 @@ public class HttpHandlerUtil { */ private static final ObjectMapper mapper = new ObjectMapper(); - static Quartet<String, Map<String, Object>, String, Map<String, String>> getRequestArguments(final FullHttpRequest request) { + /** + * Convert a http request into a {@link RequestMessage}. + */ + public static RequestMessage getRequestMessageFromHttpRequest(final FullHttpRequest request) { + final String contentType = Optional.ofNullable(request.headers().get(HttpHeaderNames.CONTENT_TYPE)).orElse("application/json"); + + // default is just the StandardOpProcessor which maintains compatibility with older versions which only + // processed scripts. + final RequestMessage.Builder msgBuilder = RequestMessage.build(StandardOpProcessor.OP_PROCESSOR_NAME); + if (request.method() == GET) { final QueryStringDecoder decoder = new QueryStringDecoder(request.uri()); final List<String> gremlinParms = decoder.parameters().get(Tokens.ARGS_GREMLIN); @@ -81,6 +81,11 @@ public class HttpHandlerUtil { final String script = gremlinParms.get(0); if (script.isEmpty()) throw new IllegalArgumentException("no gremlin script supplied"); + final List<String> requestIdParms = decoder.parameters().get(Tokens.REQUEST_ID); + if (requestIdParms != null && requestIdParms.size() > 0) { + msgBuilder.overrideRequestId(UUID.fromString(requestIdParms.get(0))); + } + // query string parameters - take the first instance of a key only - ignore the rest final Map<String, Object> bindings = new HashMap<>(); decoder.parameters().entrySet().stream().filter(kv -> kv.getKey().startsWith(ARGS_BINDINGS_DOT)) @@ -93,7 +98,8 @@ public class HttpHandlerUtil { final List<String> languageParms = decoder.parameters().get(Tokens.ARGS_LANGUAGE); final String language = (null == languageParms || languageParms.size() == 0) ? null : languageParms.get(0); - return Quartet.with(script, bindings, language, aliases); + return msgBuilder.addArg(Tokens.ARGS_GREMLIN, script).addArg(Tokens.ARGS_LANGUAGE, language) + .addArg(Tokens.ARGS_BINDINGS, bindings).addArg(Tokens.ARGS_ALIASES, aliases).create(); } else { final JsonNode body; try { @@ -124,7 +130,15 @@ public class HttpHandlerUtil { final JsonNode languageNode = body.get(Tokens.ARGS_LANGUAGE); final String language = null == languageNode ? null : languageNode.asText(); - return Quartet.with(scriptNode.asText(), bindings, language, aliases); + final JsonNode requestIdNode = body.get(Tokens.REQUEST_ID); + final UUID requestId = null == requestIdNode ? UUID.randomUUID() : UUID.fromString(requestIdNode.asText()); + + final JsonNode opNode = body.get("op"); + final String op = null == opNode ? "" : opNode.asText(); + + return msgBuilder.overrideRequestId(requestId).processor(op) + .addArg(Tokens.ARGS_GREMLIN, scriptNode.asText()).addArg(Tokens.ARGS_LANGUAGE, language) + .addArg(Tokens.ARGS_BINDINGS, bindings).addArg(Tokens.ARGS_ALIASES, aliases).create(); } } diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java index 72a529e646..caca8609be 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java @@ -21,6 +21,8 @@ package org.apache.tinkerpop.gremlin.server; import ch.qos.logback.classic.Level; import ch.qos.logback.classic.Logger; import nl.altindag.log.LogCaptor; +import org.apache.tinkerpop.gremlin.driver.Channelizer; +import org.apache.tinkerpop.gremlin.server.channel.HttpChannelizer; import org.apache.tinkerpop.gremlin.util.ExceptionHelper; import org.apache.tinkerpop.gremlin.TestHelper; import org.apache.tinkerpop.gremlin.driver.Client; @@ -54,7 +56,6 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; @@ -159,6 +160,9 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration final String nameOfTest = name.getMethodName(); switch (nameOfTest) { + case "shouldInterceptRequests": + settings.channelizer = HttpChannelizer.class.getName(); + break; case "shouldAliasTraversalSourceVariables": case "shouldAliasTraversalSourceVariablesInSession": try { @@ -208,11 +212,36 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration @Test public void shouldInterceptRequests() throws Exception { + final int requestsToMake = 32; + final AtomicInteger httpRequests = new AtomicInteger(0); + + final Cluster cluster = TestClientFactory.build(). + channelizer(Channelizer.HttpChannelizer.class). + requestInterceptor(r -> { + httpRequests.incrementAndGet(); + return r; + }).create(); + + try { + final Client client = cluster.connect(); + for (int ix = 0; ix < requestsToMake; ix++) { + assertEquals(ix + 1, client.submit(ix + "+1").all().get().get(0).getInt()); + } + } finally { + cluster.close(); + } + + assertEquals(requestsToMake, httpRequests.get()); + } + + @Test + public void shouldInterceptRequestsWithHandshake() throws Exception { final int requestsToMake = 32; final AtomicInteger websocketHandshakeRequests = new AtomicInteger(0); final Cluster cluster = TestClientFactory.build(). - minConnectionPoolSize(1).maxConnectionPoolSize(1).handshakeInterceptor(r -> { + minConnectionPoolSize(1).maxConnectionPoolSize(1). + handshakeInterceptor(r -> { websocketHandshakeRequests.incrementAndGet(); return r; }).create(); diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java new file mode 100644 index 0000000000..1d0fbc6fb5 --- /dev/null +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.server; + +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.tinkerpop.gremlin.driver.Channelizer; +import org.apache.tinkerpop.gremlin.driver.Client; +import org.apache.tinkerpop.gremlin.driver.Cluster; +import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection; +import org.apache.tinkerpop.gremlin.driver.ser.Serializers; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; +import org.apache.tinkerpop.gremlin.server.channel.HttpChannelizer; +import org.apache.tinkerpop.gremlin.structure.Transaction; +import org.junit.Test; + +import static org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource.traversal; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class HttpDriverIntegrateTest extends AbstractGremlinServerIntegrationTest { + + @Override + public Settings overrideSettings(final Settings settings) { + settings.channelizer = HttpChannelizer.class.getName(); + return settings; + } + + @Test + public void shouldSubmitScriptWithGraphSON() throws Exception { + final Cluster cluster = TestClientFactory.build() + .channelizer(Channelizer.HttpChannelizer.class) + .serializer(Serializers.GRAPHSON_V3D0) + .create(); + try { + final Client client = cluster.connect(); + assertEquals(2, client.submit("1+1").all().get().get(0).getInt()); + } catch (Exception ex) { + throw ex; + } finally { + cluster.close(); + } + } + + @Test + public void shouldSubmitScriptWithGraphBinary() throws Exception { + final Cluster cluster = TestClientFactory.build() + .channelizer(Channelizer.HttpChannelizer.class) + .serializer(Serializers.GRAPHBINARY_V1D0) + .create(); + try { + final Client client = cluster.connect(); + assertEquals(2, client.submit("1+1").all().get().get(0).getInt()); + } catch (Exception ex) { + throw ex; + } finally { + cluster.close(); + } + } + + @Test + public void shouldSubmitBytecodeWithGraphSON() throws Exception { + final Cluster cluster = TestClientFactory.build() + .channelizer(Channelizer.HttpChannelizer.class) + .serializer(Serializers.GRAPHSON_V3D0) + .create(); + try { + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + assertEquals("2", g.inject("2").toList().get(0)); + } catch (Exception ex) { + throw ex; + } finally { + cluster.close(); + } + } + + @Test + public void shouldSubmitBytecodeWithGraphBinary() throws Exception { + final Cluster cluster = TestClientFactory.build() + .channelizer(Channelizer.HttpChannelizer.class) + .serializer(Serializers.GRAPHBINARY_V1D0) + .create(); + try { + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + assertEquals("2", g.inject("2").toList().get(0)); + } catch (Exception ex) { + throw ex; + } finally { + cluster.close(); + } + } + + @Test + public void shouldSubmitMultipleRequestsOverSingleConnection() throws Exception { + final Cluster cluster = TestClientFactory.build() + .channelizer(Channelizer.HttpChannelizer.class) + .minConnectionPoolSize(1).maxConnectionPoolSize(1) + .serializer(Serializers.GRAPHBINARY_V1D0) + .create(); + try { + for (int ix = 0; ix < 100; ix++) { + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + assertEquals(ix, g.inject(ix).toList().get(0).intValue()); + } + } catch (Exception ex) { + throw ex; + } finally { + cluster.close(); + } + } + + @Test + public void shouldSubmitMultipleRequestsOverMultiConnection() throws Exception { + final Cluster cluster = TestClientFactory.build() + .channelizer(Channelizer.HttpChannelizer.class) + .minConnectionPoolSize(1).maxConnectionPoolSize(8) + .serializer(Serializers.GRAPHBINARY_V1D0) + .create(); + try { + for (int ix = 0; ix < 100; ix++) { + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + assertEquals(ix, g.inject(ix).toList().get(0).intValue()); + } + } catch (Exception ex) { + throw ex; + } finally { + cluster.close(); + } + } + + @Test + public void shouldFailToUseSession() throws Exception { + final Cluster cluster = TestClientFactory.build() + .channelizer(Channelizer.HttpChannelizer.class) + .serializer(Serializers.GRAPHBINARY_V1D0) + .create(); + try { + final Client client = cluster.connect("shouldFailToUseSession"); + client.submit("1+1").all().get(); + fail("Can't use session with HTTP"); + } catch (Exception ex) { + final Throwable t = ExceptionUtils.getRootCause(ex); + assertEquals("Cannot use sessions or tx() with HttpChannelizer", t.getMessage()); + } finally { + cluster.close(); + } + } + + @Test + public void shouldFailToUseTx() throws Exception { + final Cluster cluster = TestClientFactory.build() + .channelizer(Channelizer.HttpChannelizer.class) + .serializer(Serializers.GRAPHBINARY_V1D0) + .create(); + try { + final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + final Transaction tx = g.tx(); + final GraphTraversalSource gtx = tx.begin(); + gtx.inject("1").toList(); + fail("Can't use tx() with HTTP"); + } catch (Exception ex) { + final Throwable t = ExceptionUtils.getRootCause(ex); + assertEquals("Cannot use sessions or tx() with HttpChannelizer", t.getMessage()); + } finally { + cluster.close(); + } + } +}
