This is an automated email from the ASF dual-hosted git repository.

kenhuuu pushed a commit to branch http-chunked-poc
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit c1fb0c6cf94622569b264f25f43c5374e709dd5d
Author: Ken Hu <[email protected]>
AuthorDate: Mon Dec 11 23:07:09 2023 -0800

    trial for chunked encoding with HTTP
---
 .../structure/io/binary/GraphBinaryReader.java     |   7 +
 .../tinkerpop/gremlin/driver/Channelizer.java      |   2 +-
 .../driver/handler/HttpGremlinResponseDecoder.java |  12 +-
 .../server/handler/HttpGremlinEndpointHandler.java | 724 +++++++++++++++++----
 .../server/handler/PartialChunkedInput.java        |  77 +++
 .../handler/WsAndHttpChannelizerHandler.java       |   5 +-
 .../tinkerpop/gremlin/util/ser/NettyBuffer.java    |   2 +-
 .../util/ser/binary/ResponseMessageSerializer.java |  95 ++-
 8 files changed, 784 insertions(+), 140 deletions(-)

diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/binary/GraphBinaryReader.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/binary/GraphBinaryReader.java
index 35129f7839..f8539b9732 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/binary/GraphBinaryReader.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/binary/GraphBinaryReader.java
@@ -48,6 +48,7 @@ import java.io.IOException;
  */
 public class GraphBinaryReader {
     private final TypeSerializerRegistry registry;
+    private Buffer savedBuffer;
 
     public GraphBinaryReader() {
         this(TypeSerializerRegistry.INSTANCE);
@@ -57,6 +58,7 @@ public class GraphBinaryReader {
         this.registry = registry;
     }
 
+
     /**
      * Reads a value for an specific type.
      *
@@ -84,6 +86,11 @@ public class GraphBinaryReader {
         // Fully-qualified format: {type_code}{type_info}{value_flag}{value}
         final DataType type = 
DataType.get(Byte.toUnsignedInt(buffer.readByte()));
 
+        if (type == DataType.DURATION) {
+            // for testing purposes, this shouldn't happen (it's the start of 
a new response message)
+            savedBuffer = buffer;
+        }
+
         if (type == DataType.UNSPECIFIED_NULL) {
             // There is no TypeSerializer for unspecified null object
             // Read the value_flag - (folding the buffer.readByte() into the 
assert does not advance the index so
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index a626e0160b..a8545a9a6a 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -296,7 +296,7 @@ public interface Channelizer extends ChannelHandler {
             handler = new HttpClientCodec();
 
             pipeline.addLast("http-codec", handler);
-            pipeline.addLast("aggregator", new 
HttpObjectAggregator(maxContentLength));
+//            pipeline.addLast("aggregator", new 
HttpObjectAggregator(maxContentLength));
             pipeline.addLast("gremlin-encoder", gremlinRequestEncoder);
             pipeline.addLast("gremlin-decoder", gremlinResponseDecoder);
         }
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseDecoder.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseDecoder.java
index 79dc21422b..36e98fb161 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseDecoder.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseDecoder.java
@@ -18,10 +18,14 @@
  */
 package org.apache.tinkerpop.gremlin.driver.handler;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.EmptyByteBuf;
+import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.MessageToMessageDecoder;
 import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpContent;
 import io.netty.util.CharsetUtil;
 import org.apache.tinkerpop.gremlin.util.MessageSerializer;
 import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
@@ -33,7 +37,7 @@ import java.util.List;
  * Converts {@code HttpResponse} to a {@link ResponseMessage}.
  */
 @ChannelHandler.Sharable
-public final class HttpGremlinResponseDecoder extends 
MessageToMessageDecoder<FullHttpResponse> {
+public final class HttpGremlinResponseDecoder extends 
MessageToMessageDecoder<HttpContent> {
     private final MessageSerializer<?> serializer;
 
     public HttpGremlinResponseDecoder(final MessageSerializer<?> serializer) {
@@ -41,7 +45,9 @@ public final class HttpGremlinResponseDecoder extends 
MessageToMessageDecoder<Fu
     }
 
     @Override
-    protected void decode(final ChannelHandlerContext channelHandlerContext, 
final FullHttpResponse httpResponse, final List<Object> objects) throws 
Exception {
-        objects.add(serializer.deserializeResponse(httpResponse.content()));
+    protected void decode(final ChannelHandlerContext channelHandlerContext, 
final HttpContent content, final List<Object> objects) throws Exception {
+        for (ResponseMessage msg = 
serializer.deserializeResponse(content.content()); msg != null; msg = 
serializer.deserializeResponse(Unpooled.buffer(0))) {
+            objects.add(msg);
+        }
     }
 }
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
index ec19b28e6d..4243ba590d 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
@@ -20,32 +20,51 @@ package org.apache.tinkerpop.gremlin.server.handler;
 
 import com.codahale.metrics.Timer;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
 import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelException;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelPromise;
 import io.netty.handler.codec.TooLongFrameException;
 import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.DefaultHttpResponse;
 import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpChunkedInput;
 import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpResponse;
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http.HttpUtil;
+import io.netty.handler.stream.ChunkedInput;
+import io.netty.handler.stream.ChunkedStream;
 import io.netty.util.ReferenceCountUtil;
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
 import org.apache.tinkerpop.gremlin.jsr223.GremlinScriptChecker;
+import org.apache.tinkerpop.gremlin.jsr223.JavaTranslator;
 import 
org.apache.tinkerpop.gremlin.process.remote.traversal.DefaultRemoteTraverser;
+import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
+import org.apache.tinkerpop.gremlin.process.traversal.Failure;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
 import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.util.AbstractTraverser;
+import org.apache.tinkerpop.gremlin.process.traversal.util.BytecodeHelper;
+import 
org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
+import org.apache.tinkerpop.gremlin.server.Context;
 import org.apache.tinkerpop.gremlin.server.GraphManager;
 import org.apache.tinkerpop.gremlin.server.GremlinServer;
 import org.apache.tinkerpop.gremlin.server.Settings;
 import org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser;
+import org.apache.tinkerpop.gremlin.server.op.OpProcessorException;
+import org.apache.tinkerpop.gremlin.server.op.traversal.TraversalOpProcessor;
 import org.apache.tinkerpop.gremlin.server.util.MetricManager;
 import org.apache.tinkerpop.gremlin.server.util.TextPlainMessageSerializer;
+import org.apache.tinkerpop.gremlin.server.util.TraverserIterator;
 import org.apache.tinkerpop.gremlin.structure.Element;
 import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.util.TemporaryException;
 import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceFactory;
 import org.apache.tinkerpop.gremlin.util.ExceptionHelper;
 import org.apache.tinkerpop.gremlin.util.MessageSerializer;
@@ -62,15 +81,25 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.script.Bindings;
+import javax.script.ScriptException;
 import javax.script.SimpleBindings;
+import java.lang.reflect.UndeclaredThrowableException;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -95,6 +124,9 @@ import static 
io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
  */
 @ChannelHandler.Sharable
 public class HttpGremlinEndpointHandler extends ChannelInboundHandlerAdapter {
+    private static final Bindings EMPTY_BINDINGS = new SimpleBindings();
+    public static final Timer traversalOpTimer = 
MetricManager.INSTANCE.getTimer(name(GremlinServer.class, "op", "traversal"));
+
     private static final Logger logger = 
LoggerFactory.getLogger(HttpGremlinEndpointHandler.class);
     private static final Logger auditLogger = 
LoggerFactory.getLogger(GremlinServer.AUDIT_LOGGER_NAME);
 
@@ -116,6 +148,8 @@ public class HttpGremlinEndpointHandler extends 
ChannelInboundHandlerAdapter {
 
     private static final Pattern pattern = Pattern.compile("(.*);q=(.*)");
 
+    private Pair<String, MessageTextSerializer<?>> reqSerializer;
+
     public HttpGremlinEndpointHandler(final Map<String, MessageSerializer<?>> 
serializers,
                                       final GremlinExecutor gremlinExecutor,
                                       final GraphManager graphManager,
@@ -158,8 +192,8 @@ public class HttpGremlinEndpointHandler extends 
ChannelInboundHandlerAdapter {
             }
 
             final String acceptMime = 
Optional.ofNullable(req.headers().get(HttpHeaderNames.ACCEPT)).orElse("application/json");
-            final Pair<String, MessageTextSerializer<?>> serializer = 
chooseSerializer(acceptMime);
-            if (null == serializer) {
+            reqSerializer = chooseSerializer(acceptMime);
+            if (null == reqSerializer) {
                 HttpHandlerUtil.sendError(ctx, BAD_REQUEST, String.format("no 
serializer for requested Accept header: %s", acceptMime),
                         keepAlive);
                 ReferenceCountUtil.release(msg);
@@ -186,124 +220,125 @@ public class HttpGremlinEndpointHandler extends 
ChannelInboundHandlerAdapter {
                     auditLogger.info("User {} with address {} requested: {}", 
user.getName(), address,
                             
requestMessage.getArgOrDefault(Tokens.ARGS_GREMLIN, ""));
                 }
-                final ChannelPromise promise = ctx.channel().newPromise();
-                final AtomicReference<Object> resultHolder = new 
AtomicReference<>();
-                promise.addListener(future -> {
-                    // if failed then the error was already written back to 
the client as part of the eval future
-                    // processing of the exception
-                    if (future.isSuccess()) {
-                        logger.debug("Preparing HTTP response for request with 
script [{}] and bindings of [{}] with result of [{}] on [{}]",
-                                
requestMessage.getArgOrDefault(Tokens.ARGS_GREMLIN, ""),
-                                
requestMessage.getArgOrDefault(Tokens.ARGS_BINDINGS, Collections.emptyMap()),
-                                resultHolder.get(), 
Thread.currentThread().getName());
-                        final FullHttpResponse response = new 
DefaultFullHttpResponse(HTTP_1_1, OK, (ByteBuf) resultHolder.get());
-                        response.headers().set(HttpHeaderNames.CONTENT_TYPE, 
serializer.getValue0());
-
-                        // handle cors business
-                        if (origin != null) 
response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, origin);
-
-                        HttpHandlerUtil.sendAndCleanupConnection(ctx, 
keepAlive, response);
-                    }
-                });
+//                final ChannelPromise promise = ctx.channel().newPromise();
+//                final AtomicReference<Object> resultHolder = new 
AtomicReference<>();
+//                promise.addListener(future -> {
+//                    // if failed then the error was already written back to 
the client as part of the eval future
+//                    // processing of the exception
+//                    if (future.isSuccess()) {
+//                        logger.debug("Preparing HTTP response for request 
with script [{}] and bindings of [{}] with result of [{}] on [{}]",
+//                                
requestMessage.getArgOrDefault(Tokens.ARGS_GREMLIN, ""),
+//                                
requestMessage.getArgOrDefault(Tokens.ARGS_BINDINGS, Collections.emptyMap()),
+//                                resultHolder.get(), 
Thread.currentThread().getName());
+//                        final FullHttpResponse response = new 
DefaultFullHttpResponse(HTTP_1_1, OK, (ByteBuf) resultHolder.get());
+//                        response.headers().set(HttpHeaderNames.CONTENT_TYPE, 
serializer.getValue0());
+//
+//                        // handle cors business
+//                        if (origin != null) 
response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, origin);
+//
+//                        HttpHandlerUtil.sendAndCleanupConnection(ctx, 
keepAlive, response);
+//                    }
+//                });
 
                 final Timer.Context timerContext = evalOpTimer.time();
-
-                final Bindings bindings;
-                try {
-                    bindings = 
createBindings(requestMessage.getArgOrDefault(Tokens.ARGS_BINDINGS, 
Collections.emptyMap()),
-                            
requestMessage.getArgOrDefault(Tokens.ARGS_ALIASES, Collections.emptyMap()));
-                } catch (IllegalStateException iae) {
-                    HttpHandlerUtil.sendError(ctx, BAD_REQUEST, 
iae.getMessage(), keepAlive);
-                    ReferenceCountUtil.release(msg);
-                    return;
-                }
-
-                // provide a transform function to serialize to message - this 
will force serialization to occur
-                // in the same thread as the eval. after the CompletableFuture 
is returned from the eval the result
-                // is ready to be written as a ByteBuf directly to the 
response.  nothing should be blocking here.
-                final CompletableFuture<Object> evalFuture = 
gremlinExecutor.eval(
-                        requestMessage.getArg(Tokens.ARGS_GREMLIN), 
requestMessage.getArg(Tokens.ARGS_LANGUAGE), bindings,
-                        
requestMessage.getArgOrDefault(Tokens.ARGS_EVAL_TIMEOUT, null),
-                        FunctionUtils.wrapFunction(o -> {
-                            // stopping the timer here is roughly equivalent 
to where the timer would have been stopped for
-                            // this metric in other contexts.  we just want to 
measure eval time not serialization time.
-                            timerContext.stop();
-
-                            logger.debug("Transforming result of request with 
script [{}] and bindings of [{}] with result of [{}] on [{}]",
-                                    requestMessage.getArg(Tokens.ARGS_GREMLIN),
-                                    
requestMessage.getArg(Tokens.ARGS_BINDINGS), o, 
Thread.currentThread().getName());
-
-                            final Optional<String> mp = 
requestMessage.getArg(Tokens.ARGS_GREMLIN) instanceof String
-                                    ? 
GremlinScriptChecker.parse(requestMessage.getArg(Tokens.ARGS_GREMLIN)).getMaterializeProperties()
-                                    : Optional.empty();
-
-                            // need to replicate what TraversalOpProcessor 
does with the bytecode op. it converts
-                            // results to Traverser so that GLVs can handle 
the results. don't quite get the same
-                            // benefit here because the bulk has to be 1 since 
we've already resolved the result,
-                            // but at least http is compatible
-                            final List<Object> results = 
requestMessage.getOp().equals(Tokens.OPS_BYTECODE) ?
-                                    (List<Object>) 
IteratorUtils.asList(o).stream().map(r -> new DefaultRemoteTraverser<Object>(r, 
1)).collect(Collectors.toList()) :
-                                    IteratorUtils.asList(o);
-
-                            if (mp.isPresent() && 
mp.get().equals(Tokens.MATERIALIZE_PROPERTIES_TOKENS)) {
-                                final Object firstElement = results.get(0);
-
-                                if (firstElement instanceof Element) {
-                                    for (int i = 0; i < results.size(); i++)
-                                        results.set(i, 
ReferenceFactory.detach((Element) results.get(i)));
-                                } else if (firstElement instanceof 
AbstractTraverser) {
-                                    for (final Object item : results)
-                                        ((AbstractTraverser) item).detach();
-                                }
-                            }
-
-                            final ResponseMessage responseMessage = 
ResponseMessage.build(requestMessage.getRequestId())
-                                    .code(ResponseStatusCode.SUCCESS)
-                                    .result(results).create();
-
-                            // http server is sessionless and must handle 
commit on transactions. the commit occurs
-                            // before serialization to be consistent with how 
things work for websocket based
-                            // communication.  this means that failed 
serialization does not mean that you won't get
-                            // a commit to the database
-                            
attemptCommit(requestMessage.getArg(Tokens.ARGS_ALIASES), graphManager, 
settings.strictTransactionManagement);
-
-                            try {
-                                return 
Unpooled.wrappedBuffer(serializer.getValue1().serializeResponseAsBinary(responseMessage,
 ctx.alloc()));
-                            } catch (Exception ex) {
-                                logger.warn(String.format("Error during 
serialization for %s", responseMessage), ex);
-
-                                // creating a new SerializationException will 
clear the cause which will allow the
-                                // future to report a better error message. if 
the cause is present, then
-                                // GremlinExecutor will prefer the cause and 
we'll get a low level Jackson sort of
-                                // error in the response.
-                                if (ex instanceof SerializationException) {
-                                    throw new 
SerializationException(String.format(
-                                            "Could not serialize the result 
with %s - %s",
-                                            serializer.getValue0(),
-                                            ex.getMessage()));
-                                }
-
-                                throw ex;
-                            }
-                        }));
-
-                evalFuture.exceptionally(t -> {
-                    if (t.getMessage() != null)
-                        HttpHandlerUtil.sendError(ctx, INTERNAL_SERVER_ERROR, 
t.getMessage(), Optional.of(t), keepAlive);
-                    else
-                        HttpHandlerUtil.sendError(ctx, INTERNAL_SERVER_ERROR, 
String.format("Error encountered evaluating script: %s",
-                                        
requestMessage.getArg(Tokens.ARGS_GREMLIN))
-                                , Optional.of(t), keepAlive);
-                    promise.setFailure(t);
-                    return null;
-                });
-
-                evalFuture.thenAcceptAsync(r -> {
-                    // now that the eval/serialization is done in the same 
thread - complete the promise so we can
-                    // write back the HTTP response on the same thread as the 
original request
-                    resultHolder.set(r);
-                    promise.setSuccess();
-                }, gremlinExecutor.getExecutorService());
+                Context serverCtx = new Context(requestMessage, ctx, settings, 
graphManager, gremlinExecutor, gremlinExecutor.getScheduledExecutorService());
+                iterateBytecodeTraversal(serverCtx);
+//                final Bindings bindings;
+//                try {
+//                    bindings = 
createBindings(requestMessage.getArgOrDefault(Tokens.ARGS_BINDINGS, 
Collections.emptyMap()),
+//                            
requestMessage.getArgOrDefault(Tokens.ARGS_ALIASES, Collections.emptyMap()));
+//                } catch (IllegalStateException iae) {
+//                    HttpHandlerUtil.sendError(ctx, BAD_REQUEST, 
iae.getMessage(), keepAlive);
+//                    ReferenceCountUtil.release(msg);
+//                    return;
+//                }
+//
+//                // provide a transform function to serialize to message - 
this will force serialization to occur
+//                // in the same thread as the eval. after the 
CompletableFuture is returned from the eval the result
+//                // is ready to be written as a ByteBuf directly to the 
response.  nothing should be blocking here.
+//                final CompletableFuture<Object> evalFuture = 
gremlinExecutor.eval(
+//                        requestMessage.getArg(Tokens.ARGS_GREMLIN), 
requestMessage.getArg(Tokens.ARGS_LANGUAGE), bindings,
+//                        
requestMessage.getArgOrDefault(Tokens.ARGS_EVAL_TIMEOUT, null),
+//                        FunctionUtils.wrapFunction(o -> {
+//                            // stopping the timer here is roughly equivalent 
to where the timer would have been stopped for
+//                            // this metric in other contexts.  we just want 
to measure eval time not serialization time.
+//                            timerContext.stop();
+//
+//                            logger.debug("Transforming result of request 
with script [{}] and bindings of [{}] with result of [{}] on [{}]",
+//                                    
requestMessage.getArg(Tokens.ARGS_GREMLIN),
+//                                    
requestMessage.getArg(Tokens.ARGS_BINDINGS), o, 
Thread.currentThread().getName());
+//
+//                            final Optional<String> mp = 
requestMessage.getArg(Tokens.ARGS_GREMLIN) instanceof String
+//                                    ? 
GremlinScriptChecker.parse(requestMessage.getArg(Tokens.ARGS_GREMLIN)).getMaterializeProperties()
+//                                    : Optional.empty();
+//
+//                            // need to replicate what TraversalOpProcessor 
does with the bytecode op. it converts
+//                            // results to Traverser so that GLVs can handle 
the results. don't quite get the same
+//                            // benefit here because the bulk has to be 1 
since we've already resolved the result,
+//                            // but at least http is compatible
+//                            final List<Object> results = 
requestMessage.getOp().equals(Tokens.OPS_BYTECODE) ?
+//                                    (List<Object>) 
IteratorUtils.asList(o).stream().map(r -> new DefaultRemoteTraverser<Object>(r, 
1)).collect(Collectors.toList()) :
+//                                    IteratorUtils.asList(o);
+//
+//                            if (mp.isPresent() && 
mp.get().equals(Tokens.MATERIALIZE_PROPERTIES_TOKENS)) {
+//                                final Object firstElement = results.get(0);
+//
+//                                if (firstElement instanceof Element) {
+//                                    for (int i = 0; i < results.size(); i++)
+//                                        results.set(i, 
ReferenceFactory.detach((Element) results.get(i)));
+//                                } else if (firstElement instanceof 
AbstractTraverser) {
+//                                    for (final Object item : results)
+//                                        ((AbstractTraverser) item).detach();
+//                                }
+//                            }
+//
+//                            final ResponseMessage responseMessage = 
ResponseMessage.build(requestMessage.getRequestId())
+//                                    .code(ResponseStatusCode.SUCCESS)
+//                                    .result(results).create();
+//
+//                            // http server is sessionless and must handle 
commit on transactions. the commit occurs
+//                            // before serialization to be consistent with 
how things work for websocket based
+//                            // communication.  this means that failed 
serialization does not mean that you won't get
+//                            // a commit to the database
+//                            
attemptCommit(requestMessage.getArg(Tokens.ARGS_ALIASES), graphManager, 
settings.strictTransactionManagement);
+//
+//                            try {
+//                                return 
Unpooled.wrappedBuffer(serializer.getValue1().serializeResponseAsBinary(responseMessage,
 ctx.alloc()));
+//                            } catch (Exception ex) {
+//                                logger.warn(String.format("Error during 
serialization for %s", responseMessage), ex);
+//
+//                                // creating a new SerializationException 
will clear the cause which will allow the
+//                                // future to report a better error message. 
if the cause is present, then
+//                                // GremlinExecutor will prefer the cause and 
we'll get a low level Jackson sort of
+//                                // error in the response.
+//                                if (ex instanceof SerializationException) {
+//                                    throw new 
SerializationException(String.format(
+//                                            "Could not serialize the result 
with %s - %s",
+//                                            serializer.getValue0(),
+//                                            ex.getMessage()));
+//                                }
+//
+//                                throw ex;
+//                            }
+//                        }));
+//
+//                evalFuture.exceptionally(t -> {
+//                    if (t.getMessage() != null)
+//                        HttpHandlerUtil.sendError(ctx, 
INTERNAL_SERVER_ERROR, t.getMessage(), Optional.of(t), keepAlive);
+//                    else
+//                        HttpHandlerUtil.sendError(ctx, 
INTERNAL_SERVER_ERROR, String.format("Error encountered evaluating script: %s",
+//                                        
requestMessage.getArg(Tokens.ARGS_GREMLIN))
+//                                , Optional.of(t), keepAlive);
+//                    promise.setFailure(t);
+//                    return null;
+//                });
+//
+//                evalFuture.thenAcceptAsync(r -> {
+//                    // now that the eval/serialization is done in the same 
thread - complete the promise so we can
+//                    // write back the HTTP response on the same thread as 
the original request
+//                    resultHolder.set(r);
+//                    promise.setSuccess();
+//                }, gremlinExecutor.getExecutorService());
             } catch (Exception ex) {
                 // send the error response here and don't rely on exception 
caught because it might not have the
                 // context on whether to close the connection or not, based on 
keepalive.
@@ -391,4 +426,457 @@ public class HttpGremlinEndpointHandler extends 
ChannelInboundHandlerAdapter {
         else
             graphManager.commitAll();
     }
+
+    private static void validateTraversalSourceAlias(final Context ctx, final 
RequestMessage message, final Map<String, String> aliases) throws 
OpProcessorException {
+        final String traversalSourceBindingForAlias = 
aliases.values().iterator().next();
+        if 
(!ctx.getGraphManager().getTraversalSourceNames().contains(traversalSourceBindingForAlias))
 {
+            final String msg = String.format("The traversal source [%s] for 
alias [%s] is not configured on the server.", traversalSourceBindingForAlias, 
Tokens.VAL_TRAVERSAL_SOURCE_ALIAS);
+            throw new OpProcessorException(msg, 
ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+        }
+    }
+
+    private static Map<String, String> validateTraversalRequest(final 
RequestMessage message) throws OpProcessorException {
+        if (!message.optionalArgs(Tokens.ARGS_GREMLIN).isPresent()) {
+            final String msg = String.format("A message with [%s] op code 
requires a [%s] argument.", Tokens.OPS_BYTECODE, Tokens.ARGS_GREMLIN);
+            throw new OpProcessorException(msg, 
ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+        }
+
+        // matches functionality in the UnifiedHandler
+        if (!(message.optionalArgs(Tokens.ARGS_GREMLIN).get() instanceof 
Bytecode)) {
+            final String msg = String.format("A message with [%s] op code 
requires a [%s] argument that is of type %s.",
+                    Tokens.OPS_BYTECODE, Tokens.ARGS_GREMLIN, 
Bytecode.class.getSimpleName());
+            throw new OpProcessorException(msg, 
ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+        }
+
+        return validatedAliases(message).get();
+    }
+
+    private static Optional<Map<String, String>> validatedAliases(final 
RequestMessage message) throws OpProcessorException {
+        final Optional<Map<String, String>> aliases = 
message.optionalArgs(Tokens.ARGS_ALIASES);
+        if (!aliases.isPresent()) {
+            final String msg = String.format("A message with [%s] op code 
requires a [%s] argument.", Tokens.OPS_BYTECODE, Tokens.ARGS_ALIASES);
+            throw new OpProcessorException(msg, 
ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+        }
+
+        if (aliases.get().size() != 1 || 
!aliases.get().containsKey(Tokens.VAL_TRAVERSAL_SOURCE_ALIAS)) {
+            final String msg = String.format("A message with [%s] op code 
requires the [%s] argument to be a Map containing one alias assignment named 
'%s'.",
+                    Tokens.OPS_BYTECODE, Tokens.ARGS_ALIASES, 
Tokens.VAL_TRAVERSAL_SOURCE_ALIAS);
+            throw new OpProcessorException(msg, 
ResponseMessage.build(message).code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(msg).create());
+        }
+
+        return aliases;
+    }
+
+    private void iterateBytecodeTraversal(final Context context) throws 
Exception {
+        final RequestMessage msg = context.getRequestMessage();
+        final Settings settings = context.getSettings();
+        logger.debug("Traversal request {} for in thread {}", 
msg.getRequestId(), Thread.currentThread().getName());
+
+        // validateTraversalRequest() ensures that this is of type Bytecode
+        final Object bytecodeObj = msg.getArgs().get(Tokens.ARGS_GREMLIN);
+        final Bytecode bytecode = (Bytecode) bytecodeObj;
+
+        // earlier validation in selection of this op method should free us to 
cast this without worry
+        final Map<String, String> aliases = (Map<String, String>) 
msg.optionalArgs(Tokens.ARGS_ALIASES).get();
+
+        // timeout override - handle both deprecated and newly named 
configuration. earlier logic should prevent
+        // both configurations from being submitted at the same time
+        final Map<String, Object> args = msg.getArgs();
+        final long seto = args.containsKey(Tokens.ARGS_EVAL_TIMEOUT) ?
+                ((Number) args.get(Tokens.ARGS_EVAL_TIMEOUT)).longValue() : 
context.getSettings().getEvaluationTimeout();
+
+        final GraphManager graphManager = context.getGraphManager();
+        final String traversalSourceName = 
aliases.entrySet().iterator().next().getValue();
+        final TraversalSource g = 
graphManager.getTraversalSource(traversalSourceName);
+
+        final Traversal.Admin<?, ?> traversal;
+        try {
+            final Optional<String> lambdaLanguage = 
BytecodeHelper.getLambdaLanguage(bytecode);
+            if (!lambdaLanguage.isPresent())
+                traversal = JavaTranslator.of(g).translate(bytecode);
+            else
+                traversal = context.getGremlinExecutor().eval(bytecode, 
EMPTY_BINDINGS, lambdaLanguage.get(), traversalSourceName);
+        } catch (ScriptException ex) {
+            logger.error("Traversal contains a lambda that cannot be 
compiled", ex);
+            throw new OpProcessorException("Traversal contains a lambda that 
cannot be compiled",
+                    
ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_EVALUATION)
+                            .statusMessage(ex.getMessage())
+                            .statusAttributeException(ex).create());
+        } catch (Exception ex) {
+            logger.error("Could not deserialize the Traversal instance", ex);
+            throw new OpProcessorException("Could not deserialize the 
Traversal instance",
+                    
ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_SERIALIZATION)
+                            .statusMessage(ex.getMessage())
+                            .statusAttributeException(ex).create());
+        }
+        if (settings.enableAuditLog) {
+            AuthenticatedUser user = 
context.getChannelHandlerContext().channel().attr(StateKey.AUTHENTICATED_USER).get();
+            if (null == user) {    // This is expected when using the 
AllowAllAuthenticator
+                user = AuthenticatedUser.ANONYMOUS_USER;
+            }
+            String address = 
context.getChannelHandlerContext().channel().remoteAddress().toString();
+            if (address.startsWith("/") && address.length() > 1) address = 
address.substring(1);
+            auditLogger.info("User {} with address {} requested: {}", 
user.getName(), address, bytecode);
+        }
+
+        final Timer.Context timerContext = traversalOpTimer.time();
+        final FutureTask<Void> evalFuture = new FutureTask<>(() -> {
+            context.setStartedResponse();
+            final Graph graph = g.getGraph();
+
+            try {
+                beforeProcessing(graph, context);
+
+                try {
+                    // compile the traversal - without it getEndStep() has 
nothing in it
+                    traversal.applyStrategies();
+                    handleIterator(context, new TraverserIterator(traversal), 
graph);
+                } catch (Exception ex) {
+                    Throwable t = ex;
+                    if (ex instanceof UndeclaredThrowableException)
+                        t = t.getCause();
+
+                    // if any exception in the chain is TemporaryException or 
Failure then we should respond with the
+                    // right error code so that the client knows to retry
+                    final Optional<Throwable> possibleSpecialException = 
determineIfSpecialException(ex);
+                    if (possibleSpecialException.isPresent()) {
+                        final Throwable special = 
possibleSpecialException.get();
+                        final ResponseMessage.Builder specialResponseMsg = 
ResponseMessage.build(msg).
+                                statusMessage(special.getMessage()).
+                                statusAttributeException(special);
+                        if (special instanceof TemporaryException) {
+                            
specialResponseMsg.code(ResponseStatusCode.SERVER_ERROR_TEMPORARY);
+                        } else if (special instanceof Failure) {
+                            final Failure failure = (Failure) special;
+                            
specialResponseMsg.code(ResponseStatusCode.SERVER_ERROR_FAIL_STEP).
+                                    
statusAttribute(Tokens.STATUS_ATTRIBUTE_FAIL_STEP_MESSAGE, failure.format());
+                        }
+                        context.writeAndFlush(specialResponseMsg.create());
+                    } else if (t instanceof InterruptedException || t 
instanceof TraversalInterruptedException) {
+                        graphManager.onQueryError(msg, t);
+                        final String errorMessage = String.format("A timeout 
occurred during traversal evaluation of [%s] - consider increasing the limit 
given to evaluationTimeout", msg);
+                        logger.warn(errorMessage);
+                        
context.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
+                                .statusMessage(errorMessage)
+                                .statusAttributeException(ex).create());
+                    } else {
+                        logger.warn(String.format("Exception processing a 
Traversal on iteration for request [%s].", msg.getRequestId()), ex);
+                        
context.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR)
+                                .statusMessage(ex.getMessage())
+                                .statusAttributeException(ex).create());
+                    }
+                    onError(graph, context, ex);
+                }
+            } catch (Throwable t) {
+                onError(graph, context, t);
+                // if any exception in the chain is TemporaryException or 
Failure then we should respond with the
+                // right error code so that the client knows to retry
+                final Optional<Throwable> possibleSpecialException = 
determineIfSpecialException(t);
+                if (possibleSpecialException.isPresent()) {
+                    final Throwable special = possibleSpecialException.get();
+                    final ResponseMessage.Builder specialResponseMsg = 
ResponseMessage.build(msg).
+                            statusMessage(special.getMessage()).
+                            statusAttributeException(special);
+                    if (special instanceof TemporaryException) {
+                        
specialResponseMsg.code(ResponseStatusCode.SERVER_ERROR_TEMPORARY);
+                    } else if (special instanceof Failure) {
+                        final Failure failure = (Failure) special;
+                        
specialResponseMsg.code(ResponseStatusCode.SERVER_ERROR_FAIL_STEP).
+                                
statusAttribute(Tokens.STATUS_ATTRIBUTE_FAIL_STEP_MESSAGE, failure.format());
+                    }
+                    context.writeAndFlush(specialResponseMsg.create());
+                } else {
+                    logger.warn(String.format("Exception processing a 
Traversal on request [%s].", msg.getRequestId()), t);
+                    
context.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR)
+                            .statusMessage(t.getMessage())
+                            .statusAttributeException(t).create());
+                    if (t instanceof Error) {
+                        //Re-throw any errors to be handled by and set as the 
result of evalFuture
+                        throw t;
+                    }
+                }
+            } finally {
+                timerContext.stop();
+
+                // There is a race condition that this query may have finished 
before the timeoutFuture was created,
+                // though this is very unlikely. This is handled in the 
settor, if this has already been grabbed.
+                // If we passed this point and the setter hasn't been called, 
it will cancel the timeoutFuture inside
+                // the setter to compensate.
+                final ScheduledFuture<?> timeoutFuture = 
context.getTimeoutExecutor();
+                if (null != timeoutFuture)
+                    timeoutFuture.cancel(true);
+            }
+
+            return null;
+        });
+
+        try {
+            final Future<?> executionFuture = 
context.getGremlinExecutor().getExecutorService().submit(evalFuture);
+            if (seto > 0) {
+                // Schedule a timeout in the thread pool for future execution
+                
context.setTimeoutExecutor(context.getScheduledExecutorService().schedule(() -> 
{
+                    executionFuture.cancel(true);
+                    if (!context.getStartedResponse()) {
+                        context.sendTimeoutResponse();
+                    }
+                }, seto, TimeUnit.MILLISECONDS));
+            }
+        } catch (RejectedExecutionException ree) {
+            
context.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.TOO_MANY_REQUESTS)
+                    .statusMessage("Rate limiting").create());
+        }
+    }
+
+    protected void beforeProcessing(final Graph graph, final Context ctx) {
+        final GraphManager graphManager = ctx.getGraphManager();
+        final RequestMessage msg = ctx.getRequestMessage();
+        graphManager.beforeQueryStart(msg);
+        if (graph.features().graph().supportsTransactions() && 
graph.tx().isOpen()) graph.tx().rollback();
+    }
+
+    protected void onError(final Graph graph, final Context ctx, Throwable 
error) {
+        final GraphManager graphManager = ctx.getGraphManager();
+        final RequestMessage msg = ctx.getRequestMessage();
+        graphManager.onQueryError(msg, error);
+        if (graph.features().graph().supportsTransactions() && 
graph.tx().isOpen()) graph.tx().rollback();
+    }
+
+    protected void onTraversalSuccess(final Graph graph, final Context ctx) {
+        final GraphManager graphManager = ctx.getGraphManager();
+        final RequestMessage msg = ctx.getRequestMessage();
+        graphManager.onQuerySuccess(msg);
+        if (graph.features().graph().supportsTransactions() && 
graph.tx().isOpen()) graph.tx().commit();
+    }
+
+    protected void handleIterator(final Context context, final Iterator itty, 
final Graph graph) throws InterruptedException {
+        final ChannelHandlerContext nettyContext = 
context.getChannelHandlerContext();
+        final RequestMessage msg = context.getRequestMessage();
+        final Settings settings = context.getSettings();
+//        final MessageSerializer<?> serializer = 
nettyContext.channel().attr(StateKey.SERIALIZER).get(); get from member variable
+        final boolean useBinary = true; // 
nettyContext.channel().attr(StateKey.USE_BINARY).get();
+        boolean warnOnce = false;
+
+        // we have an empty iterator - happens on stuff like: g.V().iterate()
+        if (!itty.hasNext()) {
+            final Map<String, Object> attributes = 
generateStatusAttributes(nettyContext, msg, ResponseStatusCode.NO_CONTENT, 
itty, settings);
+
+            // as there is nothing left to iterate if we are transaction 
managed then we should execute a
+            // commit here before we send back a NO_CONTENT which implies 
success
+            onTraversalSuccess(graph, context);
+            context.writeAndFlush(ResponseMessage.build(msg)
+                    .code(ResponseStatusCode.NO_CONTENT)
+                    .statusAttributes(attributes)
+                    .create());
+            return;
+        }
+
+        // the batch size can be overridden by the request
+        final int resultIterationBatchSize = (Integer) 
msg.optionalArgs(Tokens.ARGS_BATCH_SIZE)
+                .orElse(settings.resultIterationBatchSize);
+        List<Object> aggregate = new ArrayList<>(resultIterationBatchSize);
+
+        // use an external control to manage the loop as opposed to just 
checking hasNext() in the while.  this
+        // prevent situations where auto transactions create a new transaction 
after calls to commit() withing
+        // the loop on calls to hasNext().
+        boolean hasMore = itty.hasNext();
+
+        final HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+        response.headers().set(HttpHeaderNames.CONTENT_TYPE, 
reqSerializer.getValue0());
+        HttpUtil.setTransferEncodingChunked(response, true);
+        nettyContext.write(response);
+
+        while (hasMore) {
+            if (Thread.interrupted()) throw new InterruptedException();
+
+            // check if an implementation needs to force flush the aggregated 
results before the iteration batch
+            // size is reached.
+            final boolean forceFlush = isForceFlushed(nettyContext, msg, itty);
+
+            // have to check the aggregate size because it is possible that 
the channel is not writeable (below)
+            // so iterating next() if the message is not written and flushed 
would bump the aggregate size beyond
+            // the expected resultIterationBatchSize.  Total serialization 
time for the response remains in
+            // effect so if the client is "slow" it may simply timeout.
+            //
+            // there is a need to check hasNext() on the iterator because if 
the channel is not writeable the
+            // previous pass through the while loop will have next()'d the 
iterator and if it is "done" then a
+            // NoSuchElementException will raise its head. also need a check 
to ensure that this iteration doesn't
+            // require a forced flush which can be forced by sub-classes.
+            //
+            // this could be placed inside the isWriteable() portion of the 
if-then below but it seems better to
+            // allow iteration to continue into a batch if that is possible 
rather than just doing nothing at all
+            // while waiting for the client to catch up
+            if (aggregate.size() < resultIterationBatchSize && itty.hasNext() 
&& !forceFlush) aggregate.add(itty.next());
+
+            // Don't keep executor busy if client has already given up; there 
is no way to catch up if the channel is
+            // not active, and hence we should break the loop.
+            if (!nettyContext.channel().isActive()) {
+                onError(graph, context, new ChannelException("Channel is not 
active - cannot write any more results"));
+                break;
+            }
+
+            // send back a page of results if batch size is met or if it's the 
end of the results being iterated.
+            // also check writeability of the channel to prevent OOME for slow 
clients.
+            //
+            // clients might decide to close the Netty channel to the server 
with a CloseWebsocketFrame after errors
+            // like CorruptedFrameException. On the server, although the 
channel gets closed, there might be some
+            // executor threads waiting for watermark to clear which will not 
clear in these cases since client has
+            // already given up on these requests. This leads to these 
executors waiting for the client to consume
+            // results till the timeout. checking for isActive() should help 
prevent that.
+            if (nettyContext.channel().isActive() && 
nettyContext.channel().isWritable()) {
+                if (forceFlush || aggregate.size() == resultIterationBatchSize 
|| !itty.hasNext()) {
+                    final ResponseStatusCode code = itty.hasNext() ? 
ResponseStatusCode.PARTIAL_CONTENT : ResponseStatusCode.SUCCESS;
+
+                    // serialize here because in sessionless requests the 
serialization must occur in the same
+                    // thread as the eval.  as eval occurs in the 
GremlinExecutor there's no way to get back to the
+                    // thread that processed the eval of the script so, we 
have to push serialization down into that
+                    final Map<String, Object> metadata = 
generateResultMetaData(nettyContext, msg, code, itty, settings);
+                    final Map<String, Object> statusAttrb = 
generateStatusAttributes(nettyContext, msg, code, itty, settings);
+                    Frame frame = null;
+                    try {
+                        frame = makeFrame(context, msg, 
reqSerializer.getValue1(), useBinary, aggregate, code,
+                                metadata, statusAttrb);
+                    } catch (Exception ex) {
+                        // a frame may use a Bytebuf which is a countable 
release - if it does not get written
+                        // downstream it needs to be released here
+                        if (frame != null) frame.tryRelease();
+
+                        // exception is handled in makeFrame() - serialization 
error gets written back to driver
+                        // at that point
+                        onError(graph, context, ex);
+                        break;
+                    }
+
+                    // track whether there is anything left in the iterator 
because it needs to be accessed after
+                    // the transaction could be closed - in that case a call 
to hasNext() could open a new transaction
+                    // unintentionally
+                    hasMore = itty.hasNext();
+
+                    try {
+                        // only need to reset the aggregation list if there's 
more stuff to write
+                        if (hasMore)
+                            aggregate = new 
ArrayList<>(resultIterationBatchSize);
+                        else {
+                            // iteration and serialization are both complete 
which means this finished successfully. note that
+                            // errors internal to script eval or timeout will 
rollback given GremlinServer's global configurations.
+                            // local errors will get rolled back below because 
the exceptions aren't thrown in those cases to be
+                            // caught by the GremlinExecutor for global 
rollback logic. this only needs to be committed if
+                            // there are no more items to iterate and 
serialization is complete
+                            onTraversalSuccess(graph, context);
+                        }
+                    } catch (Exception ex) {
+                        // a frame may use a Bytebuf which is a countable 
release - if it does not get written
+                        // downstream it needs to be released here
+                        if (frame != null) frame.tryRelease();
+                        throw ex;
+                    }
+
+                    if (!hasMore) iterateComplete(nettyContext, msg, itty);
+
+                    // the flush is called after the commit has potentially 
occurred.  in this way, if a commit was
+                    // required then it will be 100% complete before the 
client receives it. the "frame" at this point
+                    // should have completely detached objects from the 
transaction (i.e. serialization has occurred)
+                    // so a new one should not be opened on the flush down the 
netty pipeline
+//                    don't do this for the first part as we need to return 
http response not ws
+//                    context.writeAndFlush(code, frame);
+//                    final FullHttpResponse response = new 
DefaultFullHttpResponse(HTTP_1_1, OK, (ByteBuf) frame.getMsg());
+//                    response.headers().set(HttpHeaderNames.CONTENT_TYPE, 
reqSerializer.getValue0());
+
+                    // handle cors business
+//                    if (origin != null) 
response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, origin);
+
+
+                    if (hasMore) {
+                        nettyContext.writeAndFlush(new PartialChunkedInput(new 
ChunkedStream(new ByteBufInputStream((ByteBuf) frame.getMsg()))));
+                    } else {
+                        nettyContext.writeAndFlush(new HttpChunkedInput(new 
ChunkedStream(new ByteBufInputStream((ByteBuf) frame.getMsg()))));
+                    }
+
+                }
+            } else {
+                // don't keep triggering this warning over and over again for 
the same request
+                if (!warnOnce) {
+                    logger.warn("Pausing response writing as 
writeBufferHighWaterMark exceeded on {} - writing will continue once client has 
caught up", msg);
+                    warnOnce = true;
+                }
+
+                // since the client is lagging we can hold here for a period 
of time for the client to catch up.
+                // this isn't blocking the IO thread - just a worker.
+                TimeUnit.MILLISECONDS.sleep(10);
+            }
+        }
+    }
+
+    protected static Optional<Throwable> determineIfSpecialException(final 
Throwable ex) {
+        return Stream.of(ExceptionUtils.getThrowables(ex)).
+                filter(i -> i instanceof TemporaryException || i instanceof 
Failure).findFirst();
+    }
+
+    protected void iterateComplete(final ChannelHandlerContext ctx, final 
RequestMessage msg, final Iterator itty) {
+        // do nothing by default
+    }
+
+    protected Map<String, Object> generateResultMetaData(final 
ChannelHandlerContext ctx, final RequestMessage msg,
+                                                         final 
ResponseStatusCode code, final Iterator itty,
+                                                         final Settings 
settings) {
+        return Collections.emptyMap();
+    }
+
+    /**
+     * Generates response status meta-data to put on a {@link ResponseMessage}.
+     *
+     * @param itty a reference to the current {@link Iterator} of results - it 
is not meant to be forwarded in
+     *             this method
+     */
+    protected Map<String, Object> generateStatusAttributes(final 
ChannelHandlerContext ctx, final RequestMessage msg,
+                                                           final 
ResponseStatusCode code, final Iterator itty,
+                                                           final Settings 
settings) {
+        // only return server metadata on the last message
+        if (itty.hasNext()) return Collections.emptyMap();
+
+        final Map<String, Object> metaData = new HashMap<>();
+        metaData.put(Tokens.ARGS_HOST, 
ctx.channel().remoteAddress().toString());
+
+        return metaData;
+    }
+
+    protected static Frame makeFrame(final Context ctx, final RequestMessage 
msg,
+                                     final MessageSerializer<?> serializer, 
final boolean useBinary, final List<Object> aggregate,
+                                     final ResponseStatusCode code, final 
Map<String,Object> responseMetaData,
+                                     final Map<String,Object> 
statusAttributes) throws Exception {
+        try {
+            final ChannelHandlerContext nettyContext = 
ctx.getChannelHandlerContext();
+
+            ctx.handleDetachment(aggregate);
+
+            if (useBinary) {
+                return new 
Frame(serializer.serializeResponseAsBinary(ResponseMessage.build(msg)
+                        .code(code)
+                        .statusAttributes(statusAttributes)
+                        .responseMetaData(responseMetaData)
+                        .result(aggregate).create(), nettyContext.alloc()));
+            } else {
+                // the expectation is that the GremlinTextRequestDecoder will 
have placed a MessageTextSerializer
+                // instance on the channel.
+                final MessageTextSerializer<?> textSerializer = 
(MessageTextSerializer<?>) serializer;
+                return new 
Frame(textSerializer.serializeResponseAsString(ResponseMessage.build(msg)
+                        .code(code)
+                        .statusAttributes(statusAttributes)
+                        .responseMetaData(responseMetaData)
+                        .result(aggregate).create(), nettyContext.alloc()));
+            }
+        } catch (Exception ex) {
+            logger.warn("The result [{}] in the request {} could not be 
serialized and returned.", aggregate, msg.getRequestId(), ex);
+            final String errorMessage = String.format("Error during 
serialization: %s", ExceptionHelper.getMessageFromExceptionOrCause(ex));
+            final ResponseMessage error = 
ResponseMessage.build(msg.getRequestId())
+                    .statusMessage(errorMessage)
+                    .statusAttributeException(ex)
+                    
.code(ResponseStatusCode.SERVER_ERROR_SERIALIZATION).create();
+            ctx.writeAndFlush(error);
+            throw ex;
+        }
+    }
+
+    protected boolean isForceFlushed(final ChannelHandlerContext ctx, final 
RequestMessage msg, final Iterator itty) {
+        return false;
+    }
 }
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/PartialChunkedInput.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/PartialChunkedInput.java
new file mode 100644
index 0000000000..49ff95ce93
--- /dev/null
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/PartialChunkedInput.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.DefaultHttpContent;
+import io.netty.handler.codec.http.HttpContent;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.stream.ChunkedInput;
+
+public class PartialChunkedInput implements ChunkedInput<HttpContent> {
+
+    private final ChunkedInput<ByteBuf> input;
+
+
+    public PartialChunkedInput(ChunkedInput<ByteBuf> input) {
+        this.input = input;
+    }
+
+
+    @Override
+    public boolean isEndOfInput() throws Exception {
+        return input.isEndOfInput();
+    }
+
+    @Override
+    public void close() throws Exception {
+        input.close();
+    }
+
+    @Deprecated
+    @Override
+    public HttpContent readChunk(ChannelHandlerContext ctx) throws Exception {
+        return readChunk(ctx.alloc());
+    }
+
+    @Override
+    public HttpContent readChunk(ByteBufAllocator allocator) throws Exception {
+        if (input.isEndOfInput()) {
+            return null;
+        } else {
+            ByteBuf buf = input.readChunk(allocator);
+            if (buf == null) {
+                return null;
+            }
+            return new DefaultHttpContent(buf);
+        }
+    }
+
+    @Override
+    public long length() {
+        return input.length();
+    }
+
+    @Override
+    public long progress() {
+        return input.progress();
+    }
+}
\ No newline at end of file
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsAndHttpChannelizerHandler.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsAndHttpChannelizerHandler.java
index db57b8e9f9..d99cd066f3 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsAndHttpChannelizerHandler.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/WsAndHttpChannelizerHandler.java
@@ -23,6 +23,7 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelPipeline;
 import io.netty.handler.codec.http.HttpMessage;
+import io.netty.handler.stream.ChunkedWriteHandler;
 import org.apache.tinkerpop.gremlin.server.Channelizer;
 import org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer;
 import org.apache.tinkerpop.gremlin.server.channel.WsAndHttpChannelizer;
@@ -82,6 +83,7 @@ public class WsAndHttpChannelizerHandler extends 
ChannelInboundHandlerAdapter {
             pipeline.remove(PIPELINE_REQUEST_HANDLER);
             if (null != pipeline.get(PIPELINE_HTTP_USER_AGENT_HANDLER)) {
                 pipeline.remove(PIPELINE_HTTP_USER_AGENT_HANDLER);
+                pipeline.remove("chunk-handler");
             }
             if (null != pipeline.get(PIPELINE_AUTHENTICATOR)) {
                 final ChannelHandler authenticator = 
pipeline.get(PIPELINE_AUTHENTICATOR);
@@ -91,7 +93,8 @@ public class WsAndHttpChannelizerHandler extends 
ChannelInboundHandlerAdapter {
                 pipeline.addAfter(PIPELINE_HTTP_USER_AGENT_HANDLER, 
PIPELINE_REQUEST_HANDLER, this.httpGremlinEndpointHandler);
             } else {
                 pipeline.addAfter(PIPELINE_HTTP_AGGREGATOR, 
PIPELINE_HTTP_USER_AGENT_HANDLER, new HttpUserAgentHandler());
-                pipeline.addAfter(PIPELINE_HTTP_USER_AGENT_HANDLER, 
PIPELINE_REQUEST_HANDLER, this.httpGremlinEndpointHandler);
+                pipeline.addAfter(PIPELINE_HTTP_USER_AGENT_HANDLER, 
"chunk-handler", new ChunkedWriteHandler());
+                pipeline.addAfter("chunk-handler", PIPELINE_REQUEST_HANDLER, 
this.httpGremlinEndpointHandler);
                 // Note that channelRead()'s do not propagate down the 
pipeline past HttpGremlinEndpointHandler
             }
         }
diff --git 
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/NettyBuffer.java
 
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/NettyBuffer.java
index 3310313293..1ccdd95c7e 100644
--- 
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/NettyBuffer.java
+++ 
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/NettyBuffer.java
@@ -28,7 +28,7 @@ import java.nio.ByteBuffer;
 /**
  * Represents a {@link Buffer} backed by Netty's {@link ByteBuf}.
  */
-final class NettyBuffer implements Buffer {
+final public class NettyBuffer implements Buffer {
     private final ByteBuf buffer;
 
     /**
diff --git 
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/binary/ResponseMessageSerializer.java
 
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/binary/ResponseMessageSerializer.java
index f6cc304494..1d28a65c47 100644
--- 
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/binary/ResponseMessageSerializer.java
+++ 
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/binary/ResponseMessageSerializer.java
@@ -19,6 +19,8 @@
 package org.apache.tinkerpop.gremlin.util.ser.binary;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.tinkerpop.gremlin.util.ser.NettyBuffer;
 import org.apache.tinkerpop.gremlin.util.ser.NettyBufferFactory;
 import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
 import org.apache.tinkerpop.gremlin.util.message.ResponseResult;
@@ -34,29 +36,81 @@ import java.util.Map;
 import java.util.UUID;
 
 public class ResponseMessageSerializer {
+    public static final int RESPONSE_START_SIZE = -1;
     private static final NettyBufferFactory bufferFactory = new 
NettyBufferFactory();
+    ByteBuf retainedBuf = Unpooled.buffer(0);
+    int retainedSize = RESPONSE_START_SIZE;
 
     public ResponseMessage readValue(final ByteBuf byteBuf, final 
GraphBinaryReader context) throws SerializationException {
         // Wrap netty's buffer
-        final Buffer buffer = bufferFactory.create(byteBuf);
-        final int version = buffer.readByte() & 0xff;
+        final Buffer buffer = bufferFactory.create(retainedBuf.readableBytes() 
> 0 ? retainedBuf : byteBuf);
 
-        if (version >>> 7 != 1) {
-            // This is an indication that the response buffer was incorrectly 
built
-            // Or the buffer offsets are wrong
-            throw new SerializationException("The most significant bit should 
be set according to the format");
+        if (retainedBuf.readableBytes() == 0) {
+            retainedBuf.release();
+            retainedBuf = Unpooled.buffer(0);
         }
 
-        try {
-            return ResponseMessage.build(context.readValue(buffer, UUID.class, 
true))
-                    
.code(ResponseStatusCode.getFromValue(context.readValue(buffer, Integer.class, 
false)))
-                    .statusMessage(context.readValue(buffer, String.class, 
true))
-                    .statusAttributes(context.readValue(buffer, Map.class, 
false))
-                    .responseMetaData(context.readValue(buffer, Map.class, 
false))
-                    .result(context.read(buffer))
-                    .create();
-        } catch (IOException ex) {
-            throw new SerializationException(ex);
+        if (retainedSize == RESPONSE_START_SIZE && byteBuf.readableBytes() >= 
5) {
+            final int version = buffer.readByte() & 0xff;
+
+            if (version >>> 7 != 1) {
+                // This is an indication that the response buffer was 
incorrectly built
+                // Or the buffer offsets are wrong
+                throw new SerializationException("The most significant bit 
should be set according to the format");
+            }
+
+            retainedSize = buffer.readInt() - 5;
+
+            if ((retainedBuf.readableBytes() + buffer.readableBytes()) < 
retainedSize) {
+                retainedBuf.writeBytes(byteBuf);
+
+                return null;
+            } else {
+
+                if (byteBuf.readableBytes() > 0) {
+                    retainedBuf.writeBytes(byteBuf);
+                }
+
+                try {
+                    final Buffer messageBuffer = 
bufferFactory.create(retainedBuf.readSlice(retainedSize));
+                    ResponseMessage respMsg = 
ResponseMessage.build(context.readValue(messageBuffer, UUID.class, true))
+                            
.code(ResponseStatusCode.getFromValue(context.readValue(messageBuffer, 
Integer.class, false)))
+                            .statusMessage(context.readValue(messageBuffer, 
String.class, true))
+                            .statusAttributes(context.readValue(messageBuffer, 
Map.class, false))
+                            .responseMetaData(context.readValue(messageBuffer, 
Map.class, false))
+                            .result(context.read(messageBuffer))
+                            .create();
+
+                    retainedSize = RESPONSE_START_SIZE;
+
+                    return respMsg;
+                } catch (Exception ex) {
+                    throw new SerializationException(ex);
+                }
+            }
+        } else {
+            retainedBuf.writeBytes(byteBuf);
+
+            if (((retainedBuf.readableBytes()) < retainedSize) || retainedSize 
== RESPONSE_START_SIZE) {
+                return null;
+            } else {
+                try {
+                    final Buffer messageBuffer = 
bufferFactory.create(retainedBuf.readSlice(retainedSize));
+                    ResponseMessage respMsg = 
ResponseMessage.build(context.readValue(messageBuffer, UUID.class, true))
+                            
.code(ResponseStatusCode.getFromValue(context.readValue(messageBuffer, 
Integer.class, false)))
+                            .statusMessage(context.readValue(messageBuffer, 
String.class, true))
+                            .statusAttributes(context.readValue(messageBuffer, 
Map.class, false))
+                            .responseMetaData(context.readValue(messageBuffer, 
Map.class, false))
+                            .result(context.read(messageBuffer))
+                            .create();
+
+                    retainedSize = RESPONSE_START_SIZE;
+
+                    return respMsg;
+                } catch (Exception ex) {
+                    throw new SerializationException(ex);
+                }
+            }
         }
     }
 
@@ -70,6 +124,8 @@ public class ResponseMessageSerializer {
         try {
             // Version
             buffer.writeByte(GraphBinaryWriter.VERSION_BYTE);
+            // Make placeholder for size
+            buffer.writeInt(0);
             // Nullable request id
             context.writeValue(value.getRequestId(), buffer, true);
             // Status code
@@ -82,6 +138,13 @@ public class ResponseMessageSerializer {
             context.writeValue(result.getMeta(), buffer, false);
             // Fully-qualified value
             context.write(result.getData(), buffer);
+            // overwrite version
+            buffer.markWriterIndex();
+            final int readable = buffer.readableBytes(); // set this because 
changing writer index affects its value
+            buffer.writerIndex(1);
+            buffer.writeInt(readable);
+            buffer.resetWriterIndex();
+
         } catch (IOException ex) {
             throw new SerializationException(ex);
         }

Reply via email to