This is an automated email from the ASF dual-hosted git repository.
kenhuuu pushed a commit to branch master-http
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/master-http by this push:
new 64ec40b355 Update RequestInterceptor to better work with HTTP. (#2799)
64ec40b355 is described below
commit 64ec40b355da09c6d12e25ec41263e30416e56be
Author: kenhuuu <[email protected]>
AuthorDate: Wed Oct 16 11:35:38 2024 -0700
Update RequestInterceptor to better work with HTTP. (#2799)
The interceptor is updated to run for every request to allow modifying
each HTTP request. It works on a newly created abstraction of a request
called HttpRequest to prevent breaking changes in the event the underlying
library ever changes. A name is required when adding/removing
interceptors as it's the simplest way to identify interceptors that were
inserted from lambdas.
---
CHANGELOG.asciidoc | 1 +
docs/src/upgrade/release-4.x.x.asciidoc | 21 +++
.../tinkerpop/gremlin/driver/Channelizer.java | 3 +-
.../apache/tinkerpop/gremlin/driver/Cluster.java | 95 ++++++++++++--
.../tinkerpop/gremlin/driver/HttpRequest.java | 136 ++++++++++++++++++++
.../gremlin/driver/RequestInterceptor.java | 11 +-
.../apache/tinkerpop/gremlin/driver/Settings.java | 5 +
.../apache/tinkerpop/gremlin/driver/auth/Auth.java | 10 +-
.../tinkerpop/gremlin/driver/auth/Basic.java | 7 +-
.../tinkerpop/gremlin/driver/auth/Sigv4.java | 92 +++++--------
.../driver/handler/HttpGremlinRequestEncoder.java | 59 ++++++---
.../GraphBinarySerializationInterceptor.java | 66 ++++++++++
.../gremlin/driver/simple/SimpleHttpClient.java | 8 +-
.../tinkerpop/gremlin/driver/ClusterTest.java | 143 +++++++++++++++++++++
.../gremlin/server/GremlinDriverIntegrateTest.java | 39 +++++-
.../server/GremlinServerAuthIntegrateTest.java | 19 +--
16 files changed, 590 insertions(+), 125 deletions(-)
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 5d8736bc44..5da058ab21 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -35,6 +35,7 @@
image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
* Added new list filtering step `none()`.
* Added support for `Set` in GraphSON and GraphBinary serialization for
`gremlin-javascript`, where it previously just converted to array.
* Added `Set` syntax in `gremlin-language`.
+* Modified RequestInterceptor to be a `UnaryOperator<HttpRequest>` to abstract
the underlying implementation.
* Renamed `MergeStep` to `MergeElementStep` as it is a base class to
`mergeV()` and `mergeE()`.
* Renamed `TraversalMergeStep` of `merge()` to `MergeStep` for consistency.
* Removed the deprecated `withGraph()` option from `AnonymousTraversalSource`.
diff --git a/docs/src/upgrade/release-4.x.x.asciidoc
b/docs/src/upgrade/release-4.x.x.asciidoc
index 7e5e741a9f..e1f90c74b1 100644
--- a/docs/src/upgrade/release-4.x.x.asciidoc
+++ b/docs/src/upgrade/release-4.x.x.asciidoc
@@ -85,6 +85,27 @@ The properties file in the above example can either point to
a remote configurat
See: link:https://issues.apache.org/jira/browse/TINKERPOP-3017[TINKERPOP-3017]
+==== Changes to RequestInterceptor
+
+Because the underlying transport has been changed from WebSockets to HTTP, the
usage of the `RequestInterceptor` has
+changed as well. The `RequestInterceptor` will now be run per request and will
allow you to completely modify the HTTP
+request that is sent to the server. `Cluster` has four new methods added to
it: `addInterceptorAfter`,
+`addInterceptorBefore`, `removeInterceptor` and `addInterceptor`. Each
interceptor requires a name as it will be used
+to insert new interceptors in different positions.
+
+The interceptors work with a new class called HttpRequest. This is just a
basic abstraction over a request but it also
+contains some useful strings for common headers. The initial `HttpRequest`
that is passed to the first interceptor will
+contain a `RequestMessage`. `RequestMessage` is immutable and only certain
keys can be added to them. If you want to
+customize the body by adding other fields, you will need to make a different
copy of the `RequestMessage` or completely
+change the body to contain a different data type. The final interceptor must
return a `HttpRequest` whose body contains
+a `byte[]`.
+
+After the initial HTTP request is generated, the interceptors will be called
in order to allow the request to be
+modified. After each `RequestInterceptor` is run, the request is updated with
the data from the final `HttpRequest` and
+that is sent to the endpoint. There is a default interceptor added to every
`Cluster` called "serializer". This
+interceptor is responsible for serializing the request body is which what the
server normally expects. This is intended
+to be an advanced customization technique that should only be used when needed.
+
==== Changes to deserialization for gremlin-javascript
Starting from this version, `gremlin-javascript` will deserialize `Set` data
into a ECMAScript 2015 Set. Previously,
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
index 950699f17d..c43b80c7d2 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -191,7 +191,8 @@ public interface Channelizer extends ChannelHandler {
super.init(connection);
httpCompressionDecoder = new HttpContentDecompressionHandler();
- gremlinRequestEncoder = new
HttpGremlinRequestEncoder(cluster.getSerializer(),
cluster.getRequestInterceptor(), cluster.isUserAgentOnConnectEnabled(),
cluster.isBulkingEnabled());
+ gremlinRequestEncoder = new
HttpGremlinRequestEncoder(cluster.getSerializer(),
cluster.getRequestInterceptors(),
+ cluster.isUserAgentOnConnectEnabled(),
cluster.isBulkingEnabled(), connection.getUri());
gremlinResponseDecoder = new
HttpGremlinResponseStreamDecoder(cluster.getSerializer(),
cluster.getMaxResponseContentLength());
}
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
index bfca36f9aa..6f18c394e1 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
@@ -29,7 +29,9 @@ import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.util.concurrent.Future;
import org.apache.commons.configuration2.Configuration;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.tinkerpop.gremlin.driver.auth.Auth;
+import
org.apache.tinkerpop.gremlin.driver.interceptor.GraphBinarySerializationInterceptor;
import org.apache.tinkerpop.gremlin.util.MessageSerializer;
import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
import org.apache.tinkerpop.gremlin.util.ser.Serializers;
@@ -57,6 +59,7 @@ import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -75,6 +78,7 @@ import java.util.stream.Collectors;
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
public final class Cluster {
+ public static final String SERIALIZER_INTERCEPTOR_NAME = "serializer";
private static final Logger logger =
LoggerFactory.getLogger(Cluster.class);
private final Manager manager;
@@ -364,8 +368,8 @@ public final class Cluster {
return manager.serializer;
}
- List<RequestInterceptor> getRequestInterceptor() {
- return manager.interceptor;
+ List<Pair<String, ? extends RequestInterceptor>> getRequestInterceptors() {
+ return manager.interceptors;
}
ScheduledExecutorService executor() {
@@ -473,6 +477,8 @@ public final class Cluster {
}
public final static class Builder {
+ private static int INTERCEPTOR_NOT_FOUND = -1;
+
private final List<InetAddress> addresses = new ArrayList<>();
private int port = 8182;
private String path = "/gremlin";
@@ -499,17 +505,18 @@ public final class Cluster {
private boolean sslSkipCertValidation = false;
private SslContext sslContext = null;
private LoadBalancingStrategy loadBalancingStrategy = new
LoadBalancingStrategy.RoundRobin();
- private List<RequestInterceptor> interceptors = new ArrayList<>();
+ private LinkedList<Pair<String, ? extends RequestInterceptor>>
interceptors = new LinkedList<>();
private long connectionSetupTimeoutMillis =
Connection.CONNECTION_SETUP_TIMEOUT_MILLIS;
private boolean enableUserAgentOnConnect = true;
private boolean enableBulkedResult = false;
private Builder() {
- // empty to prevent direct instantiation
+ addInterceptor(SERIALIZER_INTERCEPTOR_NAME, new
GraphBinarySerializationInterceptor());
}
private Builder(final String address) {
addContactPoint(address);
+ addInterceptor(SERIALIZER_INTERCEPTOR_NAME, new
GraphBinarySerializationInterceptor());
}
/**
@@ -741,16 +748,82 @@ public final class Cluster {
}
/**
- * Specifies an {@link RequestInterceptor} that will allow
manipulation of the {@code FullHttpRequest} prior
- * to its being sent to the server. For websockets the interceptor is
only called on the handshake.
+ * Adds a {@link RequestInterceptor} after another one that will allow
manipulation of the {@code HttpRequest}
+ * prior to its being sent to the server.
*/
- public Builder requestInterceptor(final RequestInterceptor
interceptor) {
- interceptors.add(interceptor);
+ public Builder addInterceptorAfter(final String priorInterceptorName,
final String nameOfInterceptor,
+ final RequestInterceptor
interceptor) {
+ final int index = getInterceptorIndex(priorInterceptorName);
+ if (INTERCEPTOR_NOT_FOUND == index) {
+ throw new IllegalArgumentException(priorInterceptorName + "
interceptor not found");
+ } else if (getInterceptorIndex(nameOfInterceptor) !=
INTERCEPTOR_NOT_FOUND) {
+ throw new IllegalArgumentException(nameOfInterceptor + "
interceptor already exists");
+ }
+ interceptors.add(index + 1, Pair.of(nameOfInterceptor,
interceptor));
+
return this;
}
+ /**
+ * Adds a {@link RequestInterceptor} before another one that will
allow manipulation of the {@code HttpRequest}
+ * prior to its being sent to the server.
+ */
+ public Builder addInterceptorBefore(final String
subsequentInterceptorName, final String nameOfInterceptor,
+ final RequestInterceptor
interceptor) {
+ final int index = getInterceptorIndex(subsequentInterceptorName);
+ if (INTERCEPTOR_NOT_FOUND == index) {
+ throw new IllegalArgumentException(subsequentInterceptorName +
" interceptor not found");
+ } else if (getInterceptorIndex(nameOfInterceptor) !=
INTERCEPTOR_NOT_FOUND) {
+ throw new IllegalArgumentException(nameOfInterceptor + "
interceptor already exists");
+ } else if (index == 0) {
+ interceptors.addFirst(Pair.of(nameOfInterceptor, interceptor));
+ } else {
+ interceptors.add(index - 1, Pair.of(nameOfInterceptor,
interceptor));
+ }
+
+ return this;
+ }
+
+ /**
+ * Adds a {@link RequestInterceptor} to the end of the list that will
allow manipulation of the
+ * {@code HttpRequest} prior to its being sent to the server.
+ */
+ public Builder addInterceptor(final String name, final
RequestInterceptor interceptor) {
+ if (getInterceptorIndex(name) != INTERCEPTOR_NOT_FOUND) {
+ throw new IllegalArgumentException(name + " interceptor
already exists");
+ }
+ interceptors.add(Pair.of(name, interceptor));
+ return this;
+ }
+
+ /**
+ * Removes a {@link RequestInterceptor} from the list. This can be
used to remove the default interceptors that
+ * aren't needed.
+ */
+ public Builder removeInterceptor(final String name) {
+ final int index = getInterceptorIndex(name);
+ if (index == INTERCEPTOR_NOT_FOUND) {
+ throw new IllegalArgumentException(name + " interceptor not
found");
+ }
+ interceptors.remove(index);
+ return this;
+ }
+
+ private int getInterceptorIndex(final String name) {
+ for (int i = 0; i < interceptors.size(); i++) {
+ if (interceptors.get(i).getLeft().equals(name)) {
+ return i;
+ }
+ }
+
+ return INTERCEPTOR_NOT_FOUND;
+ }
+
+ /**
+ * Adds an Auth {@link RequestInterceptor} to the end of list of
interceptors.
+ */
public Builder auth(final Auth auth) {
- interceptors.add(auth);
+ addInterceptor(auth.getClass().getSimpleName().toLowerCase() +
"-auth", auth);
return this;
}
@@ -861,7 +934,7 @@ public final class Cluster {
private final LoadBalancingStrategy loadBalancingStrategy;
private final Optional<SslContext> sslContextOptional;
private final Supplier<RequestMessage.Builder> validationRequest;
- private final List<RequestInterceptor> interceptor;
+ private final List<Pair<String, ? extends RequestInterceptor>>
interceptors;
/**
* Thread pool for requests.
@@ -894,7 +967,7 @@ public final class Cluster {
this.loadBalancingStrategy = builder.loadBalancingStrategy;
this.contactPoints = builder.getContactPoints();
- this.interceptor = builder.interceptors;
+ this.interceptors = builder.interceptors;
this.enableUserAgentOnConnect = builder.enableUserAgentOnConnect;
this.enableBulkedResult = builder.enableBulkedResult;
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/HttpRequest.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/HttpRequest.java
new file mode 100644
index 0000000000..0d10a1a1ca
--- /dev/null
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/HttpRequest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver;
+
+import java.net.URI;
+import java.util.Map;
+
+/**
+ * HttpRequest represents the data that will be used to create the actual
request to the remote endpoint. It will be
+ * passed to different {@link RequestInterceptor} that can update its values.
The body can be anything as the
+ * interceptor may change what the payload is. Also contains some convenience
Strings for common HTTP header key and
+ * values and HTTP methods.
+ */
+public class HttpRequest {
+ public static class Headers {
+ // Add as needed. Headers are case-insensitive; lower case for now to
match Netty.
+ public static final String ACCEPT = "accept";
+ public static final String ACCEPT_ENCODING = "accept-encoding";
+ public static final String AUTHORIZATION = "authorization";
+ public static final String CONTENT_TYPE = "content-type";
+ public static final String CONTENT_LENGTH = "content-length";
+ public static final String DEFLATE = "deflate";
+ public static final String HOST = "host";
+ public static final String USER_AGENT = "user-agent";
+ }
+
+ public static class Method {
+ public static final String GET = "GET";
+ public static final String POST = "POST";
+ }
+
+ private final Map<String, String> headers;
+ private Object body;
+ private URI uri;
+ private String method;
+
+ /**
+ * Constructor that defaults the method to {@code POST}.
+ */
+ public HttpRequest(final Map<String, String> headers, final Object body,
final URI uri) {
+ this(headers, body, uri, Method.POST);
+ }
+
+ /**
+ * Full constructor.
+ */
+ public HttpRequest(final Map<String, String> headers, final Object body,
final URI uri, final String method) {
+ this.headers = headers;
+ this.body = body;
+ this.uri = uri;
+ this.method = method;
+ }
+
+ /**
+ * Get the headers of the request.
+ *
+ * @return a map of headers. This can be used to directly update the
entries.
+ */
+ public Map<String, String> headers() {
+ return headers;
+ }
+
+ /**
+ * Get the body of the request.
+ *
+ * @return an Object representing the body.
+ */
+ public Object getBody() {
+ return body;
+ }
+
+ /**
+ * Get the URI of the request.
+ *
+ * @return the request URI.
+ */
+ public URI getUri() {
+ return uri;
+ }
+
+ /**
+ * Get the HTTP method of the request. The standard {@code /gremlin}
endpoint only supports {@code POST}.
+ *
+ * @return the HTTP method.
+ */
+ public String getMethod() {
+ return method;
+ }
+
+ /**
+ * Set the HTTP body of the request. During processing, the body can be
any type but the final interceptor must set
+ * the body to a {@code byte[]}.
+ *
+ * @return this HttpRequest for method chaining.
+ */
+ public HttpRequest setBody(final Object body) {
+ this.body = body;
+ return this;
+ }
+
+ /**
+ * Set the HTTP method of the request.
+ *
+ * @return this HttpRequest for method chaining.
+ */
+ public HttpRequest setMethod(final String method) {
+ this.method = method;
+ return this;
+ }
+
+ /**
+ * Set the URI of the request.
+ *
+ * @return this HttpRequest for method chaining.
+ */
+ public HttpRequest setUri(final URI uri) {
+ this.uri = uri;
+ return this;
+ }
+}
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestInterceptor.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestInterceptor.java
index 2ffde9ac09..84fbc77880 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestInterceptor.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestInterceptor.java
@@ -18,15 +18,14 @@
*/
package org.apache.tinkerpop.gremlin.driver;
-import io.netty.handler.codec.http.FullHttpRequest;
-
import java.util.function.UnaryOperator;
/**
- * This function is called a {@code FullHttpRequest} constructed and allow it
to be modified as needed before it is
- * sent to the server. Implementations are supplied to {@link
Cluster.Builder#requestInterceptor(RequestInterceptor)}.
- * When this method is called is dependent on the {@link Channelizer}
implementation.
+ * Interceptors are run as a list to allow modification of the HTTP request
before it is sent to the server. The first
+ * interceptor will be provided with a {@link HttpRequest} that holds a
+ * {@link org.apache.tinkerpop.gremlin.util.message.RequestMessage} in the
body. The final interceptor should contain a
+ * {@code byte[]} in the body.
*/
-public interface RequestInterceptor extends UnaryOperator<FullHttpRequest> {
+public interface RequestInterceptor extends UnaryOperator<HttpRequest> {
}
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
index 3f5f5e6f4a..a71983a2be 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
@@ -393,5 +393,10 @@ public final class Settings {
* The region setting for sigv4 authentication.
*/
public String region = null;
+
+ /**
+ * The service name setting for sigv4 authentication.
+ */
+ public String serviceName = null;
}
}
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/auth/Auth.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/auth/Auth.java
index b096fe742d..0593e62c8a 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/auth/Auth.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/auth/Auth.java
@@ -30,12 +30,8 @@ public interface Auth extends RequestInterceptor {
return new Basic(username, password);
}
- static Auth sigv4(final String regionName) {
- return new Sigv4(regionName);
- }
-
- static Auth sigv4(final String regionName, final AWSCredentialsProvider
awsCredentialsProvider) {
- return new Sigv4(regionName, awsCredentialsProvider);
+ static Auth sigv4(final String regionName, final String serviceName) {
+ return new Sigv4(regionName, serviceName);
}
static Auth sigv4(final String regionName, final AWSCredentialsProvider
awsCredentialsProvider, final String serviceName) {
@@ -47,7 +43,7 @@ public interface Auth extends RequestInterceptor {
return basic(settings.username, settings.password);
}
if (settings.type.equals(AUTH_SIGV4)) {
- return sigv4(settings.region);
+ return sigv4(settings.region, settings.serviceName);
}
throw new IllegalArgumentException("Unknown auth type: " +
settings.type);
}
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/auth/Basic.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/auth/Basic.java
index 578a241c92..f69d624fd6 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/auth/Basic.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/auth/Basic.java
@@ -20,6 +20,7 @@ package org.apache.tinkerpop.gremlin.driver.auth;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
+import org.apache.tinkerpop.gremlin.driver.HttpRequest;
import java.util.Base64;
@@ -34,10 +35,10 @@ public class Basic implements Auth {
}
@Override
- public FullHttpRequest apply(final FullHttpRequest fullHttpRequest) {
+ public HttpRequest apply(final HttpRequest httpRequest) {
final String valueToEncode = username + ":" + password;
- fullHttpRequest.headers().add(HttpHeaderNames.AUTHORIZATION,
+ httpRequest.headers().put(HttpRequest.Headers.AUTHORIZATION,
"Basic " +
Base64.getEncoder().encodeToString(valueToEncode.getBytes()));
- return fullHttpRequest;
+ return httpRequest;
}
}
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/auth/Sigv4.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/auth/Sigv4.java
index 55f1bbd723..3404c9d5b6 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/auth/Sigv4.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/auth/Sigv4.java
@@ -33,6 +33,7 @@ import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaders;
import org.apache.http.entity.StringEntity;
+import org.apache.tinkerpop.gremlin.driver.HttpRequest;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@@ -48,19 +49,16 @@ import static
com.amazonaws.auth.internal.SignerConstants.HOST;
import static com.amazonaws.auth.internal.SignerConstants.X_AMZ_DATE;
import static com.amazonaws.auth.internal.SignerConstants.X_AMZ_SECURITY_TOKEN;
+/**
+ * A {@link org.apache.tinkerpop.gremlin.driver.RequestInterceptor} that
provides headers required for SigV4. Because
+ * the signing process requires final header and body data, this interceptor
should almost always be last.
+ */
public class Sigv4 implements Auth {
-
- static final String NEPTUNE_SERVICE_NAME = "neptune-db";
private final AWSCredentialsProvider awsCredentialsProvider;
private final AWS4Signer aws4Signer;
-
- public Sigv4(final String regionName) {
- this(regionName, new DefaultAWSCredentialsProviderChain(),
NEPTUNE_SERVICE_NAME);
- }
-
- public Sigv4(final String regionName, final AWSCredentialsProvider
awsCredentialsProvider) {
- this(regionName, awsCredentialsProvider, NEPTUNE_SERVICE_NAME);
+ public Sigv4(final String regionName, final String serviceName) {
+ this(regionName, new DefaultAWSCredentialsProviderChain(),
serviceName);
}
public Sigv4(final String regionName, final AWSCredentialsProvider
awsCredentialsProvider, final String serviceName) {
@@ -72,10 +70,10 @@ public class Sigv4 implements Auth {
}
@Override
- public FullHttpRequest apply(final FullHttpRequest fullHttpRequest) {
+ public HttpRequest apply(final HttpRequest httpRequest) {
try {
// Convert Http request into an AWS SDK signable request
- final SignableRequest<?> awsSignableRequest =
toSignableRequest(fullHttpRequest);
+ final SignableRequest<?> awsSignableRequest =
toSignableRequest(httpRequest);
// Sign the AWS SDK signable request (which internally adds some
HTTP headers)
final AWSCredentials credentials =
awsCredentialsProvider.getCredentials();
@@ -87,82 +85,54 @@ public class Sigv4 implements Auth {
sessionToken = ((BasicSessionCredentials)
credentials).getSessionToken();
}
- // todo: confirm is needed to replace header `Host` with `host`
- fullHttpRequest.headers().remove(HttpHeaderNames.HOST);
- fullHttpRequest.headers().add(HOST,
awsSignableRequest.getHeaders().get(HOST));
- fullHttpRequest.headers().add(X_AMZ_DATE,
awsSignableRequest.getHeaders().get(X_AMZ_DATE));
- fullHttpRequest.headers().add(AUTHORIZATION,
awsSignableRequest.getHeaders().get(AUTHORIZATION));
+ final Map<String, String> headers = httpRequest.headers();
+ headers.remove(HttpRequest.Headers.HOST);
+ headers.put(HOST, awsSignableRequest.getHeaders().get(HOST));
+ headers.put(X_AMZ_DATE,
awsSignableRequest.getHeaders().get(X_AMZ_DATE));
+ headers.put(AUTHORIZATION,
awsSignableRequest.getHeaders().get(AUTHORIZATION));
if (!sessionToken.isEmpty()) {
- fullHttpRequest.headers().add(X_AMZ_SECURITY_TOKEN,
sessionToken);
+ headers.put(X_AMZ_SECURITY_TOKEN, sessionToken);
}
} catch (final Exception ex) {
throw new AuthenticationException(ex);
}
- return fullHttpRequest;
+ return httpRequest;
}
- private SignableRequest<?> toSignableRequest(final FullHttpRequest
request) throws IOException {
+ private SignableRequest<?> toSignableRequest(final HttpRequest request)
throws IOException {
// make sure the request contains the minimal required set of
information
- checkNotNull(request.uri(), "The request URI must not be null");
- checkNotNull(request.method(), "The request method must not be null");
+ checkNotNull(request.getUri(), "The request URI must not be null");
+ checkNotNull(request.getMethod(), "The request method must not be
null");
// convert the headers to the internal API format
- final HttpHeaders headers = request.headers();
+ final Map<String, String> headers = request.headers();
final Map<String, String> headersInternal = new HashMap<>();
- String hostName = "";
-
// we don't want to add the Host header as the Signer always adds the
host header.
- for (String header : headers.names()) {
+ for (Map.Entry<String, String> header : headers.entrySet()) {
// Skip adding the Host header as the signing process will add one.
- if (!header.equalsIgnoreCase(HOST)) {
- headersInternal.put(header, headers.get(header));
- } else {
- hostName = headers.get(header);
+ if (!header.getKey().equalsIgnoreCase(HttpRequest.Headers.HOST)) {
+ headersInternal.put(header.getKey(), header.getValue());
}
}
// convert the parameters to the internal API format
- final URI uri = URI.create(request.uri());
+ final URI uri = request.getUri();
final Map<String, List<String>> parametersInternal =
extractParametersFromQueryString(uri.getQuery());
// carry over the entity (or an empty entity, if no entity is provided)
- final InputStream content;
- final ByteBuf contentBuffer = request.content();
- boolean hasContent = false;
- try {
- if (contentBuffer != null && contentBuffer.isReadable()) {
- hasContent = true;
- contentBuffer.retain();
- final byte[] bytes = new byte[contentBuffer.readableBytes()];
- contentBuffer.getBytes(contentBuffer.readerIndex(), bytes);
- content = new ByteArrayInputStream(bytes);
- } else {
- content = new StringEntity("").getContent();
- }
- } finally {
- if (hasContent) {
- contentBuffer.release();
- }
- }
-
- if (StringUtils.isNullOrEmpty(hostName)) {
- // try to extract hostname from the uri since hostname was not
provided in the header.
- final String authority = uri.getAuthority();
- if (authority == null) {
- throw new IllegalArgumentException("Unable to identify host
information,"
- + " either hostname should be provided in the uri or
should be passed as a header");
- }
-
- hostName = authority;
+ if (!(request.getBody() instanceof byte[])) {
+ throw new IllegalArgumentException("Expected byte[] in HttpRequest
body but got " + request.getBody().getClass());
}
- final URI endpointUri = URI.create("http://" + hostName);
+ final byte[] body = (byte[]) request.getBody();
+ final InputStream content = (body.length != 0) ? new
ByteArrayInputStream(body) : new StringEntity("").getContent();
+ final URI endpointUri = URI.create(uri.getScheme() + "://" +
uri.getHost());
return convertToSignableRequest(
- request.method().name(),
+ request.getMethod(),
endpointUri,
uri.getPath(),
headersInternal,
@@ -216,7 +186,7 @@ public class Sigv4 implements Auth {
final InputStream httpContent) {
// create the HTTP AWS SDK Signable Request and carry over information
- final DefaultRequest<?> awsRequest = new
DefaultRequest<>(NEPTUNE_SERVICE_NAME);
+ final DefaultRequest<?> awsRequest = new
DefaultRequest<>(aws4Signer.getServiceName());
awsRequest.setHttpMethod(HttpMethodName.fromValue(httpMethodName));
awsRequest.setEndpoint(httpEndpointUri);
awsRequest.setResourcePath(resourcePath);
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java
index 4e08f6def7..c1ea9ae98c 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java
@@ -19,17 +19,18 @@
package org.apache.tinkerpop.gremlin.driver.handler;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
-import io.netty.handler.codec.http.HttpHeaderNames;
-import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.tinkerpop.gremlin.driver.HttpRequest;
import org.apache.tinkerpop.gremlin.driver.RequestInterceptor;
import org.apache.tinkerpop.gremlin.driver.UserAgent;
import org.apache.tinkerpop.gremlin.driver.auth.Auth.AuthenticationException;
@@ -38,10 +39,12 @@ import
org.apache.tinkerpop.gremlin.process.traversal.GremlinLang;
import org.apache.tinkerpop.gremlin.util.MessageSerializer;
import org.apache.tinkerpop.gremlin.util.Tokens;
import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
-import org.apache.tinkerpop.gremlin.util.ser.SerializationException;
import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import static
org.apache.tinkerpop.gremlin.driver.handler.SslCheckHandler.REQUEST_SENT;
@@ -54,13 +57,17 @@ public final class HttpGremlinRequestEncoder extends
MessageToMessageEncoder<Req
private final MessageSerializer<?> serializer;
private final boolean userAgentEnabled;
private final boolean bulkedResultEnabled;
- private final List<RequestInterceptor> interceptors;
+ private final List<Pair<String, ? extends RequestInterceptor>>
interceptors;
+ private final URI uri;
- public HttpGremlinRequestEncoder(final MessageSerializer<?> serializer,
final List<RequestInterceptor> interceptors, boolean userAgentEnabled, boolean
bulkedResultEnabled) {
+ public HttpGremlinRequestEncoder(final MessageSerializer<?> serializer,
+ final List<Pair<String, ? extends
RequestInterceptor>> interceptors,
+ final boolean userAgentEnabled, boolean
bulkedResultEnabled, final URI uri) {
this.serializer = serializer;
this.interceptors = interceptors;
this.userAgentEnabled = userAgentEnabled;
this.bulkedResultEnabled = bulkedResultEnabled;
+ this.uri = uri;
}
@Override
@@ -74,29 +81,28 @@ public final class HttpGremlinRequestEncoder extends
MessageToMessageEncoder<Req
final InetSocketAddress remoteAddress =
getRemoteAddress(channelHandlerContext.channel());
try {
- final ByteBuf buffer =
serializer.serializeRequestAsBinary(requestMessage,
channelHandlerContext.alloc());
- FullHttpRequest request = new
DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/", buffer);
- request.headers().add(HttpHeaderNames.CONTENT_TYPE, mimeType);
- request.headers().add(HttpHeaderNames.CONTENT_LENGTH,
buffer.readableBytes());
- request.headers().add(HttpHeaderNames.ACCEPT, mimeType);
- request.headers().add(HttpHeaderNames.ACCEPT_ENCODING,
HttpHeaderValues.DEFLATE);
- request.headers().add(HttpHeaderNames.HOST,
remoteAddress.getAddress().getHostAddress());
+ Map<String, String> headersMap = new HashMap<>();
+ headersMap.put(HttpRequest.Headers.HOST,
remoteAddress.getAddress().getHostAddress());
+ headersMap.put(HttpRequest.Headers.ACCEPT, mimeType);
+ headersMap.put(HttpRequest.Headers.ACCEPT_ENCODING,
HttpRequest.Headers.DEFLATE);
if (userAgentEnabled) {
- request.headers().add(HttpHeaderNames.USER_AGENT,
UserAgent.USER_AGENT);
+ headersMap.put(HttpRequest.Headers.USER_AGENT,
UserAgent.USER_AGENT);
}
if (bulkedResultEnabled) {
- request.headers().add(Tokens.BULKED, "true");
+ headersMap.put(Tokens.BULKED, "true");
}
+ HttpRequest gremlinRequest = new HttpRequest(headersMap,
requestMessage, uri);
- for (final RequestInterceptor interceptor : interceptors) {
- request = interceptor.apply(request);
+ for (final Pair<String, ? extends RequestInterceptor> interceptor
: interceptors) {
+ gremlinRequest = interceptor.getRight().apply(gremlinRequest);
}
- objects.add(request);
+
+ final FullHttpRequest finalRequest = new
DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST,
+ uri.getPath(), convertBody(gremlinRequest));
+ gremlinRequest.headers().forEach((k, v) ->
finalRequest.headers().add(k, v));
+
+ objects.add(finalRequest);
channelHandlerContext.channel().attr(REQUEST_SENT).set(true);
- } catch (SerializationException ex) {
- throw new ResponseException(HttpResponseStatus.BAD_REQUEST,
String.format(
- "An error occurred during serialization of this request
[%s] - it could not be sent to the server - Reason: %s",
- requestMessage, ex));
} catch (AuthenticationException ex) {
throw new ResponseException(HttpResponseStatus.BAD_REQUEST,
String.format(
"An error occurred during authentication [%s] - it could
not be sent to the server - Reason: %s",
@@ -115,4 +121,15 @@ public final class HttpGremlinRequestEncoder extends
MessageToMessageEncoder<Req
}
return remoteAddress;
}
+
+ private static ByteBuf convertBody(final HttpRequest request) {
+ final Object body = request.getBody();
+ if (body instanceof byte[]) {
+ request.headers().put(HttpRequest.Headers.CONTENT_LENGTH,
String.valueOf(((byte[]) body).length));
+ return Unpooled.wrappedBuffer((byte[]) body);
+ } else {
+ throw new IllegalArgumentException("Final body must be byte[] but
found "
+ + body.getClass().getSimpleName());
+ }
+ }
}
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/interceptor/GraphBinarySerializationInterceptor.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/interceptor/GraphBinarySerializationInterceptor.java
new file mode 100644
index 0000000000..a6e656eea8
--- /dev/null
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/interceptor/GraphBinarySerializationInterceptor.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver.interceptor;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.tinkerpop.gremlin.driver.HttpRequest;
+import org.apache.tinkerpop.gremlin.driver.RequestInterceptor;
+import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
+import org.apache.tinkerpop.gremlin.util.MessageSerializer;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4;
+import org.apache.tinkerpop.gremlin.util.ser.SerializationException;
+
+/**
+ * A {@link RequestInterceptor} that serializes the request body to the {@code
GraphBinary} format. This interceptor
+ * should be run before other interceptors that need to calculate values based
on the request body.
+ */
+public class GraphBinarySerializationInterceptor implements RequestInterceptor
{
+ // Should be thread-safe as the GraphBinaryWriter doesn't maintain state.
+ private static final MessageSerializer serializer = new
GraphBinaryMessageSerializerV4();
+
+ @Override
+ public HttpRequest apply(HttpRequest httpRequest) {
+ if (!(httpRequest.getBody() instanceof RequestMessage)) {
+ throw new IllegalArgumentException("Only RequestMessage
serialization is supported");
+ }
+
+ final RequestMessage request = (RequestMessage) httpRequest.getBody();
+ final ByteBuf requestBuf;
+ try {
+ requestBuf = serializer.serializeRequestAsBinary(request,
ByteBufAllocator.DEFAULT);
+ } catch (SerializationException se) {
+ throw new RuntimeException(new
ResponseException(HttpResponseStatus.BAD_REQUEST, String.format(
+ "An error occurred during serialization of this request
[%s] - it could not be sent to the server - Reason: %s",
+ request, se)));
+ }
+
+ // Convert from ByteBuf to bytes[] because that's what the final
request body should contain.
+ final byte[] requestBytes = ByteBufUtil.getBytes(requestBuf);
+ requestBuf.release();
+
+ httpRequest.setBody(requestBytes);
+ httpRequest.headers().put(HttpRequest.Headers.CONTENT_TYPE,
serializer.mimeTypesSupported()[0]);
+
+ return httpRequest;
+ }
+}
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleHttpClient.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleHttpClient.java
index 8a8cc623f3..b2d932b2e2 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleHttpClient.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleHttpClient.java
@@ -23,10 +23,12 @@ import io.netty.channel.ChannelOption;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.tinkerpop.gremlin.driver.Channelizer;
import
org.apache.tinkerpop.gremlin.driver.handler.HttpContentDecompressionHandler;
import
org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinResponseStreamDecoder;
import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinRequestEncoder;
+import
org.apache.tinkerpop.gremlin.driver.interceptor.GraphBinarySerializationInterceptor;
import org.apache.tinkerpop.gremlin.util.MessageSerializer;
import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
import io.netty.bootstrap.Bootstrap;
@@ -45,6 +47,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -105,7 +108,10 @@ public class SimpleHttpClient extends AbstractClient {
new HttpClientCodec(),
new HttpContentDecompressionHandler(),
new
HttpGremlinResponseStreamDecoder(serializer, Integer.MAX_VALUE),
- new HttpGremlinRequestEncoder(serializer,
new ArrayList<>(), false, false),
+ new HttpGremlinRequestEncoder(serializer,
+ Collections.singletonList(
+ Pair.of("serializer", new
GraphBinarySerializationInterceptor())),
+ false, false, uri),
callbackResponseHandler);
}
});
diff --git
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClusterTest.java
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClusterTest.java
new file mode 100644
index 0000000000..fff9485a2a
--- /dev/null
+++
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ClusterTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.driver;
+
+import org.apache.commons.lang3.tuple.Pair;
+import
org.apache.tinkerpop.gremlin.driver.interceptor.GraphBinarySerializationInterceptor;
+import org.junit.Test;
+
+import java.util.List;
+
+import static
org.apache.tinkerpop.gremlin.driver.Cluster.SERIALIZER_INTERCEPTOR_NAME;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Test Cluster and Cluster.Builder.
+ */
+public class ClusterTest {
+ private static final RequestInterceptor TEST_INTERCEPTOR = httpRequest ->
new HttpRequest(null, null, null);
+
+ @Test
+ public void shouldNotAllowModifyingRelativeToNonExistentInterceptor() {
+ try {
+ Cluster.build().addInterceptorAfter("none", "test", req -> req);
+ fail("Should not have allowed interceptor to be added.");
+ } catch (Exception e) {
+ assertThat(e, instanceOf(IllegalArgumentException.class));
+ assertEquals("none interceptor not found", e.getMessage());
+ }
+
+ try {
+ Cluster.build().addInterceptorBefore("none", "test", req -> req);
+ fail("Should not have allowed interceptor to be added.");
+ } catch (Exception e) {
+ assertThat(e, instanceOf(IllegalArgumentException.class));
+ assertEquals("none interceptor not found", e.getMessage());
+ }
+
+ try {
+ Cluster.build().removeInterceptor("nonexistent");
+ fail("Should not have allowed interceptor to be removed.");
+ } catch (Exception e) {
+ assertThat(e, instanceOf(IllegalArgumentException.class));
+ assertEquals("nonexistent interceptor not found", e.getMessage());
+ }
+ }
+
+ @Test
+ public void shouldAddToInterceptorToBeginningIfBeforeFirst() {
+ final Cluster testCluster = Cluster.build()
+ .addInterceptor("b", req -> req)
+ .addInterceptorBefore(SERIALIZER_INTERCEPTOR_NAME, "a",
TEST_INTERCEPTOR)
+ .create();
+ assertEquals("a",
testCluster.getRequestInterceptors().get(0).getLeft());
+ assertEquals(TEST_INTERCEPTOR,
testCluster.getRequestInterceptors().get(0).getRight());
+ assertEquals(SERIALIZER_INTERCEPTOR_NAME,
testCluster.getRequestInterceptors().get(1).getLeft());
+ }
+
+ @Test
+ public void shouldAddToInterceptorAfter() {
+ final Cluster testCluster = Cluster.build()
+ .addInterceptor("b", req -> req)
+ .addInterceptorAfter(SERIALIZER_INTERCEPTOR_NAME, "a",
TEST_INTERCEPTOR)
+ .create();
+ assertEquals(SERIALIZER_INTERCEPTOR_NAME,
testCluster.getRequestInterceptors().get(0).getLeft());
+ assertEquals("a",
testCluster.getRequestInterceptors().get(1).getLeft());
+ assertEquals(TEST_INTERCEPTOR,
testCluster.getRequestInterceptors().get(1).getRight());
+ assertEquals("b",
testCluster.getRequestInterceptors().get(2).getLeft());
+
+ }
+
+ @Test
+ public void shouldAddToInterceptorLast() {
+ final Cluster testCluster = Cluster.build()
+ .addInterceptor("c", req -> req)
+ .addInterceptor("b", req -> req)
+ .addInterceptor("a", req -> req)
+ .create();
+ assertEquals(SERIALIZER_INTERCEPTOR_NAME,
testCluster.getRequestInterceptors().get(0).getLeft());
+ assertEquals("c",
testCluster.getRequestInterceptors().get(1).getLeft());
+ assertEquals("b",
testCluster.getRequestInterceptors().get(2).getLeft());
+ assertEquals("a",
testCluster.getRequestInterceptors().get(3).getLeft());
+ }
+
+ @Test
+ public void shouldNotAllowAddingDuplicateName() {
+ try {
+ Cluster.build().addInterceptor("name", req ->
req).addInterceptor("name", req -> req);
+ fail("Should not have allowed interceptor to be added.");
+ } catch (Exception e) {
+ assertThat(e, instanceOf(IllegalArgumentException.class));
+ assertEquals("name interceptor already exists", e.getMessage());
+ }
+
+ try {
+ Cluster.build().addInterceptor("name", req ->
req).addInterceptorAfter("name", "name", req -> req);
+ fail("Should not have allowed interceptor to be added.");
+ } catch (Exception e) {
+ assertThat(e, instanceOf(IllegalArgumentException.class));
+ assertEquals("name interceptor already exists", e.getMessage());
+ }
+
+ try {
+ Cluster.build().addInterceptor("name", req ->
req).addInterceptorBefore("name", "name", req -> req);
+ fail("Should not have allowed interceptor to be added.");
+ } catch (Exception e) {
+ assertThat(e, instanceOf(IllegalArgumentException.class));
+ assertEquals("name interceptor already exists", e.getMessage());
+ }
+ }
+
+ @Test
+ public void shouldContainBodySerializerByDefault() {
+ final List<Pair<String, ? extends RequestInterceptor>> interceptors =
Cluster.build().create().getRequestInterceptors();
+ assertEquals(1, interceptors.size());
+ assertTrue(interceptors.get(0).getRight() instanceof
GraphBinarySerializationInterceptor);
+ }
+
+ @Test
+ public void shouldRemoveDefaultSerializer() {
+ final Cluster testCluster =
Cluster.build().removeInterceptor(SERIALIZER_INTERCEPTOR_NAME).create();
+ assertEquals(0, testCluster.getRequestInterceptors().size());
+ }
+}
diff --git
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index a191146924..b14b49959a 100644
---
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -19,11 +19,15 @@
package org.apache.tinkerpop.gremlin.server;
import ch.qos.logback.classic.Level;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.ByteBufUtil;
import io.netty.handler.codec.http.HttpResponseStatus;
import nl.altindag.log.LogCaptor;
import org.apache.tinkerpop.gremlin.TestHelper;
import org.apache.tinkerpop.gremlin.driver.Client;
import org.apache.tinkerpop.gremlin.driver.Cluster;
+import org.apache.tinkerpop.gremlin.driver.HttpRequest;
import org.apache.tinkerpop.gremlin.driver.RequestOptions;
import org.apache.tinkerpop.gremlin.driver.Result;
import org.apache.tinkerpop.gremlin.driver.ResultSet;
@@ -35,8 +39,11 @@ import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.io.Storage;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex;
import org.apache.tinkerpop.gremlin.util.ExceptionHelper;
+import org.apache.tinkerpop.gremlin.util.MessageSerializer;
import org.apache.tinkerpop.gremlin.util.TimeUtil;
import org.apache.tinkerpop.gremlin.util.function.FunctionUtils;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4;
import org.apache.tinkerpop.gremlin.util.ser.Serializers;
import org.junit.AfterClass;
import org.junit.Before;
@@ -162,7 +169,7 @@ public class GremlinDriverIntegrateTest extends
AbstractGremlinServerIntegration
final AtomicInteger httpRequests = new AtomicInteger(0);
final Cluster cluster = TestClientFactory.build().
- requestInterceptor(r -> {
+ addInterceptor("counter", r -> {
httpRequests.incrementAndGet();
return r;
}).create();
@@ -179,6 +186,28 @@ public class GremlinDriverIntegrateTest extends
AbstractGremlinServerIntegration
assertEquals(requestsToMake, httpRequests.get());
}
+ @Test
+ public void shouldRunInterceptorsInOrder() throws Exception {
+ AtomicReference<Object> body = new AtomicReference<>();
+ final Cluster cluster = TestClientFactory.build().
+ addInterceptor("first", r -> {
+ body.set(r.getBody());
+ r.setBody(null);
+ return r;
+ }).
+ addInterceptor("second", r -> {
+ r.setBody(body.get());
+ return r;
+ }).create();
+
+ try {
+ final Client client = cluster.connect();
+ assertEquals(2, client.submit("1+1").all().get().get(0).getInt());
+ } finally {
+ cluster.close();
+ }
+ }
+
@Test
public void shouldInterceptRequestsWithHandshake() throws Exception {
final int requestsToMake = 32;
@@ -186,10 +215,10 @@ public class GremlinDriverIntegrateTest extends
AbstractGremlinServerIntegration
final Cluster cluster = TestClientFactory.build().
minConnectionPoolSize(1).maxConnectionPoolSize(1).
- requestInterceptor(r -> {
- handshakeRequests.incrementAndGet();
- return r;
- }).create();
+ addInterceptor("counter", r -> {
+ handshakeRequests.incrementAndGet();
+ return r;
+ }).create();
try {
final Client client = cluster.connect();
diff --git
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
index edc7773cd8..33e1471a3e 100644
---
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
+++
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
@@ -23,6 +23,7 @@ import com.amazonaws.auth.AWSCredentialsProvider;
import io.netty.handler.codec.http.FullHttpRequest;
import org.apache.tinkerpop.gremlin.driver.Client;
import org.apache.tinkerpop.gremlin.driver.Cluster;
+import org.apache.tinkerpop.gremlin.driver.HttpRequest;
import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
import org.apache.tinkerpop.gremlin.server.auth.SimpleAuthenticator;
import org.apache.tinkerpop.gremlin.util.ExceptionHelper;
@@ -94,22 +95,22 @@ public class GremlinServerAuthIntegrateTest extends
AbstractGremlinServerIntegra
when(credentials.getAWSAccessKeyId()).thenReturn("I am
AWSAccessKeyId");
when(credentials.getAWSSecretKey()).thenReturn("I am AWSSecretKey");
- final AtomicReference<FullHttpRequest> fullHttpRequest = new
AtomicReference<>();
+ final AtomicReference<HttpRequest> httpRequest = new
AtomicReference<>();
final Cluster cluster = TestClientFactory.build()
- .auth(sigv4("us-west2", credentialsProvider))
- .requestInterceptor(r -> {
- fullHttpRequest.set(r);
+ .auth(sigv4("us-west2", credentialsProvider, "service-name"))
+ .addInterceptor("header-checker", r -> {
+ httpRequest.set(r);
return r;
})
.create();
final Client client = cluster.connect();
client.submit("1+1").all().get();
- assertNotNull(fullHttpRequest.get().headers().get("X-Amz-Date"));
- assertThat(fullHttpRequest.get().headers().get("Authorization"),
- startsWith("AWS4-HMAC-SHA256 Credential=I am AWSAccessKeyId"));
- assertThat(fullHttpRequest.get().headers().get("Authorization"),
- allOf(containsString("/us-west2/neptune-db/aws4_request"),
containsString("Signature=")));
+ Map<String, String> headers = httpRequest.get().headers();
+ assertNotNull(headers.get("X-Amz-Date"));
+ assertThat(headers.get("Authorization"), startsWith("AWS4-HMAC-SHA256
Credential=I am AWSAccessKeyId"));
+ assertThat(headers.get("Authorization"),
+ allOf(containsString("/us-west2/service-name/aws4_request"),
containsString("Signature=")));
cluster.close();
}