This is an automated email from the ASF dual-hosted git repository. valentyn pushed a commit to branch valentyn/http-java-driver in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit af665f98178a4ad05835edfa08a0600bf092d129 Author: Valentyn Kahamlyk <[email protected]> AuthorDate: Mon Apr 8 14:04:12 2024 -0700 PoC --- .../tinkerpop/gremlin/driver/Channelizer.java | 32 +-- .../apache/tinkerpop/gremlin/driver/Handler.java | 282 --------------------- .../tinkerpop/gremlin/driver/ResultQueue.java | 9 +- .../driver/handler/GremlinResponseHandler.java | 135 ++++++++++ .../driver/handler/HttpGremlinRequestEncoder.java | 11 + .../HttpGremlinResponseDebugStreamDecoder.java | 40 +++ .../driver/handler/HttpGremlinResponseDecoder.java | 1 + .../handler/HttpGremlinResponseStreamDecoder.java | 92 +++++++ .../gremlin/driver/simple/SimpleHttpClient.java | 22 +- .../gremlin/server/HttpDriverIntegrateTest.java | 61 +---- gremlin-server/src/test/resources/logback-test.xml | 2 +- 11 files changed, 325 insertions(+), 362 deletions(-) diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java index a626e0160b..fae760178d 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java @@ -18,28 +18,29 @@ */ package org.apache.tinkerpop.gremlin.driver; -import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException; -import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinRequestEncoder; -import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinResponseDecoder; -import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler; -import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinRequestEncoder; -import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinResponseDecoder; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpObjectAggregator; -import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketVersion; import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; import io.netty.handler.timeout.IdleStateHandler; - +import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException; +import org.apache.tinkerpop.gremlin.driver.handler.GremlinResponseHandler; +import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinRequestEncoder; +import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler; +import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinRequestEncoder; +import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinResponseDecoder; +import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinResponseStreamDecoder; +import org.apache.tinkerpop.gremlin.util.ser.MessageChunkSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -144,8 +145,7 @@ public interface Channelizer extends ChannelHandler { } configure(pipeline); - pipeline.addLast(PIPELINE_GREMLIN_SASL_HANDLER, new Handler.GremlinSaslAuthenticationHandler(cluster.authProperties())); - pipeline.addLast(PIPELINE_GREMLIN_HANDLER, new Handler.GremlinResponseHandler(pending)); + pipeline.addLast(PIPELINE_GREMLIN_HANDLER, new GremlinResponseHandler(pending)); } } @@ -250,15 +250,15 @@ public interface Channelizer extends ChannelHandler { /** * Sends requests over the HTTP endpoint. Client functionality is governed by the limitations of the HTTP endpoint, * meaning that sessions are not available and as such {@code tx()} (i.e. transactions) are not available over this - * channelizer. Only sessionless requests are possible. Some driver configuration options may not be relevant when - * using HTTP, such as {@link Tokens#ARGS_BATCH_SIZE} since HTTP does not stream results back in that fashion. + * channelizer. Only sessionless requests are possible. */ public final class HttpChannelizer extends AbstractChannelizer { private HttpClientCodec handler; private HttpGremlinRequestEncoder gremlinRequestEncoder; - private HttpGremlinResponseDecoder gremlinResponseDecoder; + // private HttpGremlinResponseDecoder gremlinResponseDecoder; + private HttpGremlinResponseStreamDecoder gremlinResponseDecoder; @Override public void init(final Connection connection) { @@ -269,7 +269,7 @@ public interface Channelizer extends ChannelHandler { throw new IllegalStateException(String.format("Cannot use sessions or tx() with %s", HttpChannelizer.class.getSimpleName())); gremlinRequestEncoder = new HttpGremlinRequestEncoder(cluster.getSerializer(), cluster.getRequestInterceptor(), cluster.isUserAgentOnConnectEnabled()); - gremlinResponseDecoder = new HttpGremlinResponseDecoder(cluster.getSerializer()); + gremlinResponseDecoder = new HttpGremlinResponseStreamDecoder((MessageChunkSerializer<?>) cluster.getSerializer()); } @Override @@ -296,7 +296,7 @@ public interface Channelizer extends ChannelHandler { handler = new HttpClientCodec(); pipeline.addLast("http-codec", handler); - pipeline.addLast("aggregator", new HttpObjectAggregator(maxContentLength)); + // pipeline.addLast("aggregator", new HttpObjectAggregator(maxContentLength)); pipeline.addLast("gremlin-encoder", gremlinRequestEncoder); pipeline.addLast("gremlin-decoder", gremlinResponseDecoder); } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java deleted file mode 100644 index c76a20e6a7..0000000000 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Handler.java +++ /dev/null @@ -1,282 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.tinkerpop.gremlin.driver; - -import io.netty.util.AttributeMap; -import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.tinkerpop.gremlin.driver.exception.ResponseException; -import org.apache.tinkerpop.gremlin.util.Tokens; -import org.apache.tinkerpop.gremlin.util.message.RequestMessage; -import org.apache.tinkerpop.gremlin.util.message.ResponseMessage; -import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.util.Attribute; -import io.netty.util.AttributeKey; -import io.netty.util.ReferenceCountUtil; -import org.apache.tinkerpop.gremlin.util.ser.SerializationException; -import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.InetSocketAddress; -import java.security.PrivilegedExceptionAction; -import java.security.PrivilegedActionException; -import java.util.Base64; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentMap; - -import javax.security.auth.Subject; -import javax.security.auth.callback.Callback; -import javax.security.auth.callback.CallbackHandler; -import javax.security.auth.callback.NameCallback; -import javax.security.auth.callback.PasswordCallback; -import javax.security.auth.login.LoginContext; -import javax.security.auth.login.LoginException; -import javax.security.sasl.Sasl; -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslException; - -/** - * Holder for internal handler classes used in constructing the channel pipeline. - * - * @author Stephen Mallette (http://stephen.genoprime.com) - */ -final class Handler { - - /** - * Generic SASL handler that will authenticate against the gremlin server. - */ - static class GremlinSaslAuthenticationHandler extends SimpleChannelInboundHandler<ResponseMessage> implements CallbackHandler { - private static final Logger logger = LoggerFactory.getLogger(GremlinSaslAuthenticationHandler.class); - private static final AttributeKey<Subject> subjectKey = AttributeKey.valueOf("subject"); - private static final AttributeKey<SaslClient> saslClientKey = AttributeKey.valueOf("saslclient"); - private static final Map<String, String> SASL_PROPERTIES = new HashMap<String, String>() {{ put(Sasl.SERVER_AUTH, "true"); }}; - private static final byte[] NULL_CHALLENGE = new byte[0]; - - private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder(); - private static final Base64.Decoder BASE64_DECODER = Base64.getDecoder(); - - private final AuthProperties authProps; - - public GremlinSaslAuthenticationHandler(final AuthProperties authProps) { - this.authProps = authProps; - } - - @Override - protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final ResponseMessage response) throws Exception { - // We are only interested in AUTHENTICATE responses here. Everything else can - // get passed down the pipeline - if (response.getStatus().getCode() == ResponseStatusCode.AUTHENTICATE) { - final Attribute<SaslClient> saslClient = ((AttributeMap) channelHandlerContext).attr(saslClientKey); - final Attribute<Subject> subject = ((AttributeMap) channelHandlerContext).attr(subjectKey); - final RequestMessage.Builder messageBuilder = RequestMessage.build(Tokens.OPS_AUTHENTICATION); - // First time through we don't have a sasl client - if (saslClient.get() == null) { - subject.set(login()); - try { - saslClient.set(saslClient(getHostName(channelHandlerContext))); - } catch (SaslException saslException) { - // push the sasl error into a failure response from the server. this ensures that standard - // processing for the ResultQueue is kept. without this SaslException trap and subsequent - // conversion to an authentication failure, the close() of the connection might not - // succeed as it will appear as though pending messages remain present in the queue on the - // connection and the shutdown won't proceed - final ResponseMessage clientSideError = ResponseMessage.build(response.getRequestId()) - .code(ResponseStatusCode.FORBIDDEN).statusMessage(saslException.getMessage()).create(); - channelHandlerContext.fireChannelRead(clientSideError); - return; - } - - messageBuilder.addArg(Tokens.ARGS_SASL_MECHANISM, getMechanism()); - messageBuilder.addArg(Tokens.ARGS_SASL, saslClient.get().hasInitialResponse() ? - BASE64_ENCODER.encodeToString(evaluateChallenge(subject, saslClient, NULL_CHALLENGE)) : null); - } else { - // the server sends base64 encoded sasl as well as the byte array. the byte array will eventually be - // phased out, but is present now for backward compatibility in 3.2.x - final String base64sasl = response.getStatus().getAttributes().containsKey(Tokens.ARGS_SASL) ? - response.getStatus().getAttributes().get(Tokens.ARGS_SASL).toString() : - BASE64_ENCODER.encodeToString((byte[]) response.getResult().getData()); - - messageBuilder.addArg(Tokens.ARGS_SASL, BASE64_ENCODER.encodeToString( - evaluateChallenge(subject, saslClient, BASE64_DECODER.decode(base64sasl)))); - } - channelHandlerContext.writeAndFlush(messageBuilder.create()); - } else { - // SimpleChannelInboundHandler will release the frame if we don't retain it explicitly. - ReferenceCountUtil.retain(response); - channelHandlerContext.fireChannelRead(response); - } - } - - public void handle(final Callback[] callbacks) { - for (Callback callback : callbacks) { - if (callback instanceof NameCallback) { - if (authProps.get(AuthProperties.Property.USERNAME) != null) { - ((NameCallback)callback).setName(authProps.get(AuthProperties.Property.USERNAME)); - } - } else if (callback instanceof PasswordCallback) { - if (authProps.get(AuthProperties.Property.PASSWORD) != null) { - ((PasswordCallback)callback).setPassword(authProps.get(AuthProperties.Property.PASSWORD).toCharArray()); - } - } else { - logger.warn("SASL handler got a callback of type " + callback.getClass().getCanonicalName()); - } - } - } - - private byte[] evaluateChallenge(final Attribute<Subject> subject, final Attribute<SaslClient> saslClient, - final byte[] challenge) throws SaslException { - - if (subject.get() == null) { - return saslClient.get().evaluateChallenge(challenge); - } else { - // If we have a subject then run this as a privileged action using the subject - try { - return Subject.doAs(subject.get(), (PrivilegedExceptionAction<byte[]>) () -> saslClient.get().evaluateChallenge(challenge)); - } catch (PrivilegedActionException e) { - throw (SaslException)e.getException(); - } - } - } - - private Subject login() throws LoginException { - // Login if the user provided us with an entry into the JAAS config file - if (authProps.get(AuthProperties.Property.JAAS_ENTRY) != null) { - final LoginContext login = new LoginContext(authProps.get(AuthProperties.Property.JAAS_ENTRY)); - login.login(); - return login.getSubject(); - } - return null; - } - - private SaslClient saslClient(final String hostname) throws SaslException { - return Sasl.createSaslClient(new String[] { getMechanism() }, null, authProps.get(AuthProperties.Property.PROTOCOL), - hostname, SASL_PROPERTIES, this); - } - - private String getHostName(final ChannelHandlerContext channelHandlerContext) { - return ((InetSocketAddress)channelHandlerContext.channel().remoteAddress()).getAddress().getHostName(); - } - - /** - * Work out the Sasl mechanism based on the user supplied parameters. - * If we have a username and password use PLAIN otherwise GSSAPI - * ToDo: have gremlin-server provide the mechanism(s) it is configured with, so that additional mechanisms can - * be supported in the driver and confusing GSSException messages from the driver are avoided - */ - private String getMechanism() { - if ((authProps.get(AuthProperties.Property.USERNAME) != null) && - (authProps.get(AuthProperties.Property.PASSWORD) != null)) { - return "PLAIN"; - } else { - return "GSSAPI"; - } - } - } - - /** - * Takes a map of requests pending responses and writes responses to the {@link ResultQueue} of a request - * as the {@link ResponseMessage} objects are deserialized. - */ - static class GremlinResponseHandler extends SimpleChannelInboundHandler<ResponseMessage> { - private static final Logger logger = LoggerFactory.getLogger(GremlinResponseHandler.class); - private final ConcurrentMap<UUID, ResultQueue> pending; - - public GremlinResponseHandler(final ConcurrentMap<UUID, ResultQueue> pending) { - this.pending = pending; - } - - @Override - public void channelInactive(final ChannelHandlerContext ctx) throws Exception { - // occurs when the server shuts down in a disorderly fashion, otherwise in an orderly shutdown the server - // should fire off a close message which will properly release the driver. - super.channelInactive(ctx); - - // the channel isn't going to get anymore results as it is closed so release all pending requests - pending.values().forEach(val -> val.markError(new IllegalStateException("Connection to server is no longer active"))); - pending.clear(); - } - - @Override - protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final ResponseMessage response) throws Exception { - final ResponseStatusCode statusCode = response.getStatus().getCode(); - final ResultQueue queue = pending.get(response.getRequestId()); - if (statusCode == ResponseStatusCode.SUCCESS || statusCode == ResponseStatusCode.PARTIAL_CONTENT) { - final Object data = response.getResult().getData(); - - // this is a "result" from the server which is either the result of a script or a - // serialized traversal - if (data instanceof List) { - // unrolls the collection into individual results to be handled by the queue. - final List<Object> listToUnroll = (List<Object>) data; - listToUnroll.forEach(item -> queue.add(new Result(item))); - } else { - // since this is not a list it can just be added to the queue - queue.add(new Result(response.getResult().getData())); - } - } else { - // this is a "success" but represents no results otherwise it is an error - if (statusCode != ResponseStatusCode.NO_CONTENT) { - final Map<String,Object> attributes = response.getStatus().getAttributes(); - final String stackTrace = attributes.containsKey(Tokens.STATUS_ATTRIBUTE_STACK_TRACE) ? - (String) attributes.get(Tokens.STATUS_ATTRIBUTE_STACK_TRACE) : null; - final List<String> exceptions = attributes.containsKey(Tokens.STATUS_ATTRIBUTE_EXCEPTIONS) ? - (List<String>) attributes.get(Tokens.STATUS_ATTRIBUTE_EXCEPTIONS) : null; - queue.markError(new ResponseException(response.getStatus().getCode(), response.getStatus().getMessage(), - exceptions, stackTrace, cleanStatusAttributes(attributes))); - } - } - - // as this is a non-PARTIAL_CONTENT code - the stream is done. - if (statusCode != ResponseStatusCode.PARTIAL_CONTENT) { - pending.remove(response.getRequestId()).markComplete(response.getStatus().getAttributes()); - } - } - - @Override - public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception { - // if this happens enough times (like the client is unable to deserialize a response) the pending - // messages queue will not clear. wonder if there is some way to cope with that. of course, if - // there are that many failures someone would take notice and hopefully stop the client. - logger.error("Could not process the response", cause); - - // the channel took an error because of something pretty bad so release all the futures out there - pending.values().forEach(val -> val.markError(cause)); - pending.clear(); - - // serialization exceptions should not close the channel - that's worth a retry - if (!IteratorUtils.anyMatch(ExceptionUtils.getThrowableList(cause).iterator(), t -> t instanceof SerializationException)) - if (ctx.channel().isActive()) ctx.close(); - } - - private Map<String,Object> cleanStatusAttributes(final Map<String,Object> statusAttributes) { - final Map<String,Object> m = new HashMap<>(); - statusAttributes.forEach((k,v) -> { - if (!k.equals(Tokens.STATUS_ATTRIBUTE_EXCEPTIONS) && !k.equals(Tokens.STATUS_ATTRIBUTE_STACK_TRACE)) - m.put(k,v); - }); - return m; - } - } - -} diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java index 8ce4fba21c..c4e827503d 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultQueue.java @@ -18,6 +18,7 @@ */ package org.apache.tinkerpop.gremlin.driver; +import org.apache.tinkerpop.gremlin.driver.handler.GremlinResponseHandler; import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.javatuples.Pair; @@ -33,13 +34,13 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicReference; /** - * A queue of incoming {@link Result} objects. The queue is updated by the {@link Handler.GremlinResponseHandler} + * A queue of incoming {@link Result} objects. The queue is updated by the {@link GremlinResponseHandler} * until a response terminator is identified. * * @author Stephen Mallette (http://stephen.genoprime.com) */ @SuppressWarnings("ThrowableResultOfMethodCallIgnored") -final class ResultQueue { +public final class ResultQueue { private final LinkedBlockingQueue<Result> resultLinkedBlockingQueue; @@ -104,7 +105,7 @@ final class ResultQueue { resultLinkedBlockingQueue.drainTo(collection); } - void markComplete(final Map<String,Object> statusAttributes) { + public void markComplete(final Map<String, Object> statusAttributes) { // if there was some aggregation performed in the queue then the full object is hanging out waiting to be // added to the ResultSet if (aggregatedResult != null) @@ -117,7 +118,7 @@ final class ResultQueue { this.drainAllWaiting(); } - void markError(final Throwable throwable) { + public void markError(final Throwable throwable) { error.set(throwable); this.readComplete.completeExceptionally(throwable); this.drainAllWaiting(); diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java new file mode 100644 index 0000000000..daa9384b1c --- /dev/null +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver.handler; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.util.AttributeMap; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.tinkerpop.gremlin.driver.Result; +import org.apache.tinkerpop.gremlin.driver.ResultQueue; +import org.apache.tinkerpop.gremlin.driver.exception.ResponseException; +import org.apache.tinkerpop.gremlin.util.Tokens; +import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; +import org.apache.tinkerpop.gremlin.util.message.ResponseMessage; +import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode; +import org.apache.tinkerpop.gremlin.util.ser.SerializationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; + +import static org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinRequestEncoder.REQUEST_ID; + +/** + * Takes a map of requests pending responses and writes responses to the {@link ResultQueue} of a request + * as the {@link ResponseMessage} objects are deserialized. + */ +public class GremlinResponseHandler extends SimpleChannelInboundHandler<ResponseMessage> { + private static final Logger logger = LoggerFactory.getLogger(GremlinResponseHandler.class); + private final ConcurrentMap<UUID, ResultQueue> pending; + + public GremlinResponseHandler(final ConcurrentMap<UUID, ResultQueue> pending) { + this.pending = pending; + } + + @Override + public void channelInactive(final ChannelHandlerContext ctx) throws Exception { + // occurs when the server shuts down in a disorderly fashion, otherwise in an orderly shutdown the server + // should fire off a close message which will properly release the driver. + super.channelInactive(ctx); + + // the channel isn't going to get anymore results as it is closed so release all pending requests + pending.values().forEach(val -> val.markError(new IllegalStateException("Connection to server is no longer active"))); + pending.clear(); + } + + @Override + protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final ResponseMessage response) throws Exception { + final UUID requestId = ((AttributeMap) channelHandlerContext).attr(REQUEST_ID).get(); + + final ResponseStatusCode statusCode = response.getStatus() == null ? ResponseStatusCode.PARTIAL_CONTENT : response.getStatus().getCode(); + final ResultQueue queue = pending.get(requestId); + System.out.println("Handler.GremlinResponseHandler get requestId: " + requestId); + if (response.getResult().getData() != null) { + System.out.println("Handler.GremlinResponseHandler payload size: " + ((List) response.getResult().getData()).size()); + } + + if (statusCode == ResponseStatusCode.SUCCESS || statusCode == ResponseStatusCode.PARTIAL_CONTENT) { + final Object data = response.getResult().getData(); + + // this is a "result" from the server which is either the result of a script or a + // serialized traversal + if (data instanceof List) { + // unrolls the collection into individual results to be handled by the queue. + final List<Object> listToUnroll = (List<Object>) data; + listToUnroll.forEach(item -> queue.add(new Result(item))); + } else { + // since this is not a list it can just be added to the queue + queue.add(new Result(response.getResult().getData())); + } + } else { + // this is a "success" but represents no results otherwise it is an error + if (statusCode != ResponseStatusCode.NO_CONTENT) { + final Map<String, Object> attributes = response.getStatus().getAttributes(); + final String stackTrace = attributes.containsKey(Tokens.STATUS_ATTRIBUTE_STACK_TRACE) ? + (String) attributes.get(Tokens.STATUS_ATTRIBUTE_STACK_TRACE) : null; + final List<String> exceptions = attributes.containsKey(Tokens.STATUS_ATTRIBUTE_EXCEPTIONS) ? + (List<String>) attributes.get(Tokens.STATUS_ATTRIBUTE_EXCEPTIONS) : null; + queue.markError(new ResponseException(response.getStatus().getCode(), response.getStatus().getMessage(), + exceptions, stackTrace, cleanStatusAttributes(attributes))); + } + } + + // todo: + // as this is a non-PARTIAL_CONTENT code - the stream is done. + if (statusCode != ResponseStatusCode.PARTIAL_CONTENT) { + pending.remove(requestId).markComplete(response.getStatus().getAttributes()); + } + } + + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception { + // if this happens enough times (like the client is unable to deserialize a response) the pending + // messages queue will not clear. wonder if there is some way to cope with that. of course, if + // there are that many failures someone would take notice and hopefully stop the client. + logger.error("Could not process the response", cause); + + // the channel took an error because of something pretty bad so release all the futures out there + pending.values().forEach(val -> val.markError(cause)); + pending.clear(); + + // serialization exceptions should not close the channel - that's worth a retry + if (!IteratorUtils.anyMatch(ExceptionUtils.getThrowableList(cause).iterator(), t -> t instanceof SerializationException)) + if (ctx.channel().isActive()) ctx.close(); + } + + private Map<String, Object> cleanStatusAttributes(final Map<String, Object> statusAttributes) { + final Map<String, Object> m = new HashMap<>(); + statusAttributes.forEach((k, v) -> { + if (!k.equals(Tokens.STATUS_ATTRIBUTE_EXCEPTIONS) && !k.equals(Tokens.STATUS_ATTRIBUTE_STACK_TRACE)) + m.put(k, v); + }); + return m; + } +} diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java index 06377e31be..fc220c6819 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java @@ -27,6 +27,9 @@ import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpVersion; +import io.netty.util.Attribute; +import io.netty.util.AttributeKey; +import io.netty.util.AttributeMap; import org.apache.tinkerpop.gremlin.driver.UserAgent; import org.apache.tinkerpop.gremlin.driver.exception.ResponseException; import org.apache.tinkerpop.gremlin.process.traversal.Bytecode; @@ -37,6 +40,7 @@ import org.apache.tinkerpop.gremlin.util.ser.MessageTextSerializerV4; import org.apache.tinkerpop.gremlin.util.ser.SerTokens; import java.util.List; +import java.util.UUID; import java.util.function.UnaryOperator; /** @@ -44,6 +48,9 @@ import java.util.function.UnaryOperator; */ @ChannelHandler.Sharable public final class HttpGremlinRequestEncoder extends MessageToMessageEncoder<RequestMessageV4> { + + //todo: move + public static final AttributeKey<UUID> REQUEST_ID = AttributeKey.valueOf("requestId"); private final MessageSerializer<?> serializer; private final boolean userAgentEnabled; private final UnaryOperator<FullHttpRequest> interceptor; @@ -63,6 +70,10 @@ public final class HttpGremlinRequestEncoder extends MessageToMessageEncoder<Req @Override protected void encode(final ChannelHandlerContext channelHandlerContext, final RequestMessageV4 requestMessage, final List<Object> objects) throws Exception { + final Attribute<UUID> requestIdAttribute = ((AttributeMap) channelHandlerContext).attr(REQUEST_ID); + requestIdAttribute.set(requestMessage.getRequestId()); + System.out.println("HttpGremlinRequestEncoder set requestId: " + requestIdAttribute.get()); + final String mimeType = serializer.mimeTypesSupported()[0]; // only GraphSON3 and GraphBinary recommended for serialization of Bytecode requests if (requestMessage.getArg("gremlin") instanceof Bytecode && diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseDebugStreamDecoder.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseDebugStreamDecoder.java new file mode 100644 index 0000000000..35dc49ef30 --- /dev/null +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseDebugStreamDecoder.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver.handler; + +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; +import org.apache.tinkerpop.gremlin.util.message.ResponseMessage; + +import java.util.List; + +/** + * Converts {@code HttpResponse} to a {@link ResponseMessage}. + */ [email protected] +public final class HttpGremlinResponseDebugStreamDecoder extends MessageToMessageDecoder<ResponseMessage> { + public HttpGremlinResponseDebugStreamDecoder() {} + + @Override + protected void decode(final ChannelHandlerContext channelHandlerContext, final ResponseMessage response, final List<Object> objects) throws Exception { + System.out.println("HttpGremlinResponseStreamDecoder: "); + System.out.println(response.getResult()); + } +} diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseDecoder.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseDecoder.java index 9b46db27ee..4d7d452239 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseDecoder.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseDecoder.java @@ -37,6 +37,7 @@ import java.util.UUID; /** * Converts {@code HttpResponse} to a {@link ResponseMessage}. + * Can be used for GraphSON */ @ChannelHandler.Sharable public final class HttpGremlinResponseDecoder extends MessageToMessageDecoder<FullHttpResponse> { diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java new file mode 100644 index 0000000000..1306f7fff0 --- /dev/null +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver.handler; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.handler.codec.http.DefaultHttpContent; +import io.netty.handler.codec.http.DefaultHttpObject; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.DefaultLastHttpContent; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.util.Attribute; +import io.netty.util.AttributeKey; +import io.netty.util.AttributeMap; +import org.apache.tinkerpop.gremlin.util.message.ResponseMessage; +import org.apache.tinkerpop.gremlin.util.ser.MessageChunkSerializer; +import org.apache.tinkerpop.gremlin.util.ser.SerializationException; + +import java.util.List; +import java.util.Objects; + +public class HttpGremlinResponseStreamDecoder extends MessageToMessageDecoder<DefaultHttpObject> { + + // todo: move out + public static final AttributeKey<Boolean> IS_FIRST_CHUNK = AttributeKey.valueOf("isFirstChunk"); + + private final MessageChunkSerializer<?> serializer; + + public HttpGremlinResponseStreamDecoder(MessageChunkSerializer<?> serializer) { + + this.serializer = serializer; + } + + @Override + protected void decode(ChannelHandlerContext ctx, DefaultHttpObject msg, List<Object> out) throws Exception { + final Attribute<Boolean> isFirstChunk = ((AttributeMap) ctx).attr(IS_FIRST_CHUNK); + + if (msg instanceof DefaultHttpResponse) { + isFirstChunk.set(true); + } + + if (msg instanceof DefaultHttpContent) { + try { + if (ctx.attr(IS_FIRST_CHUNK).get()) { + System.out.println("first chunk"); + } else { + System.out.println("not first chunk"); + } + + final ResponseMessage chunk = serializer.readChunk(((DefaultHttpContent) msg).content(), isFirstChunk.get()); + + if (chunk.getResult().getData() != null) { + System.out.println("payload size: " + ((List) chunk.getResult().getData()).size()); + } + + if (msg instanceof DefaultLastHttpContent) { + final HttpHeaders trailingHeaders = ((DefaultLastHttpContent) msg).trailingHeaders(); + + System.out.println("final chunk, trailing headers:"); + System.out.println(trailingHeaders); + + if (!Objects.equals(trailingHeaders.get("code"), "200")) { + throw new Exception(trailingHeaders.get("message")); + } + } + + isFirstChunk.set(false); + + out.add(chunk); + } catch (SerializationException e) { + System.out.println("Ex: " + e.getMessage()); + throw new RuntimeException(e); + } + } + } +} diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleHttpClient.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleHttpClient.java index fd97fa3d6e..8a37dfacb5 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleHttpClient.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleHttpClient.java @@ -20,17 +20,13 @@ package org.apache.tinkerpop.gremlin.driver.simple; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelOption; -import io.netty.handler.codec.http.EmptyHttpHeaders; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import org.apache.tinkerpop.gremlin.driver.HandshakeInterceptor; +import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinResponseStreamDecoder; import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinRequestEncoder; -import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinResponseDecoder; -import org.apache.tinkerpop.gremlin.util.MessageSerializer; -import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler; -import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinRequestEncoder; -import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinResponseDecoder; +import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinResponseDebugStreamDecoder; import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; @@ -39,11 +35,9 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.HttpClientCodec; -import io.netty.handler.codec.http.HttpObjectAggregator; -import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory; -import io.netty.handler.codec.http.websocketx.WebSocketVersion; import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4; import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryMapper; +import org.apache.tinkerpop.gremlin.util.ser.MessageChunkSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,7 +92,7 @@ public class SimpleHttpClient extends AbstractClient { // final WebSocketClientHandler wsHandler = new WebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker( // uri, WebSocketVersion.V13, null, true, EmptyHttpHeaders.INSTANCE, 65536), 10000, false); - final MessageSerializer<GraphBinaryMapper> serializer = new GraphBinaryMessageSerializerV4(); + final MessageChunkSerializer<GraphBinaryMapper> serializer = new GraphBinaryMessageSerializerV4(); b.channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override @@ -109,12 +103,16 @@ public class SimpleHttpClient extends AbstractClient { } p.addLast( new HttpClientCodec(), - new HttpObjectAggregator(65536), + new HttpGremlinResponseStreamDecoder(serializer), +// new HttpObjectAggregator(65536), + // wsHandler, // new WebSocketGremlinRequestEncoder(true, serializer), // new WebSocketGremlinResponseDecoder(serializer), new HttpGremlinRequestEncoder(serializer, HandshakeInterceptor.NO_OP, false), - new HttpGremlinResponseDecoder(serializer), + // new HttpGremlinResponseDecoder(serializer), + new HttpGremlinResponseDebugStreamDecoder(), + callbackResponseHandler); } }); diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java index 2d08b7722f..6c036da7e5 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java @@ -30,6 +30,8 @@ import org.apache.tinkerpop.gremlin.structure.Transaction; import org.apache.tinkerpop.gremlin.util.ser.Serializers; import org.junit.Test; +import java.util.List; + import static org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource.traversal; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.AllOf.allOf; @@ -49,7 +51,7 @@ public class HttpDriverIntegrateTest extends AbstractGremlinServerIntegrationTes public void shouldSubmitScriptWithGraphSON() throws Exception { final Cluster cluster = TestClientFactory.build() .channelizer(Channelizer.HttpChannelizer.class) - .serializer(Serializers.GRAPHSON_V3) + .serializer(Serializers.GRAPHSON_V4) .create(); try { final Client client = cluster.connect(); @@ -65,11 +67,12 @@ public class HttpDriverIntegrateTest extends AbstractGremlinServerIntegrationTes public void shouldSubmitScriptWithGraphBinary() throws Exception { final Cluster cluster = TestClientFactory.build() .channelizer(Channelizer.HttpChannelizer.class) - .serializer(Serializers.GRAPHBINARY_V1) + .serializer(Serializers.GRAPHBINARY_V4) .create(); try { final Client client = cluster.connect(); - assertEquals(2, client.submit("1+1").all().get().get(0).getInt()); + // default chunk size is 64 + assertEquals(100, client.submit("new int[100]").all().get().size()); } catch (Exception ex) { throw ex; } finally { @@ -81,7 +84,7 @@ public class HttpDriverIntegrateTest extends AbstractGremlinServerIntegrationTes public void shouldSubmitBytecodeWithGraphSON() throws Exception { final Cluster cluster = TestClientFactory.build() .channelizer(Channelizer.HttpChannelizer.class) - .serializer(Serializers.GRAPHSON_V3) + .serializer(Serializers.GRAPHSON_V4) .create(); try { final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); @@ -98,7 +101,7 @@ public class HttpDriverIntegrateTest extends AbstractGremlinServerIntegrationTes public void shouldGetErrorForBytecodeWithUntypedGraphSON() throws Exception { final Cluster cluster = TestClientFactory.build() .channelizer(Channelizer.HttpChannelizer.class) - .serializer(Serializers.GRAPHSON_V2_UNTYPED) + .serializer(Serializers.GRAPHSON_V4_UNTYPED) .create(); try { final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); @@ -116,7 +119,7 @@ public class HttpDriverIntegrateTest extends AbstractGremlinServerIntegrationTes public void shouldSubmitBytecodeWithGraphBinary() throws Exception { final Cluster cluster = TestClientFactory.build() .channelizer(Channelizer.HttpChannelizer.class) - .serializer(Serializers.GRAPHBINARY_V1) + .serializer(Serializers.GRAPHBINARY_V4) .create(); try { final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); @@ -129,44 +132,6 @@ public class HttpDriverIntegrateTest extends AbstractGremlinServerIntegrationTes } } - @Test - public void shouldSubmitMultipleRequestsOverSingleConnection() throws Exception { - final Cluster cluster = TestClientFactory.build() - .channelizer(Channelizer.HttpChannelizer.class) - .minConnectionPoolSize(1).maxConnectionPoolSize(1) - .serializer(Serializers.GRAPHBINARY_V1) - .create(); - try { - for (int ix = 0; ix < 100; ix++) { - final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); - assertEquals(ix, g.inject(ix).toList().get(0).intValue()); - } - } catch (Exception ex) { - throw ex; - } finally { - cluster.close(); - } - } - - @Test - public void shouldSubmitMultipleRequestsOverMultiConnection() throws Exception { - final Cluster cluster = TestClientFactory.build() - .channelizer(Channelizer.HttpChannelizer.class) - .minConnectionPoolSize(1).maxConnectionPoolSize(8) - .serializer(Serializers.GRAPHBINARY_V1) - .create(); - try { - for (int ix = 0; ix < 100; ix++) { - final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); - assertEquals(ix, g.inject(ix).toList().get(0).intValue()); - } - } catch (Exception ex) { - throw ex; - } finally { - cluster.close(); - } - } - @Test public void shouldFailToUseSession() throws Exception { final Cluster cluster = TestClientFactory.build() @@ -205,28 +170,30 @@ public class HttpDriverIntegrateTest extends AbstractGremlinServerIntegrationTes } } + // !!! @Test public void shouldDeserializeErrorWithGraphBinary() throws Exception { final Cluster cluster = TestClientFactory.build() .channelizer(Channelizer.HttpChannelizer.class) - .serializer(Serializers.GRAPHBINARY_V1) + .serializer(Serializers.GRAPHBINARY_V4) .create(); try { final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster, "doesNotExist")); g.V().next(); fail("Expected exception to be thrown."); } catch (Exception ex) { - assert ex.getMessage().contains("Could not rebind"); + assert ex.getMessage().contains("The traversal source [doesNotExist] for alias [g] is not configured on the server."); } finally { cluster.close(); } } + // !!! @Test public void shouldDeserializeErrorWithGraphSON() throws Exception { final Cluster cluster = TestClientFactory.build() .channelizer(Channelizer.HttpChannelizer.class) - .serializer(Serializers.GRAPHSON_V3) + .serializer(Serializers.GRAPHSON_V4) .create(); try { final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster, "doesNotExist")); diff --git a/gremlin-server/src/test/resources/logback-test.xml b/gremlin-server/src/test/resources/logback-test.xml index 354e97cb98..647f9674e9 100644 --- a/gremlin-server/src/test/resources/logback-test.xml +++ b/gremlin-server/src/test/resources/logback-test.xml @@ -25,7 +25,7 @@ limitations under the License. <logger name="org.apache.tinkerpop.gremlin.server.AbstractChannelizer" level="ERROR"/> <!-- this logger is noisy and we don't assert anything and the error is already tracked on the server so we can trim the logs a bit with this. --> - <logger name="org.apache.tinkerpop.gremlin.driver.Handler$GremlinResponseHandler" level="OFF"/> + <logger name="org.apache.tinkerpop.gremlin.driver.handler.GremlinResponseHandler" level="OFF"/> <root level="WARN"> <appender-ref ref="stdout"/> </root>
