This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a commit to branch TINKERPOP-2982
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 675fc4ff8f71c7d37366c26d5da672fcea75fb91
Author: Stephen Mallette <[email protected]>
AuthorDate: Tue Aug 15 10:48:32 2023 -0400

    TINKERPOP-2982 Allowed gremlin-driver to work over HTTP.
    
    HTTP is just an option. Websockets is still intact. There shouldn't be any 
breaking changes here. While there were some server changes, compatibility 
should be retained there...just added some extra parameters that were formerly 
only needed with TraversalOpProcessor. g.tx() and sessions will not work with 
this configuration.
---
 CHANGELOG.asciidoc                                 |   3 +-
 .../tinkerpop/gremlin/driver/Channelizer.java      |  91 +++++++++-
 .../apache/tinkerpop/gremlin/driver/Cluster.java   |  28 +++-
 .../gremlin/driver/HandshakeInterceptor.java       |   2 +
 .../org/apache/tinkerpop/gremlin/driver/Host.java  |  14 +-
 .../gremlin/driver/MessageSerializer.java          |   3 +-
 ...akeInterceptor.java => RequestInterceptor.java} |  12 +-
 .../driver/handler/HttpGremlinRequestEncoder.java  |  92 +++++++++++
 .../driver/handler/HttpGremlinResponseDecoder.java |  51 ++++++
 .../driver/handler/WebSocketClientHandler.java     |   5 +-
 .../gremlin/driver/message/RequestMessage.java     |   8 +
 .../driver/ser/GraphBinaryMessageSerializerV1.java |  10 +-
 .../binary/GraphBinaryMessageSerializerV1Test.java |  27 +++
 .../handler/HttpBasicAuthorizationHandler.java     |  16 +-
 .../server/handler/HttpGremlinEndpointHandler.java |  78 +++++----
 .../gremlin/server/handler/HttpHandlerUtil.java    |  46 ++++--
 .../gremlin/server/GremlinDriverIntegrateTest.java |  33 +++-
 .../gremlin/server/HttpDriverIntegrateTest.java    | 183 +++++++++++++++++++++
 18 files changed, 605 insertions(+), 97 deletions(-)

diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 41703a3990..4278e2ff2a 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -26,7 +26,8 @@ 
image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
 This release also includes changes from <<release-3-5-8, 3.5.8>>.
 
 * Fix a javadoc comment in GraphTraversal.not method.
-
+* Allowed `gremlin-driver` to be used over HTTP.
+* Deprecated the `HandshakeInterceptor` in favor of a more generic 
`RequestInterceptor`.
 
 [[release-3-6-5]]
 === TinkerPop 3.6.5 (Release Date: July 31, 2023)
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index efe47c5977..4c87f7ec78 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -19,6 +19,8 @@
 package org.apache.tinkerpop.gremlin.driver;
 
 import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
+import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinRequestEncoder;
+import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinResponseDecoder;
 import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler;
 import 
org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinRequestEncoder;
 import 
org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinResponseDecoder;
@@ -77,6 +79,13 @@ public interface Channelizer extends ChannelHandler {
     public default void connected() {
     }
 
+    /**
+     * Gets the scheme to use to construct the URL and by default uses HTTP.
+     */
+    public default String getScheme(final boolean sslEnabled) {
+        return sslEnabled ? "https" : "http";
+    }
+
     /**
      * Base implementation of the client side {@link Channelizer}.
      */
@@ -126,7 +135,7 @@ public interface Channelizer extends ChannelHandler {
             }
 
             if (sslCtx.isPresent()) {
-                SslHandler sslHandler = 
sslCtx.get().newHandler(socketChannel.alloc(), connection.getUri().getHost(), 
connection.getUri().getPort());
+                final SslHandler sslHandler = 
sslCtx.get().newHandler(socketChannel.alloc(), connection.getUri().getHost(), 
connection.getUri().getPort());
                 // TINKERPOP-2814. Remove the SSL handshake timeout so that 
handshakes that take longer than 10000ms
                 // (Netty default) but less than connectionSetupTimeoutMillis 
can succeed. This means the SSL handshake
                 // will instead be capped by connectionSetupTimeoutMillis.
@@ -143,18 +152,18 @@ public interface Channelizer extends ChannelHandler {
     /**
      * WebSocket {@link Channelizer} implementation.
      */
-    public final class WebSocketChannelizer extends AbstractChannelizer {
+    final class WebSocketChannelizer extends AbstractChannelizer {
         private static final Logger logger = 
LoggerFactory.getLogger(WebSocketChannelizer.class);
         private WebSocketClientHandler handler;
 
-        private WebSocketGremlinRequestEncoder webSocketGremlinRequestEncoder;
-        private WebSocketGremlinResponseDecoder 
webSocketGremlinResponseDecoder;
+        private WebSocketGremlinRequestEncoder gremlinRequestEncoder;
+        private WebSocketGremlinResponseDecoder gremlinResponseDecoder;
 
         @Override
         public void init(final Connection connection) {
             super.init(connection);
-            webSocketGremlinRequestEncoder = new 
WebSocketGremlinRequestEncoder(true, cluster.getSerializer());
-            webSocketGremlinResponseDecoder = new 
WebSocketGremlinResponseDecoder(cluster.getSerializer());
+            gremlinRequestEncoder = new WebSocketGremlinRequestEncoder(true, 
cluster.getSerializer());
+            gremlinResponseDecoder = new 
WebSocketGremlinResponseDecoder(cluster.getSerializer());
         }
 
         /**
@@ -194,7 +203,7 @@ public interface Channelizer extends ChannelHandler {
                     new 
WebSocketClientHandler.InterceptedWebSocketClientHandshaker13(
                             connection.getUri(), WebSocketVersion.V13, null, 
true,
                             httpHeaders, maxContentLength, true, false, -1,
-                            cluster.getHandshakeInterceptor()), 
cluster.getConnectionSetupTimeout(), supportsSsl());
+                            cluster.getRequestInterceptor()), 
cluster.getConnectionSetupTimeout(), supportsSsl());
 
             final int keepAliveInterval = toIntExact(TimeUnit.SECONDS.convert(
                     cluster.connectionPoolSettings().keepAliveInterval, 
TimeUnit.MILLISECONDS));
@@ -205,8 +214,8 @@ public interface Channelizer extends ChannelHandler {
             pipeline.addLast(WebSocketClientCompressionHandler.INSTANCE);
             pipeline.addLast("idle-state-Handler", new IdleStateHandler(0, 
keepAliveInterval, 0));
             pipeline.addLast("ws-handler", handler);
-            pipeline.addLast("gremlin-encoder", 
webSocketGremlinRequestEncoder);
-            pipeline.addLast("gremlin-decoder", 
webSocketGremlinResponseDecoder);
+            pipeline.addLast("gremlin-encoder", gremlinRequestEncoder);
+            pipeline.addLast("gremlin-decoder", gremlinResponseDecoder);
         }
 
         @Override
@@ -231,5 +240,69 @@ public interface Channelizer extends ChannelHandler {
                 throw new ConnectionException(connection.getUri(), errMsg, ex);
             }
         }
+
+        @Override
+        public String getScheme(boolean sslEnabled) {
+            return sslEnabled ? "wss" : "ws";
+        }
+    }
+
+    /**
+     * Sends requests over the HTTP endpoint. Client functionality is governed 
by the limitations of the HTTP endpoint,
+     * meaning that sessions are not available and as such {@code tx()} (i.e. 
transactions) are not available over this
+     * channelizer. Only sessionless requests are possible.
+     */
+    final class HttpChannelizer extends AbstractChannelizer {
+
+        private HttpClientCodec handler;
+
+        private HttpGremlinRequestEncoder gremlinRequestEncoder;
+        private HttpGremlinResponseDecoder gremlinResponseDecoder;
+
+        @Override
+        public void init(final Connection connection) {
+            super.init(connection);
+
+            // server does not support sessions so this channerlizer can't 
support the SessionedClient
+            if (connection.getClient() instanceof Client.SessionedClient)
+                throw new IllegalStateException(String.format("Cannot use 
sessions or tx() with %s", HttpChannelizer.class.getSimpleName()));
+
+            gremlinRequestEncoder = new 
HttpGremlinRequestEncoder(cluster.getSerializer(), 
cluster.getRequestInterceptor());
+            gremlinResponseDecoder = new 
HttpGremlinResponseDecoder(cluster.getSerializer());
+        }
+
+        @Override
+        public void connected() {
+            super.connected();
+        }
+
+        @Override
+        public boolean supportsSsl() {
+            final String scheme = connection.getUri().getScheme();
+            return "https".equalsIgnoreCase(scheme);
+        }
+
+        @Override
+        public void configure(final ChannelPipeline pipeline) {
+            final String scheme = connection.getUri().getScheme();
+            if (!"http".equalsIgnoreCase(scheme) && 
!"https".equalsIgnoreCase(scheme))
+                throw new IllegalStateException("Unsupported scheme (only 
http: or https: supported): " + scheme);
+
+            if (!supportsSsl() && "https".equalsIgnoreCase(scheme))
+                throw new IllegalStateException("To use https scheme ensure 
that enableSsl is set to true in configuration");
+
+            final int maxContentLength = 
cluster.connectionPoolSettings().maxContentLength;
+            final HttpHeaders httpHeaders = new DefaultHttpHeaders();
+            if(connection.getCluster().isUserAgentOnConnectEnabled()) {
+                httpHeaders.set(UserAgent.USER_AGENT_HEADER_NAME, 
UserAgent.USER_AGENT);
+            }
+
+            handler = new HttpClientCodec();
+
+            pipeline.addLast("http-codec", handler);
+            pipeline.addLast("aggregator", new 
HttpObjectAggregator(maxContentLength));
+            pipeline.addLast("gremlin-encoder", gremlinRequestEncoder);
+            pipeline.addLast("gremlin-decoder", gremlinResponseDecoder);
+        }
     }
 }
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
index 3ed867a3ea..702ecb0ddb 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
@@ -20,6 +20,7 @@ package org.apache.tinkerpop.gremlin.driver;
 
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.ChannelOption;
+import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslContextBuilder;
 import io.netty.handler.ssl.SslProvider;
@@ -61,12 +62,12 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
 
 /**
@@ -470,7 +471,7 @@ public final class Cluster {
         return manager.serializer;
     }
 
-    HandshakeInterceptor getHandshakeInterceptor() {
+    UnaryOperator<FullHttpRequest> getRequestInterceptor() {
         return manager.interceptor;
     }
 
@@ -608,7 +609,7 @@ public final class Cluster {
         private boolean sslSkipCertValidation = false;
         private SslContext sslContext = null;
         private LoadBalancingStrategy loadBalancingStrategy = new 
LoadBalancingStrategy.RoundRobin();
-        private HandshakeInterceptor interceptor = HandshakeInterceptor.NO_OP;
+        private UnaryOperator<FullHttpRequest> interceptor = 
HandshakeInterceptor.NO_OP;
         private AuthProperties authProps = new AuthProperties();
         private long connectionSetupTimeoutMillis = 
Connection.CONNECTION_SETUP_TIMEOUT_MILLIS;
         private boolean enableUserAgentOnConnect = true;
@@ -891,7 +892,7 @@ public final class Cluster {
          * Specify the {@link Channelizer} implementation to use on the client 
when creating a {@link Connection}.
          */
         public Builder channelizer(final Class channelizerClass) {
-            return channelizer(channelizerClass.getCanonicalName());
+            return channelizer(channelizerClass.getName());
         }
 
         /**
@@ -922,10 +923,23 @@ public final class Cluster {
         }
 
         /**
-         * Specifies an {@link HandshakeInterceptor} that will allow 
manipulation of the
-         * {@code FullHttpRequest} prior to its being sent over the websocket.
+         * Specifies an {@link HandshakeInterceptor} that will allow 
manipulation of the {@code FullHttpRequest} prior
+         * to its being sent to the server.
+         * @deprecated As of release 3.6.6, replaced with {@link 
#requestInterceptor(RequestInterceptor)}.
          */
+        @Deprecated
         public Builder handshakeInterceptor(final HandshakeInterceptor 
interceptor) {
+            // when this deprecated method is removed, the interceptor can 
have its type promoted from
+            // UnaryOperator<FullHttpRequest> to RequestInterceptor
+            this.interceptor = interceptor;
+            return this;
+        }
+
+        /**
+         * Specifies an {@link HandshakeInterceptor} that will allow 
manipulation of the {@code FullHttpRequest} prior
+         * to its being sent to the server. For websockets the interceptor is 
only called on the handshake.
+         */
+        public Builder requestInterceptor(final RequestInterceptor 
interceptor) {
             this.interceptor = interceptor;
             return this;
         }
@@ -1062,7 +1076,7 @@ public final class Cluster {
         private final AuthProperties authProps;
         private final Optional<SslContext> sslContextOptional;
         private final Supplier<RequestMessage.Builder> validationRequest;
-        private final HandshakeInterceptor interceptor;
+        private final UnaryOperator<FullHttpRequest> interceptor;
 
         /**
          * Thread pool for requests.
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/HandshakeInterceptor.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/HandshakeInterceptor.java
index ea1ff87dbd..9b5132192e 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/HandshakeInterceptor.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/HandshakeInterceptor.java
@@ -26,7 +26,9 @@ import java.util.function.UnaryOperator;
  * This function is called when the websocket handshake is attempted and the 
first {@code FullHttpRequest} is sent to
  * the server. The interceptor allows this message to be modified as needed 
before it is sent to the server.
  * Implementations are supplied to {@link 
Cluster.Builder#handshakeInterceptor(HandshakeInterceptor)}.
+ * @deprecated As of release 3.6.6, replaced by {@link RequestInterceptor}.
  */
+@Deprecated
 public interface HandshakeInterceptor extends UnaryOperator<FullHttpRequest> {
 
     /**
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Host.java 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Host.java
index 3d492fa612..ef65e3afc3 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Host.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Host.java
@@ -49,7 +49,8 @@ public final class Host {
     Host(final InetSocketAddress address, final Cluster cluster) {
         this.cluster = cluster;
         this.address = address;
-        this.hostUri = makeUriFromAddress(address, cluster.getPath(), 
cluster.connectionPoolSettings().enableSsl);
+        this.hostUri = makeUriFromAddress(address, cluster.getPath(), 
cluster.connectionPoolSettings().enableSsl,
+                cluster.getChannelizer());
         hostLabel = String.format("Host{address=%s, hostUri=%s}", address, 
hostUri);
     }
 
@@ -112,9 +113,16 @@ public final class Host {
         makeAvailable();
     }
 
-    private static URI makeUriFromAddress(final InetSocketAddress addy, final 
String path, final boolean ssl) {
+    private static URI makeUriFromAddress(final InetSocketAddress addy, final 
String path, final boolean ssl, final String channelizerClass) {
+        final Channelizer channelizer;
         try {
-            final String scheme = ssl ? "wss" : "ws";
+            channelizer = (Channelizer) 
Class.forName(channelizerClass).newInstance();
+        } catch (Exception ex) {
+            throw new RuntimeException(String.format("Invalid Channelizer 
instance: %s", channelizerClass));
+        }
+
+        try {
+            final String scheme = channelizer.getScheme(ssl);
             return new URI(scheme, null, addy.getHostName(), addy.getPort(), 
path, null, null);
         } catch (URISyntaxException use) {
             throw new RuntimeException(String.format("URI for host could not 
be constructed from: %s", addy), use);
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/MessageSerializer.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/MessageSerializer.java
index 615077b2d6..bb8423bc95 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/MessageSerializer.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/MessageSerializer.java
@@ -79,7 +79,8 @@ public interface MessageSerializer<M> {
     public ResponseMessage deserializeResponse(final ByteBuf msg) throws 
SerializationException;
 
     /**
-     * The list of mime types that the serializer supports.
+     * The list of mime types that the serializer supports. They should be 
ordered in preferred ordered where the
+     * greatest fidelity match is first.
      */
     public String[] mimeTypesSupported();
 
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/HandshakeInterceptor.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestInterceptor.java
similarity index 63%
copy from 
gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/HandshakeInterceptor.java
copy to 
gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestInterceptor.java
index ea1ff87dbd..0aae506e66 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/HandshakeInterceptor.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestInterceptor.java
@@ -23,14 +23,14 @@ import io.netty.handler.codec.http.FullHttpRequest;
 import java.util.function.UnaryOperator;
 
 /**
- * This function is called when the websocket handshake is attempted and the 
first {@code FullHttpRequest} is sent to
- * the server. The interceptor allows this message to be modified as needed 
before it is sent to the server.
- * Implementations are supplied to {@link 
Cluster.Builder#handshakeInterceptor(HandshakeInterceptor)}.
+ * This function is called a {@code FullHttpRequest} constructed and allow it 
to be modified as needed before it is
+ * sent to the server. Implementations are supplied to {@link 
Cluster.Builder#requestInterceptor(RequestInterceptor)}.
+ * When this method is called is dependent on the {@link Channelizer} 
implementation.
  */
-public interface HandshakeInterceptor extends UnaryOperator<FullHttpRequest> {
+public interface RequestInterceptor extends UnaryOperator<FullHttpRequest> {
 
     /**
-     * The default implementation of a {@link HandshakeInterceptor} and 
behaves as a no-op.
+     * The default implementation of a {@link RequestInterceptor} and behaves 
as a no-op.
      */
-    public static final HandshakeInterceptor NO_OP = o -> o;
+    public static final RequestInterceptor NO_OP = o -> o;
 }
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java
new file mode 100644
index 0000000000..f3a85e0f9f
--- /dev/null
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageEncoder;
+import io.netty.handler.codec.http.DefaultFullHttpRequest;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpVersion;
+import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
+import org.apache.tinkerpop.gremlin.driver.Tokens;
+import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
+import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
+import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
+import 
org.apache.tinkerpop.gremlin.process.traversal.translator.GroovyTranslator;
+import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.function.UnaryOperator;
+
+/**
+ * Converts {@link RequestMessage} to a {@code HttpRequest}.
+ */
[email protected]
+public final class HttpGremlinRequestEncoder extends 
MessageToMessageEncoder<RequestMessage> {
+    private final MessageSerializer<?> serializer;
+
+    private final ObjectMapper mapper = new ObjectMapper();
+
+    private final UnaryOperator<FullHttpRequest> interceptor;
+
+    public HttpGremlinRequestEncoder(final MessageSerializer<?> serializer, 
final UnaryOperator<FullHttpRequest> interceptor) {
+        this.serializer = serializer;
+        this.interceptor = interceptor;
+    }
+
+    @Override
+    protected void encode(final ChannelHandlerContext channelHandlerContext, 
final RequestMessage requestMessage, final List<Object> objects) throws 
Exception {
+        try {
+            final String gremlin;
+            final Object gremlinStringOrBytecode = 
requestMessage.getArg(Tokens.ARGS_GREMLIN);
+
+            // the gremlin key can contain a Gremlin script or bytecode. if 
it's bytecode we can't submit it over
+            // http as such. it has to be converted to a script and we can do 
that with the Groovy translator.
+            final boolean usesBytecode = gremlinStringOrBytecode instanceof 
Bytecode;
+            if (usesBytecode) {
+                gremlin = GroovyTranslator.of("g").translate((Bytecode) 
gremlinStringOrBytecode).getScript();
+            } else {
+                gremlin = gremlinStringOrBytecode.toString();
+            }
+            final byte[] payload = mapper.writeValueAsBytes(new 
HashMap<String,Object>() {{
+                put(Tokens.ARGS_GREMLIN, gremlin);
+                put(Tokens.REQUEST_ID, requestMessage.getRequestId());
+                if (usesBytecode) put("op", Tokens.OPS_BYTECODE);
+            }});
+            final ByteBuf bb = 
channelHandlerContext.alloc().buffer(payload.length);
+            bb.writeBytes(payload);
+            final FullHttpRequest request = new 
DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/", bb);
+            request.headers().add(HttpHeaderNames.CONTENT_TYPE, 
"application/json");
+            request.headers().add(HttpHeaderNames.CONTENT_LENGTH, 
payload.length);
+            request.headers().add(HttpHeaderNames.ACCEPT, 
serializer.mimeTypesSupported()[0]);
+
+            objects.add(interceptor.apply(request));
+        } catch (Exception ex) {
+            throw new 
ResponseException(ResponseStatusCode.REQUEST_ERROR_SERIALIZATION, String.format(
+                    "An error occurred during serialization of this request 
[%s] - it could not be sent to the server - Reason: %s",
+                    requestMessage, ex));
+        }
+    }
+}
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseDecoder.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseDecoder.java
new file mode 100644
index 0000000000..00c345828a
--- /dev/null
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinResponseDecoder.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.handler;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.WebSocketFrame;
+import io.netty.util.CharsetUtil;
+import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
+import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.driver.ser.MessageTextSerializer;
+
+import java.util.List;
+
+/**
+ * Converts {@code HttpResponse} to a {@link ResponseMessage}.
+ */
[email protected]
+public final class HttpGremlinResponseDecoder extends 
MessageToMessageDecoder<FullHttpResponse> {
+    private final MessageSerializer<?> serializer;
+
+    public HttpGremlinResponseDecoder(final MessageSerializer<?> serializer) {
+        this.serializer = serializer;
+    }
+
+    @Override
+    protected void decode(final ChannelHandlerContext channelHandlerContext, 
final FullHttpResponse httpResponse, final List<Object> objects) throws 
Exception {
+        final String content = 
httpResponse.content().toString(CharsetUtil.UTF_8);
+        objects.add(((MessageTextSerializer) 
serializer).deserializeResponse(content));
+    }
+}
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
index 82ae2fedd2..55df38f989 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/WebSocketClientHandler.java
@@ -37,6 +37,7 @@ import io.netty.util.concurrent.Promise;
 
 import java.net.URI;
 import java.util.concurrent.TimeoutException;
+import java.util.function.UnaryOperator;
 
 import javax.net.ssl.SSLHandshakeException;
 
@@ -140,13 +141,13 @@ public final class WebSocketClientHandler extends 
WebSocketClientProtocolHandler
      */
     public static class InterceptedWebSocketClientHandshaker13 extends 
WebSocketClientHandshaker13 {
 
-        private final HandshakeInterceptor interceptor;
+        private final UnaryOperator<FullHttpRequest> interceptor;
 
         public InterceptedWebSocketClientHandshaker13(final URI webSocketURL, 
final WebSocketVersion version,
                                                       final String 
subprotocol, final boolean allowExtensions,
                                                       final HttpHeaders 
customHeaders, final int maxFramePayloadLength,
                                                       final boolean 
performMasking, final boolean allowMaskMismatch,
-                                                      final long 
forceCloseTimeoutMillis, final HandshakeInterceptor interceptor) {
+                                                      final long 
forceCloseTimeoutMillis, final UnaryOperator<FullHttpRequest> interceptor) {
             super(webSocketURL, version, subprotocol, allowExtensions, 
customHeaders, maxFramePayloadLength,
                     performMasking, allowMaskMismatch, 
forceCloseTimeoutMillis);
             this.interceptor = interceptor;
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/message/RequestMessage.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/message/RequestMessage.java
index aeebf35d0d..21fa12c18b 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/message/RequestMessage.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/message/RequestMessage.java
@@ -96,6 +96,14 @@ public final class RequestMessage {
         return o == null ? Optional.empty() : Optional.of((T) o);
     }
 
+    public <T> T getArg(final String key) {
+        return (T) args.get(key);
+    }
+
+    public <T> T getArgOrDefault(final String key, final T def) {
+        return (T) optionalArgs(key).orElse(def);
+    }
+
     public static Builder from(final RequestMessage msg) {
         final Builder builder = build(msg.op)
                 .overrideRequestId(msg.requestId)
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ser/GraphBinaryMessageSerializerV1.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ser/GraphBinaryMessageSerializerV1.java
index 1c7a69a238..c983a89543 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ser/GraphBinaryMessageSerializerV1.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ser/GraphBinaryMessageSerializerV1.java
@@ -198,7 +198,15 @@ public class GraphBinaryMessageSerializerV1 extends 
AbstractMessageSerializer<Gr
 
     @Override
     public String serializeRequestAsString(final RequestMessage 
requestMessage, final ByteBufAllocator allocator) throws SerializationException 
{
-        final ByteBuf bb = serializeRequestAsBinary(requestMessage, allocator);
+        final ByteBuf bb = allocator.buffer();
+
+        try {
+            requestSerializer.writeValue(requestMessage, bb, writer);
+        } catch (Exception ex) {
+            bb.release();
+            throw ex;
+        }
+
         return base64Encoder.encodeToString(convertToBytes(bb));
     }
 
diff --git 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ser/binary/GraphBinaryMessageSerializerV1Test.java
 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ser/binary/GraphBinaryMessageSerializerV1Test.java
index 6cd37b87ce..052bfce456 100644
--- 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ser/binary/GraphBinaryMessageSerializerV1Test.java
+++ 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ser/binary/GraphBinaryMessageSerializerV1Test.java
@@ -57,6 +57,19 @@ public class GraphBinaryMessageSerializerV1Test {
         assertThat(request, reflectionEquals(deserialized));
     }
 
+    @Test
+    public void shouldSerializeAndDeserializeRequestOverText() throws 
SerializationException {
+        final RequestMessage request = RequestMessage.build("op1")
+                .processor("proc1")
+                .overrideRequestId(UUID.randomUUID())
+                .addArg("arg1", "value1")
+                .create();
+
+        final String base64 = serializer.serializeRequestAsString(request, 
allocator);
+        final RequestMessage deserialized = 
serializer.deserializeRequest(base64);
+        assertThat(request, reflectionEquals(deserialized));
+    }
+
     @Test
     public void shouldSerializeAndDeserializeRequestWithoutArgs() throws 
SerializationException {
         final RequestMessage request = RequestMessage.build("op1")
@@ -99,6 +112,20 @@ public class GraphBinaryMessageSerializerV1Test {
         assertResponseEquals(response, deserialized);
     }
 
+    @Test
+    public void shouldSerializeAndDeserializeResponseOverText() throws 
SerializationException {
+        final ResponseMessage response = 
ResponseMessage.build(UUID.randomUUID())
+                .code(ResponseStatusCode.SUCCESS)
+                .statusMessage("Found")
+                .statusAttribute("k1", 1)
+                .result("This is a fine message with a string")
+                .create();
+
+        final String base64 = serializer.serializeResponseAsString(response, 
allocator);
+        final ResponseMessage deserialized = 
serializer.deserializeResponse(base64);
+        assertResponseEquals(response, deserialized);
+    }
+
     @Test
     public void shouldSerializeAndDeserializeResponseWithoutStatusMessage() 
throws SerializationException {
         final ResponseMessage response = 
ResponseMessage.build(UUID.randomUUID())
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpBasicAuthorizationHandler.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpBasicAuthorizationHandler.java
index 0951a5d561..e58a2b3a46 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpBasicAuthorizationHandler.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpBasicAuthorizationHandler.java
@@ -31,12 +31,9 @@ import org.apache.tinkerpop.gremlin.server.GremlinServer;
 import org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser;
 import org.apache.tinkerpop.gremlin.server.authz.AuthorizationException;
 import org.apache.tinkerpop.gremlin.server.authz.Authorizer;
-import org.javatuples.Quartet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-
 import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
 import static 
io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
 import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
@@ -64,20 +61,13 @@ public class HttpBasicAuthorizationHandler extends 
ChannelInboundHandlerAdapter
         if (msg instanceof FullHttpMessage){
             final FullHttpMessage request = (FullHttpMessage) msg;
             final boolean keepAlive = HttpUtil.isKeepAlive(request);
+
             try {
                 user = ctx.channel().attr(StateKey.AUTHENTICATED_USER).get();
                 if (null == user) {    // This is expected when using the 
AllowAllAuthenticator
                     user = AuthenticatedUser.ANONYMOUS_USER;
                 }
-                final Quartet<String, Map<String, Object>, String, Map<String, 
String>> requestArguments =
-                        HttpHandlerUtil.getRequestArguments((FullHttpRequest) 
request);
-                final RequestMessage requestMessage = 
RequestMessage.build(Tokens.OPS_EVAL).
-                        processor("").
-                        addArg(Tokens.ARGS_GREMLIN, 
requestArguments.getValue0()).
-                        addArg(Tokens.ARGS_BINDINGS, 
requestArguments.getValue1()).
-                        addArg(Tokens.ARGS_LANGUAGE, 
requestArguments.getValue2()).
-                        addArg(Tokens.ARGS_ALIASES, 
requestArguments.getValue3()).
-                        create();
+                final RequestMessage requestMessage = 
HttpHandlerUtil.getRequestMessageFromHttpRequest((FullHttpRequest) request);
                 authorizer.authorize(user, requestMessage);
                 ctx.fireChannelRead(request);
             } catch (AuthorizationException ex) {  // Expected: users can 
alternate between allowed and disallowed requests
@@ -85,7 +75,7 @@ public class HttpBasicAuthorizationHandler extends 
ChannelInboundHandlerAdapter
                 if (address.startsWith("/") && address.length() > 1) address = 
address.substring(1);
                 final String script;
                 try {
-                    script = 
HttpHandlerUtil.getRequestArguments((FullHttpRequest) request).getValue0();
+                    script = 
HttpHandlerUtil.getRequestMessageFromHttpRequest((FullHttpRequest) 
request).getArgOrDefault(Tokens.ARGS_GREMLIN, "");
                 } catch (IllegalArgumentException iae) {
                     HttpHandlerUtil.sendError(ctx, BAD_REQUEST, 
iae.getMessage(), keepAlive);
                     return;
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
index 6995b21583..47d1e68186 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
@@ -19,12 +19,13 @@
 package org.apache.tinkerpop.gremlin.server.handler;
 
 import com.codahale.metrics.Timer;
-import io.netty.buffer.ByteBufAllocator;
+import io.netty.handler.codec.http.*;
+import org.apache.tinkerpop.gremlin.driver.Tokens;
 import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.driver.ser.SerializationException;
+import 
org.apache.tinkerpop.gremlin.process.remote.traversal.DefaultRemoteTraverser;
 import org.apache.tinkerpop.gremlin.server.util.TextPlainMessageSerializer;
 import org.javatuples.Pair;
-import org.javatuples.Quartet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.tinkerpop.gremlin.util.ExceptionHelper;
@@ -49,33 +50,21 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelPromise;
 import io.netty.handler.codec.TooLongFrameException;
-import io.netty.handler.codec.http.DefaultFullHttpResponse;
-import io.netty.handler.codec.http.FullHttpRequest;
-import io.netty.handler.codec.http.FullHttpResponse;
-import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.codec.http.HttpUtil;
 import io.netty.util.ReferenceCountUtil;
 
 import javax.script.Bindings;
 import javax.script.SimpleBindings;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static com.codahale.metrics.MetricRegistry.name;
-import static io.netty.handler.codec.http.HttpHeaderNames.*;
 import static io.netty.handler.codec.http.HttpMethod.GET;
 import static io.netty.handler.codec.http.HttpMethod.POST;
 import static 
io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
@@ -147,32 +136,34 @@ public class HttpGremlinEndpointHandler extends 
ChannelInboundHandlerAdapter {
                 return;
             }
 
-            final Quartet<String, Map<String, Object>, String, Map<String, 
String>> requestArguments;
+            final RequestMessage requestMessage;
             try {
-                requestArguments = HttpHandlerUtil.getRequestArguments(req);
+                requestMessage = 
HttpHandlerUtil.getRequestMessageFromHttpRequest(req);
             } catch (IllegalArgumentException iae) {
                 HttpHandlerUtil.sendError(ctx, BAD_REQUEST, iae.getMessage(), 
keepAlive);
                 ReferenceCountUtil.release(msg);
                 return;
             }
 
-            final String acceptString = 
Optional.ofNullable(req.headers().get("Accept")).orElse("application/json");
-            final Pair<String, MessageTextSerializer<?>> serializer = 
chooseSerializer(acceptString);
+            final String acceptMime = 
Optional.ofNullable(req.headers().get(HttpHeaderNames.ACCEPT)).orElse("application/json");
+            final Pair<String, MessageTextSerializer<?>> serializer = 
chooseSerializer(acceptMime);
             if (null == serializer) {
-                HttpHandlerUtil.sendError(ctx, BAD_REQUEST, String.format("no 
serializer for requested Accept header: %s", acceptString),
+                HttpHandlerUtil.sendError(ctx, BAD_REQUEST, String.format("no 
serializer for requested Accept header: %s", acceptMime),
                         keepAlive);
                 ReferenceCountUtil.release(msg);
                 return;
             }
 
-            final String origin = req.headers().get(ORIGIN);
+            final String origin = req.headers().get(HttpHeaderNames.ORIGIN);
 
-            // not using the req any where below here - assume it is safe to 
release at this point.
+            // not using the req anywhere below here - assume it is safe to 
release at this point.
             ReferenceCountUtil.release(msg);
 
             try {
                 logger.debug("Processing request containing script [{}] and 
bindings of [{}] on {}",
-                        requestArguments.getValue0(), 
requestArguments.getValue1(), Thread.currentThread().getName());
+                        requestMessage.getArgOrDefault(Tokens.ARGS_GREMLIN, 
""),
+                        requestMessage.getArgOrDefault(Tokens.ARGS_BINDINGS, 
Collections.emptyMap()),
+                        Thread.currentThread().getName());
                 if (settings.enableAuditLog) {
                     AuthenticatedUser user = 
ctx.channel().attr(StateKey.AUTHENTICATED_USER).get();
                     if (null == user) {    // This is expected when using the 
AllowAllAuthenticator
@@ -180,7 +171,8 @@ public class HttpGremlinEndpointHandler extends 
ChannelInboundHandlerAdapter {
                     }
                     String address = ctx.channel().remoteAddress().toString();
                     if (address.startsWith("/") && address.length() > 1) 
address = address.substring(1);
-                    auditLogger.info("User {} with address {} requested: {}", 
user.getName(), address, requestArguments.getValue0());
+                    auditLogger.info("User {} with address {} requested: {}", 
user.getName(), address,
+                            
requestMessage.getArgOrDefault(Tokens.ARGS_GREMLIN, ""));
                 }
                 final ChannelPromise promise = ctx.channel().newPromise();
                 final AtomicReference<Object> resultHolder = new 
AtomicReference<>();
@@ -189,12 +181,14 @@ public class HttpGremlinEndpointHandler extends 
ChannelInboundHandlerAdapter {
                     // processing of the exception
                     if (future.isSuccess()) {
                         logger.debug("Preparing HTTP response for request with 
script [{}] and bindings of [{}] with result of [{}] on [{}]",
-                                requestArguments.getValue0(), 
requestArguments.getValue1(), resultHolder.get(), 
Thread.currentThread().getName());
+                                
requestMessage.getArgOrDefault(Tokens.ARGS_GREMLIN, ""),
+                                
requestMessage.getArgOrDefault(Tokens.ARGS_BINDINGS, Collections.emptyMap()),
+                                resultHolder.get(), 
Thread.currentThread().getName());
                         final FullHttpResponse response = new 
DefaultFullHttpResponse(HTTP_1_1, OK, (ByteBuf) resultHolder.get());
-                        response.headers().set(CONTENT_TYPE, 
serializer.getValue0());
+                        response.headers().set(HttpHeaderNames.CONTENT_TYPE, 
serializer.getValue0());
 
                         // handle cors business
-                        if (origin != null) 
response.headers().set(ACCESS_CONTROL_ALLOW_ORIGIN, origin);
+                        if (origin != null) 
response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, origin);
 
                         HttpHandlerUtil.sendAndCleanupConnection(ctx, 
keepAlive, response);
                     }
@@ -204,7 +198,8 @@ public class HttpGremlinEndpointHandler extends 
ChannelInboundHandlerAdapter {
 
                 final Bindings bindings;
                 try {
-                    bindings = createBindings(requestArguments.getValue1(), 
requestArguments.getValue3());
+                    bindings = 
createBindings(requestMessage.getArgOrDefault(Tokens.ARGS_BINDINGS, 
Collections.emptyMap()),
+                            
requestMessage.getArgOrDefault(Tokens.ARGS_ALIASES, Collections.emptyMap()));
                 } catch (IllegalStateException iae) {
                     HttpHandlerUtil.sendError(ctx, BAD_REQUEST, 
iae.getMessage(), keepAlive);
                     ReferenceCountUtil.release(msg);
@@ -214,23 +209,33 @@ public class HttpGremlinEndpointHandler extends 
ChannelInboundHandlerAdapter {
                 // provide a transform function to serialize to message - this 
will force serialization to occur
                 // in the same thread as the eval. after the CompletableFuture 
is returned from the eval the result
                 // is ready to be written as a ByteBuf directly to the 
response.  nothing should be blocking here.
-                final CompletableFuture<Object> evalFuture = 
gremlinExecutor.eval(requestArguments.getValue0(), 
requestArguments.getValue2(), bindings,
+                final CompletableFuture<Object> evalFuture = 
gremlinExecutor.eval(
+                        requestMessage.getArg(Tokens.ARGS_GREMLIN), 
requestMessage.getArg(Tokens.ARGS_LANGUAGE), bindings,
                         FunctionUtils.wrapFunction(o -> {
                             // stopping the timer here is roughly equivalent 
to where the timer would have been stopped for
                             // this metric in other contexts.  we just want to 
measure eval time not serialization time.
                             timerContext.stop();
 
                             logger.debug("Transforming result of request with 
script [{}] and bindings of [{}] with result of [{}] on [{}]",
-                                    requestArguments.getValue0(), 
requestArguments.getValue1(), o, Thread.currentThread().getName());
-                            final ResponseMessage responseMessage = 
ResponseMessage.build(UUID.randomUUID())
+                                    requestMessage.getArg(Tokens.ARGS_GREMLIN),
+                                    
requestMessage.getArg(Tokens.ARGS_BINDINGS), o, 
Thread.currentThread().getName());
+
+                            // need to replicate what TraversalOpProcessor 
does with the bytecode op. it converts
+                            // results to Traverser so that GLVs can handle 
the results. don't quite get the same
+                            // benefit here because the bulk has to be 1 since 
we've already resolved the result,
+                            // but at least http is compatible
+                            final List<Object> results = 
requestMessage.getProcessor().equals(Tokens.OPS_BYTECODE) ?
+                                    (List<Object>) 
IteratorUtils.asList(o).stream().map(r -> new DefaultRemoteTraverser<Object>(r, 
1)).collect(Collectors.toList()) :
+                                    IteratorUtils.asList(o);
+                            final ResponseMessage responseMessage = 
ResponseMessage.build(requestMessage.getRequestId())
                                     .code(ResponseStatusCode.SUCCESS)
-                                    .result(IteratorUtils.asList(o)).create();
+                                    .result(results).create();
 
                             // http server is sessionless and must handle 
commit on transactions. the commit occurs
                             // before serialization to be consistent with how 
things work for websocket based
                             // communication.  this means that failed 
serialization does not mean that you won't get
                             // a commit to the database
-                            attemptCommit(requestArguments.getValue3(), 
graphManager, settings.strictTransactionManagement);
+                            
attemptCommit(requestMessage.getArg(Tokens.ARGS_ALIASES), graphManager, 
settings.strictTransactionManagement);
 
                             try {
                                 return 
Unpooled.wrappedBuffer(serializer.getValue1().serializeResponseAsString(responseMessage,
 ctx.alloc()).getBytes(UTF8));
@@ -256,7 +261,8 @@ public class HttpGremlinEndpointHandler extends 
ChannelInboundHandlerAdapter {
                     if (t.getMessage() != null)
                         HttpHandlerUtil.sendError(ctx, INTERNAL_SERVER_ERROR, 
t.getMessage(), Optional.of(t), keepAlive);
                     else
-                        HttpHandlerUtil.sendError(ctx, INTERNAL_SERVER_ERROR, 
String.format("Error encountered evaluating script: %s", 
requestArguments.getValue0())
+                        HttpHandlerUtil.sendError(ctx, INTERNAL_SERVER_ERROR, 
String.format("Error encountered evaluating script: %s",
+                                        
requestMessage.getArg(Tokens.ARGS_GREMLIN))
                                 , Optional.of(t), keepAlive);
                     promise.setFailure(t);
                     return null;
@@ -326,8 +332,8 @@ public class HttpGremlinEndpointHandler extends 
ChannelInboundHandlerAdapter {
         return bindings;
     }
 
-    private Pair<String, MessageTextSerializer<?>> chooseSerializer(final 
String acceptString) {
-        final List<Pair<String, Double>> ordered = 
Stream.of(acceptString.split(",")).map(mediaType -> {
+    private Pair<String, MessageTextSerializer<?>> chooseSerializer(final 
String mimeType) {
+        final List<Pair<String, Double>> ordered = 
Stream.of(mimeType.split(",")).map(mediaType -> {
             // parse out each mediaType with its params - keeping it simple 
and just looking for "quality".  if
             // that value isn't there, default it to 1.0.  not really 
validating here so users better get their
             // accept headers straight
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
index c1ae1e54ea..fc82820e8c 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
@@ -23,32 +23,23 @@ import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.http.DefaultFullHttpResponse;
-import io.netty.handler.codec.http.FullHttpRequest;
-import io.netty.handler.codec.http.FullHttpResponse;
-import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.codec.http.HttpUtil;
-import io.netty.handler.codec.http.QueryStringDecoder;
+import io.netty.handler.codec.http.*;
 import io.netty.util.CharsetUtil;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.tinkerpop.gremlin.driver.Tokens;
+import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.server.GremlinServer;
+import org.apache.tinkerpop.gremlin.server.op.standard.StandardOpProcessor;
 import org.apache.tinkerpop.gremlin.server.util.MetricManager;
 import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
 import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper;
 import org.apache.tinkerpop.shaded.jackson.databind.node.ArrayNode;
 import org.apache.tinkerpop.shaded.jackson.databind.node.ObjectNode;
-import org.javatuples.Quartet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
 
 import static com.codahale.metrics.MetricRegistry.name;
 import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
@@ -71,7 +62,16 @@ public class HttpHandlerUtil {
      */
     private static final ObjectMapper mapper = new ObjectMapper();
 
-    static Quartet<String, Map<String, Object>, String, Map<String, String>> 
getRequestArguments(final FullHttpRequest request) {
+    /**
+     * Convert a http request into a {@link RequestMessage}.
+     */
+    public static RequestMessage getRequestMessageFromHttpRequest(final 
FullHttpRequest request) {
+        final String contentType = 
Optional.ofNullable(request.headers().get(HttpHeaderNames.CONTENT_TYPE)).orElse("application/json");
+
+        // default is just the StandardOpProcessor which maintains 
compatibility with older versions which only
+        // processed scripts.
+        final RequestMessage.Builder msgBuilder = 
RequestMessage.build(StandardOpProcessor.OP_PROCESSOR_NAME);
+
         if (request.method() == GET) {
             final QueryStringDecoder decoder = new 
QueryStringDecoder(request.uri());
             final List<String> gremlinParms = 
decoder.parameters().get(Tokens.ARGS_GREMLIN);
@@ -81,6 +81,11 @@ public class HttpHandlerUtil {
             final String script = gremlinParms.get(0);
             if (script.isEmpty()) throw new IllegalArgumentException("no 
gremlin script supplied");
 
+            final List<String> requestIdParms = 
decoder.parameters().get(Tokens.REQUEST_ID);
+            if (requestIdParms != null && requestIdParms.size() > 0) {
+                
msgBuilder.overrideRequestId(UUID.fromString(requestIdParms.get(0)));
+            }
+
             // query string parameters - take the first instance of a key only 
- ignore the rest
             final Map<String, Object> bindings = new HashMap<>();
             decoder.parameters().entrySet().stream().filter(kv -> 
kv.getKey().startsWith(ARGS_BINDINGS_DOT))
@@ -93,7 +98,8 @@ public class HttpHandlerUtil {
             final List<String> languageParms = 
decoder.parameters().get(Tokens.ARGS_LANGUAGE);
             final String language = (null == languageParms || 
languageParms.size() == 0) ? null : languageParms.get(0);
 
-            return Quartet.with(script, bindings, language, aliases);
+            return msgBuilder.addArg(Tokens.ARGS_GREMLIN, 
script).addArg(Tokens.ARGS_LANGUAGE, language)
+                    .addArg(Tokens.ARGS_BINDINGS, 
bindings).addArg(Tokens.ARGS_ALIASES, aliases).create();
         } else {
             final JsonNode body;
             try {
@@ -124,7 +130,15 @@ public class HttpHandlerUtil {
             final JsonNode languageNode = body.get(Tokens.ARGS_LANGUAGE);
             final String language = null == languageNode ? null : 
languageNode.asText();
 
-            return Quartet.with(scriptNode.asText(), bindings, language, 
aliases);
+            final JsonNode requestIdNode = body.get(Tokens.REQUEST_ID);
+            final UUID requestId = null == requestIdNode ? UUID.randomUUID() : 
UUID.fromString(requestIdNode.asText());
+
+            final JsonNode opNode = body.get("op");
+            final String op = null == opNode ? "" : opNode.asText();
+
+            return msgBuilder.overrideRequestId(requestId).processor(op)
+                    .addArg(Tokens.ARGS_GREMLIN, 
scriptNode.asText()).addArg(Tokens.ARGS_LANGUAGE, language)
+                    .addArg(Tokens.ARGS_BINDINGS, 
bindings).addArg(Tokens.ARGS_ALIASES, aliases).create();
         }
     }
 
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index 72a529e646..caca8609be 100644
--- 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -21,6 +21,8 @@ package org.apache.tinkerpop.gremlin.server;
 import ch.qos.logback.classic.Level;
 import ch.qos.logback.classic.Logger;
 import nl.altindag.log.LogCaptor;
+import org.apache.tinkerpop.gremlin.driver.Channelizer;
+import org.apache.tinkerpop.gremlin.server.channel.HttpChannelizer;
 import org.apache.tinkerpop.gremlin.util.ExceptionHelper;
 import org.apache.tinkerpop.gremlin.TestHelper;
 import org.apache.tinkerpop.gremlin.driver.Client;
@@ -54,7 +56,6 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
@@ -159,6 +160,9 @@ public class GremlinDriverIntegrateTest extends 
AbstractGremlinServerIntegration
         final String nameOfTest = name.getMethodName();
 
         switch (nameOfTest) {
+            case "shouldInterceptRequests":
+                settings.channelizer = HttpChannelizer.class.getName();
+                break;
             case "shouldAliasTraversalSourceVariables":
             case "shouldAliasTraversalSourceVariablesInSession":
                 try {
@@ -208,11 +212,36 @@ public class GremlinDriverIntegrateTest extends 
AbstractGremlinServerIntegration
 
     @Test
     public void shouldInterceptRequests() throws Exception {
+        final int requestsToMake = 32;
+        final AtomicInteger httpRequests = new AtomicInteger(0);
+
+        final Cluster cluster = TestClientFactory.build().
+                channelizer(Channelizer.HttpChannelizer.class).
+                requestInterceptor(r -> {
+                    httpRequests.incrementAndGet();
+                    return r;
+                }).create();
+
+        try {
+            final Client client = cluster.connect();
+            for (int ix = 0; ix < requestsToMake; ix++) {
+                assertEquals(ix + 1, client.submit(ix + 
"+1").all().get().get(0).getInt());
+            }
+        } finally {
+            cluster.close();
+        }
+
+        assertEquals(requestsToMake, httpRequests.get());
+    }
+
+    @Test
+    public void shouldInterceptRequestsWithHandshake() throws Exception {
         final int requestsToMake = 32;
         final AtomicInteger websocketHandshakeRequests = new AtomicInteger(0);
 
         final Cluster cluster = TestClientFactory.build().
-                
minConnectionPoolSize(1).maxConnectionPoolSize(1).handshakeInterceptor(r -> {
+                minConnectionPoolSize(1).maxConnectionPoolSize(1).
+                handshakeInterceptor(r -> {
             websocketHandshakeRequests.incrementAndGet();
             return r;
         }).create();
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java
new file mode 100644
index 0000000000..1d0fbc6fb5
--- /dev/null
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.server;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.tinkerpop.gremlin.driver.Channelizer;
+import org.apache.tinkerpop.gremlin.driver.Client;
+import org.apache.tinkerpop.gremlin.driver.Cluster;
+import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection;
+import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
+import 
org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.server.channel.HttpChannelizer;
+import org.apache.tinkerpop.gremlin.structure.Transaction;
+import org.junit.Test;
+
+import static 
org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource.traversal;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class HttpDriverIntegrateTest extends 
AbstractGremlinServerIntegrationTest {
+
+    @Override
+    public Settings overrideSettings(final Settings settings) {
+        settings.channelizer = HttpChannelizer.class.getName();
+        return settings;
+    }
+
+    @Test
+    public void shouldSubmitScriptWithGraphSON() throws Exception {
+        final Cluster cluster = TestClientFactory.build()
+                .channelizer(Channelizer.HttpChannelizer.class)
+                .serializer(Serializers.GRAPHSON_V3D0)
+                .create();
+        try {
+            final Client client = cluster.connect();
+            assertEquals(2, client.submit("1+1").all().get().get(0).getInt());
+        } catch (Exception ex) {
+            throw ex;
+        } finally {
+            cluster.close();
+        }
+    }
+
+    @Test
+    public void shouldSubmitScriptWithGraphBinary() throws Exception {
+        final Cluster cluster = TestClientFactory.build()
+                .channelizer(Channelizer.HttpChannelizer.class)
+                .serializer(Serializers.GRAPHBINARY_V1D0)
+                .create();
+        try {
+            final Client client = cluster.connect();
+            assertEquals(2, client.submit("1+1").all().get().get(0).getInt());
+        } catch (Exception ex) {
+            throw ex;
+        } finally {
+            cluster.close();
+        }
+    }
+
+    @Test
+    public void shouldSubmitBytecodeWithGraphSON() throws Exception {
+        final Cluster cluster = TestClientFactory.build()
+                .channelizer(Channelizer.HttpChannelizer.class)
+                .serializer(Serializers.GRAPHSON_V3D0)
+                .create();
+        try {
+            final GraphTraversalSource g = 
traversal().withRemote(DriverRemoteConnection.using(cluster));
+            assertEquals("2", g.inject("2").toList().get(0));
+        } catch (Exception ex) {
+            throw ex;
+        } finally {
+            cluster.close();
+        }
+    }
+
+    @Test
+    public void shouldSubmitBytecodeWithGraphBinary() throws Exception {
+        final Cluster cluster = TestClientFactory.build()
+                .channelizer(Channelizer.HttpChannelizer.class)
+                .serializer(Serializers.GRAPHBINARY_V1D0)
+                .create();
+        try {
+            final GraphTraversalSource g = 
traversal().withRemote(DriverRemoteConnection.using(cluster));
+            assertEquals("2", g.inject("2").toList().get(0));
+        } catch (Exception ex) {
+            throw ex;
+        } finally {
+            cluster.close();
+        }
+    }
+
+    @Test
+    public void shouldSubmitMultipleRequestsOverSingleConnection() throws 
Exception {
+        final Cluster cluster = TestClientFactory.build()
+                .channelizer(Channelizer.HttpChannelizer.class)
+                .minConnectionPoolSize(1).maxConnectionPoolSize(1)
+                .serializer(Serializers.GRAPHBINARY_V1D0)
+                .create();
+        try {
+            for (int ix = 0; ix < 100; ix++) {
+                final GraphTraversalSource g = 
traversal().withRemote(DriverRemoteConnection.using(cluster));
+                assertEquals(ix, g.inject(ix).toList().get(0).intValue());
+            }
+        } catch (Exception ex) {
+            throw ex;
+        } finally {
+            cluster.close();
+        }
+    }
+
+    @Test
+    public void shouldSubmitMultipleRequestsOverMultiConnection() throws 
Exception {
+        final Cluster cluster = TestClientFactory.build()
+                .channelizer(Channelizer.HttpChannelizer.class)
+                .minConnectionPoolSize(1).maxConnectionPoolSize(8)
+                .serializer(Serializers.GRAPHBINARY_V1D0)
+                .create();
+        try {
+            for (int ix = 0; ix < 100; ix++) {
+                final GraphTraversalSource g = 
traversal().withRemote(DriverRemoteConnection.using(cluster));
+                assertEquals(ix, g.inject(ix).toList().get(0).intValue());
+            }
+        } catch (Exception ex) {
+            throw ex;
+        } finally {
+            cluster.close();
+        }
+    }
+
+    @Test
+    public void shouldFailToUseSession() throws Exception {
+        final Cluster cluster = TestClientFactory.build()
+                .channelizer(Channelizer.HttpChannelizer.class)
+                .serializer(Serializers.GRAPHBINARY_V1D0)
+                .create();
+        try {
+            final Client client = cluster.connect("shouldFailToUseSession");
+            client.submit("1+1").all().get();
+            fail("Can't use session with HTTP");
+        } catch (Exception ex) {
+            final Throwable t = ExceptionUtils.getRootCause(ex);
+            assertEquals("Cannot use sessions or tx() with HttpChannelizer", 
t.getMessage());
+        } finally {
+            cluster.close();
+        }
+    }
+
+    @Test
+    public void shouldFailToUseTx() throws Exception {
+        final Cluster cluster = TestClientFactory.build()
+                .channelizer(Channelizer.HttpChannelizer.class)
+                .serializer(Serializers.GRAPHBINARY_V1D0)
+                .create();
+        try {
+            final GraphTraversalSource g = 
traversal().withRemote(DriverRemoteConnection.using(cluster));
+            final Transaction tx = g.tx();
+            final GraphTraversalSource gtx = tx.begin();
+            gtx.inject("1").toList();
+            fail("Can't use tx() with HTTP");
+        } catch (Exception ex) {
+            final Throwable t = ExceptionUtils.getRootCause(ex);
+            assertEquals("Cannot use sessions or tx() with HttpChannelizer", 
t.getMessage());
+        } finally {
+            cluster.close();
+        }
+    }
+}

Reply via email to