TINKERPOP-1342 Allow better per-request settings in driver
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/ccb5bcc1 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/ccb5bcc1 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/ccb5bcc1 Branch: refs/heads/TINKERPOP-1342 Commit: ccb5bcc148faafd1a7b74b20b0d9f78429bf8598 Parents: 1250129 Author: Stephen Mallette <[email protected]> Authored: Wed Aug 8 15:56:38 2018 -0400 Committer: Stephen Mallette <[email protected]> Committed: Thu Aug 16 12:11:32 2018 -0400 ---------------------------------------------------------------------- .../apache/tinkerpop/gremlin/driver/Client.java | 148 ++++++++++--------- .../gremlin/driver/RequestOptions.java | 120 +++++++++++++++ .../server/GremlinDriverIntegrateTest.java | 21 +++ 3 files changed, 216 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ccb5bcc1/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java ---------------------------------------------------------------------- diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java index 6e604cb..c6ccf7e 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java @@ -67,7 +67,7 @@ public abstract class Client { } /** - * Makes any final changes to the builder and returns the constructed {@link RequestMessage}. Implementers + * Makes any initial changes to the builder and returns the constructed {@link RequestMessage}. Implementers * may choose to override this message to append data to the request before sending. By default, this method * will simply return the {@code builder} passed in by the caller. */ @@ -184,7 +184,7 @@ public abstract class Client { * @param gremlin the gremlin script to execute */ public ResultSet submit(final String gremlin) { - return submit(gremlin, null); + return submit(gremlin, RequestOptions.EMPTY); } /** @@ -205,13 +205,28 @@ public abstract class Client { } /** + * Submits a Gremlin script to the server and returns a {@link ResultSet} once the write of the request is + * complete. + * + * @param gremlin the gremlin script to execute + * @param options for the request + */ + public ResultSet submit(final String gremlin, final RequestOptions options) { + try { + return submitAsync(gremlin, options).get(); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + /** * The asynchronous version of {@link #submit(String)} where the returned future will complete when the * write of the request completes. * * @param gremlin the gremlin script to execute */ public CompletableFuture<ResultSet> submitAsync(final String gremlin) { - return submitAsync(gremlin, null); + return submitAsync(gremlin, RequestOptions.build().create()); } /** @@ -222,13 +237,12 @@ public abstract class Client { * @param parameters a map of parameters that will be bound to the script on execution */ public CompletableFuture<ResultSet> submitAsync(final String gremlin, final Map<String, Object> parameters) { - final RequestMessage.Builder request = RequestMessage.build(Tokens.OPS_EVAL) - .add(Tokens.ARGS_GREMLIN, gremlin) - .add(Tokens.ARGS_BATCH_SIZE, cluster.connectionPoolSettings().resultIterationBatchSize); - - Optional.ofNullable(parameters).ifPresent(params -> request.addArg(Tokens.ARGS_BINDINGS, parameters)); + final RequestOptions.Builder options = RequestOptions.build(); + if (parameters != null && !parameters.isEmpty()) { + parameters.forEach(options::addParameter); + } - return submitAsync(buildMessage(request).create()); + return submitAsync(gremlin, options.create()); } /** @@ -238,19 +252,17 @@ public abstract class Client { * @param gremlin the gremlin script to execute * @param parameters a map of parameters that will be bound to the script on execution * @param graphOrTraversalSource rebinds the specified global Gremlin Server variable to "g" + * @deprecated As of release 3.4.0, replaced by {@link #submitAsync(String, RequestOptions)}. */ + @Deprecated public CompletableFuture<ResultSet> submitAsync(final String gremlin, final String graphOrTraversalSource, final Map<String, Object> parameters) { - final RequestMessage.Builder request = RequestMessage.build(Tokens.OPS_EVAL) - .add(Tokens.ARGS_GREMLIN, gremlin) - .add(Tokens.ARGS_BATCH_SIZE, cluster.connectionPoolSettings().resultIterationBatchSize); - - Optional.ofNullable(parameters).ifPresent(params -> request.addArg(Tokens.ARGS_BINDINGS, parameters)); - - if (graphOrTraversalSource != null && !graphOrTraversalSource.isEmpty()) - request.addArg(Tokens.ARGS_ALIASES, makeDefaultAliasMap(graphOrTraversalSource)); + Map<String,String> aliases = null; + if (graphOrTraversalSource != null && !graphOrTraversalSource.isEmpty()) { + aliases = makeDefaultAliasMap(graphOrTraversalSource); + } - return submitAsync(buildMessage(request).create()); + return submitAsync(gremlin, aliases, parameters); } /** @@ -262,19 +274,47 @@ public abstract class Client { * @param aliases aliases the specified global Gremlin Server variable some other name that then be used in the * script where the key is the alias name and the value represents the global variable on the * server + * @deprecated As of release 3.4.0, replaced by {@link #submitAsync(String, RequestOptions)}. */ + @Deprecated public CompletableFuture<ResultSet> submitAsync(final String gremlin, final Map<String,String> aliases, final Map<String, Object> parameters) { - final RequestMessage.Builder request = RequestMessage.build(Tokens.OPS_EVAL) - .add(Tokens.ARGS_GREMLIN, gremlin) - .add(Tokens.ARGS_BATCH_SIZE, cluster.connectionPoolSettings().resultIterationBatchSize); + final RequestOptions.Builder options = RequestOptions.build(); + if (aliases != null && !aliases.isEmpty()) { + aliases.forEach(options::addAlias); + } + + if (parameters != null && !parameters.isEmpty()) { + parameters.forEach(options::addParameter); + } + + options.batchSize(cluster.connectionPoolSettings().resultIterationBatchSize); + + return submitAsync(gremlin, options.create()); + } + + /** + * The asynchronous version of {@link #submit(String, RequestOptions)}} where the returned future will complete when the + * write of the request completes. + * + * @param gremlin the gremlin script to execute + * @param options the options to supply for this request + */ + public CompletableFuture<ResultSet> submitAsync(final String gremlin, final RequestOptions options) { + final int batchSize = options.getBatchSize().orElse(cluster.connectionPoolSettings().resultIterationBatchSize); - Optional.ofNullable(parameters).ifPresent(params -> request.addArg(Tokens.ARGS_BINDINGS, parameters)); + // need to call buildMessage() right away to get client specific configurations, that way request specific + // ones can override as needed + final RequestMessage.Builder request = buildMessage(RequestMessage.build(Tokens.OPS_EVAL)) + .add(Tokens.ARGS_GREMLIN, gremlin) + .add(Tokens.ARGS_BATCH_SIZE, batchSize); - if (aliases != null && !aliases.isEmpty()) - request.addArg(Tokens.ARGS_ALIASES, aliases); + // apply settings if they were made available + options.getTimeout().ifPresent(timeout -> request.add(Tokens.ARGS_SCRIPT_EVAL_TIMEOUT, timeout)); + options.getParameters().ifPresent(params -> request.addArg(Tokens.ARGS_BINDINGS, params)); + options.getAliases().ifPresent(aliases -> request.addArg(Tokens.ARGS_ALIASES, aliases)); - return submitAsync(buildMessage(request).create()); + return submitAsync(request.create()); } /** @@ -384,52 +424,6 @@ public abstract class Client { } /** - * The asynchronous version of {@link #submit(String, Map)}} where the returned future will complete when the - * write of the request completes. - * - * @param gremlin the gremlin script to execute - * @param parameters a map of parameters that will be bound to the script on execution - * @param graphOrTraversalSource rebinds the specified global Gremlin Server variable to "g" - */ - public CompletableFuture<ResultSet> submitAsync(final String gremlin, final String graphOrTraversalSource, - final Map<String, Object> parameters) { - final RequestMessage.Builder request = RequestMessage.build(Tokens.OPS_EVAL) - .add(Tokens.ARGS_GREMLIN, gremlin) - .add(Tokens.ARGS_BATCH_SIZE, cluster.connectionPoolSettings().resultIterationBatchSize); - - Optional.ofNullable(parameters).ifPresent(params -> request.addArg(Tokens.ARGS_BINDINGS, parameters)); - - if (graphOrTraversalSource != null && !graphOrTraversalSource.isEmpty()) - request.addArg(Tokens.ARGS_ALIASES, makeDefaultAliasMap(graphOrTraversalSource)); - - return submitAsync(buildMessage(request).create()); - } - - /** - * The asynchronous version of {@link #submit(String, Map)}} where the returned future will complete when the - * write of the request completes. - * - * @param gremlin the gremlin script to execute - * @param parameters a map of parameters that will be bound to the script on execution - * @param aliases aliases the specified global Gremlin Server variable some other name that then be used in the - * script where the key is the alias name and the value represents the global variable on the - * server - */ - public CompletableFuture<ResultSet> submitAsync(final String gremlin, final Map<String,String> aliases, - final Map<String, Object> parameters) { - final RequestMessage.Builder request = RequestMessage.build(Tokens.OPS_EVAL) - .add(Tokens.ARGS_GREMLIN, gremlin) - .add(Tokens.ARGS_BATCH_SIZE, cluster.connectionPoolSettings().resultIterationBatchSize); - - Optional.ofNullable(parameters).ifPresent(params -> request.addArg(Tokens.ARGS_BINDINGS, parameters)); - - if (aliases != null && !aliases.isEmpty()) - request.addArg(Tokens.ARGS_ALIASES, aliases); - - return submitAsync(buildMessage(request).create()); - } - - /** * {@inheritDoc} */ @Override @@ -535,8 +529,16 @@ public abstract class Client { @Override public CompletableFuture<ResultSet> submitAsync(final RequestMessage msg) { final RequestMessage.Builder builder = RequestMessage.from(msg); - if (!aliases.isEmpty()) - builder.addArg(Tokens.ARGS_ALIASES, aliases); + + // only add aliases which aren't already present. if they are present then they represent request level + // overrides which should be mucked with + if (!aliases.isEmpty()) { + final Map original = (Map) msg.getArgs().getOrDefault(Tokens.ARGS_ALIASES, Collections.emptyMap()); + aliases.forEach((k,v) -> { + if (!original.containsKey(k)) + builder.addArg(Tokens.ARGS_ALIASES, aliases); + }); + } return super.submitAsync(builder.create()); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ccb5bcc1/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestOptions.java ---------------------------------------------------------------------- diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestOptions.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestOptions.java new file mode 100644 index 0000000..20c5ab2 --- /dev/null +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/RequestOptions.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver; + +import org.apache.tinkerpop.gremlin.driver.message.RequestMessage; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** + * Options that can be supplied on a per request basis. + * + * @author Stephen Mallette (http://stephen.genoprime.com) + */ +public final class RequestOptions { + + public static final RequestOptions EMPTY = RequestOptions.build().create(); + + private final Map<String,String> aliases; + private final Map<String, Object> parameters; + private final Integer batchSize; + private final Long timeout; + + private RequestOptions(final Builder builder) { + this.aliases = builder.aliases; + this.parameters = builder.parameters; + this.batchSize = builder.batchSize; + this.timeout = builder.timeout; + } + + public Optional<Map<String, String>> getAliases() { + return Optional.ofNullable(aliases); + } + + public Optional<Map<String, Object>> getParameters() { + return Optional.ofNullable(parameters); + } + + public Optional<Integer> getBatchSize() { + return Optional.ofNullable(batchSize); + } + + public Optional<Long> getTimeout() { + return Optional.ofNullable(timeout); + } + + public static Builder build() { + return new Builder(); + } + + public static final class Builder { + private Map<String,String> aliases = null; + private Map<String, Object> parameters = null; + private Integer batchSize = null; + private Long timeout = null; + + /** + * The aliases to set on the request. + */ + public Builder addAlias(final String aliasName, final String actualName) { + if (null == aliases) + aliases = new HashMap<>(); + + aliases.put(aliasName, actualName); + return this; + } + + /** + * The parameters to pass on the request. + */ + public Builder addParameter(final String name, final Object value) { + if (null == parameters) + parameters = new HashMap<>(); + + parameters.put(name, value); + return this; + } + + /** + * The per client request override for the client and server configured {@code resultIterationBatchSize}. If + * this value is not set, then the configuration for the {@link Cluster} is used unless the + * {@link RequestMessage} is configured completely by the user. + */ + public Builder batchSize(final int batchSize) { + this.batchSize = batchSize; + return this; + } + + /** + * The per client request override in milliseconds for the server configured {@code scriptEvaluationTimeout}. + * If this value is not set, then the configuration for the server is used. + */ + public Builder timeout(final long timeout) { + this.timeout = timeout; + return this; + } + + public RequestOptions create() { + return new RequestOptions(this); + } + + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ccb5bcc1/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java ---------------------------------------------------------------------- diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java index c7e7bb2..6343e4d 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java @@ -24,6 +24,7 @@ import org.apache.tinkerpop.gremlin.TestHelper; import org.apache.tinkerpop.gremlin.driver.Channelizer; import org.apache.tinkerpop.gremlin.driver.Client; import org.apache.tinkerpop.gremlin.driver.Cluster; +import org.apache.tinkerpop.gremlin.driver.RequestOptions; import org.apache.tinkerpop.gremlin.driver.Result; import org.apache.tinkerpop.gremlin.driver.ResultSet; import org.apache.tinkerpop.gremlin.driver.exception.ResponseException; @@ -179,12 +180,32 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration case "shouldProcessEvalInterruption": settings.scriptEvaluationTimeout = 1500; break; + case "shouldProcessEvalTimeoutOverride": + settings.scriptEvaluationTimeout = 15000; + break; } return settings; } @Test + public void shouldProcessEvalTimeoutOverride() throws Exception { + final Cluster cluster = TestClientFactory.open(); + final Client client = cluster.connect(); + final RequestOptions options = RequestOptions.build().timeout(500).create(); + + try { + client.submit("Thread.sleep(5000);'done'", options).all().get(); + fail("Should have timed out"); + } catch (Exception ex) { + final ResponseException re = (ResponseException) ex.getCause(); + assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, re.getResponseStatusCode()); + } + + cluster.close(); + } + + @Test public void shouldProcessTraversalInterruption() throws Exception { final Cluster cluster = TestClientFactory.open(); final Client client = cluster.connect();
