This is an automated email from the ASF dual-hosted git repository.

kenhuuu pushed a commit to branch master-http
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 82bfa9e7e4e67e13d90c3dba314684085c721c75
Author: Ken Hu <[email protected]>
AuthorDate: Sat Jun 8 10:53:04 2024 -0700

    Re-enable and fix shouldBlowTheWorkQueueSize test CTR.
    
    Server error handling was incorrect in some cases as the initial
    HttpResponse is never sent if the failure happened before the task is
    submitted. The driver didn't properly handle errors that were
    serialized in GraphBinary.
---
 .../handler/HttpGremlinResponseStreamDecoder.java  | 15 +++--
 .../server/handler/HttpGremlinEndpointHandler.java |  2 +
 .../gremlin/server/handler/HttpHandlerUtil.java    | 15 +++++
 .../tinkerpop/gremlin/server/handler/StateKey.java |  5 ++
 .../gremlin/server/GremlinServerIntegrateTest.java | 69 +++++++++++-----------
 5 files changed, 65 insertions(+), 41 deletions(-)

diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java
index d38220a4ca..d271e0dc0d 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java
@@ -24,6 +24,7 @@ import io.netty.handler.codec.MessageToMessageDecoder;
 import io.netty.handler.codec.TooLongFrameException;
 import io.netty.handler.codec.http.DefaultHttpObject;
 import io.netty.handler.codec.http.HttpContent;
+import io.netty.handler.codec.http.HttpHeaderNames;
 import io.netty.handler.codec.http.HttpHeaders;
 import io.netty.handler.codec.http.HttpResponse;
 import io.netty.handler.codec.http.HttpResponseStatus;
@@ -34,7 +35,9 @@ import io.netty.util.AttributeMap;
 import io.netty.util.CharsetUtil;
 import org.apache.tinkerpop.gremlin.util.MessageSerializerV4;
 import org.apache.tinkerpop.gremlin.util.message.ResponseMessageV4;
+import org.apache.tinkerpop.gremlin.util.ser.SerTokensV4;
 import org.apache.tinkerpop.gremlin.util.ser.SerializationException;
+import org.apache.tinkerpop.gremlin.util.ser.SerializersV4;
 import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
 import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper;
 
@@ -45,6 +48,7 @@ public class HttpGremlinResponseStreamDecoder extends 
MessageToMessageDecoder<De
 
     private static final AttributeKey<Boolean> IS_FIRST_CHUNK = 
AttributeKey.valueOf("isFirstChunk");
     private static final AttributeKey<HttpResponseStatus> RESPONSE_STATUS = 
AttributeKey.valueOf("responseStatus");
+    private static final AttributeKey<String> RESPONSE_ENCODING = 
AttributeKey.valueOf("responseSerializer");
     private static final AttributeKey<Integer> BYTES_READ = 
AttributeKey.valueOf("bytesRead");
 
     private final MessageSerializerV4<?> serializer;
@@ -60,14 +64,14 @@ public class HttpGremlinResponseStreamDecoder extends 
MessageToMessageDecoder<De
     protected void decode(ChannelHandlerContext ctx, DefaultHttpObject msg, 
List<Object> out) throws Exception {
         final Attribute<Boolean> isFirstChunk = ((AttributeMap) 
ctx).attr(IS_FIRST_CHUNK);
         final Attribute<HttpResponseStatus> responseStatus = ((AttributeMap) 
ctx).attr(RESPONSE_STATUS);
+        final Attribute<String> responseEncoding = ((AttributeMap) 
ctx).attr(RESPONSE_ENCODING);
 
         if (msg instanceof HttpResponse) {
             ctx.channel().attr(BYTES_READ).set(0);
-            responseStatus.set(((HttpResponse) msg).status());
 
-            if (isError(((HttpResponse) msg).status())) {
-                return;
-            }
+            final HttpResponse resp = (HttpResponse) msg;
+            responseStatus.set(resp.status());
+            
responseEncoding.set(resp.headers().get(HttpHeaderNames.CONTENT_TYPE));
 
             isFirstChunk.set(true);
         }
@@ -88,9 +92,8 @@ public class HttpGremlinResponseStreamDecoder extends 
MessageToMessageDecoder<De
             }
 
             try {
-                // with error status we can get json in response
                 // no more chunks expected
-                if (isError(responseStatus.get())) {
+                if (isError(responseStatus.get()) && 
!SerTokensV4.MIME_GRAPHBINARY_V4.equals(responseEncoding.get())) {
                     final JsonNode node = 
mapper.readTree(content.toString(CharsetUtil.UTF_8));
                     final String message = node.get("message").asText();
                     final ResponseMessageV4 response = 
ResponseMessageV4.build()
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
index 3a808aed05..68c00ec1c8 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
@@ -164,6 +164,7 @@ public class HttpGremlinEndpointHandler extends 
SimpleChannelInboundHandler<Requ
 
     @Override
     public void channelRead0(final ChannelHandlerContext ctx, final 
RequestMessageV4 requestMessage) {
+        ctx.channel().attr(StateKey.HTTP_RESPONSE_SENT).set(false);
         final Pair<String, MessageSerializerV4<?>> serializer = 
ctx.channel().attr(StateKey.SERIALIZER).get();
 
         final Context requestCtx = new Context(requestMessage, ctx, settings, 
graphManager, gremlinExecutor,
@@ -200,6 +201,7 @@ public class HttpGremlinEndpointHandler extends 
SimpleChannelInboundHandler<Requ
                 responseHeader.headers().set(TRANSFER_ENCODING, CHUNKED);
                 responseHeader.headers().set(HttpHeaderNames.CONTENT_TYPE, 
serializer.getValue0());
                 ctx.writeAndFlush(responseHeader);
+                ctx.channel().attr(StateKey.HTTP_RESPONSE_SENT).set(true);
 
                 switch (requestMessage.getGremlinType()) {
                     case "":
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
index de7e0871db..8d729def98 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
@@ -24,8 +24,11 @@ import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.http.DefaultFullHttpResponse;
 import io.netty.handler.codec.http.DefaultHttpContent;
+import io.netty.handler.codec.http.DefaultHttpResponse;
 import io.netty.handler.codec.http.DefaultLastHttpContent;
 import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpResponse;
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http.HttpUtil;
 import io.netty.util.CharsetUtil;
@@ -44,6 +47,9 @@ import org.slf4j.LoggerFactory;
 
 import static com.codahale.metrics.MetricRegistry.name;
 import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
+import static io.netty.handler.codec.http.HttpHeaderNames.TRANSFER_ENCODING;
+import static io.netty.handler.codec.http.HttpHeaderValues.CHUNKED;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
 import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
 
 /**
@@ -99,6 +105,15 @@ public class HttpHandlerUtil {
                     : serializer.serializeResponseAsBinary(responseMessage, 
ctx.alloc());
 
             
context.setRequestState(HttpGremlinEndpointHandler.RequestState.ERROR);
+
+            if (!ctx.channel().attr(StateKey.HTTP_RESPONSE_SENT).get()) {
+                final HttpResponse responseHeader = new 
DefaultHttpResponse(HTTP_1_1, responseMessage.getStatus().getCode());
+                responseHeader.headers().set(TRANSFER_ENCODING, CHUNKED); // 
Set this to make it "keep alive" eligible.
+                responseHeader.headers().set(HttpHeaderNames.CONTENT_TYPE, 
ctx.channel().attr(StateKey.SERIALIZER).get().getValue0());
+                ctx.writeAndFlush(responseHeader);
+                ctx.channel().attr(StateKey.HTTP_RESPONSE_SENT).set(true);
+            }
+
             ctx.writeAndFlush(new DefaultHttpContent(ByteBuf));
 
             sendTrailingHeaders(ctx, responseMessage.getStatus().getCode(), 
responseMessage.getStatus().getException());
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/StateKey.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/StateKey.java
index 26dda9b5a7..fdb34c5eda 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/StateKey.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/StateKey.java
@@ -44,6 +44,11 @@ public final class StateKey {
      */
     public static final AttributeKey<UUID> REQUEST_ID = 
AttributeKey.valueOf("requestId");
 
+    /**
+     * The key for whether a {@link io.netty.handler.codec.http.HttpResponse} 
has been sent for the current response.
+     */
+    public static final AttributeKey<Boolean> HTTP_RESPONSE_SENT = 
AttributeKey.valueOf("responseSent");
+
     /**
      * The key for the current {@link AuthenticatedUser}.
      */
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
index 75dcf666c7..ac153b4dc0 100644
--- 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
@@ -261,41 +261,40 @@ public class GremlinServerIntegrateTest extends 
AbstractGremlinServerIntegration
         }
     }
 
-//    TODO: re-enable after pooling implemented in Java driver for HTTP/1.1.
-//    @Test
-//    public void shouldBlowTheWorkQueueSize() throws Exception {
-//        final Cluster cluster = TestClientFactory.open();
-//        final Client client = cluster.connect();
-//
-//        // maxWorkQueueSize=1 && gremlinPool=1
-//        // we should be able to do one request at a time serially
-//        assertEquals("test1", 
client.submit("'test1'").all().get().get(0).getString());
-//        assertEquals("test2", 
client.submit("'test2'").all().get().get(0).getString());
-//        assertEquals("test3", 
client.submit("'test3'").all().get().get(0).getString());
-//
-//        final AtomicBoolean errorTriggered = new AtomicBoolean();
-//        final ResultSet r1 = 
client.submitAsync("Thread.sleep(1000);'test4'").get();
-//
-//        final List<CompletableFuture<List<Result>>> blockers = new 
ArrayList<>();
-//        for (int ix = 0; ix < 512 && !errorTriggered.get(); ix++) {
-//            blockers.add(client.submit("'test'").all().exceptionally(t -> {
-//                final ResponseException re = (ResponseException) 
t.getCause();
-//                errorTriggered.compareAndSet(false, 
HttpResponseStatus.TOO_MANY_REQUESTS == re.getResponseStatusCode());
-//                return null;
-//            }));
-//        }
-//
-//        assertThat(errorTriggered.get(), is(true));
-//
-//        // wait for the blockage to clear for sure
-//        assertEquals("test4", r1.all().get().get(0).getString());
-//        blockers.forEach(CompletableFuture::join);
-//
-//        // should be accepting test6 now
-//        assertEquals("test6", 
client.submit("'test6'").all().get().get(0).getString());
-//
-//        cluster.close();
-//    }
+    @Test
+    public void shouldBlowTheWorkQueueSize() throws Exception {
+        final Cluster cluster = TestClientFactory.open();
+        final Client client = cluster.connect();
+
+        // maxWorkQueueSize=1 && gremlinPool=1
+        // we should be able to do one request at a time serially
+        assertEquals("test1", 
client.submit("'test1'").all().get().get(0).getString());
+        assertEquals("test2", 
client.submit("'test2'").all().get().get(0).getString());
+        assertEquals("test3", 
client.submit("'test3'").all().get().get(0).getString());
+
+        final AtomicBoolean errorTriggered = new AtomicBoolean();
+        final ResultSet r1 = 
client.submitAsync("Thread.sleep(1000);'test4'").get();
+
+        final List<CompletableFuture<List<Result>>> blockers = new 
ArrayList<>();
+        for (int ix = 0; ix < 512 && !errorTriggered.get(); ix++) {
+            blockers.add(client.submit("'test'").all().exceptionally(t -> {
+                final ResponseException re = (ResponseException) t.getCause();
+                errorTriggered.compareAndSet(false, 
HttpResponseStatus.TOO_MANY_REQUESTS == re.getResponseStatusCode());
+                return null;
+            }));
+        }
+
+        assertThat(errorTriggered.get(), is(true));
+
+        // wait for the blockage to clear for sure
+        assertEquals("test4", r1.all().get().get(0).getString());
+        blockers.forEach(CompletableFuture::join);
+
+        // should be accepting test6 now
+        assertEquals("test6", 
client.submit("'test6'").all().get().get(0).getString());
+
+        cluster.close();
+    }
 
     @Test
     public void shouldScriptEvaluationErrorForRemoteTraversal() throws 
Exception {

Reply via email to