TINKERPOP-1490 Restructured Traversal.promise() No longer uses an ExecutorService and is only applicable to "remote" traversals. Moved the commons-lang dependency back to gremlin-groovy for now.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/ee6a3589 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/ee6a3589 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/ee6a3589 Branch: refs/heads/TINKERPOP-1130 Commit: ee6a35893661b015dbb827463f175ddcecf1bcb8 Parents: eb08976 Author: Stephen Mallette <[email protected]> Authored: Fri Nov 11 12:51:40 2016 -0500 Committer: Stephen Mallette <[email protected]> Committed: Fri Dec 16 10:00:40 2016 -0500 ---------------------------------------------------------------------- gremlin-core/pom.xml | 5 - .../process/remote/RemoteConnection.java | 12 +- .../remote/traversal/RemoteTraversal.java | 2 +- .../remote/traversal/step/map/RemoteStep.java | 32 +++- .../gremlin/process/traversal/Traversal.java | 57 ++------ .../traversal/util/DefaultTraversal.java | 37 ----- .../process/traversal/TraversalTest.java | 145 ------------------- .../tinkerpop/gremlin/driver/Connection.java | 6 +- .../driver/remote/DriverRemoteConnection.java | 14 ++ .../driver/remote/DriverRemoteTraversal.java | 16 +- .../DriverRemoteTraversalSideEffects.java | 22 ++- .../DriverRemoteTraversalSideEffectsTest.java | 12 +- gremlin-groovy/pom.xml | 5 + .../server/GremlinServerIntegrateTest.java | 25 ++++ .../process/traversal/CoreTraversalTest.java | 42 ------ 15 files changed, 131 insertions(+), 301 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-core/pom.xml ---------------------------------------------------------------------- diff --git a/gremlin-core/pom.xml b/gremlin-core/pom.xml index 0594448..e8f3a34 100644 --- a/gremlin-core/pom.xml +++ b/gremlin-core/pom.xml @@ -61,11 +61,6 @@ limitations under the License. </exclusion> </exclusions> </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-lang3</artifactId> - <version>3.3.1</version> - </dependency> <!-- LOGGING --> <dependency> <groupId>org.slf4j</groupId> http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteConnection.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteConnection.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteConnection.java index 8506ad7..f4e3976 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteConnection.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/RemoteConnection.java @@ -24,6 +24,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.Traverser; import java.util.Iterator; +import java.util.concurrent.CompletableFuture; /** * A simple abstraction of a "connection" to a "server" that is capable of processing a {@link Traversal} and @@ -43,9 +44,16 @@ public interface RemoteConnection extends AutoCloseable { public <E> Iterator<Traverser.Admin<E>> submit(final Traversal<?, E> traversal) throws RemoteConnectionException; /** - * Submits {@link Traversal} {@link Bytecode} to a server and returns a {@link Traversal}. - * The {@link Traversal} is an abstraction over two types of results that can be returned as part of the + * Submits {@link Traversal} {@link Bytecode} to a server and returns a {@link RemoteTraversal}. + * The {@link RemoteTraversal} is an abstraction over two types of results that can be returned as part of the * response from the server: the results of the {@link Traversal} itself and the side-effects that it produced. */ public <E> RemoteTraversal<?,E> submit(final Bytecode bytecode) throws RemoteConnectionException; + + /** + * Submits {@link Traversal} {@link Bytecode} to a server and returns a promise of a {@link RemoteTraversal}. + * The {@link RemoteTraversal} is an abstraction over two types of results that can be returned as part of the + * response from the server: the results of the {@link Traversal} itself and the side-effects that it produced. + */ + public <E> CompletableFuture<RemoteTraversal<?, E>> submitAsync(final Bytecode bytecode) throws RemoteConnectionException; } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/RemoteTraversal.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/RemoteTraversal.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/RemoteTraversal.java index 9c893c2..57b0cda 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/RemoteTraversal.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/RemoteTraversal.java @@ -39,7 +39,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep; public interface RemoteTraversal<S,E> extends Traversal.Admin<S,E> { /** - * Returns remote side-effects generated by the traversal so that they can accessible to the client. Note that + * Returns remote side-effects generated by the traversal so that they can be accessible to the client. Note that * "side-effect" refers to the value in "a" in the traversal {@code g.V().aggregate('a').values('name')}. */ @Override http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/step/map/RemoteStep.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/step/map/RemoteStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/step/map/RemoteStep.java index 6b2be96..3e19097 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/step/map/RemoteStep.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/remote/traversal/step/map/RemoteStep.java @@ -21,12 +21,17 @@ package org.apache.tinkerpop.gremlin.process.remote.traversal.step.map; import org.apache.tinkerpop.gremlin.process.remote.RemoteConnection; import org.apache.tinkerpop.gremlin.process.remote.RemoteConnectionException; import org.apache.tinkerpop.gremlin.process.remote.traversal.RemoteTraversal; +import org.apache.tinkerpop.gremlin.process.traversal.Bytecode; import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.Traverser; import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep; +import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversal; import org.apache.tinkerpop.gremlin.structure.util.StringFactory; import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; /** * Sends a {@link Traversal} to a {@link RemoteConnection} and iterates back the results. @@ -38,6 +43,7 @@ public final class RemoteStep<S, E> extends AbstractStep<S, E> { private transient RemoteConnection remoteConnection; private RemoteTraversal<?, E> remoteTraversal; + private final AtomicReference<CompletableFuture<Traversal<?, E>>> traversalFuture = new AtomicReference<>(null); public RemoteStep(final Traversal.Admin traversal, final RemoteConnection remoteConnection) { super(traversal); @@ -51,14 +57,26 @@ public final class RemoteStep<S, E> extends AbstractStep<S, E> { @Override protected Traverser.Admin<E> processNextStart() throws NoSuchElementException { - if (null == this.remoteTraversal) { - try { - this.remoteTraversal = this.remoteConnection.submit(this.traversal.getBytecode()); - this.traversal.setSideEffects(this.remoteTraversal.getSideEffects()); - } catch (final RemoteConnectionException sce) { - throw new IllegalStateException(sce); + if (null == this.remoteTraversal) promise().join(); + return this.remoteTraversal.nextTraverser(); + } + + /** + * Submits the traversal asynchronously to a "remote" using {@link RemoteConnection#submitAsync(Bytecode)}. + */ + public CompletableFuture<Traversal<?, E>> promise() { + try { + if (null == traversalFuture.get()) { + traversalFuture.set(this.remoteConnection.submitAsync(this.traversal.getBytecode()).<Traversal<?, E>>thenApply(t -> { + this.remoteTraversal = (RemoteTraversal<?, E>) t; + this.traversal.setSideEffects(this.remoteTraversal.getSideEffects()); + return traversal; + })); } + + return traversalFuture.get(); + } catch (RemoteConnectionException rce) { + throw new IllegalStateException(rce); } - return this.remoteTraversal.nextTraverser(); } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/Traversal.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/Traversal.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/Traversal.java index e4ba5a6..04f5127 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/Traversal.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/Traversal.java @@ -18,8 +18,9 @@ */ package org.apache.tinkerpop.gremlin.process.traversal; -import org.apache.commons.lang3.concurrent.BasicThreadFactory; +import org.apache.commons.configuration.Configuration; import org.apache.tinkerpop.gremlin.process.computer.GraphComputer; +import org.apache.tinkerpop.gremlin.process.remote.traversal.step.map.RemoteStep; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal; import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent; import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.ProfileSideEffectStep; @@ -43,11 +44,7 @@ import java.util.Optional; import java.util.Set; import java.util.Spliterator; import java.util.Spliterators; -import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Stream; @@ -148,39 +145,21 @@ public interface Traversal<S, E> extends Iterator<E>, Serializable, Cloneable, A /** * Starts a promise to execute a function on the current {@code Traversal} that will be completed in the future. - * This implementation uses {@link Admin#traversalExecutorService} to execute the supplied - * {@code traversalFunction}. + * Note that this method can only be used if the {@code Traversal} is constructed using + * {@link TraversalSource#withRemote(Configuration)}. Calling this method otherwise will yield an + * {@code IllegalStateException}. */ public default <T> CompletableFuture<T> promise(final Function<Traversal, T> traversalFunction) { - return promise(traversalFunction, Admin.traversalExecutorService); - } - - /** - * Starts a promise to execute a function on the current {@code Traversal} that will be completed in the future. - * This implementation uses the caller supplied {@code ExecutorService} to execute the {@code traversalFunction}. - */ - public default <T> CompletableFuture<T> promise(final Function<Traversal, T> traversalFunction, final ExecutorService service) { - final CompletableFuture<T> promise = new CompletableFuture<>(); - final Future iterationFuture = service.submit(() -> { - try { - promise.complete(traversalFunction.apply(this)); - } catch (Exception ex) { - // the promise may have been cancelled by the caller, in which case, there is no need to attempt - // another write on completion - if (!promise.isDone()) promise.completeExceptionally(ex); - } - }); - - // if the user cancels the promise then attempt to kill the iteration. - promise.exceptionally(t -> { - if (t instanceof CancellationException) { - iterationFuture.cancel(true); - } - - return null; - }); - - return promise; + // apply strategies to see if RemoteStrategy has any effect (i.e. add RemoteStep) + if (!this.asAdmin().isLocked()) this.asAdmin().applyStrategies(); + + // use the end step so the results are bulked + final Step<?, E> endStep = this.asAdmin().getEndStep(); + if (endStep instanceof RemoteStep) { + return ((RemoteStep) endStep).promise().thenApply(traversalFunction); + } else { + throw new IllegalStateException("Only traversals created using withRemote() can be used in an async way"); + } } /** @@ -297,12 +276,6 @@ public interface Traversal<S, E> extends Iterator<E>, Serializable, Cloneable, A public interface Admin<S, E> extends Traversal<S, E> { /** - * Service that handles promises. - */ - static final ExecutorService traversalExecutorService = Executors.newCachedThreadPool( - new BasicThreadFactory.Builder().namingPattern("traversal-executor-%d").build()); - - /** * Get the {@link Bytecode} associated with the construction of this traversal. * * @return the byte code representation of the traversal http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversal.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversal.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversal.java index 6ce6dfe..3c21e37 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversal.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/DefaultTraversal.java @@ -43,9 +43,6 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.Optional; import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.function.Function; /** * @author Marko A. Rodriguez (http://markorodriguez.com) @@ -328,40 +325,6 @@ public class DefaultTraversal<S, E> implements Traversal.Admin<S, E> { this.graph = graph; } - /** - * Override of {@link Traversal#promise(Function)} that is aware of graph transactions. - */ - @Override - public <T2> CompletableFuture<T2> promise(final Function<Traversal, T2> traversalFunction) { - return this.promise(traversalFunction, Traversal.Admin.traversalExecutorService); - } - - /** - * Override of {@link Traversal#promise(Function)} that is aware of graph transactions. In a transactional graph - * a promise represents the full scope of a transaction, even if the graph is only partially iterated. - */ - @Override - public <T2> CompletableFuture<T2> promise(final Function<Traversal, T2> traversalFunction, final ExecutorService service) { - if (graph != null && graph.features().graph().supportsTransactions()) { - final Function<Traversal, T2> transactionAware = traversal -> { - - try { - if (graph.tx().isOpen()) graph.tx().rollback(); - final T2 obj = traversalFunction.apply(traversal); - if (graph.tx().isOpen()) graph.tx().commit(); - return obj; - } catch (Exception ex) { - if (graph.tx().isOpen()) graph.tx().rollback(); - throw ex; - } - }; - - return Traversal.Admin.super.promise(transactionAware, service); - } else { - return Traversal.Admin.super.promise(traversalFunction, service); - } - } - @Override public boolean equals(final Object other) { return other != null && other.getClass().equals(this.getClass()) && this.equals(((Traversal.Admin) other)); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalTest.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalTest.java b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalTest.java index aa1b99b..c427d8e 100644 --- a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalTest.java +++ b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalTest.java @@ -30,34 +30,22 @@ import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.NoSuchElementException; import java.util.Optional; -import java.util.Random; import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.hamcrest.core.Is.is; import static org.hamcrest.core.IsCollectionContaining.hasItems; -import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; /** * @author Stephen Mallette (http://stephen.genoprime.com) */ public class TraversalTest { - private final ExecutorService service = Executors.newFixedThreadPool(2); - @Test public void shouldTryNext() { final MockTraversal<Integer> t = new MockTraversal<>(1, 2, 3); @@ -117,139 +105,6 @@ public class TraversalTest { } @Test - public void shouldPromiseNextThreeUsingForkJoin() throws Exception { - final MockTraversal<Integer> t = new MockTraversal<>(1, 2, 3, 4, 5, 6, 7); - final CompletableFuture<List<Integer>> promiseFirst = t.promise(traversal -> traversal.next(3)); - final List<Integer> listFirst = promiseFirst.get(); - assertEquals(3, listFirst.size()); - assertThat(listFirst, hasItems(1 ,2, 3)); - assertThat(t.hasNext(), is(true)); - assertThat(promiseFirst.isDone(), is(true)); - - final CompletableFuture<List<Integer>> promiseSecond = t.promise(traversal -> traversal.next(3)); - final List<Integer> listSecond = promiseSecond.get(); - assertEquals(3, listSecond.size()); - assertThat(listSecond, hasItems(4, 5, 6)); - assertThat(t.hasNext(), is(true)); - assertThat(promiseSecond.isDone(), is(true)); - - final CompletableFuture<List<Integer>> promiseThird = t.promise(traversal -> traversal.next(3)); - final List<Integer> listThird = promiseThird.get(); - assertEquals(1, listThird.size()); - assertThat(listThird, hasItems(7)); - assertThat(t.hasNext(), is(false)); - assertThat(promiseThird.isDone(), is(true)); - - final CompletableFuture<Integer> promiseDead = t.promise(traversal -> (Integer) traversal.next()); - final AtomicBoolean dead = new AtomicBoolean(false); - promiseDead.exceptionally(tossed -> { - dead.set(tossed instanceof NoSuchElementException); - return null; - }); - - try { - promiseDead.get(10000, TimeUnit.MILLISECONDS); - fail("Should have gotten an exception"); - } catch (Exception ex) { - if (ex instanceof TimeoutException) { - fail("This should not have timed out but should have gotten an exception caught above in the exceptionally() clause"); - } - - assertThat(ex.getCause(), instanceOf(NoSuchElementException.class)); - } - - assertThat(dead.get(), is(true)); - assertThat(t.hasNext(), is(false)); - assertThat(promiseDead.isDone(), is(true)); - } - - @Test - public void shouldPromiseNextThreeUsingSpecificExecutor() throws Exception { - final MockTraversal<Integer> t = new MockTraversal<>(1, 2, 3, 4, 5, 6, 7); - final CompletableFuture<List<Integer>> promiseFirst = t.promise(traversal -> traversal.next(3), service); - final List<Integer> listFirst = promiseFirst.get(); - assertEquals(3, listFirst.size()); - assertThat(listFirst, hasItems(1 ,2, 3)); - assertThat(t.hasNext(), is(true)); - assertThat(promiseFirst.isDone(), is(true)); - - final CompletableFuture<List<Integer>> promiseSecond = t.promise(traversal -> traversal.next(3), service); - final List<Integer> listSecond = promiseSecond.get(); - assertEquals(3, listSecond.size()); - assertThat(listSecond, hasItems(4, 5, 6)); - assertThat(t.hasNext(), is(true)); - assertThat(promiseSecond.isDone(), is(true)); - - final CompletableFuture<List<Integer>> promiseThird = t.promise(traversal -> traversal.next(3), service); - final List<Integer> listThird = promiseThird.get(); - assertEquals(1, listThird.size()); - assertThat(listThird, hasItems(7)); - assertThat(t.hasNext(), is(false)); - assertThat(promiseThird.isDone(), is(true)); - - final CompletableFuture<Integer> promiseDead = t.promise(traversal -> (Integer) traversal.next(), service); - final AtomicBoolean dead = new AtomicBoolean(false); - promiseDead.exceptionally(tossed -> { - dead.set(tossed instanceof NoSuchElementException); - return null; - }); - - try { - promiseDead.get(10000, TimeUnit.MILLISECONDS); - fail("Should have gotten an exception"); - } catch (Exception ex) { - if (ex instanceof TimeoutException) { - fail("This should not have timed out but should have gotten an exception caught above in the exceptionally() clause"); - } - - assertThat(ex.getCause(), instanceOf(NoSuchElementException.class)); - } - - assertThat(dead.get(), is(true)); - assertThat(t.hasNext(), is(false)); - assertThat(promiseDead.isDone(), is(true)); - } - - @Test - public void shouldInterruptTraversalFunction() throws Exception { - final Random rand = new Random(1234567890); - - // infinite traversal - final MockTraversal<Integer> t = new MockTraversal<>(IntStream.generate(rand::nextInt).iterator()); - - // iterate a bunch of it - final CompletableFuture<List<Integer>> promise10 = t.promise(traversal -> traversal.next(10), service); - assertEquals(10, promise10.get(10000, TimeUnit.MILLISECONDS).size()); - final CompletableFuture<List<Integer>> promise100 = t.promise(traversal -> traversal.next(100), service); - assertEquals(100, promise100.get(10000, TimeUnit.MILLISECONDS).size()); - final CompletableFuture<List<Integer>> promise1000 = t.promise(traversal -> traversal.next(1000), service); - assertEquals(1000, promise1000.get(10000, TimeUnit.MILLISECONDS).size()); - - // this is endless, so let's cancel - final CompletableFuture<List<Integer>> promiseForevers = t.promise(traversal -> traversal.next(Integer.MAX_VALUE), service); - - // specify what to do on exception - final AtomicBoolean failed = new AtomicBoolean(false); - promiseForevers.exceptionally(ex -> { - failed.set(true); - return null; - }); - - try { - // let it actually iterate a moment - promiseForevers.get(500, TimeUnit.MILLISECONDS); - fail("This should have timed out because the traversal has infinite items in it"); - } catch (TimeoutException tex) { - - } - - assertThat(promiseForevers.isDone(), is(false)); - promiseForevers.cancel(true); - assertThat(failed.get(), is(true)); - assertThat(promiseForevers.isDone(), is(true)); - } - - @Test public void shouldIterate() { final MockTraversal<Integer> t = new MockTraversal<>(1, 2, 3, 4, 5, 6, 7); assertThat(t.hasNext(), is(true)); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java ---------------------------------------------------------------------- diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java index 972e838..9a2180e 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java @@ -208,7 +208,7 @@ final class Connection { logger.debug(String.format("Write on connection %s failed", thisConnection.getConnectionInfo()), f.cause()); thisConnection.isDead = true; thisConnection.returnToPool(); - future.completeExceptionally(f.cause()); + cluster.executor().submit(() -> future.completeExceptionally(f.cause())); } else { final LinkedBlockingQueue<Result> resultLinkedBlockingQueue = new LinkedBlockingQueue<>(); final CompletableFuture<Void> readCompleted = new CompletableFuture<>(); @@ -250,8 +250,8 @@ final class Connection { final ResultQueue handler = new ResultQueue(resultLinkedBlockingQueue, readCompleted); pending.put(requestMessage.getRequestId(), handler); - future.complete(new ResultSet(handler, cluster.executor(), readCompleted, - requestMessage, pool.host)); + cluster.executor().submit(() -> future.complete( + new ResultSet(handler, cluster.executor(), readCompleted, requestMessage, pool.host))); } }); channel.writeAndFlush(requestMessage, requestPromise); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java ---------------------------------------------------------------------- diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java index bb2d33d..be3fa28 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java @@ -37,6 +37,7 @@ import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; import java.util.Iterator; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; /** @@ -163,6 +164,10 @@ public class DriverRemoteConnection implements RemoteConnection { } } + /** + * @deprecated As of release 3.2.2, replaced by {@link #submit(Bytecode)}. + */ + @Deprecated @Override public <E> Iterator<Traverser.Admin<E>> submit(final Traversal<?, E> t) throws RemoteConnectionException { try { @@ -189,6 +194,15 @@ public class DriverRemoteConnection implements RemoteConnection { } @Override + public <E> CompletableFuture<RemoteTraversal<?, E>> submitAsync(final Bytecode bytecode) throws RemoteConnectionException { + try { + return client.submitAsync(bytecode).thenApply(rs -> new DriverRemoteTraversal<>(rs, client, attachElements, conf)); + } catch (Exception ex) { + throw new RemoteConnectionException(ex); + } + } + + @Override public void close() throws Exception { try { client.close(); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversal.java ---------------------------------------------------------------------- diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversal.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversal.java index 88ee794..d3f290c 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversal.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversal.java @@ -66,28 +66,18 @@ public class DriverRemoteTraversal<S, E> extends AbstractRemoteTraversal<S, E> { } this.rs = rs; - this.sideEffects = new DriverRemoteTraversalSideEffects( - client, + this.sideEffects = new DriverRemoteTraversalSideEffects(client, rs.getOriginalRequestMessage().getRequestId(), - rs.getHost()); + rs.getHost(), rs.allItemsAvailableAsync()); } /** * Gets a side-effect from the server. Do not call this method prior to completing the iteration of the - * {@link DriverRemoteTraversal} that spawned this as the side-effect will not be ready. If this method is called - * prior to iteration being complete, then it will block until the traversal notifies it of completion. Generally + * {@link DriverRemoteTraversal} that spawned this as the side-effect will not be ready. Generally * speaking, the common user would not get side-effects this way - they would use a call to {@code cap()}. */ @Override public RemoteTraversalSideEffects getSideEffects() { - // wait for the read to complete (i.e. iteration on the server) before allowing the caller to get the - // side-effect. calling prior to this will result in the side-effect not being found. of course, the - // bad part here is that the method blocks indefinitely waiting for the result, but it prevents the - // test failure problems that happen on slower systems. in practice, it's unlikely that a user would - // try to get a side-effect prior to iteration, but since the API allows it, this at least prevents - // the error. - rs.allItemsAvailableAsync().join(); - return this.sideEffects; } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffects.java ---------------------------------------------------------------------- diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffects.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffects.java index 8d6fa98..4305567 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffects.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffects.java @@ -33,6 +33,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; /** * Java driver implementation of {@link TraversalSideEffects}. This class is not thread safe. @@ -50,15 +51,26 @@ public class DriverRemoteTraversalSideEffects extends AbstractRemoteTraversalSid private boolean closed = false; private boolean retrievedAllKeys = false; + private final CompletableFuture<Void> ready; - public DriverRemoteTraversalSideEffects(final Client client, final UUID serverSideEffect, final Host host) { + public DriverRemoteTraversalSideEffects(final Client client, final UUID serverSideEffect, final Host host, + final CompletableFuture<Void> ready) { this.client = client; this.serverSideEffect = serverSideEffect; this.host = host; + this.ready = ready; } @Override public <V> V get(final String key) throws IllegalArgumentException { + // wait for the read to complete (i.e. iteration on the server) before allowing the caller to get the + // side-effect. calling prior to this will result in the side-effect not being found. of course, the + // bad part here is that the method blocks indefinitely waiting for the result, but it prevents the + // test failure problems that happen on slower systems. in practice, it's unlikely that a user would + // try to get a side-effect prior to iteration, but since the API allows it, this at least prevents + // the error. + ready.join(); + if (!keys().contains(key)) throw TraversalSideEffects.Exceptions.sideEffectKeyDoesNotExist(key); if (!sideEffects.containsKey(key)) { @@ -91,6 +103,14 @@ public class DriverRemoteTraversalSideEffects extends AbstractRemoteTraversalSid @Override public Set<String> keys() { + // wait for the read to complete (i.e. iteration on the server) before allowing the caller to get the + // side-effect. calling prior to this will result in the side-effect not being found. of course, the + // bad part here is that the method blocks indefinitely waiting for the result, but it prevents the + // test failure problems that happen on slower systems. in practice, it's unlikely that a user would + // try to get a side-effect prior to iteration, but since the API allows it, this at least prevents + // the error. + ready.join(); + if (closed && !retrievedAllKeys) throw new IllegalStateException("Traversal has been closed - side-effect keys cannot be retrieved"); if (!retrievedAllKeys) { http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffectsTest.java ---------------------------------------------------------------------- diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffectsTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffectsTest.java index 27d0079..4e6df93 100644 --- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffectsTest.java +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteTraversalSideEffectsTest.java @@ -52,7 +52,9 @@ public class DriverRemoteTraversalSideEffectsTest extends AbstractResultQueueTes mockClientForCall(client); final UUID sideEffectKey = UUID.fromString("31dec2c6-b214-4a6f-a68b-996608dce0d9"); - final TraversalSideEffects sideEffects = new DriverRemoteTraversalSideEffects(client, sideEffectKey, null); + final CompletableFuture<Void> ready = new CompletableFuture<>(); + ready.complete(null); + final TraversalSideEffects sideEffects = new DriverRemoteTraversalSideEffects(client, sideEffectKey, null, ready); assertEquals(1, sideEffects.keys().size()); sideEffects.close(); @@ -73,7 +75,9 @@ public class DriverRemoteTraversalSideEffectsTest extends AbstractResultQueueTes mockClientForCall(client); mockClientForCall(client); final UUID sideEffectKey = UUID.fromString("31dec2c6-b214-4a6f-a68b-996608dce0d9"); - final TraversalSideEffects sideEffects = new DriverRemoteTraversalSideEffects(client, sideEffectKey, null); + final CompletableFuture<Void> ready = new CompletableFuture<>(); + ready.complete(null); + final TraversalSideEffects sideEffects = new DriverRemoteTraversalSideEffects(client, sideEffectKey, null, ready); assertNotNull(sideEffects.get("test-0")); sideEffects.close(); @@ -93,7 +97,9 @@ public class DriverRemoteTraversalSideEffectsTest extends AbstractResultQueueTes mockClientForCall(client); final UUID sideEffectKey = UUID.fromString("31dec2c6-b214-4a6f-a68b-996608dce0d9"); - final TraversalSideEffects sideEffects = new DriverRemoteTraversalSideEffects(client, sideEffectKey, null); + final CompletableFuture<Void> ready = new CompletableFuture<>(); + ready.complete(null); + final TraversalSideEffects sideEffects = new DriverRemoteTraversalSideEffects(client, sideEffectKey, null, ready); sideEffects.close(); sideEffects.close(); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-groovy/pom.xml ---------------------------------------------------------------------- diff --git a/gremlin-groovy/pom.xml b/gremlin-groovy/pom.xml index dae5e8a..b82c986 100644 --- a/gremlin-groovy/pom.xml +++ b/gremlin-groovy/pom.xml @@ -65,6 +65,11 @@ limitations under the License. <artifactId>jBCrypt</artifactId> <version>jbcrypt-0.4</version> </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>3.3.1</version> + </dependency> <!-- TEST --> <dependency> <groupId>org.slf4j</groupId> http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java ---------------------------------------------------------------------- diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java index b3dbe29..420bd05 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerIntegrateTest.java @@ -46,12 +46,14 @@ import org.apache.tinkerpop.gremlin.groovy.jsr223.customizer.InterpreterModeCust import org.apache.tinkerpop.gremlin.groovy.jsr223.customizer.SimpleSandboxExtension; import org.apache.tinkerpop.gremlin.groovy.jsr223.customizer.TimedInterruptCustomizerProvider; import org.apache.tinkerpop.gremlin.process.remote.RemoteGraph; +import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet; import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.T; import org.apache.tinkerpop.gremlin.server.channel.NioChannelizer; +import org.apache.tinkerpop.gremlin.structure.Vertex; import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex; import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph; import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender; @@ -76,6 +78,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -990,4 +993,26 @@ public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegration final BulkSet localBSideEffects = se.get("b"); assertThat(localBSideEffects.isEmpty(), is(false)); } + + @Test + public void shouldDoNonBlockingPromiseWithRemote() throws Exception { + final Graph graph = EmptyGraph.instance(); + final GraphTraversalSource g = graph.traversal().withRemote(conf); + g.addV("person").property("age", 20).promise(Traversal::iterate).join(); + g.addV("person").property("age", 10).promise(Traversal::iterate).join(); + assertEquals(50L, g.V().hasLabel("person").map(Lambda.function("it.get().value('age') + 10")).sum().promise(t -> t.next()).join()); + g.addV("person").property("age", 20).promise(Traversal::iterate).join(); + + final Traversal traversal = g.V().hasLabel("person").has("age", 20).values("age"); + assertEquals(20, traversal.promise(t -> ((Traversal) t).next(1).get(0)).join()); + assertEquals(20, traversal.next()); + assertThat(traversal.hasNext(), is(false)); + + final Traversal traversalCloned = g.V().hasLabel("person").has("age", 20).values("age"); + assertEquals(20, traversalCloned.next()); + assertEquals(20, traversalCloned.promise(t -> ((Traversal) t).next(1).get(0)).join()); + assertThat(traversalCloned.promise(t -> ((Traversal) t).hasNext()).join(), is(false)); + + assertEquals(3, g.V().promise(Traversal::toList).join().size()); + } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ee6a3589/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/CoreTraversalTest.java ---------------------------------------------------------------------- diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/CoreTraversalTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/CoreTraversalTest.java index 050f9de..68f8217 100644 --- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/CoreTraversalTest.java +++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/CoreTraversalTest.java @@ -20,7 +20,6 @@ package org.apache.tinkerpop.gremlin.process.traversal; import org.apache.tinkerpop.gremlin.ExceptionCoverage; import org.apache.tinkerpop.gremlin.FeatureRequirement; -import org.apache.tinkerpop.gremlin.FeatureRequirementSet; import org.apache.tinkerpop.gremlin.LoadGraphWith; import org.apache.tinkerpop.gremlin.process.AbstractGremlinProcessTest; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal; @@ -41,9 +40,6 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Random; import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.MODERN; @@ -311,42 +307,4 @@ public class CoreTraversalTest extends AbstractGremlinProcessTest { } } - - @Test - @FeatureRequirementSet(FeatureRequirementSet.Package.SIMPLE) - public void shouldUsePromiseAndControlTransactionsIfAvailable() throws Exception { - // this test will validate that transactional graphs can properly open/close transactions within a promise. - // as there is a feature check, non-transactional graphs can use this to simply exercise the promise API - final Vertex vAdded = g.addV("person").property("name", "stephen").promise(t -> (Vertex) t.next()).get(10000, TimeUnit.MILLISECONDS); - final Vertex vRead = g.V().has("name", "stephen").next(); - assertEquals(vAdded.id(), vRead.id()); - - // transaction should have been committed at this point so test the count in this thread to validate persistence - assertVertexEdgeCounts(graph, 1, 0); - - // cancel a promise and ensure the transaction ended in failure. hold the traversal in park until it can be - // interrupted, then the promise will have to rollback the transaction. - final CompletableFuture promiseToCancel = g.addV("person").property("name", "marko").sideEffect(traverser -> { - try { - Thread.sleep(100000); - } catch (Exception ignored) { - - } - }).promise(t -> (Vertex) t.next()); - - try { - promiseToCancel.get(500, TimeUnit.MILLISECONDS); - fail("Should have timed out"); - } catch (TimeoutException te) { - - } - - promiseToCancel.cancel(true); - - // graphs that support transactions will rollback the transaction - if (graph.features().graph().supportsTransactions()) - assertVertexEdgeCounts(graph, 1, 0); - else - assertVertexEdgeCounts(graph, 2, 0); - } }
