This is an automated email from the ASF dual-hosted git repository.

kenhuuu pushed a commit to branch http-initial-error-fix
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 2e25a2d567037bcc2abb6fc7f27a60e1155e0b6b
Author: Ken Hu <[email protected]>
AuthorDate: Tue Jun 11 23:01:02 2024 -0700

    Prevent Java driver channel from being reused too early CTR.
    
    The connection was being returned too quickly before all HttpContent
    were read. This modifies the driver's handlers by ensuring that the
    LastHttpContent has been handled before returning the connection. The
    same was done for the SimpleHttpClient. As a consequence, some tests
    needed to be updated as they will now have up to two more empty
    ResponseMessageV4s added to the end.
---
 .../tinkerpop/gremlin/driver/Channelizer.java      |  8 ++++++
 .../driver/handler/GremlinResponseHandler.java     | 30 +++++++++++++++-------
 .../handler/HttpGremlinResponseStreamDecoder.java  | 22 +++++++---------
 .../gremlin/driver/simple/AbstractClient.java      | 23 -----------------
 .../gremlin/driver/simple/SimpleHttpClient.java    | 28 +++++++++++++++++++-
 .../gremlin/server/GremlinServerIntegrateTest.java | 14 +++++++---
 6 files changed, 75 insertions(+), 50 deletions(-)

diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index d2e393f100..5d8af18a3f 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -26,6 +26,7 @@ import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.codec.http.HttpClientCodec;
+import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslHandler;
 import org.apache.tinkerpop.gremlin.driver.handler.GremlinResponseHandler;
@@ -33,7 +34,9 @@ import 
org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinRequestEncoder;
 import 
org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinResponseStreamDecoder;
 import org.apache.tinkerpop.gremlin.driver.handler.SslCheckHandler;
 import org.apache.tinkerpop.gremlin.util.MessageSerializerV4;
+import org.apache.tinkerpop.gremlin.util.message.ResponseMessageV4;
 
+import java.util.Collections;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -155,6 +158,11 @@ public interface Channelizer extends ChannelHandler {
      * channelizer. Only sessionless requests are possible.
      */
     final class HttpChannelizer extends AbstractChannelizer {
+        /**
+         * This response is used as a signal for determining if all content of 
the response has been read.
+         */
+        public static final ResponseMessageV4 LAST_CONTENT_READ_RESPONSE =
+                
ResponseMessageV4.build().code(HttpResponseStatus.NO_CONTENT).result(Collections.emptyList()).create();
 
         private HttpGremlinRequestEncoder gremlinRequestEncoder;
         private HttpGremlinResponseStreamDecoder gremlinResponseDecoder;
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
index 88e8a740e6..83203f964c 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.driver.handler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.util.AttributeKey;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.tinkerpop.gremlin.driver.Result;
 import org.apache.tinkerpop.gremlin.driver.ResultQueue;
@@ -34,12 +35,15 @@ import org.slf4j.LoggerFactory;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static 
org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ_RESPONSE;
+
 /**
  * Takes a map of requests pending responses and writes responses to the 
{@link ResultQueue} of a request
  * as the {@link ResponseMessageV4} objects are deserialized.
  */
 public class GremlinResponseHandler extends 
SimpleChannelInboundHandler<ResponseMessageV4> {
     private static final Logger logger = 
LoggerFactory.getLogger(GremlinResponseHandler.class);
+    private static final AttributeKey<ResponseException> CAUGHT_EXCEPTION = 
AttributeKey.valueOf("caughtException");
     private final AtomicReference<ResultQueue> pending;
 
     public GremlinResponseHandler(final AtomicReference<ResultQueue> pending) {
@@ -60,26 +64,34 @@ public class GremlinResponseHandler extends 
SimpleChannelInboundHandler<Response
 
     @Override
     protected void channelRead0(final ChannelHandlerContext 
channelHandlerContext, final ResponseMessageV4 response) {
-        final HttpResponseStatus statusCode = response.getStatus() == null ? 
HttpResponseStatus.PARTIAL_CONTENT : response.getStatus().getCode();
+        final HttpResponseStatus statusCode = response.getStatus() == null ? 
null : response.getStatus().getCode();
         final ResultQueue queue = pending.get();
 
-        if (statusCode == HttpResponseStatus.OK || statusCode == 
HttpResponseStatus.PARTIAL_CONTENT) {
+        if ((null == statusCode) || (statusCode == HttpResponseStatus.OK)) {
             final List<Object> data = response.getResult().getData();
             // unrolls the collection into individual results to be handled by 
the queue.
             data.forEach(item -> queue.add(new Result(item)));
         } else {
             // this is a "success" but represents no results otherwise it is 
an error
             if (statusCode != HttpResponseStatus.NO_CONTENT) {
-                queue.markError(new 
ResponseException(response.getStatus().getCode(), 
response.getStatus().getMessage(),
-                        response.getStatus().getException()));
+                // Save the error because there could be a subsequent 
HttpContent coming (probably just trailers). All
+                // content should be read first before marking the queue or 
else this channel might get reused too early.
+                channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).set(
+                        new ResponseException(response.getStatus().getCode(), 
response.getStatus().getMessage(),
+                                response.getStatus().getException())
+                );
             }
         }
 
-        // as this is a non-PARTIAL_CONTENT code - the stream is done.
-        if (statusCode != HttpResponseStatus.PARTIAL_CONTENT) {
-            final ResultQueue current = pending.getAndSet(null);
-            if (current != null) {
-                current.markComplete(response.getStatus().getAttributes());
+        // Stream is done when the last content signaling response message is 
read.
+        if (LAST_CONTENT_READ_RESPONSE == response) {
+            final ResultQueue resultQueue = pending.getAndSet(null);
+            if (resultQueue != null) {
+                if (null == 
channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).get()) {
+                    
resultQueue.markComplete(response.getStatus().getAttributes());
+                } else {
+                    
resultQueue.markError(channelHandlerContext.channel().attr(CAUGHT_EXCEPTION).getAndSet(null));
+                }
             }
         }
     }
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java
index d271e0dc0d..799dcc8f1a 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseStreamDecoder.java
@@ -44,6 +44,8 @@ import 
org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper;
 import java.util.List;
 import java.util.Objects;
 
+import static 
org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ_RESPONSE;
+
 public class HttpGremlinResponseStreamDecoder extends 
MessageToMessageDecoder<DefaultHttpObject> {
 
     private static final AttributeKey<Boolean> IS_FIRST_CHUNK = 
AttributeKey.valueOf("isFirstChunk");
@@ -84,13 +86,6 @@ public class HttpGremlinResponseStreamDecoder extends 
MessageToMessageDecoder<De
                 throw new TooLongFrameException("Response exceeded " + 
maxContentLength + " bytes.");
             }
 
-            if (msg instanceof LastHttpContent && content.readableBytes() == 0 
&& bytesRead.get() != 0) {
-                // If this last content contains no bytes and there were bytes 
read previously, it means that this is the
-                // trailing headers. Trailing headers aren't used in the 
driver and shouldn't be passed on.
-                content.release();
-                return;
-            }
-
             try {
                 // no more chunks expected
                 if (isError(responseStatus.get()) && 
!SerTokensV4.MIME_GRAPHBINARY_V4.equals(responseEncoding.get())) {
@@ -101,14 +96,15 @@ public class HttpGremlinResponseStreamDecoder extends 
MessageToMessageDecoder<De
                             .create();
 
                     out.add(response);
-                    return;
+                } else {
+                    final ResponseMessageV4 chunk = 
serializer.readChunk(content, isFirstChunk.get());
+                    isFirstChunk.set(false);
+                    out.add(chunk);
                 }
 
-                final ResponseMessageV4 chunk = serializer.readChunk(content, 
isFirstChunk.get());
-
-                isFirstChunk.set(false);
-
-                out.add(chunk);
+                if (msg instanceof LastHttpContent) {
+                    out.add(LAST_CONTENT_READ_RESPONSE);
+                }
             } catch (SerializationException e) {
                 throw new RuntimeException(e);
             }
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java
index a95bcbc8f5..27dc8a0f96 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java
@@ -61,29 +61,6 @@ public abstract class AbstractClient implements SimpleClient 
{
         return submitAsync(requestMessage).get(180, TimeUnit.SECONDS);
     }
 
-    @Override
-    public CompletableFuture<List<ResponseMessageV4>> submitAsync(final 
RequestMessageV4 requestMessage) throws Exception {
-        final List<ResponseMessageV4> results = new ArrayList<>();
-        final CompletableFuture<List<ResponseMessageV4>> f = new 
CompletableFuture<>();
-        callbackResponseHandler.callback = response -> {
-            // message with trailers
-            if (f.isDone())
-                throw new RuntimeException("A terminating message was already 
encountered - no more messages should have been received");
-
-            results.add(response);
-
-            // check if the current message is terminating - if it is then we 
can mark complete
-            if (response.getStatus() != null && response.getStatus().getCode() 
!= HttpResponseStatus.PARTIAL_CONTENT
-                    && response.getStatus().getCode() != 
HttpResponseStatus.PROXY_AUTHENTICATION_REQUIRED) {
-                f.complete(results);
-            }
-        };
-
-        writeAndFlush(requestMessage);
-
-        return f;
-    }
-
     static class CallbackResponseHandler extends 
SimpleChannelInboundHandler<ResponseMessageV4> {
         public Consumer<ResponseMessageV4> callback;
 
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleHttpClient.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleHttpClient.java
index 7d4b31213f..328535d4d1 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleHttpClient.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleHttpClient.java
@@ -20,9 +20,11 @@ package org.apache.tinkerpop.gremlin.driver.simple;
 
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.ChannelOption;
+import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslContextBuilder;
 import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import org.apache.tinkerpop.gremlin.driver.Channelizer;
 import org.apache.tinkerpop.gremlin.driver.RequestInterceptor;
 import 
org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinResponseStreamDecoder;
 import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinRequestEncoder;
@@ -35,6 +37,7 @@ import io.netty.channel.ChannelPipeline;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.handler.codec.http.HttpClientCodec;
+import org.apache.tinkerpop.gremlin.util.message.ResponseMessageV4;
 import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4;
 import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryMapper;
 import org.slf4j.Logger;
@@ -43,6 +46,8 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -101,7 +106,6 @@ public class SimpleHttpClient extends AbstractClient {
                                     new HttpClientCodec(),
                                     new 
HttpGremlinResponseStreamDecoder(serializer, Integer.MAX_VALUE),
                                     new HttpGremlinRequestEncoder(serializer, 
new ArrayList<>(), false),
-
                                     callbackResponseHandler);
                         }
                     });
@@ -112,6 +116,28 @@ public class SimpleHttpClient extends AbstractClient {
         }
     }
 
+    @Override
+    public CompletableFuture<List<ResponseMessageV4>> submitAsync(final 
RequestMessageV4 requestMessage) throws Exception {
+        final List<ResponseMessageV4> results = new ArrayList<>();
+        final CompletableFuture<List<ResponseMessageV4>> f = new 
CompletableFuture<>();
+        callbackResponseHandler.callback = response -> {
+            // message with trailers
+            if (f.isDone())
+                throw new RuntimeException("A terminating message was already 
encountered - no more messages should have been received");
+
+            results.add(response);
+
+            // check if the current message is terminating - if it is then we 
can mark complete
+            if (Channelizer.HttpChannelizer.LAST_CONTENT_READ_RESPONSE == 
response) {
+                f.complete(results);
+            }
+        };
+
+        writeAndFlush(requestMessage);
+
+        return f;
+    }
+
     @Override
     public void writeAndFlush(final RequestMessageV4 requestMessage) throws 
Exception {
         channel.writeAndFlush(requestMessage);
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
index ac153b4dc0..04b267c443 100644
--- 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
@@ -611,7 +611,6 @@ public class GremlinServerIntegrateTest extends 
AbstractGremlinServerIntegration
             final RequestMessageV4 request = 
RequestMessageV4.build("[0,1,2,3,4,5,6,7,8,9]").create();
 
             final List<ResponseMessageV4> msgs = client.submit(request);
-            assertEquals(5, client.submit(request).size());
             assertEquals(0, (int) msgs.get(0).getResult().getData().get(0));
             assertEquals(1, (int) msgs.get(0).getResult().getData().get(1));
             assertEquals(2, (int) msgs.get(1).getResult().getData().get(0));
@@ -622,6 +621,9 @@ public class GremlinServerIntegrateTest extends 
AbstractGremlinServerIntegration
             assertEquals(7, (int) msgs.get(3).getResult().getData().get(1));
             assertEquals(8, (int) msgs.get(4).getResult().getData().get(0));
             assertEquals(9, (int) msgs.get(4).getResult().getData().get(1));
+            for (ResponseMessageV4 resp : msgs.subList(5, msgs.size())) {
+                assertEquals(0, resp.getResult().getData().size());
+            }
         }
     }
 
@@ -806,8 +808,10 @@ public class GremlinServerIntegrateTest extends 
AbstractGremlinServerIntegration
         try (SimpleClient client = TestClientFactory.createSimpleHttpClient()) 
{
             final RequestMessageV4 request = 
RequestMessageV4.build("10").create();
             final List<ResponseMessageV4> responses = client.submit(request);
-            assertEquals(1, responses.size());
-            assertEquals(HttpResponseStatus.OK, 
responses.get(0).getStatus().getCode());
+            assertEquals(10, responses.get(0).getResult().getData().get(0));
+            for (ResponseMessageV4 resp : responses.subList(1, 
responses.size())) {
+                assertEquals(0, resp.getResult().getData().size());
+            }
         }
     }
 
@@ -831,7 +835,9 @@ public class GremlinServerIntegrateTest extends 
AbstractGremlinServerIntegration
             final RequestMessageV4 request = RequestMessageV4.build("new 
String().doNothingAtAllBecauseThis is a syntax error").create();
             final List<ResponseMessageV4> responses = client.submit(request);
             assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, 
responses.get(0).getStatus().getCode());
-            assertEquals(1, responses.size());
+            for (ResponseMessageV4 resp : responses.subList(1, 
responses.size())) {
+                assertEquals(0, resp.getResult().getData().size());
+            }
         }
     }
 

Reply via email to