This is an automated email from the ASF dual-hosted git repository.
kenhuuu pushed a commit to branch master-http
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/master-http by this push:
new 88048316d6 Minimal changes to get gremlin-driver working with
integration tests (#2543)
88048316d6 is described below
commit 88048316d6ae916320a1a9dc4f09d18f24577ee2
Author: kenhuuu <[email protected]>
AuthorDate: Thu Apr 4 13:56:40 2024 -0700
Minimal changes to get gremlin-driver working with integration tests (#2543)
---
.../apache/tinkerpop/gremlin/driver/Client.java | 107 +++----
.../apache/tinkerpop/gremlin/driver/Cluster.java | 12 +-
.../tinkerpop/gremlin/driver/Connection.java | 17 +-
.../tinkerpop/gremlin/driver/ConnectionPool.java | 8 +-
.../gremlin/driver/LoadBalancingStrategy.java | 10 +-
.../tinkerpop/gremlin/driver/RequestOptions.java | 6 +-
.../apache/tinkerpop/gremlin/driver/ResultSet.java | 8 +-
.../apache/tinkerpop/gremlin/driver/Settings.java | 10 +-
.../driver/handler/HttpGremlinRequestEncoder.java | 11 +-
.../gremlin/driver/simple/AbstractClient.java | 10 +-
.../gremlin/driver/simple/SimpleClient.java | 14 +-
...{WebSocketClient.java => SimpleHttpClient.java} | 46 +--
.../gremlin/driver/simple/WebSocketClient.java | 170 +++++-----
.../tinkerpop/gremlin/driver/ResultSetTest.java | 4 +-
.../server/handler/HttpGremlinEndpointHandler.java | 217 ++++++-------
.../gremlin/server/handler/HttpHandlerUtil.java | 5 +
.../driver/ClientConnectionIntegrateTest.java | 8 +-
.../gremlin/server/GremlinDriverIntegrateTest.java | 72 ++---
.../gremlin/server/GremlinServerIntegrateTest.java | 342 ++++++++++-----------
.../server/GremlinServerSslIntegrateTest.java | 11 +-
.../gremlin/server/TestClientFactory.java | 14 +-
.../server/handler/HttpHandlerUtilTest.java | 4 +-
.../gremlin/util/message/RequestMessageV4.java | 26 +-
23 files changed, 546 insertions(+), 586 deletions(-)
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
index 303269c5b4..88f935fd20 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
@@ -20,11 +20,11 @@ package org.apache.tinkerpop.gremlin.driver;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.tinkerpop.gremlin.util.Tokens;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
-import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
@@ -81,11 +81,11 @@ public abstract class Client {
}
/**
- * Makes any initial changes to the builder and returns the constructed
{@link RequestMessage}. Implementers
+ * Makes any initial changes to the builder and returns the constructed
{@link RequestMessageV4}. Implementers
* may choose to override this message to append data to the request
before sending. By default, this method
* will simply return the {@code builder} passed in by the caller.
*/
- public RequestMessage.Builder buildMessage(final RequestMessage.Builder
builder) {
+ public RequestMessageV4.Builder buildMessage(final
RequestMessageV4.Builder builder) {
return builder;
}
@@ -97,7 +97,7 @@ public abstract class Client {
/**
* Chooses a {@link Connection} to write the message to.
*/
- protected abstract Connection chooseConnection(final RequestMessage msg)
throws TimeoutException, ConnectionException;
+ protected abstract Connection chooseConnection(final RequestMessageV4 msg)
throws TimeoutException, ConnectionException;
/**
* Asynchronous close of the {@code Client}.
@@ -352,26 +352,25 @@ public abstract class Client {
// need to call buildMessage() right away to get client specific
configurations, that way request specific
// ones can override as needed
- final RequestMessage.Builder request =
buildMessage(RequestMessage.build(Tokens.OPS_EVAL))
- .add(Tokens.ARGS_GREMLIN, gremlin)
- .add(Tokens.ARGS_BATCH_SIZE, batchSize);
+ final RequestMessageV4.Builder request =
buildMessage(RequestMessageV4.build(gremlin))
+ .addChunkSize(batchSize);
// apply settings if they were made available
- options.getTimeout().ifPresent(timeout ->
request.add(Tokens.ARGS_EVAL_TIMEOUT, timeout));
- options.getParameters().ifPresent(params ->
request.addArg(Tokens.ARGS_BINDINGS, params));
- options.getAliases().ifPresent(aliases ->
request.addArg(Tokens.ARGS_ALIASES, aliases));
+// options.getTimeout().ifPresent(timeout ->
request.add(Tokens.ARGS_EVAL_TIMEOUT, timeout));
+ options.getParameters().ifPresent(params ->
request.addBindings(params));
+ options.getAliases().ifPresent(aliases -> {if (aliases.get("g") !=
null) request.addG(aliases.get("g")); });
options.getOverrideRequestId().ifPresent(request::overrideRequestId);
- options.getUserAgent().ifPresent(userAgent ->
request.addArg(Tokens.ARGS_USER_AGENT, userAgent));
- options.getLanguage().ifPresent(lang ->
request.addArg(Tokens.ARGS_LANGUAGE, lang));
- options.getMaterializeProperties().ifPresent(mp ->
request.addArg(Tokens.ARGS_MATERIALIZE_PROPERTIES, mp));
+// options.getUserAgent().ifPresent(userAgent ->
request.addArg(Tokens.ARGS_USER_AGENT, userAgent));
+ options.getLanguage().ifPresent(lang -> request.addLanguage(lang));
+// options.getMaterializeProperties().ifPresent(mp ->
request.addArg(Tokens.ARGS_MATERIALIZE_PROPERTIES, mp));
return submitAsync(request.create());
}
/**
- * A low-level method that allows the submission of a manually constructed
{@link RequestMessage}.
+ * A low-level method that allows the submission of a manually constructed
{@link RequestMessageV4}.
*/
- public CompletableFuture<ResultSet> submitAsync(final RequestMessage msg) {
+ public CompletableFuture<ResultSet> submitAsync(final RequestMessageV4
msg) {
if (isClosing()) throw new IllegalStateException("Client is closed");
if (!initialized)
@@ -497,20 +496,20 @@ public abstract class Client {
* from that host's connection pool.
*/
@Override
- protected Connection chooseConnection(final RequestMessage msg) throws
TimeoutException, ConnectionException {
+ protected Connection chooseConnection(final RequestMessageV4 msg)
throws TimeoutException, ConnectionException {
final Iterator<Host> possibleHosts;
- if (msg.optionalArgs(Tokens.ARGS_HOST).isPresent()) {
- // looking at this code about putting the Host on the
RequestMessage in light of 3.5.4, not sure
- // this is being used as intended here. server side usage is
to place the channel.remoteAddress
- // in this token in the status metadata for the response.
can't remember why it is being used this
- // way here exactly. created TINKERPOP-2821 to examine this
more carefully to clean this up in a
- // future version.
- final Host host = (Host) msg.getArgs().get(Tokens.ARGS_HOST);
- msg.getArgs().remove(Tokens.ARGS_HOST);
- possibleHosts = IteratorUtils.of(host);
- } else {
+// if (msg.optionalArgs(Tokens.ARGS_HOST).isPresent()) {
+// // looking at this code about putting the Host on the
RequestMessage in light of 3.5.4, not sure
+// // this is being used as intended here. server side usage is
to place the channel.remoteAddress
+// // in this token in the status metadata for the response.
can't remember why it is being used this
+// // way here exactly. created TINKERPOP-2821 to examine this
more carefully to clean this up in a
+// // future version.
+// final Host host = (Host) msg.getArgs().get(Tokens.ARGS_HOST);
+// msg.getArgs().remove(Tokens.ARGS_HOST);
+// possibleHosts = IteratorUtils.of(host);
+// } else {
possibleHosts =
this.cluster.loadBalancingStrategy().select(msg);
- }
+// }
// try a random host if none are marked available. maybe it will
reconnect in the meantime. better than
// going straight to a fast NoHostAvailableException as was the
case in versions 3.5.4 and earlier
@@ -652,16 +651,14 @@ public abstract class Client {
try {
// need to call buildMessage() right away to get client
specific configurations, that way request specific
// ones can override as needed
- final RequestMessage.Builder request =
buildMessage(RequestMessage.build(Tokens.OPS_BYTECODE)
- .processor("traversal")
- .addArg(Tokens.ARGS_GREMLIN, bytecode));
+ final RequestMessageV4.Builder request =
buildMessage(RequestMessageV4.build(bytecode));
// apply settings if they were made available
- options.getBatchSize().ifPresent(batchSize ->
request.add(Tokens.ARGS_BATCH_SIZE, batchSize));
- options.getTimeout().ifPresent(timeout ->
request.add(Tokens.ARGS_EVAL_TIMEOUT, timeout));
+ options.getBatchSize().ifPresent(batchSize ->
request.addChunkSize(batchSize));
+// options.getTimeout().ifPresent(timeout ->
request.add(Tokens.ARGS_EVAL_TIMEOUT, timeout));
options.getOverrideRequestId().ifPresent(request::overrideRequestId);
- options.getUserAgent().ifPresent(userAgent ->
request.add(Tokens.ARGS_USER_AGENT, userAgent));
- options.getMaterializeProperties().ifPresent(mp ->
request.addArg(Tokens.ARGS_MATERIALIZE_PROPERTIES, mp));
+// options.getUserAgent().ifPresent(userAgent ->
request.add(Tokens.ARGS_USER_AGENT, userAgent));
+// options.getMaterializeProperties().ifPresent(mp ->
request.addArg(Tokens.ARGS_MATERIALIZE_PROPERTIES, mp));
return submitAsync(request.create());
} catch (RuntimeException re) {
@@ -672,18 +669,20 @@ public abstract class Client {
}
@Override
- public CompletableFuture<ResultSet> submitAsync(final RequestMessage
msg) {
- final RequestMessage.Builder builder = RequestMessage.from(msg);
+ public CompletableFuture<ResultSet> submitAsync(final RequestMessageV4
msg) {
+ final RequestMessageV4.Builder builder =
RequestMessageV4.from(msg);
// only add aliases which aren't already present. if they are
present then they represent request level
// overrides which should be mucked with
- if (!aliases.isEmpty()) {
- final Map original = (Map)
msg.getArgs().getOrDefault(Tokens.ARGS_ALIASES, Collections.emptyMap());
- aliases.forEach((k, v) -> {
- if (!original.containsKey(k))
- builder.addArg(Tokens.ARGS_ALIASES, aliases);
- });
- }
+ // TODO: replaced this with ARGS_G as we don't allow a map of
aliases anymore.
+// if (!aliases.isEmpty()) {
+// final Map original = (Map)
msg.getArgs().getOrDefault(Tokens.ARGS_ALIASES, Collections.emptyMap());
+// aliases.forEach((k, v) -> {
+// if (!original.containsKey(k))
+// builder.addArg(Tokens.ARGS_ALIASES, aliases);
+// });
+// }
+ builder.addG(aliases.get("g"));
return super.submitAsync(builder.create());
}
@@ -704,10 +703,11 @@ public abstract class Client {
}
@Override
- public RequestMessage.Builder buildMessage(final
RequestMessage.Builder builder) {
+ public RequestMessageV4.Builder buildMessage(final
RequestMessageV4.Builder builder) {
if (close.isDone()) throw new IllegalStateException("Client is
closed");
- if (!aliases.isEmpty())
- builder.addArg(Tokens.ARGS_ALIASES, aliases);
+// TODO: aliases not supported. replace with ARG_G.
+// if (!aliases.isEmpty())
+// builder.addArg(Tokens.ARGS_ALIASES, aliases);
return client.buildMessage(builder);
}
@@ -726,7 +726,7 @@ public abstract class Client {
* Delegates to the underlying {@link Client.ClusteredClient}.
*/
@Override
- protected Connection chooseConnection(final RequestMessage msg) throws
TimeoutException, ConnectionException {
+ protected Connection chooseConnection(final RequestMessageV4 msg)
throws TimeoutException, ConnectionException {
if (close.isDone()) throw new IllegalStateException("Client is
closed");
return client.chooseConnection(msg);
}
@@ -783,13 +783,14 @@ public abstract class Client {
}
/**
- * Adds the {@link Tokens#ARGS_SESSION} value to every {@link
RequestMessage}.
+ * Adds the {@link Tokens#ARGS_SESSION} value to every {@link
RequestMessageV4}.
*/
@Override
- public RequestMessage.Builder buildMessage(final
RequestMessage.Builder builder) {
- builder.processor("session");
- builder.addArg(Tokens.ARGS_SESSION, sessionId);
- builder.addArg(Tokens.ARGS_MANAGE_TRANSACTION, manageTransactions);
+ public RequestMessageV4.Builder buildMessage(final
RequestMessageV4.Builder builder) {
+// TODO: replace this with new Transaction API later.
+// builder.processor("session");
+// builder.addArg(Tokens.ARGS_SESSION, sessionId);
+// builder.addArg(Tokens.ARGS_MANAGE_TRANSACTION,
manageTransactions);
return builder;
}
@@ -797,7 +798,7 @@ public abstract class Client {
* Since the session is bound to a single host, simply borrow a
connection from that pool.
*/
@Override
- protected Connection chooseConnection(final RequestMessage msg) throws
TimeoutException, ConnectionException {
+ protected Connection chooseConnection(final RequestMessageV4 msg)
throws TimeoutException, ConnectionException {
return
connectionPool.borrowConnection(cluster.connectionPoolSettings().maxWaitForConnection,
TimeUnit.MILLISECONDS);
}
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
index 870bb8bdcc..c5dc54ca60 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
@@ -29,7 +29,7 @@ import io.netty.util.concurrent.Future;
import org.apache.commons.configuration2.Configuration;
import org.apache.tinkerpop.gremlin.util.MessageSerializer;
import org.apache.tinkerpop.gremlin.util.Tokens;
-import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4;
import org.apache.tinkerpop.gremlin.util.ser.Serializers;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.nio.NioEventLoopGroup;
@@ -501,7 +501,7 @@ public final class Cluster {
return manager.authProps;
}
- RequestMessage.Builder validationRequest() {
+ RequestMessageV4.Builder validationRequest() {
return manager.validationRequest.get();
}
@@ -597,7 +597,7 @@ public final class Cluster {
private int reconnectInterval = Connection.RECONNECT_INTERVAL;
private int resultIterationBatchSize =
Connection.RESULT_ITERATION_BATCH_SIZE;
private long keepAliveInterval = Connection.KEEP_ALIVE_INTERVAL;
- private String channelizer =
Channelizer.WebSocketChannelizer.class.getName();
+ private String channelizer =
Channelizer.HttpChannelizer.class.getName();
private boolean enableSsl = false;
private String keyStore = null;
private String keyStorePassword = null;
@@ -1037,7 +1037,7 @@ public final class Cluster {
public Cluster create() {
if (addresses.size() == 0) addContactPoint("localhost");
- if (null == serializer) serializer =
Serializers.GRAPHBINARY_V1.simpleInstance();
+ if (null == serializer) serializer =
Serializers.GRAPHBINARY_V4.simpleInstance();
return new Cluster(this);
}
}
@@ -1077,7 +1077,7 @@ public final class Cluster {
private final LoadBalancingStrategy loadBalancingStrategy;
private final AuthProperties authProps;
private final Optional<SslContext> sslContextOptional;
- private final Supplier<RequestMessage.Builder> validationRequest;
+ private final Supplier<RequestMessageV4.Builder> validationRequest;
private final UnaryOperator<FullHttpRequest> interceptor;
/**
@@ -1167,7 +1167,7 @@ public final class Cluster {
this.connectionScheduler = new
ScheduledThreadPoolExecutor(contactPoints.size() + 1,
new
BasicThreadFactory.Builder().namingPattern("gremlin-driver-conn-scheduler-%d").build());
- validationRequest = () ->
RequestMessage.build(Tokens.OPS_EVAL).add(Tokens.ARGS_GREMLIN,
builder.validationRequest);
+ validationRequest = () ->
RequestMessageV4.build(builder.validationRequest);
}
private void validateBuilder(final Builder builder) {
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
index db79fe065f..c5c76e83b3 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -22,7 +22,7 @@ import org.apache.tinkerpop.gremlin.util.Tokens;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
-import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
@@ -61,8 +61,8 @@ final class Connection {
private final String creatingThread;
private final String createdTimestamp;
- public static final int MAX_IN_PROCESS = 4;
- public static final int MIN_IN_PROCESS = 1;
+ public static final int MAX_IN_PROCESS = 1;
+ public static final int MIN_IN_PROCESS = 0;
public static final int MAX_WAIT_FOR_CONNECTION = 16000;
public static final int MAX_WAIT_FOR_CLOSE = 3000;
public static final int MAX_CONTENT_LENGTH = 10 * 1024 * 1024;
@@ -209,7 +209,7 @@ final class Connection {
return future;
}
- public ChannelPromise write(final RequestMessage requestMessage, final
CompletableFuture<ResultSet> resultQueueSetup) {
+ public ChannelPromise write(final RequestMessageV4 requestMessage, final
CompletableFuture<ResultSet> resultQueueSetup) {
// dont allow the same request id to be used as one that is already in
the queue
if (pending.containsKey(requestMessage.getRequestId()))
throw new IllegalStateException(String.format("There is already a
request pending with an id of: %s", requestMessage.getRequestId()));
@@ -310,10 +310,11 @@ final class Connection {
// the session close message was removed in 3.5.0 after
deprecation at 3.3.11. That removal was perhaps
// a bit hasty as session semantics may still require this message
in certain cases. Until we can look
// at this in more detail, it seems best to bring back the old
functionality to the driver.
- if (client instanceof Client.SessionedClient) {
+ // TODO: commented due to not supporting sessions with HTTP.
+ /*if (client instanceof Client.SessionedClient) {
final boolean forceClose =
client.getSettings().getSession().get().isForceClosed();
- final RequestMessage closeMessage = client.buildMessage(
-
RequestMessage.build(Tokens.OPS_CLOSE).addArg(Tokens.ARGS_FORCE,
forceClose)).create();
+ final RequestMessageV4 closeMessage = client.buildMessage(
+
RequestMessageV4.build(Tokens.OPS_CLOSE).addArg(Tokens.ARGS_FORCE,
forceClose)).create();
final CompletableFuture<ResultSet> closed = new
CompletableFuture<>();
@@ -337,7 +338,7 @@ final class Connection {
((Client.SessionedClient) client).getSessionId());
logger.warn(msg, ex);
}
- }
+ }*/
// take a defensive posture here in the event the channelizer
didn't get initialized somehow and a
// close() on the Connection is still called
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
index ee31353f4b..78b1c81391 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java
@@ -22,7 +22,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
import org.apache.tinkerpop.gremlin.util.ExceptionHelper;
-import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4;
import org.apache.tinkerpop.gremlin.util.TimeUtil;
import java.util.ArrayList;
@@ -51,8 +51,8 @@ final class ConnectionPool {
public static final int MIN_POOL_SIZE = 2;
public static final int MAX_POOL_SIZE = 8;
- public static final int MIN_SIMULTANEOUS_USAGE_PER_CONNECTION = 8;
- public static final int MAX_SIMULTANEOUS_USAGE_PER_CONNECTION = 16;
+ public static final int MIN_SIMULTANEOUS_USAGE_PER_CONNECTION = 0;
+ public static final int MAX_SIMULTANEOUS_USAGE_PER_CONNECTION = 1;
// A small buffer in millis used for comparing if a connection was created
within a certain amount of time.
private static final int CONNECTION_SETUP_TIME_DELTA = 25;
@@ -523,7 +523,7 @@ final class ConnectionPool {
// pool needs it. for now that seems like an unnecessary added bit
of complexity for dealing with this
// error state
connection = connectionFactory.create(this);
- final RequestMessage ping =
client.buildMessage(cluster.validationRequest()).create();
+ final RequestMessageV4 ping =
client.buildMessage(cluster.validationRequest()).create();
final CompletableFuture<ResultSet> f = new CompletableFuture<>();
connection.write(ping, f);
f.get().all().get();
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/LoadBalancingStrategy.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/LoadBalancingStrategy.java
index 4455a4ed6e..153ac65fd7 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/LoadBalancingStrategy.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/LoadBalancingStrategy.java
@@ -18,7 +18,7 @@
*/
package org.apache.tinkerpop.gremlin.driver;
-import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4;
import java.util.ArrayList;
import java.util.Collection;
@@ -41,13 +41,13 @@ public interface LoadBalancingStrategy extends
Host.Listener {
public void initialize(final Cluster cluster, final Collection<Host>
hosts);
/**
- * Provide an ordered list of hosts to send the given {@link
RequestMessage} to.
+ * Provide an ordered list of hosts to send the given {@link
RequestMessageV4} to.
*/
- public Iterator<Host> select(final RequestMessage msg);
+ public Iterator<Host> select(final RequestMessageV4 msg);
/**
* A simple round-robin strategy that simply selects the next host in the
{@link Cluster} to send the
- * {@link RequestMessage} to.
+ * {@link RequestMessageV4} to.
*/
public static class RoundRobin implements LoadBalancingStrategy {
@@ -61,7 +61,7 @@ public interface LoadBalancingStrategy extends Host.Listener {
}
@Override
- public Iterator<Host> select(final RequestMessage msg) {
+ public Iterator<Host> select(final RequestMessageV4 msg) {
final List<Host> hosts = new ArrayList<>();
// a host could be marked as dead in which case we dont need to
send messages to it - just skip it for
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestOptions.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestOptions.java
index 63d7dad91a..3f66e2fbb0 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestOptions.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestOptions.java
@@ -41,7 +41,7 @@ public final class RequestOptions {
private final UUID overrideRequestId;
private final String userAgent;
private final String language;
- private final String materializeProperties;
+// private final String materializeProperties;
private RequestOptions(final Builder builder) {
this.aliases = builder.aliases;
@@ -51,7 +51,7 @@ public final class RequestOptions {
this.overrideRequestId = builder.overrideRequestId;
this.userAgent = builder.userAgent;
this.language = builder.language;
- this.materializeProperties = builder.materializeProperties;
+// this.materializeProperties = builder.materializeProperties;
}
public Optional<UUID> getOverrideRequestId() {
@@ -82,7 +82,7 @@ public final class RequestOptions {
return Optional.ofNullable(language);
}
- public Optional<String> getMaterializeProperties() { return
Optional.ofNullable(materializeProperties); }
+// public Optional<String> getMaterializeProperties() { return
Optional.ofNullable(materializeProperties); }
public static Builder build() {
return new Builder();
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
index 4d60b8dfb9..b41e416da0 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ResultSet.java
@@ -18,7 +18,7 @@
*/
package org.apache.tinkerpop.gremlin.driver;
-import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4;
import java.util.ArrayList;
import java.util.Collections;
@@ -53,13 +53,13 @@ import java.util.stream.StreamSupport;
public final class ResultSet implements Iterable<Result> {
private final ResultQueue resultQueue;
private final ExecutorService executor;
- private final RequestMessage originalRequestMessage;
+ private final RequestMessageV4 originalRequestMessage;
private final Host host;
private final CompletableFuture<Void> readCompleted;
public ResultSet(final ResultQueue resultQueue, final ExecutorService
executor,
- final CompletableFuture<Void> readCompleted, final
RequestMessage originalRequestMessage,
+ final CompletableFuture<Void> readCompleted, final
RequestMessageV4 originalRequestMessage,
final Host host) {
this.executor = executor;
this.host = host;
@@ -68,7 +68,7 @@ public final class ResultSet implements Iterable<Result> {
this.originalRequestMessage = originalRequestMessage;
}
- public RequestMessage getOriginalRequestMessage() {
+ public RequestMessageV4 getOriginalRequestMessage() {
return originalRequestMessage;
}
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
index 0845dbf547..9c0a72ceb6 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
@@ -20,7 +20,7 @@ package org.apache.tinkerpop.gremlin.driver;
import org.apache.commons.configuration2.Configuration;
import org.apache.tinkerpop.gremlin.util.MessageSerializer;
-import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV1;
+import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import org.yaml.snakeyaml.LoaderOptions;
import org.yaml.snakeyaml.TypeDescription;
@@ -396,9 +396,9 @@ final class Settings {
/**
* The constructor for the channel that connects to the server. This
value should be the fully qualified
* class name of a Gremlin Driver {@link Channelizer} implementation.
By default this value is set to
- * {@link Channelizer.WebSocketChannelizer}.
+ * {@link Channelizer.HttpChannelizer}.
*/
- public String channelizer =
Channelizer.WebSocketChannelizer.class.getName();
+ public String channelizer =
Channelizer.HttpChannelizer.class.getName();
/**
* A valid Gremlin script that can be used to test remote operations.
@@ -417,9 +417,9 @@ final class Settings {
/**
* The fully qualified class name of the {@link MessageSerializer}
that will be used to communicate with the
* server. Note that the serializer configured on the client should be
supported by the server configuration.
- * By default the setting is configured to {@link
GraphBinaryMessageSerializerV1}.
+ * By default the setting is configured to {@link
GraphBinaryMessageSerializerV4}.
*/
- public String className =
GraphBinaryMessageSerializerV1.class.getCanonicalName();
+ public String className =
GraphBinaryMessageSerializerV4.class.getCanonicalName();
/**
* The configuration for the specified serializer with the {@link
#className}.
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java
index 67f7e3d95c..06377e31be 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java
@@ -31,18 +31,19 @@ import org.apache.tinkerpop.gremlin.driver.UserAgent;
import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
import org.apache.tinkerpop.gremlin.util.MessageSerializer;
-import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4;
import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode;
+import org.apache.tinkerpop.gremlin.util.ser.MessageTextSerializerV4;
import org.apache.tinkerpop.gremlin.util.ser.SerTokens;
import java.util.List;
import java.util.function.UnaryOperator;
/**
- * Converts {@link RequestMessage} to a {@code HttpRequest}.
+ * Converts {@link RequestMessageV4} to a {@code HttpRequest}.
*/
@ChannelHandler.Sharable
-public final class HttpGremlinRequestEncoder extends
MessageToMessageEncoder<RequestMessage> {
+public final class HttpGremlinRequestEncoder extends
MessageToMessageEncoder<RequestMessageV4> {
private final MessageSerializer<?> serializer;
private final boolean userAgentEnabled;
private final UnaryOperator<FullHttpRequest> interceptor;
@@ -61,7 +62,7 @@ public final class HttpGremlinRequestEncoder extends
MessageToMessageEncoder<Req
}
@Override
- protected void encode(final ChannelHandlerContext channelHandlerContext,
final RequestMessage requestMessage, final List<Object> objects) throws
Exception {
+ protected void encode(final ChannelHandlerContext channelHandlerContext,
final RequestMessageV4 requestMessage, final List<Object> objects) throws
Exception {
final String mimeType = serializer.mimeTypesSupported()[0];
// only GraphSON3 and GraphBinary recommended for serialization of
Bytecode requests
if (requestMessage.getArg("gremlin") instanceof Bytecode &&
@@ -73,7 +74,7 @@ public final class HttpGremlinRequestEncoder extends
MessageToMessageEncoder<Req
}
try {
- final ByteBuf buffer =
serializer.serializeRequestAsBinary(requestMessage,
channelHandlerContext.alloc());
+ final ByteBuf buffer =
((MessageTextSerializerV4)serializer).serializeRequestMessageV4(requestMessage,
channelHandlerContext.alloc());
final FullHttpRequest request = new
DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/", buffer);
request.headers().add(HttpHeaderNames.CONTENT_TYPE, mimeType);
request.headers().add(HttpHeaderNames.CONTENT_LENGTH,
buffer.readableBytes());
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java
index 00a67b42eb..3574cd2c91 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java
@@ -23,7 +23,7 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
-import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4;
import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode;
@@ -45,16 +45,16 @@ public abstract class AbstractClient implements
SimpleClient {
group = new NioEventLoopGroup(1, threadFactory);
}
- public abstract void writeAndFlush(final RequestMessage requestMessage)
throws Exception;
+ public abstract void writeAndFlush(final RequestMessageV4 requestMessage)
throws Exception;
@Override
- public void submit(final RequestMessage requestMessage, final
Consumer<ResponseMessage> callback) throws Exception {
+ public void submit(final RequestMessageV4 requestMessage, final
Consumer<ResponseMessage> callback) throws Exception {
callbackResponseHandler.callback = callback;
writeAndFlush(requestMessage);
}
@Override
- public List<ResponseMessage> submit(final RequestMessage requestMessage)
throws Exception {
+ public List<ResponseMessage> submit(final RequestMessageV4 requestMessage)
throws Exception {
// this is just a test client to force certain behaviors of the
server. hanging tests are a pain to deal with
// especially in travis as it's not always clear where the hang is. a
few reasonable timeouts might help
// make debugging easier when we look at logs
@@ -62,7 +62,7 @@ public abstract class AbstractClient implements SimpleClient {
}
@Override
- public CompletableFuture<List<ResponseMessage>> submitAsync(final
RequestMessage requestMessage) throws Exception {
+ public CompletableFuture<List<ResponseMessage>> submitAsync(final
RequestMessageV4 requestMessage) throws Exception {
final List<ResponseMessage> results = new ArrayList<>();
final CompletableFuture<List<ResponseMessage>> f = new
CompletableFuture<>();
callbackResponseHandler.callback = response -> {
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleClient.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleClient.java
index 43ace8237e..c6edb20d33 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleClient.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleClient.java
@@ -19,7 +19,7 @@
package org.apache.tinkerpop.gremlin.driver.simple;
import org.apache.tinkerpop.gremlin.util.Tokens;
-import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4;
import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
import java.io.Closeable;
@@ -33,20 +33,20 @@ import java.util.function.Consumer;
public interface SimpleClient extends Closeable {
public default void submit(final String gremlin, final
Consumer<ResponseMessage> callback) throws Exception {
-
submit(RequestMessage.build(Tokens.OPS_EVAL).addArg(Tokens.ARGS_GREMLIN,
gremlin).create(), callback);
+ submit(RequestMessageV4.build(gremlin).create(), callback);
}
- public void submit(final RequestMessage requestMessage, final
Consumer<ResponseMessage> callback) throws Exception;
+ public void submit(final RequestMessageV4 requestMessage, final
Consumer<ResponseMessage> callback) throws Exception;
public default List<ResponseMessage> submit(final String gremlin) throws
Exception {
- return
submit(RequestMessage.build(Tokens.OPS_EVAL).addArg(Tokens.ARGS_GREMLIN,
gremlin).create());
+ return submit(RequestMessageV4.build(gremlin).create());
}
- public List<ResponseMessage> submit(final RequestMessage requestMessage)
throws Exception;
+ public List<ResponseMessage> submit(final RequestMessageV4 requestMessage)
throws Exception;
public default CompletableFuture<List<ResponseMessage>> submitAsync(final
String gremlin) throws Exception {
- return
submitAsync(RequestMessage.build(Tokens.OPS_EVAL).addArg(Tokens.ARGS_GREMLIN,
gremlin).create());
+ return submitAsync(RequestMessageV4.build(gremlin).create());
}
- public CompletableFuture<List<ResponseMessage>> submitAsync(final
RequestMessage requestMessage) throws Exception;
+ public CompletableFuture<List<ResponseMessage>> submitAsync(final
RequestMessageV4 requestMessage) throws Exception;
}
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleHttpClient.java
similarity index 74%
copy from
gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java
copy to
gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleHttpClient.java
index a5de7fb82e..fd97fa3d6e 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/SimpleHttpClient.java
@@ -24,11 +24,14 @@ import io.netty.handler.codec.http.EmptyHttpHeaders;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import org.apache.tinkerpop.gremlin.driver.HandshakeInterceptor;
+import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinRequestEncoder;
+import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinResponseDecoder;
import org.apache.tinkerpop.gremlin.util.MessageSerializer;
import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler;
import
org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinRequestEncoder;
import
org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinResponseDecoder;
-import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
@@ -39,7 +42,7 @@ import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
-import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV1;
+import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4;
import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,28 +56,28 @@ import java.util.concurrent.TimeUnit;
*
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
-public class WebSocketClient extends AbstractClient {
- private static final Logger logger =
LoggerFactory.getLogger(WebSocketClient.class);
+public class SimpleHttpClient extends AbstractClient {
+ private static final Logger logger =
LoggerFactory.getLogger(SimpleHttpClient.class);
private final Channel channel;
- public WebSocketClient() {
- this(URI.create("ws://localhost:8182/gremlin"));
+ public SimpleHttpClient() {
+ this(URI.create("http://localhost:8182/gremlin"));
}
- public WebSocketClient(final URI uri) {
- super("ws-client-%d");
+ public SimpleHttpClient(final URI uri) {
+ super("simple-http-client-%d");
final Bootstrap b = new Bootstrap().group(group);
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
final String protocol = uri.getScheme();
- if (!"ws".equalsIgnoreCase(protocol) &&
!"wss".equalsIgnoreCase(protocol))
+ if (!"http".equalsIgnoreCase(protocol) &&
!"https".equalsIgnoreCase(protocol))
throw new IllegalArgumentException("Unsupported protocol: " +
protocol);
final String host = uri.getHost();
final int port;
if (uri.getPort() == -1) {
- if ("ws".equalsIgnoreCase(protocol)) {
+ if ("http".equalsIgnoreCase(protocol)) {
port = 80;
- } else if ("wss".equalsIgnoreCase(protocol)) {
+ } else if ("https".equalsIgnoreCase(protocol)) {
port = 443;
} else {
port = -1;
@@ -84,7 +87,7 @@ public class WebSocketClient extends AbstractClient {
}
try {
- final boolean ssl = "wss".equalsIgnoreCase(protocol);
+ final boolean ssl = "https".equalsIgnoreCase(protocol);
final SslContext sslCtx;
if (ssl) {
sslCtx = SslContextBuilder.forClient()
@@ -92,9 +95,10 @@ public class WebSocketClient extends AbstractClient {
} else {
sslCtx = null;
}
- final WebSocketClientHandler wsHandler = new
WebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(
- uri, WebSocketVersion.V13, null, true,
EmptyHttpHeaders.INSTANCE, 65536), 10000, false);
- final MessageSerializer<GraphBinaryMapper> serializer = new
GraphBinaryMessageSerializerV1();
+
+// final WebSocketClientHandler wsHandler = new
WebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(
+// uri, WebSocketVersion.V13, null, true,
EmptyHttpHeaders.INSTANCE, 65536), 10000, false);
+ final MessageSerializer<GraphBinaryMapper> serializer = new
GraphBinaryMessageSerializerV4();
b.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
@@ -106,22 +110,24 @@ public class WebSocketClient extends AbstractClient {
p.addLast(
new HttpClientCodec(),
new HttpObjectAggregator(65536),
- wsHandler,
- new WebSocketGremlinRequestEncoder(true,
serializer),
- new
WebSocketGremlinResponseDecoder(serializer),
+// wsHandler,
+// new WebSocketGremlinRequestEncoder(true,
serializer),
+// new
WebSocketGremlinResponseDecoder(serializer),
+ new HttpGremlinRequestEncoder(serializer,
HandshakeInterceptor.NO_OP, false),
+ new HttpGremlinResponseDecoder(serializer),
callbackResponseHandler);
}
});
channel = b.connect(uri.getHost(), uri.getPort()).sync().channel();
- wsHandler.handshakeFuture().get(30, TimeUnit.SECONDS);
+// wsHandler.handshakeFuture().get(30, TimeUnit.SECONDS);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@Override
- public void writeAndFlush(final RequestMessage requestMessage) throws
Exception {
+ public void writeAndFlush(final RequestMessageV4 requestMessage) throws
Exception {
channel.writeAndFlush(requestMessage);
}
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java
index a5de7fb82e..76cc274efb 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/WebSocketClient.java
@@ -53,88 +53,88 @@ import java.util.concurrent.TimeUnit;
*
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
-public class WebSocketClient extends AbstractClient {
- private static final Logger logger =
LoggerFactory.getLogger(WebSocketClient.class);
- private final Channel channel;
-
- public WebSocketClient() {
- this(URI.create("ws://localhost:8182/gremlin"));
- }
-
- public WebSocketClient(final URI uri) {
- super("ws-client-%d");
- final Bootstrap b = new Bootstrap().group(group);
- b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
-
- final String protocol = uri.getScheme();
- if (!"ws".equalsIgnoreCase(protocol) &&
!"wss".equalsIgnoreCase(protocol))
- throw new IllegalArgumentException("Unsupported protocol: " +
protocol);
- final String host = uri.getHost();
- final int port;
- if (uri.getPort() == -1) {
- if ("ws".equalsIgnoreCase(protocol)) {
- port = 80;
- } else if ("wss".equalsIgnoreCase(protocol)) {
- port = 443;
- } else {
- port = -1;
- }
- } else {
- port = uri.getPort();
- }
-
- try {
- final boolean ssl = "wss".equalsIgnoreCase(protocol);
- final SslContext sslCtx;
- if (ssl) {
- sslCtx = SslContextBuilder.forClient()
-
.trustManager(InsecureTrustManagerFactory.INSTANCE).build();
- } else {
- sslCtx = null;
- }
- final WebSocketClientHandler wsHandler = new
WebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(
- uri, WebSocketVersion.V13, null, true,
EmptyHttpHeaders.INSTANCE, 65536), 10000, false);
- final MessageSerializer<GraphBinaryMapper> serializer = new
GraphBinaryMessageSerializerV1();
- b.channel(NioSocketChannel.class)
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(final SocketChannel ch) {
- final ChannelPipeline p = ch.pipeline();
- if (sslCtx != null) {
- p.addLast(sslCtx.newHandler(ch.alloc(), host,
port));
- }
- p.addLast(
- new HttpClientCodec(),
- new HttpObjectAggregator(65536),
- wsHandler,
- new WebSocketGremlinRequestEncoder(true,
serializer),
- new
WebSocketGremlinResponseDecoder(serializer),
- callbackResponseHandler);
- }
- });
-
- channel = b.connect(uri.getHost(), uri.getPort()).sync().channel();
- wsHandler.handshakeFuture().get(30, TimeUnit.SECONDS);
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
-
- @Override
- public void writeAndFlush(final RequestMessage requestMessage) throws
Exception {
- channel.writeAndFlush(requestMessage);
- }
-
- @Override
- public void close() throws IOException {
- try {
- channel.close().get(30, TimeUnit.SECONDS);
- } catch (Exception ex) {
- logger.error("Failure closing simple WebSocketClient", ex);
- } finally {
- if (!group.shutdownGracefully().awaitUninterruptibly(30,
TimeUnit.SECONDS)) {
- logger.error("Could not cleanly shutdown thread pool on
WebSocketClient");
- }
- }
- }
-}
+//public class WebSocketClient extends AbstractClient {
+// private static final Logger logger =
LoggerFactory.getLogger(WebSocketClient.class);
+// private final Channel channel;
+//
+// public WebSocketClient() {
+// this(URI.create("ws://localhost:8182/gremlin"));
+// }
+//
+// public WebSocketClient(final URI uri) {
+// super("ws-client-%d");
+// final Bootstrap b = new Bootstrap().group(group);
+// b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+//
+// final String protocol = uri.getScheme();
+// if (!"ws".equalsIgnoreCase(protocol) &&
!"wss".equalsIgnoreCase(protocol))
+// throw new IllegalArgumentException("Unsupported protocol: " +
protocol);
+// final String host = uri.getHost();
+// final int port;
+// if (uri.getPort() == -1) {
+// if ("ws".equalsIgnoreCase(protocol)) {
+// port = 80;
+// } else if ("wss".equalsIgnoreCase(protocol)) {
+// port = 443;
+// } else {
+// port = -1;
+// }
+// } else {
+// port = uri.getPort();
+// }
+//
+// try {
+// final boolean ssl = "wss".equalsIgnoreCase(protocol);
+// final SslContext sslCtx;
+// if (ssl) {
+// sslCtx = SslContextBuilder.forClient()
+//
.trustManager(InsecureTrustManagerFactory.INSTANCE).build();
+// } else {
+// sslCtx = null;
+// }
+// final WebSocketClientHandler wsHandler = new
WebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(
+// uri, WebSocketVersion.V13, null, true,
EmptyHttpHeaders.INSTANCE, 65536), 10000, false);
+// final MessageSerializer<GraphBinaryMapper> serializer = new
GraphBinaryMessageSerializerV1();
+// b.channel(NioSocketChannel.class)
+// .handler(new ChannelInitializer<SocketChannel>() {
+// @Override
+// protected void initChannel(final SocketChannel ch) {
+// final ChannelPipeline p = ch.pipeline();
+// if (sslCtx != null) {
+// p.addLast(sslCtx.newHandler(ch.alloc(),
host, port));
+// }
+// p.addLast(
+// new HttpClientCodec(),
+// new HttpObjectAggregator(65536),
+// wsHandler,
+// new WebSocketGremlinRequestEncoder(true,
serializer),
+// new
WebSocketGremlinResponseDecoder(serializer),
+// callbackResponseHandler);
+// }
+// });
+//
+// channel = b.connect(uri.getHost(),
uri.getPort()).sync().channel();
+// wsHandler.handshakeFuture().get(30, TimeUnit.SECONDS);
+// } catch (Exception ex) {
+// throw new RuntimeException(ex);
+// }
+// }
+//
+// @Override
+// public void writeAndFlush(final RequestMessage requestMessage) throws
Exception {
+// channel.writeAndFlush(requestMessage);
+// }
+//
+// @Override
+// public void close() throws IOException {
+// try {
+// channel.close().get(30, TimeUnit.SECONDS);
+// } catch (Exception ex) {
+// logger.error("Failure closing simple WebSocketClient", ex);
+// } finally {
+// if (!group.shutdownGracefully().awaitUninterruptibly(30,
TimeUnit.SECONDS)) {
+// logger.error("Could not cleanly shutdown thread pool on
WebSocketClient");
+// }
+// }
+// }
+//}
diff --git
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
index 50e8769a2b..1a301dfa33 100644
---
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
+++
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ResultSetTest.java
@@ -18,7 +18,7 @@
*/
package org.apache.tinkerpop.gremlin.driver;
-import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4;
import org.junit.Before;
import org.junit.Test;
@@ -51,7 +51,7 @@ public class ResultSetTest extends AbstractResultQueueTest {
@Before
public void setupThis() {
- resultSet = new ResultSet(resultQueue, pool, readCompleted,
RequestMessage.build("traversal").create(), null);
+ resultSet = new ResultSet(resultQueue, pool, readCompleted,
RequestMessageV4.build("traversal").create(), null);
}
@Test
diff --git
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
index 2b2f5342a4..ca58c9253c 100644
---
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
+++
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
@@ -273,52 +273,106 @@ public class HttpGremlinEndpointHandler extends
ChannelInboundHandlerAdapter {
}
ctx.writeAndFlush(responseHeader);
- try {
- switch (requestMessage.getOp()) {
- case "":
- case Tokens.OPS_EVAL:
- iterateScriptEvalResult(requestCtx,
serializer.getValue1(), requestMessage);
- break;
- case Tokens.OPS_BYTECODE:
- iterateTraversal(requestCtx,
serializer.getValue1(), translateBytecodeToTraversal(requestCtx));
- break;
- case Tokens.OPS_INVALID:
- final String msgInvalid =
- String.format("Message could not be
parsed. Check the format of the request. [%s]", requestMessage);
- throw new ProcessingException(msgInvalid,
- ResponseMessage.build(requestMessage)
-
.code(ResponseStatusCode.REQUEST_ERROR_MALFORMED_REQUEST)
- .statusMessage(msgInvalid)
- .create());
- default:
- final String msgDefault =
- String.format("Message with op code
[%s] is not recognized.", requestMessage.getOp());
- throw new ProcessingException(msgDefault,
- ResponseMessage.build(requestMessage)
-
.code(ResponseStatusCode.REQUEST_ERROR_MALFORMED_REQUEST)
- .statusMessage(msgDefault)
- .create());
- }
- } catch (ProcessingException ope) {
- logger.warn(ope.getMessage(), ope);
- writeError(requestCtx, ope.getResponseMessage(),
serializer.getValue1());
+ switch (requestMessage.getOp()) {
+ case "":
+ case Tokens.OPS_EVAL:
+ iterateScriptEvalResult(requestCtx,
serializer.getValue1(), requestMessage);
+ break;
+ case Tokens.OPS_BYTECODE:
+ iterateTraversal(requestCtx,
serializer.getValue1(), translateBytecodeToTraversal(requestCtx));
+ break;
+ case Tokens.OPS_INVALID:
+ final String msgInvalid =
+ String.format("Message could not be
parsed. Check the format of the request. [%s]", requestMessage);
+ throw new ProcessingException(msgInvalid,
+ ResponseMessage.build(requestMessage)
+
.code(ResponseStatusCode.REQUEST_ERROR_MALFORMED_REQUEST)
+ .statusMessage(msgInvalid)
+ .create());
+ default:
+ final String msgDefault =
+ String.format("Message with op code [%s]
is not recognized.", requestMessage.getOp());
+ throw new ProcessingException(msgDefault,
+ ResponseMessage.build(requestMessage)
+
.code(ResponseStatusCode.REQUEST_ERROR_MALFORMED_REQUEST)
+ .statusMessage(msgDefault)
+ .create());
}
} catch (Exception ex) {
- // send the error response here and don't rely on
exception caught because it might not have the
- // context on whether to close the connection or not,
based on keepalive.
- final Throwable t = ExceptionHelper.getRootCause(ex);
- if (t instanceof TooLongFrameException) {
+ Throwable t = ex;
+ if (ex instanceof UndeclaredThrowableException)
+ t = t.getCause();
+
+ // if any exception in the chain is TemporaryException or
Failure then we should respond with the
+ // right error code so that the client knows to retry
+ final Optional<Throwable> possibleSpecialException =
determineIfSpecialException(ex);
+ if (possibleSpecialException.isPresent()) {
+ final Throwable special =
possibleSpecialException.get();
+ final ResponseMessage.Builder specialResponseMsg =
ResponseMessage.build(requestMessage).
+ statusMessage(special.getMessage()).
+ statusAttributeException(special);
+ if (special instanceof TemporaryException) {
+
specialResponseMsg.code(ResponseStatusCode.SERVER_ERROR_TEMPORARY);
+ } else if (special instanceof Failure) {
+ final Failure failure = (Failure) special;
+
specialResponseMsg.code(ResponseStatusCode.SERVER_ERROR_FAIL_STEP).
+
statusAttribute(Tokens.STATUS_ATTRIBUTE_FAIL_STEP_MESSAGE, failure.format());
+ }
+ writeError(requestCtx, specialResponseMsg.create(),
serializer.getValue1());
+ } else if (t instanceof ProcessingException) {
+ final ProcessingException pe = (ProcessingException) t;
+ logger.warn(pe.getMessage(), pe);
+ writeError(requestCtx, pe.getResponseMessage(),
serializer.getValue1());
+ } else if (ExceptionHelper.getRootCause(ex) instanceof
TooLongFrameException) {
writeError(requestCtx,
ResponseMessage.build(requestId)
.code(ResponseStatusCode.SERVER_ERROR)
.statusMessage(t.getMessage() + " -
increase the maxContentLength")
.create(),
serializer.getValue1());
+ } else if (t instanceof InterruptedException || t
instanceof TraversalInterruptedException) {
+ final String errorMessage = String.format("A timeout
occurred during traversal evaluation of [%s] - consider increasing the limit
given to evaluationTimeout", msg);
+ logger.warn(errorMessage);
+ writeError(requestCtx,
+ ResponseMessage.build(requestMessage)
+
.code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
+ .statusMessage(errorMessage)
+ .statusAttributeException(ex)
+ .create(),
+ serializer.getValue1());
+ } else if (t instanceof TimedInterruptTimeoutException) {
+ // occurs when the TimedInterruptCustomizerProvider is
in play
+ final String errorMessage = String.format("A timeout
occurred within the script during evaluation of [%s] - consider increasing the
limit given to TimedInterruptCustomizerProvider", msg);
+ logger.warn(errorMessage);
+ writeError(requestCtx,
+
ResponseMessage.build(requestMessage).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
+ .statusMessage("Timeout during script
evaluation triggered by TimedInterruptCustomizerProvider")
+ .statusAttributeException(t).create(),
+ serializer.getValue1());
+ } else if (t instanceof TimeoutException) {
+ final String errorMessage = String.format("Script
evaluation exceeded the configured threshold for request [%s]", msg);
+ logger.warn(errorMessage, t);
+ writeError(requestCtx,
+
ResponseMessage.build(requestMessage).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
+ .statusMessage(t.getMessage())
+ .statusAttributeException(t).create(),
+ serializer.getValue1());
+ } else if (t instanceof MultipleCompilationErrorsException
&& t.getMessage().contains("Method too large") &&
+ ((MultipleCompilationErrorsException)
t).getErrorCollector().getErrorCount() == 1) {
+ final String errorMessage = String.format("The Gremlin
statement that was submitted exceeds the maximum compilation size allowed by
the JVM, please split it into multiple smaller statements - %s",
trimMessage(requestMessage));
+ logger.warn(errorMessage);
+ writeError(requestCtx,
+
ResponseMessage.build(requestMessage).code(ResponseStatusCode.SERVER_ERROR_EVALUATION)
+ .statusMessage(errorMessage)
+ .statusAttributeException(t).create(),
+ serializer.getValue1());
} else {
+ logger.warn(String.format("Exception processing a
Traversal on iteration for request [%s].", requestId), ex);
writeError(requestCtx,
- ResponseMessage.build(requestId)
- .code(ResponseStatusCode.SERVER_ERROR)
+ ResponseMessage.build(requestMessage)
+
.code(ResponseStatusCode.SERVER_ERROR_EVALUATION)
.statusMessage((t != null) ?
t.getMessage() : ex.getMessage())
+ .statusAttributeException(ex)
.create(),
serializer.getValue1());
}
@@ -529,97 +583,14 @@ public class HttpGremlinEndpointHandler extends
ChannelInboundHandlerAdapter {
return null;
}
- private void iterateTraversal(final Context context, MessageSerializer<?>
serializer, Traversal.Admin<?, ?> traversal) {
+ private void iterateTraversal(final Context context, MessageSerializer<?>
serializer, Traversal.Admin<?, ?> traversal)
+ throws InterruptedException {
final RequestMessage msg = context.getRequestMessage();
logger.debug("Traversal request {} for in thread {}",
msg.getRequestId(), Thread.currentThread().getName());
- try {
- try {
- // compile the traversal - without it getEndStep() has nothing
in it
- traversal.applyStrategies();
- handleIterator(context, new TraverserIterator(traversal),
serializer);
- } catch (Exception ex) {
- Throwable t = ex;
- if (ex instanceof UndeclaredThrowableException)
- t = t.getCause();
-
- // if any exception in the chain is TemporaryException or
Failure then we should respond with the
- // right error code so that the client knows to retry
- final Optional<Throwable> possibleSpecialException =
determineIfSpecialException(ex);
- if (possibleSpecialException.isPresent()) {
- final Throwable special = possibleSpecialException.get();
- final ResponseMessage.Builder specialResponseMsg =
ResponseMessage.build(msg).
- statusMessage(special.getMessage()).
- statusAttributeException(special);
- if (special instanceof TemporaryException) {
-
specialResponseMsg.code(ResponseStatusCode.SERVER_ERROR_TEMPORARY);
- } else if (special instanceof Failure) {
- final Failure failure = (Failure) special;
-
specialResponseMsg.code(ResponseStatusCode.SERVER_ERROR_FAIL_STEP).
-
statusAttribute(Tokens.STATUS_ATTRIBUTE_FAIL_STEP_MESSAGE, failure.format());
- }
- writeError(context, specialResponseMsg.create(),
serializer);
- } else if (t instanceof InterruptedException || t instanceof
TraversalInterruptedException) {
- final String errorMessage = String.format("A timeout
occurred during traversal evaluation of [%s] - consider increasing the limit
given to evaluationTimeout", msg);
- logger.warn(errorMessage);
- writeError(context,
- ResponseMessage.build(msg)
-
.code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
- .statusMessage(errorMessage)
- .statusAttributeException(ex)
- .create(),
- serializer);
- } else if (t instanceof TimedInterruptTimeoutException) {
- // occurs when the TimedInterruptCustomizerProvider is in
play
- final String errorMessage = String.format("A timeout
occurred within the script during evaluation of [%s] - consider increasing the
limit given to TimedInterruptCustomizerProvider", msg);
- logger.warn(errorMessage);
- writeError(context,
-
ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
- .statusMessage("Timeout during script
evaluation triggered by TimedInterruptCustomizerProvider")
- .statusAttributeException(t).create(),
- serializer);
- } else if (t instanceof TimeoutException) {
- final String errorMessage = String.format("Script
evaluation exceeded the configured threshold for request [%s]", msg);
- logger.warn(errorMessage, t);
- writeError(context,
-
ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
- .statusMessage(t.getMessage())
- .statusAttributeException(t).create(),
- serializer);
- } else if (t instanceof MultipleCompilationErrorsException &&
t.getMessage().contains("Method too large") &&
- ((MultipleCompilationErrorsException)
t).getErrorCollector().getErrorCount() == 1) {
- final String errorMessage = String.format("The Gremlin
statement that was submitted exceeds the maximum compilation size allowed by
the JVM, please split it into multiple smaller statements - %s",
trimMessage(msg));
- logger.warn(errorMessage);
- writeError(context,
-
ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_EVALUATION)
- .statusMessage(errorMessage)
- .statusAttributeException(t).create(),
- serializer);
- } else {
- logger.warn(String.format("Exception processing a
Traversal on iteration for request [%s].", msg.getRequestId()), ex);
- writeError(context,
- ResponseMessage.build(msg)
- .code(ResponseStatusCode.SERVER_ERROR)
- .statusMessage(ex.getMessage())
- .statusAttributeException(ex)
- .create(),
- serializer);
- }
- }
- } catch (Throwable t) {
- logger.warn(String.format("Exception processing a Traversal on
request [%s].", msg.getRequestId()), t);
- writeError(context,
- ResponseMessage.build(msg)
- .code(ResponseStatusCode.SERVER_ERROR)
- .statusMessage(t.getMessage())
- .statusAttributeException(t)
- .create(),
- serializer);
- if (t instanceof Error) {
- //Re-throw any errors to be handled by and set as the result
of evalFuture
- throw t;
- }
- }
+ // compile the traversal - without it getEndStep() has nothing in it
+ traversal.applyStrategies();
+ handleIterator(context, new TraverserIterator(traversal), serializer);
}
private void handleIterator(final Context context, final Iterator itty,
final MessageSerializer<?> serializer) throws InterruptedException {
diff --git
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
index 6bd68669ad..466f0195fc 100644
---
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
+++
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtil.java
@@ -36,6 +36,7 @@ import io.netty.util.CharsetUtil;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.tinkerpop.gremlin.server.Context;
import org.apache.tinkerpop.gremlin.server.GremlinServer;
+import org.apache.tinkerpop.gremlin.server.Settings;
import org.apache.tinkerpop.gremlin.server.util.MetricManager;
import org.apache.tinkerpop.gremlin.util.MessageSerializer;
import org.apache.tinkerpop.gremlin.util.Tokens;
@@ -177,9 +178,13 @@ public class HttpHandlerUtil {
final JsonNode requestIdNode = body.get(Tokens.REQUEST_ID);
final UUID requestId = null == requestIdNode ? UUID.randomUUID() :
UUID.fromString(requestIdNode.asText());
+ final JsonNode chunkSizeNode = body.get(Tokens.ARGS_BATCH_SIZE);
+ final Integer chunkSize = null == chunkSizeNode ? null :
chunkSizeNode.asInt();
+
final RequestMessageV4.Builder builder =
RequestMessageV4.build(scriptNode.asText()).overrideRequestId(requestId)
.addBindings(bindings).addLanguage(language);
if (null != g) builder.addG(g);
+ if (null != chunkSize) builder.addChunkSize(chunkSize);
return builder.create();
}
diff --git
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java
index 861b62465b..7a1e1750aa 100644
---
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java
+++
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java
@@ -23,7 +23,7 @@ import ch.qos.logback.classic.Logger;
import io.netty.handler.codec.CorruptedFrameException;
import nl.altindag.log.LogCaptor;
import org.apache.tinkerpop.gremlin.util.Tokens;
-import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4;
import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
import org.apache.tinkerpop.gremlin.util.ser.Serializers;
@@ -151,8 +151,7 @@ public class ClientConnectionIntegrateTest extends
AbstractGremlinServerIntegrat
final ExecutorService executorServiceForTesting = cluster.executor();
try {
- final RequestMessage.Builder request =
client.buildMessage(RequestMessage.build(Tokens.OPS_EVAL))
- .add(Tokens.ARGS_GREMLIN, "Thread.sleep(5000)");
+ final RequestMessageV4.Builder request =
client.buildMessage(RequestMessageV4.build("Thread.sleep(5000)"));
final Callable<Connection> sendQueryCallable = () ->
client.chooseConnection(request.create());
final List<Callable<Connection>> listOfTasks = new ArrayList<>();
for (int i = 0; i < connPoolSize; i++) {
@@ -201,8 +200,7 @@ public class ClientConnectionIntegrateTest extends
AbstractGremlinServerIntegrat
final ExecutorService executorServiceForTesting = cluster.executor();
try {
- final RequestMessage.Builder request =
client.buildMessage(RequestMessage.build(Tokens.OPS_EVAL))
- .add(Tokens.ARGS_GREMLIN, "Thread.sleep(5000)");
+ final RequestMessageV4.Builder request =
client.buildMessage(RequestMessageV4.build("Thread.sleep(5000)"));
final Callable<Connection> sendQueryCallable = () ->
client.chooseConnection(request.create());
final List<Callable<Connection>> listOfTasks = new ArrayList<>();
for (int i = 0; i < operations; i++) {
diff --git
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
index 0b85f6f5b5..1426453107 100644
---
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
+++
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java
@@ -35,7 +35,7 @@ import
org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler;
-import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4;
import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode;
import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection;
import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV1;
@@ -1705,38 +1705,40 @@ public class GremlinDriverIntegrateTest extends
AbstractGremlinServerIntegration
cluster.close();
}
- @Test
- public void shouldSendUserAgent() throws Exception {
- final Cluster cluster =
TestClientFactory.build().serializer(Serializers.GRAPHSON_V3).create();
- final Client client = Mockito.spy(cluster.connect().alias("g"));
- client.submit("",
RequestOptions.build().userAgent("test").create()).all().get();
- cluster.close();
-
- final ArgumentCaptor<RequestMessage> requestMessageCaptor =
ArgumentCaptor.forClass(RequestMessage.class);
- verify(client).submitAsync(requestMessageCaptor.capture());
- final RequestMessage requestMessage = requestMessageCaptor.getValue();
- assertEquals("test",
requestMessage.getArgs().get(Tokens.ARGS_USER_AGENT));
- }
-
- @Test
- public void shouldSendUserAgentBytecode() {
- final Cluster cluster =
TestClientFactory.build().serializer(Serializers.GRAPHSON_V3).create();
- final Client client = Mockito.spy(cluster.connect().alias("g"));
- Mockito.when(client.alias("g")).thenReturn(client);
- final GraphTraversalSource g =
traversal().withRemote(DriverRemoteConnection.using(client));
- g.with(Tokens.ARGS_USER_AGENT, "test").V().iterate();
- cluster.close();
-
- final ArgumentCaptor<RequestOptions> requestOptionsCaptor =
ArgumentCaptor.forClass(RequestOptions.class);
- verify(client).submitAsync(Mockito.any(Bytecode.class),
requestOptionsCaptor.capture());
- final RequestOptions requestOptions = requestOptionsCaptor.getValue();
- assertEquals("test", requestOptions.getUserAgent().get());
-
- final ArgumentCaptor<RequestMessage> requestMessageCaptor =
ArgumentCaptor.forClass(RequestMessage.class);
- verify(client).submitAsync(requestMessageCaptor.capture());
- final RequestMessage requestMessage = requestMessageCaptor.getValue();
- assertEquals("test",
requestMessage.getArgs().getOrDefault(Tokens.ARGS_USER_AGENT, null));
- }
+// TODO: should probably remove and deprecate this old user-agent.
+// @Test
+// public void shouldSendUserAgent() throws Exception {
+// final Cluster cluster =
TestClientFactory.build().serializer(Serializers.GRAPHSON_V3).create();
+// final Client client = Mockito.spy(cluster.connect().alias("g"));
+// client.submit("",
RequestOptions.build().userAgent("test").create()).all().get();
+// cluster.close();
+//
+// final ArgumentCaptor<RequestMessage> requestMessageCaptor =
ArgumentCaptor.forClass(RequestMessage.class);
+// verify(client).submitAsync(requestMessageCaptor.capture());
+// final RequestMessage requestMessage =
requestMessageCaptor.getValue();
+// assertEquals("test",
requestMessage.getArgs().get(Tokens.ARGS_USER_AGENT));
+// }
+
+// TODO: should probably remove and deprecate this old user-agent.
+// @Test
+// public void shouldSendUserAgentBytecode() {
+// final Cluster cluster =
TestClientFactory.build().serializer(Serializers.GRAPHSON_V3).create();
+// final Client client = Mockito.spy(cluster.connect().alias("g"));
+// Mockito.when(client.alias("g")).thenReturn(client);
+// final GraphTraversalSource g =
traversal().withRemote(DriverRemoteConnection.using(client));
+// g.with(Tokens.ARGS_USER_AGENT, "test").V().iterate();
+// cluster.close();
+//
+// final ArgumentCaptor<RequestOptions> requestOptionsCaptor =
ArgumentCaptor.forClass(RequestOptions.class);
+// verify(client).submitAsync(Mockito.any(Bytecode.class),
requestOptionsCaptor.capture());
+// final RequestOptions requestOptions =
requestOptionsCaptor.getValue();
+// assertEquals("test", requestOptions.getUserAgent().get());
+//
+// final ArgumentCaptor<RequestMessage> requestMessageCaptor =
ArgumentCaptor.forClass(RequestMessage.class);
+// verify(client).submitAsync(requestMessageCaptor.capture());
+// final RequestMessage requestMessage =
requestMessageCaptor.getValue();
+// assertEquals("test",
requestMessage.getArgs().getOrDefault(Tokens.ARGS_USER_AGENT, null));
+// }
@Test
public void shouldSendRequestIdBytecode() {
@@ -1754,9 +1756,9 @@ public class GremlinDriverIntegrateTest extends
AbstractGremlinServerIntegration
assertTrue(requestOptions.getOverrideRequestId().isPresent());
assertEquals(overrideRequestId,
requestOptions.getOverrideRequestId().get());
- final ArgumentCaptor<RequestMessage> requestMessageCaptor =
ArgumentCaptor.forClass(RequestMessage.class);
+ final ArgumentCaptor<RequestMessageV4> requestMessageCaptor =
ArgumentCaptor.forClass(RequestMessageV4.class);
verify(client).submitAsync(requestMessageCaptor.capture());
- final RequestMessage requestMessage = requestMessageCaptor.getValue();
+ final RequestMessageV4 requestMessage =
requestMessageCaptor.getValue();
assertEquals(overrideRequestId, requestMessage.getRequestId());
}
diff --git
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
index 5c15c5e152..761e3bc4f0 100644
---
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
+++
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java
@@ -35,7 +35,7 @@ import org.apache.tinkerpop.gremlin.driver.Result;
import org.apache.tinkerpop.gremlin.driver.ResultSet;
import org.apache.tinkerpop.gremlin.util.Tokens;
import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
-import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4;
import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode;
import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection;
@@ -89,6 +89,7 @@ import static
org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalS
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.oneOf;
import static org.hamcrest.core.AllOf.allOf;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.hamcrest.core.IsNot.not;
@@ -182,9 +183,10 @@ public class GremlinServerIntegrateTest extends
AbstractGremlinServerIntegration
case
"shouldReturnInvalidRequestArgsWhenBindingCountExceedsAllowable":
settings.maxParameters = 1;
break;
- case "shouldTimeOutRemoteTraversal":
- settings.evaluationTimeout = 500;
- break;
+// TODO: evalTimeout needs be part of script now
+// case "shouldTimeOutRemoteTraversal":
+// settings.evaluationTimeout = 500;
+// break;
case "shouldBlowTheWorkQueueSize":
settings.gremlinPool = 1;
settings.maxWorkQueueSize = 1;
@@ -342,35 +344,35 @@ public class GremlinServerIntegrateTest extends
AbstractGremlinServerIntegration
g.close();
}
- @Test
- public void shouldTimeOutRemoteTraversal() throws Exception {
- final GraphTraversalSource g = traversal().withRemote(conf);
-
- try {
- // tests sleeping thread
-
g.inject(1).sideEffect(Lambda.consumer("Thread.sleep(10000)")).iterate();
- fail("This traversal should have timed out");
- } catch (Exception ex) {
- final Throwable t = ex.getCause();
- assertThat(t, instanceOf(ResponseException.class));
- assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT,
((ResponseException) t).getResponseStatusCode());
- }
-
- // make a graph with a cycle in it to force a long run traversal
-
graphGetter.get().traversal().addV("person").as("p").addE("self").to("p").iterate();
-
- try {
- // tests an "unending" traversal
- g.V().repeat(__.out()).until(__.outE().count().is(0)).iterate();
- fail("This traversal should have timed out");
- } catch (Exception ex) {
- final Throwable t = ex.getCause();
- assertThat(t, instanceOf(ResponseException.class));
- assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT,
((ResponseException) t).getResponseStatusCode());
- }
-
- g.close();
- }
+// @Test
+// public void shouldTimeOutRemoteTraversal() throws Exception {
+// final GraphTraversalSource g = traversal().withRemote(conf);
+//
+// try {
+// // tests sleeping thread
+//
g.inject(1).sideEffect(Lambda.consumer("Thread.sleep(10000)")).iterate();
+// fail("This traversal should have timed out");
+// } catch (Exception ex) {
+// final Throwable t = ex.getCause();
+// assertThat(t, instanceOf(ResponseException.class));
+// assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT,
((ResponseException) t).getResponseStatusCode());
+// }
+//
+// // make a graph with a cycle in it to force a long run traversal
+//
graphGetter.get().traversal().addV("person").as("p").addE("self").to("p").iterate();
+//
+// try {
+// // tests an "unending" traversal
+// g.V().repeat(__.out()).until(__.outE().count().is(0)).iterate();
+// fail("This traversal should have timed out");
+// } catch (Exception ex) {
+// final Throwable t = ex.getCause();
+// assertThat(t, instanceOf(ResponseException.class));
+// assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT,
((ResponseException) t).getResponseStatusCode());
+// }
+//
+// g.close();
+// }
@Test
public void shouldTimeOutRemoteTraversalWithPerRequestOption() throws
Exception {
@@ -402,44 +404,45 @@ public class GremlinServerIntegrateTest extends
AbstractGremlinServerIntegration
g.close();
}
- @Test
- public void shouldProduceProperExceptionOnTimeout() throws Exception {
- final Cluster cluster = TestClientFactory.open();
- final Client client = cluster.connect(name.getMethodName());
-
- boolean success = false;
- // Run a short test script a few times with progressively longer
timeouts.
- // Each submissions should either succeed or fail with a timeout.
- // Note: the range of timeouts is intended to cover the case when the
script finishes at about the
- // same time when the timeout occurs. In this situation either a
timeout response or a successful
- // response is acceptable, however no other processing errors should
occur.
- // Note: the timeout of 30 ms is generally sufficient for running a
simple groovy script, so using longer
- // timeouts are not likely to results in a success/timeout response
collision, which is the purpose
- // of this test.
- // Note: this test may have a false negative result, but a failure
would indicate a real problem.
- for(int i = 0; i < 30; i++) {
- int timeout = 1 + i;
- overrideEvaluationTimeout(timeout);
-
- try {
- client.submit("x = 1 + 1").all().get().get(0).getInt();
- success = true;
- } catch (Exception ex) {
- final Throwable t = ex.getCause();
- assertThat("Unexpected exception with script evaluation
timeout: " + timeout, t, instanceOf(ResponseException.class));
- assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT,
((ResponseException) t).getResponseStatusCode());
- }
- }
-
- assertTrue("Some script submissions should succeed", success);
-
- cluster.close();
- }
+// TODO: can't set evalTimeout anymore
+// @Test
+// public void shouldProduceProperExceptionOnTimeout() throws Exception {
+// final Cluster cluster = TestClientFactory.open();
+// final Client client = cluster.connect(name.getMethodName());
+//
+// boolean success = false;
+// // Run a short test script a few times with progressively longer
timeouts.
+// // Each submissions should either succeed or fail with a timeout.
+// // Note: the range of timeouts is intended to cover the case when
the script finishes at about the
+// // same time when the timeout occurs. In this situation either a
timeout response or a successful
+// // response is acceptable, however no other processing errors should
occur.
+// // Note: the timeout of 30 ms is generally sufficient for running a
simple groovy script, so using longer
+// // timeouts are not likely to results in a success/timeout response
collision, which is the purpose
+// // of this test.
+// // Note: this test may have a false negative result, but a failure
would indicate a real problem.
+// for(int i = 0; i < 30; i++) {
+// int timeout = 1 + i;
+// overrideEvaluationTimeout(timeout);
+//
+// try {
+// client.submit("x = 1 + 1").all().get().get(0).getInt();
+// success = true;
+// } catch (Exception ex) {
+// final Throwable t = ex.getCause();
+// assertThat("Unexpected exception with script evaluation
timeout: " + timeout, t, instanceOf(ResponseException.class));
+// assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT,
((ResponseException) t).getResponseStatusCode());
+// }
+// }
+//
+// assertTrue("Some script submissions should succeed", success);
+//
+// cluster.close();
+// }
@Test
public void shouldUseBaseScript() throws Exception {
final Cluster cluster = TestClientFactory.open();
- final Client client = cluster.connect(name.getMethodName());
+ final Client client = cluster.connect();
assertEquals("hello, stephen",
client.submit("hello('stephen')").all().get().get(0).getString());
@@ -449,7 +452,7 @@ public class GremlinServerIntegrateTest extends
AbstractGremlinServerIntegration
@Test
public void shouldUseInterpreterMode() throws Exception {
final Cluster cluster = TestClientFactory.open();
- final Client client = cluster.connect(name.getMethodName());
+ final Client client = cluster.connect();
client.submit("def subtractAway(x,y){x-y};[]").all().get();
client.submit("multiplyIt = { x,y -> x * y};[]").all().get();
@@ -490,23 +493,24 @@ public class GremlinServerIntegrateTest extends
AbstractGremlinServerIntegration
cluster.close();
}
- @Test
- public void shouldUseSimpleSandbox() throws Exception {
- final Cluster cluster = TestClientFactory.open();
- final Client client = cluster.connect();
-
- assertEquals(2, client.submit("1+1").all().get().get(0).getInt());
-
- try {
- // this should return "nothing" - there should be no exception
- client.submit("java.lang.System.exit(0)").all().get();
- fail("The above should not have executed in any successful way as
sandboxing is enabled");
- } catch (Exception ex) {
- assertThat(ex.getCause().getMessage(), containsString("[Static
type checking] - Not authorized to call this method:
java.lang.System#exit(int)"));
- } finally {
- cluster.close();
- }
- }
+// TODO: re-enable
+// @Test
+// public void shouldUseSimpleSandbox() throws Exception {
+// final Cluster cluster = TestClientFactory.open();
+// final Client client = cluster.connect();
+//
+// assertEquals(2, client.submit("1+1").all().get().get(0).getInt());
+//
+// try {
+// // this should return "nothing" - there should be no exception
+// client.submit("java.lang.System.exit(0)").all().get();
+// fail("The above should not have executed in any successful way
as sandboxing is enabled");
+// } catch (Exception ex) {
+// assertThat(ex.getCause().getMessage(), containsString("[Static
type checking] - Not authorized to call this method:
java.lang.System#exit(int)"));
+// } finally {
+// cluster.close();
+// }
+// }
@Test
public void shouldRespectHighWaterMarkSettingAndSucceed() throws Exception
{
@@ -528,9 +532,8 @@ public class GremlinServerIntegrateTest extends
AbstractGremlinServerIntegration
final CountDownLatch latch = new
CountDownLatch(resultCountToGenerate);
final AtomicBoolean expected = new AtomicBoolean(false);
final AtomicBoolean faulty = new AtomicBoolean(false);
- final RequestMessage request =
RequestMessage.build(Tokens.OPS_EVAL)
- .addArg(Tokens.ARGS_BATCH_SIZE, batchSize)
- .addArg(Tokens.ARGS_GREMLIN, fattyX).create();
+ final RequestMessageV4 request = RequestMessageV4.build(fattyX)
+ .addChunkSize(batchSize).create();
client.submitAsync(request).thenAcceptAsync(r -> {
r.stream().forEach(item -> {
@@ -562,8 +565,8 @@ public class GremlinServerIntegrateTest extends
AbstractGremlinServerIntegration
@Test
public void shouldReturnInvalidRequestArgsWhenGremlinArgIsNotSupplied()
throws Exception {
- try (SimpleClient client = TestClientFactory.createWebSocketClient()) {
- final RequestMessage request =
RequestMessage.build(Tokens.OPS_EVAL).create();
+ try (SimpleClient client = TestClientFactory.createSimpleHttpClient())
{
+ final RequestMessageV4 request =
RequestMessageV4.build(Tokens.OPS_EVAL).create();
final ResponseMessage result = client.submit(request).get(0);
assertThat(result.getStatus().getCode(),
is(not(ResponseStatusCode.PARTIAL_CONTENT)));
assertEquals(result.getStatus().getCode(),
ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS);
@@ -572,12 +575,11 @@ public class GremlinServerIntegrateTest extends
AbstractGremlinServerIntegration
@Test
public void
shouldReturnInvalidRequestArgsWhenInvalidReservedBindingKeyIsUsed() throws
Exception {
- try (SimpleClient client = TestClientFactory.createWebSocketClient()) {
+ try (SimpleClient client = TestClientFactory.createSimpleHttpClient())
{
final Map<String, Object> bindings = new HashMap<>();
bindings.put(T.id.getAccessor(), "123");
- final RequestMessage request =
RequestMessage.build(Tokens.OPS_EVAL)
- .addArg(Tokens.ARGS_GREMLIN, "[1,2,3,4,5,6,7,8,9,0]")
- .addArg(Tokens.ARGS_BINDINGS, bindings).create();
+ final RequestMessageV4 request =
RequestMessageV4.build("[1,2,3,4,5,6,7,8,9,0]")
+ .addBindings(bindings).create();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean pass = new AtomicBoolean(false);
client.submit(request, result -> {
@@ -592,12 +594,11 @@ public class GremlinServerIntegrateTest extends
AbstractGremlinServerIntegration
assertThat(pass.get(), is(true));
}
- try (SimpleClient client = TestClientFactory.createWebSocketClient()) {
+ try (SimpleClient client = TestClientFactory.createSimpleHttpClient())
{
final Map<String, Object> bindings = new HashMap<>();
bindings.put("id", "123");
- final RequestMessage request =
RequestMessage.build(Tokens.OPS_EVAL)
- .addArg(Tokens.ARGS_GREMLIN, "[1,2,3,4,5,6,7,8,9,0]")
- .addArg(Tokens.ARGS_BINDINGS, bindings).create();
+ final RequestMessageV4 request =
RequestMessageV4.build("[1,2,3,4,5,6,7,8,9,0]")
+ .addBindings(bindings).create();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean pass = new AtomicBoolean(false);
client.submit(request, result -> {
@@ -613,38 +614,38 @@ public class GremlinServerIntegrateTest extends
AbstractGremlinServerIntegration
}
}
- @Test
- public void
shouldReturnInvalidRequestArgsWhenInvalidTypeBindingKeyIsUsed() throws
Exception {
- try (SimpleClient client = TestClientFactory.createWebSocketClient()) {
- final Map<Object, Object> bindings = new HashMap<>();
- bindings.put(1, "123");
- final RequestMessage request =
RequestMessage.build(Tokens.OPS_EVAL)
- .addArg(Tokens.ARGS_GREMLIN, "[1,2,3,4,5,6,7,8,9,0]")
- .addArg(Tokens.ARGS_BINDINGS, bindings).create();
- final CountDownLatch latch = new CountDownLatch(1);
- final AtomicBoolean pass = new AtomicBoolean(false);
- client.submit(request, result -> {
- if (result.getStatus().getCode() !=
ResponseStatusCode.PARTIAL_CONTENT) {
-
pass.set(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS ==
result.getStatus().getCode());
- latch.countDown();
- }
- });
-
- if (!latch.await(3000, TimeUnit.MILLISECONDS))
- fail("Request should have returned error, but instead timed
out");
- assertThat(pass.get(), is(true));
- }
- }
+// TODO: probably invalid test now that builder requires proper type.
+// @Test
+// public void
shouldReturnInvalidRequestArgsWhenInvalidTypeBindingKeyIsUsed() throws
Exception {
+// try (SimpleClient client =
TestClientFactory.createWebSocketClient()) {
+// final Map<Object, Object> bindings = new HashMap<>();
+// bindings.put(1, "123");
+// final RequestMessageV4 request =
RequestMessageV4.build(Tokens.OPS_EVAL)
+// .addArg(Tokens.ARGS_GREMLIN, "[1,2,3,4,5,6,7,8,9,0]")
+// .addArg(Tokens.ARGS_BINDINGS, bindings).create();
+// final CountDownLatch latch = new CountDownLatch(1);
+// final AtomicBoolean pass = new AtomicBoolean(false);
+// client.submit(request, result -> {
+// if (result.getStatus().getCode() !=
ResponseStatusCode.PARTIAL_CONTENT) {
+//
pass.set(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS ==
result.getStatus().getCode());
+// latch.countDown();
+// }
+// });
+//
+// if (!latch.await(3000, TimeUnit.MILLISECONDS))
+// fail("Request should have returned error, but instead timed
out");
+// assertThat(pass.get(), is(true));
+// }
+// }
@Test
public void
shouldReturnInvalidRequestArgsWhenBindingCountExceedsAllowable() throws
Exception {
- try (SimpleClient client = TestClientFactory.createWebSocketClient()) {
- final Map<Object, Object> bindings = new HashMap<>();
+ try (SimpleClient client = TestClientFactory.createSimpleHttpClient())
{
+ final Map<String, Object> bindings = new HashMap<>();
bindings.put("x", 123);
bindings.put("y", 123);
- final RequestMessage request =
RequestMessage.build(Tokens.OPS_EVAL)
- .addArg(Tokens.ARGS_GREMLIN, "x+y")
- .addArg(Tokens.ARGS_BINDINGS, bindings).create();
+ final RequestMessageV4 request = RequestMessageV4.build("x+y")
+ .addBindings(bindings).create();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean pass = new AtomicBoolean(false);
client.submit(request, result -> {
@@ -659,12 +660,11 @@ public class GremlinServerIntegrateTest extends
AbstractGremlinServerIntegration
assertThat(pass.get(), is(true));
}
- try (SimpleClient client = TestClientFactory.createWebSocketClient()) {
- final Map<Object, Object> bindings = new HashMap<>();
+ try (SimpleClient client = TestClientFactory.createSimpleHttpClient())
{
+ final Map<String, Object> bindings = new HashMap<>();
bindings.put("x", 123);
- final RequestMessage request =
RequestMessage.build(Tokens.OPS_EVAL)
- .addArg(Tokens.ARGS_GREMLIN, "x+123")
- .addArg(Tokens.ARGS_BINDINGS, bindings).create();
+ final RequestMessageV4 request = RequestMessageV4.build("x+123")
+ .addBindings(bindings).create();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean pass = new AtomicBoolean(false);
client.submit(request, result -> {
@@ -682,12 +682,11 @@ public class GremlinServerIntegrateTest extends
AbstractGremlinServerIntegration
@Test
public void
shouldReturnInvalidRequestArgsWhenInvalidNullBindingKeyIsUsed() throws
Exception {
- try (SimpleClient client = TestClientFactory.createWebSocketClient()) {
+ try (SimpleClient client = TestClientFactory.createSimpleHttpClient())
{
final Map<String, Object> bindings = new HashMap<>();
bindings.put(null, "123");
- final RequestMessage request =
RequestMessage.build(Tokens.OPS_EVAL)
- .addArg(Tokens.ARGS_GREMLIN, "[1,2,3,4,5,6,7,8,9,0]")
- .addArg(Tokens.ARGS_BINDINGS, bindings).create();
+ final RequestMessageV4 request =
RequestMessageV4.build("[1,2,3,4,5,6,7,8,9,0]")
+ .addBindings(bindings).create();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean pass = new AtomicBoolean(false);
client.submit(request, result -> {
@@ -706,9 +705,8 @@ public class GremlinServerIntegrateTest extends
AbstractGremlinServerIntegration
@Test
@SuppressWarnings("unchecked")
public void shouldBatchResultsByTwos() throws Exception {
- try (SimpleClient client = TestClientFactory.createWebSocketClient()) {
- final RequestMessage request =
RequestMessage.build(Tokens.OPS_EVAL)
- .addArg(Tokens.ARGS_GREMLIN,
"[0,1,2,3,4,5,6,7,8,9]").create();
+ try (SimpleClient client = TestClientFactory.createSimpleHttpClient())
{
+ final RequestMessageV4 request =
RequestMessageV4.build("[0,1,2,3,4,5,6,7,8,9]").create();
final List<ResponseMessage> msgs = client.submit(request);
assertEquals(5, client.submit(request).size());
@@ -740,23 +738,9 @@ public class GremlinServerIntegrateTest extends
AbstractGremlinServerIntegration
// }
// }
- @Test
- @SuppressWarnings("unchecked")
- public void shouldBatchResultsByOnesByOverridingFromClientSide() throws
Exception {
- try (SimpleClient client = TestClientFactory.createWebSocketClient()) {
- final RequestMessage request =
RequestMessage.build(Tokens.OPS_EVAL)
- .addArg(Tokens.ARGS_GREMLIN, "[0,1,2,3,4,5,6,7,8,9]")
- .addArg(Tokens.ARGS_BATCH_SIZE, 1).create();
-
- final List<ResponseMessage> msgs = client.submit(request);
- assertEquals(10, msgs.size());
- IntStream.rangeClosed(0, 9).forEach(i -> assertEquals(i,
((List<Integer>) msgs.get(i).getResult().getData()).get(0).intValue()));
- }
- }
-
@Test
public void shouldNotThrowNoSuchElementException() throws Exception {
- try (SimpleClient client = TestClientFactory.createWebSocketClient()){
+ try (SimpleClient client = TestClientFactory.createSimpleHttpClient()){
// this should return "nothing" - there should be no exception
final List<ResponseMessage> responses =
client.submit("g.V().has('name','kadfjaldjfla')");
assertNull(responses.get(0).getResult().getData());
@@ -766,34 +750,35 @@ public class GremlinServerIntegrateTest extends
AbstractGremlinServerIntegration
@Test
@SuppressWarnings("unchecked")
public void shouldReceiveFailureTimeOutOnScriptEval() throws Exception {
- try (SimpleClient client = TestClientFactory.createWebSocketClient()){
+ try (SimpleClient client = TestClientFactory.createSimpleHttpClient()){
final List<ResponseMessage> responses =
client.submit("Thread.sleep(3000);'some-stuff-that-should not return'");
- assertThat(responses.get(0).getStatus().getMessage(),
allOf(startsWith("Evaluation exceeded"), containsString("1000 ms")));
+ assertThat(responses.get(0).getStatus().getMessage(),
oneOf(containsString("timeout occurred"), containsString("evaluation
exceeded")));
// validate that we can still send messages to the server
assertEquals(2, ((List<Integer>)
client.submit("1+1").get(0).getResult().getData()).get(0).intValue());
}
}
- @Test
- @SuppressWarnings("unchecked")
- public void shouldReceiveFailureTimeOutOnEvalUsingOverride() throws
Exception {
- try (SimpleClient client = TestClientFactory.createWebSocketClient()) {
- final RequestMessage msg = RequestMessage.build("eval")
- .addArg(Tokens.ARGS_EVAL_TIMEOUT, 100L)
- .addArg(Tokens.ARGS_GREMLIN,
"Thread.sleep(3000);'some-stuff-that-should not return'")
- .create();
- final List<ResponseMessage> responses = client.submit(msg);
- assertThat(responses.get(0).getStatus().getMessage(),
allOf(startsWith("Evaluation exceeded"), containsString("100 ms")));
-
- // validate that we can still send messages to the server
- assertEquals(2, ((List<Integer>)
client.submit("1+1").get(0).getResult().getData()).get(0).intValue());
- }
- }
+ // TODO: evalTimeout no longer set in RequestMessage
+// @Test
+// @SuppressWarnings("unchecked")
+// public void shouldReceiveFailureTimeOutOnEvalUsingOverride() throws
Exception {
+// try (SimpleClient client =
TestClientFactory.createWebSocketClient()) {
+// final RequestMessageV4 msg = RequestMessageV4.build("eval")
+// .addArg(Tokens.ARGS_EVAL_TIMEOUT, 100L)
+// .addArg(Tokens.ARGS_GREMLIN,
"Thread.sleep(3000);'some-stuff-that-should not return'")
+// .create();
+// final List<ResponseMessage> responses = client.submit(msg);
+// assertThat(responses.get(0).getStatus().getMessage(),
allOf(startsWith("Evaluation exceeded"), containsString("100 ms")));
+//
+// // validate that we can still send messages to the server
+// assertEquals(2, ((List<Integer>)
client.submit("1+1").get(0).getResult().getData()).get(0).intValue());
+// }
+// }
@Test
public void shouldReceiveFailureTimeOutOnScriptEvalOfOutOfControlLoop()
throws Exception {
- try (SimpleClient client = TestClientFactory.createWebSocketClient()){
+ try (SimpleClient client = TestClientFactory.createSimpleHttpClient()){
// timeout configured for 1 second so the timed interrupt should
trigger prior to the
// evaluationTimeout which is at 30 seconds by default
final List<ResponseMessage> responses =
client.submit("while(true){}");
@@ -807,7 +792,7 @@ public class GremlinServerIntegrateTest extends
AbstractGremlinServerIntegration
@Test
@SuppressWarnings("unchecked")
public void shouldLoadInitScript() throws Exception {
- try (SimpleClient client = TestClientFactory.createWebSocketClient()){
+ try (SimpleClient client = TestClientFactory.createSimpleHttpClient()){
assertEquals(2, ((List<Integer>)
client.submit("addItUp(1,1)").get(0).getResult().getData()).get(0).intValue());
}
}
@@ -937,9 +922,8 @@ public class GremlinServerIntegrateTest extends
AbstractGremlinServerIntegration
@Test
public void shouldNotHavePartialContentWithOneResult() throws Exception {
- try (SimpleClient client = TestClientFactory.createWebSocketClient()) {
- final RequestMessage request =
RequestMessage.build(Tokens.OPS_EVAL)
- .addArg(Tokens.ARGS_GREMLIN, "10").create();
+ try (SimpleClient client = TestClientFactory.createSimpleHttpClient())
{
+ final RequestMessageV4 request =
RequestMessageV4.build("10").create();
final List<ResponseMessage> responses = client.submit(request);
assertEquals(1, responses.size());
assertEquals(ResponseStatusCode.SUCCESS,
responses.get(0).getStatus().getCode());
@@ -948,9 +932,8 @@ public class GremlinServerIntegrateTest extends
AbstractGremlinServerIntegration
@Test
public void shouldHavePartialContentWithLongResultsCollection() throws
Exception {
- try (SimpleClient client = TestClientFactory.createWebSocketClient()) {
- final RequestMessage request =
RequestMessage.build(Tokens.OPS_EVAL)
- .addArg(Tokens.ARGS_GREMLIN, "new String[100]").create();
+ try (SimpleClient client = TestClientFactory.createSimpleHttpClient())
{
+ final RequestMessageV4 request = RequestMessageV4.build("new
String[100]").create();
final List<ResponseMessage> responses = client.submit(request);
assertThat(responses.size(), Matchers.greaterThan(1));
for (Iterator<ResponseMessage> it = responses.iterator();
it.hasNext(); ) {
@@ -963,9 +946,8 @@ public class GremlinServerIntegrateTest extends
AbstractGremlinServerIntegration
@Test
public void shouldFailWithBadScriptEval() throws Exception {
- try (SimpleClient client = TestClientFactory.createWebSocketClient()) {
- final RequestMessage request =
RequestMessage.build(Tokens.OPS_EVAL)
- .addArg(Tokens.ARGS_GREMLIN, "new
String().doNothingAtAllBecauseThis is a syntax error").create();
+ try (SimpleClient client = TestClientFactory.createSimpleHttpClient())
{
+ final RequestMessageV4 request = RequestMessageV4.build("new
String().doNothingAtAllBecauseThis is a syntax error").create();
final List<ResponseMessage> responses = client.submit(request);
assertEquals(ResponseStatusCode.SERVER_ERROR_EVALUATION,
responses.get(0).getStatus().getCode());
assertEquals(1, responses.size());
diff --git
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSslIntegrateTest.java
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSslIntegrateTest.java
index ba2a238155..846dc549a7 100644
---
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSslIntegrateTest.java
+++
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerSslIntegrateTest.java
@@ -30,7 +30,7 @@ import org.apache.tinkerpop.gremlin.driver.Client;
import org.apache.tinkerpop.gremlin.driver.Cluster;
import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
import org.apache.tinkerpop.gremlin.util.Tokens;
-import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4;
import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode;
import org.junit.Test;
@@ -164,13 +164,12 @@ public class GremlinServerSslIntegrateTest extends
AbstractGremlinServerIntegrat
@Test
public void shouldEnableWebSocketSsl() throws Exception {
- try (SimpleClient client =
TestClientFactory.createSSLWebSocketClient()) {
- final Map<Object, Object> bindings = new HashMap<>();
+ try (SimpleClient client =
TestClientFactory.createSSLSimpleHttpClient()) {
+ final Map<String, Object> bindings = new HashMap<>();
bindings.put("x", 123);
bindings.put("y", 123);
- final RequestMessage request =
RequestMessage.build(Tokens.OPS_EVAL)
- .addArg(Tokens.ARGS_GREMLIN, "x+y")
- .addArg(Tokens.ARGS_BINDINGS, bindings).create();
+ final RequestMessageV4 request = RequestMessageV4.build("x+y")
+ .addBindings(bindings).create();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean pass = new AtomicBoolean(false);
client.submit(request, result -> {
diff --git
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/TestClientFactory.java
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/TestClientFactory.java
index a2554a292a..c8f1f4c6f1 100644
---
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/TestClientFactory.java
+++
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/TestClientFactory.java
@@ -19,7 +19,7 @@
package org.apache.tinkerpop.gremlin.server;
import org.apache.tinkerpop.gremlin.driver.Cluster;
-import org.apache.tinkerpop.gremlin.driver.simple.WebSocketClient;
+import org.apache.tinkerpop.gremlin.driver.simple.SimpleHttpClient;
import java.net.URI;
@@ -29,8 +29,8 @@ import java.net.URI;
public final class TestClientFactory {
public static final int PORT = 45940;
- public static final URI WEBSOCKET_URI = URI.create("ws://localhost:" +
PORT + "/gremlin");
- public static final URI SSL_WEBSOCKET_URI = URI.create("wss://localhost:"
+ PORT + "/gremlin");
+ public static final URI HTTP_URI = URI.create("http://localhost:" + PORT +
"/gremlin");
+ public static final URI SSL_HTTP_URI = URI.create("https://localhost:" +
PORT + "/gremlin");
public static final String HTTP = "http://localhost:" + PORT;
public static final String RESOURCE_PATH = "conf/remote-objects.yaml";
@@ -46,12 +46,12 @@ public final class TestClientFactory {
return build().create();
}
- public static WebSocketClient createWebSocketClient() {
- return new WebSocketClient(WEBSOCKET_URI);
+ public static SimpleHttpClient createSimpleHttpClient() {
+ return new SimpleHttpClient(HTTP_URI);
}
- public static WebSocketClient createSSLWebSocketClient() {
- return new WebSocketClient(SSL_WEBSOCKET_URI);
+ public static SimpleHttpClient createSSLSimpleHttpClient() {
+ return new SimpleHttpClient(SSL_HTTP_URI);
}
public static String createURLString() {
diff --git
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtilTest.java
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtilTest.java
index f1cc004804..88f4f7b131 100644
---
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtilTest.java
+++
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtilTest.java
@@ -189,7 +189,7 @@ public class HttpHandlerUtilTest {
final String gremlin = "g.V(x)";
final ByteBuf buffer = allocator.buffer();
buffer.writeCharSequence("{ \"gremlin\": \"" + gremlin +
- "\", \"language\": \"gremlin-groovy\"}",
+ "\" \"language\": \"gremlin-groovy\"}",
CharsetUtil.UTF_8);
final HttpHeaders headers = new DefaultHttpHeaders();
@@ -200,7 +200,7 @@ public class HttpHandlerUtilTest {
try {
HttpHandlerUtil.getRequestMessageV4FromHttpRequest(httpRequest);
- fail("Expected an error because of incorrect UUID format.");
+ fail("Expected an error because of bad JSON request.");
} catch (Exception e) {
assertTrue(e.getMessage().contains("body could not be parsed"));
}
diff --git
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/RequestMessageV4.java
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/RequestMessageV4.java
index 71c34bb9e8..44a88f5c47 100644
---
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/RequestMessageV4.java
+++
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/RequestMessageV4.java
@@ -45,12 +45,6 @@ public final class RequestMessageV4 {
private Object gremlin; // Should be either a String or Bytecode type.
private Map<String, Object> fields;
-//
-// private String language;
-//
-// private Map<String, Object> bindings;
-//
-// private String g;
private RequestMessageV4(final Object gremlin, final Map<String, Object>
fields) {
if (null == gremlin) throw new
IllegalArgumentException("RequestMessage requires gremlin argument");
@@ -119,20 +113,14 @@ public final class RequestMessageV4 {
}
public static Builder from(final RequestMessageV4 msg) {
- final Builder builder = build(msg.gremlin)
- .overrideRequestId(msg.requestId)
- .addLanguage(msg.getArg(Tokens.ARGS_LANGUAGE))
- .addG(msg.getArg(Tokens.ARGS_G))
- .addBindings(msg.getArg(Tokens.ARGS_BINDINGS));
+ final Builder builder = build(msg.gremlin);
+ builder.fields.putAll(msg.getFields());
return builder;
}
public static Builder from(final RequestMessageV4 msg, final Object
gremlin) {
- final Builder builder = build(gremlin)
- .overrideRequestId(msg.requestId)
- .addLanguage(msg.getArg(Tokens.ARGS_LANGUAGE))
- .addG(msg.getArg(Tokens.ARGS_G))
- .addBindings(msg.getArg(Tokens.ARGS_BINDINGS));
+ final Builder builder = build(gremlin);
+ builder.fields.putAll(msg.getFields());
return builder;
}
@@ -208,6 +196,12 @@ public final class RequestMessageV4 {
return this;
}
+ public Builder addChunkSize(final int chunkSize) {
+ Objects.requireNonNull(chunkSize, "chunkSize argument cannot be
null.");
+ this.fields.put(Tokens.ARGS_BATCH_SIZE, chunkSize);
+ return this;
+ }
+
/**
* Create the request message given the settings provided to the
{@link Builder}.
*/