This is an automated email from the ASF dual-hosted git repository. kenhuuu pushed a commit to branch master-http in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 82bfa9e7e4e67e13d90c3dba314684085c721c75 Author: Ken Hu <[email protected]> AuthorDate: Sat Jun 8 10:53:04 2024 -0700 Re-enable and fix shouldBlowTheWorkQueueSize test CTR. Server error handling was incorrect in some cases as the initial HttpResponse is never sent if the failure happened before the task is submitted. The driver didn't properly handle errors that were serialized in GraphBinary. --- .../handler/HttpGremlinResponseStreamDecoder.java | 15 +++-- .../server/handler/HttpGremlinEndpointHandler.java | 2 + .../gremlin/server/handler/HttpHandlerUtil.java | 15 +++++ .../tinkerpop/gremlin/server/handler/StateKey.java | 5 ++ .../gremlin/server/GremlinServerIntegrateTest.java | 69 +++++++++++----------- 5 files changed, 65 insertions(+), 41 deletions(-) diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java index d38220a4ca..d271e0dc0d 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java @@ -24,6 +24,7 @@ import io.netty.handler.codec.MessageToMessageDecoder; import io.netty.handler.codec.TooLongFrameException; import io.netty.handler.codec.http.DefaultHttpObject; import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponseStatus; @@ -34,7 +35,9 @@ import io.netty.util.AttributeMap; import io.netty.util.CharsetUtil; import org.apache.tinkerpop.gremlin.util.MessageSerializerV4; import org.apache.tinkerpop.gremlin.util.message.ResponseMessageV4; +import org.apache.tinkerpop.gremlin.util.ser.SerTokensV4; import org.apache.tinkerpop.gremlin.util.ser.SerializationException; +import org.apache.tinkerpop.gremlin.util.ser.SerializersV4; import org.apache.tinkerpop.shaded.jackson.databind.JsonNode; import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper; @@ -45,6 +48,7 @@ public class HttpGremlinResponseStreamDecoder extends MessageToMessageDecoder<De private static final AttributeKey<Boolean> IS_FIRST_CHUNK = AttributeKey.valueOf("isFirstChunk"); private static final AttributeKey<HttpResponseStatus> RESPONSE_STATUS = AttributeKey.valueOf("responseStatus"); + private static final AttributeKey<String> RESPONSE_ENCODING = AttributeKey.valueOf("responseSerializer"); private static final AttributeKey<Integer> BYTES_READ = AttributeKey.valueOf("bytesRead"); private final MessageSerializerV4<?> serializer; @@ -60,14 +64,14 @@ public class HttpGremlinResponseStreamDecoder extends MessageToMessageDecoder<De protected void decode(ChannelHandlerContext ctx, DefaultHttpObject msg, List<Object> out) throws Exception { final Attribute<Boolean> isFirstChunk = ((AttributeMap) ctx).attr(IS_FIRST_CHUNK); final Attribute<HttpResponseStatus> responseStatus = ((AttributeMap) ctx).attr(RESPONSE_STATUS); + final Attribute<String> responseEncoding = ((AttributeMap) ctx).attr(RESPONSE_ENCODING); if (msg instanceof HttpResponse) { ctx.channel().attr(BYTES_READ).set(0); - responseStatus.set(((HttpResponse) msg).status()); - if (isError(((HttpResponse) msg).status())) { - return; - } + final HttpResponse resp = (HttpResponse) msg; + responseStatus.set(resp.status()); + responseEncoding.set(resp.headers().get(HttpHeaderNames.CONTENT_TYPE)); isFirstChunk.set(true); } @@ -88,9 +92,8 @@ public class HttpGremlinResponseStreamDecoder extends MessageToMessageDecoder<De } try { - // with error status we can get json in response // no more chunks expected - if (isError(responseStatus.get())) { + if (isError(responseStatus.get()) && !SerTokensV4.MIME_GRAPHBINARY_V4.equals(responseEncoding.get())) { final JsonNode node = mapper.readTree(content.toString(CharsetUtil.UTF_8)); final String message = node.get("message").asText(); final ResponseMessageV4 response = ResponseMessageV4.build() diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java index 3a808aed05..68c00ec1c8 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java @@ -164,6 +164,7 @@ public class HttpGremlinEndpointHandler extends SimpleChannelInboundHandler<Requ @Override public void channelRead0(final ChannelHandlerContext ctx, final RequestMessageV4 requestMessage) { + ctx.channel().attr(StateKey.HTTP_RESPONSE_SENT).set(false); final Pair<String, MessageSerializerV4<?>> serializer = ctx.channel().attr(StateKey.SERIALIZER).get(); final Context requestCtx = new Context(requestMessage, ctx, settings, graphManager, gremlinExecutor, @@ -200,6 +201,7 @@ public class HttpGremlinEndpointHandler extends SimpleChannelInboundHandler<Requ responseHeader.headers().set(TRANSFER_ENCODING, CHUNKED); responseHeader.headers().set(HttpHeaderNames.CONTENT_TYPE, serializer.getValue0()); ctx.writeAndFlush(responseHeader); + ctx.channel().attr(StateKey.HTTP_RESPONSE_SENT).set(true); switch (requestMessage.getGremlinType()) { case "": diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java index de7e0871db..8d729def98 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java @@ -24,8 +24,11 @@ import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.DefaultHttpContent; +import io.netty.handler.codec.http.DefaultHttpResponse; import io.netty.handler.codec.http.DefaultLastHttpContent; import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpUtil; import io.netty.util.CharsetUtil; @@ -44,6 +47,9 @@ import org.slf4j.LoggerFactory; import static com.codahale.metrics.MetricRegistry.name; import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpHeaderNames.TRANSFER_ENCODING; +import static io.netty.handler.codec.http.HttpHeaderValues.CHUNKED; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; /** @@ -99,6 +105,15 @@ public class HttpHandlerUtil { : serializer.serializeResponseAsBinary(responseMessage, ctx.alloc()); context.setRequestState(HttpGremlinEndpointHandler.RequestState.ERROR); + + if (!ctx.channel().attr(StateKey.HTTP_RESPONSE_SENT).get()) { + final HttpResponse responseHeader = new DefaultHttpResponse(HTTP_1_1, responseMessage.getStatus().getCode()); + responseHeader.headers().set(TRANSFER_ENCODING, CHUNKED); // Set this to make it "keep alive" eligible. + responseHeader.headers().set(HttpHeaderNames.CONTENT_TYPE, ctx.channel().attr(StateKey.SERIALIZER).get().getValue0()); + ctx.writeAndFlush(responseHeader); + ctx.channel().attr(StateKey.HTTP_RESPONSE_SENT).set(true); + } + ctx.writeAndFlush(new DefaultHttpContent(ByteBuf)); sendTrailingHeaders(ctx, responseMessage.getStatus().getCode(), responseMessage.getStatus().getException()); diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/StateKey.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/StateKey.java index 26dda9b5a7..fdb34c5eda 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/StateKey.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/StateKey.java @@ -44,6 +44,11 @@ public final class StateKey { */ public static final AttributeKey<UUID> REQUEST_ID = AttributeKey.valueOf("requestId"); + /** + * The key for whether a {@link io.netty.handler.codec.http.HttpResponse} has been sent for the current response. + */ + public static final AttributeKey<Boolean> HTTP_RESPONSE_SENT = AttributeKey.valueOf("responseSent"); + /** * The key for the current {@link AuthenticatedUser}. */ diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java index 75dcf666c7..ac153b4dc0 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java @@ -261,41 +261,40 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration } } -// TODO: re-enable after pooling implemented in Java driver for HTTP/1.1. -// @Test -// public void shouldBlowTheWorkQueueSize() throws Exception { -// final Cluster cluster = TestClientFactory.open(); -// final Client client = cluster.connect(); -// -// // maxWorkQueueSize=1 && gremlinPool=1 -// // we should be able to do one request at a time serially -// assertEquals("test1", client.submit("'test1'").all().get().get(0).getString()); -// assertEquals("test2", client.submit("'test2'").all().get().get(0).getString()); -// assertEquals("test3", client.submit("'test3'").all().get().get(0).getString()); -// -// final AtomicBoolean errorTriggered = new AtomicBoolean(); -// final ResultSet r1 = client.submitAsync("Thread.sleep(1000);'test4'").get(); -// -// final List<CompletableFuture<List<Result>>> blockers = new ArrayList<>(); -// for (int ix = 0; ix < 512 && !errorTriggered.get(); ix++) { -// blockers.add(client.submit("'test'").all().exceptionally(t -> { -// final ResponseException re = (ResponseException) t.getCause(); -// errorTriggered.compareAndSet(false, HttpResponseStatus.TOO_MANY_REQUESTS == re.getResponseStatusCode()); -// return null; -// })); -// } -// -// assertThat(errorTriggered.get(), is(true)); -// -// // wait for the blockage to clear for sure -// assertEquals("test4", r1.all().get().get(0).getString()); -// blockers.forEach(CompletableFuture::join); -// -// // should be accepting test6 now -// assertEquals("test6", client.submit("'test6'").all().get().get(0).getString()); -// -// cluster.close(); -// } + @Test + public void shouldBlowTheWorkQueueSize() throws Exception { + final Cluster cluster = TestClientFactory.open(); + final Client client = cluster.connect(); + + // maxWorkQueueSize=1 && gremlinPool=1 + // we should be able to do one request at a time serially + assertEquals("test1", client.submit("'test1'").all().get().get(0).getString()); + assertEquals("test2", client.submit("'test2'").all().get().get(0).getString()); + assertEquals("test3", client.submit("'test3'").all().get().get(0).getString()); + + final AtomicBoolean errorTriggered = new AtomicBoolean(); + final ResultSet r1 = client.submitAsync("Thread.sleep(1000);'test4'").get(); + + final List<CompletableFuture<List<Result>>> blockers = new ArrayList<>(); + for (int ix = 0; ix < 512 && !errorTriggered.get(); ix++) { + blockers.add(client.submit("'test'").all().exceptionally(t -> { + final ResponseException re = (ResponseException) t.getCause(); + errorTriggered.compareAndSet(false, HttpResponseStatus.TOO_MANY_REQUESTS == re.getResponseStatusCode()); + return null; + })); + } + + assertThat(errorTriggered.get(), is(true)); + + // wait for the blockage to clear for sure + assertEquals("test4", r1.all().get().get(0).getString()); + blockers.forEach(CompletableFuture::join); + + // should be accepting test6 now + assertEquals("test6", client.submit("'test6'").all().get().get(0).getString()); + + cluster.close(); + } @Test public void shouldScriptEvaluationErrorForRemoteTraversal() throws Exception {
