came up with a clever idea for maintaining order in a distributed traversal. OrdereredTraverser is a wrapper around a Traverser where all methods are delegated to the internal traverser. However, OrderedTraverser has an order() method which returns an int. This allows an ordered stream of traversers to be distributed across machines and then, on return, ordered accordingly. This is much better than the GraphComputer model we have. With OrderedTraverser, we have now exposed OrderTest, DedupTest, and TailTest to gremlin-akka. The final obstacle is nested group()s. There is something super fishy going on with cloning and I'm scared of just banging my head against the wall all morning so I will just let it simmer.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/7d12d21a Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/7d12d21a Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/7d12d21a Branch: refs/heads/TINKERPOP-1564 Commit: 7d12d21a4b1ccc07a2381833b4a21cd5b6c13378 Parents: 1d88355 Author: Marko A. Rodriguez <okramma...@gmail.com> Authored: Wed Dec 14 05:54:03 2016 -0700 Committer: Marko A. Rodriguez <okramma...@gmail.com> Committed: Thu Jan 19 13:01:41 2017 -0700 ---------------------------------------------------------------------- .../akka/process/actor/ActorMailbox.java | 25 +-- .../akka/process/AkkaActorsProvider.java | 7 +- .../actor/traversal/TraversalMasterProgram.java | 33 +++- .../traverser/util/OrderedTraverser.java | 178 +++++++++++++++++++ 4 files changed, 222 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/7d12d21a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/ActorMailbox.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/ActorMailbox.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/ActorMailbox.java index 28afb22..c8e5fde 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/ActorMailbox.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/ActorMailbox.java @@ -31,10 +31,8 @@ import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSe import scala.Option; import java.util.ArrayList; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.Queue; /** @@ -45,31 +43,34 @@ public final class ActorMailbox implements MailboxType, ProducesMessageQueue<Act private final List<Class> messagePriorities = new ArrayList<>(); public static class ActorMessageQueue implements MessageQueue, ActorSemantics { + private final List<Class> messagePriorities; private final List<Queue> messages; - private final Map<Class, Queue> priorities; private final Object MUTEX = new Object(); public ActorMessageQueue(final List<Class> messagePriorities) { - this.messages = new ArrayList<>(messagePriorities.size()); - this.priorities = new HashMap<>(messagePriorities.size()); - for (final Class clazz : messagePriorities) { + this.messagePriorities = messagePriorities; + this.messages = new ArrayList<>(this.messagePriorities.size()); + for (final Class clazz : this.messagePriorities) { final Queue queue; if (Traverser.class.isAssignableFrom(clazz)) queue = new TraverserSet<>(); else queue = new LinkedList<>(); this.messages.add(queue); - this.priorities.put(clazz, queue); } } public void enqueue(final ActorRef receiver, final Envelope handle) { synchronized (MUTEX) { - final Queue queue = this.priorities.get(handle.message() instanceof Traverser ? Traverser.class : handle.message().getClass()); - if (null == queue) - throw new IllegalArgumentException("The provided message type is not registered: " + handle.message().getClass()); - else - queue.offer(handle.message() instanceof Traverser ? handle.message() : handle); + final Object message = handle.message(); + for (int i = 0; i < this.messagePriorities.size(); i++) { + final Class clazz = this.messagePriorities.get(i); + if (clazz.isInstance(message)) { + this.messages.get(i).offer(message instanceof Traverser ? message : handle); + return; + } + } + throw new IllegalArgumentException("The provided message type is not registered: " + handle.message().getClass()); } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/7d12d21a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java index a0703bd..9c8b320 100644 --- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java +++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java @@ -63,20 +63,15 @@ import java.util.Set; */ public class AkkaActorsProvider extends AbstractGraphProvider { - protected static final boolean IMPORT_STATICS = new Random().nextBoolean(); - private static Set<String> SKIP_TESTS = new HashSet<>(Arrays.asList( "g_V_hasLabelXpersonX_V_hasLabelXsoftwareX_name", "g_VX1X_repeatXbothEXcreatedX_whereXwithoutXeXX_aggregateXeX_otherVX_emit_path", - "g_V_outXfollowedByX_group_byXsongTypeX_byXbothE_group_byXlabelX_byXweight_sumXX", "g_withBulkXfalseX_withSackX1_sumX_V_out_barrier_sack", "g_V_both_groupCountXaX_out_capXaX_selectXkeysX_unfold_both_groupCountXaX_capXaX", + "g_V_both_both_dedup_byXoutE_countX_name", GraphTest.Traversals.class.getCanonicalName(), - DedupTest.Traversals.class.getCanonicalName(), - OrderTest.Traversals.class.getCanonicalName(), GroupTest.Traversals.class.getCanonicalName(), ComplexTest.Traversals.class.getCanonicalName(), - TailTest.Traversals.class.getCanonicalName(), SubgraphTest.Traversals.class.getCanonicalName(), SideEffectTest.Traversals.class.getCanonicalName(), SubgraphStrategyProcessTest.class.getCanonicalName(), http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/7d12d21a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java index 1c44b51..723339d 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java @@ -34,6 +34,11 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traverser; import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing; import org.apache.tinkerpop.gremlin.process.traversal.step.LocalBarrier; +import org.apache.tinkerpop.gremlin.process.traversal.step.filter.RangeGlobalStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.filter.TailGlobalStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroupStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.OrderGlobalStep; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.OrderedTraverser; import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix; import org.apache.tinkerpop.gremlin.structure.Element; @@ -47,6 +52,7 @@ import java.util.Map; */ final class TraversalMasterProgram<M> implements ActorProgram.Master<M> { + private final Actor.Master master; private final Traversal.Admin<?, ?> traversal; private final TraversalMatrix<?, ?> matrix; @@ -54,6 +60,7 @@ final class TraversalMasterProgram<M> implements ActorProgram.Master<M> { private Map<String, Barrier> barriers = new HashMap<>(); private final TraverserSet<?> results; private Address.Worker leaderWorker; + private int orderCounter = -1; public TraversalMasterProgram(final Actor.Master master, final Traversal.Admin<?, ?> traversal, final Partitioner partitioner, final TraverserSet<?> results) { this.traversal = traversal; @@ -90,8 +97,12 @@ final class TraversalMasterProgram<M> implements ActorProgram.Master<M> { for (final Barrier barrier : this.barriers.values()) { final Step<?, ?> step = (Step) barrier; if (!(barrier instanceof LocalBarrier)) { + this.orderBarrier(step); + if (step instanceof OrderGlobalStep) this.orderCounter = 0; while (step.hasNext()) { - this.sendTraverser(step.next()); + this.sendTraverser(-1 == this.orderCounter ? + step.next() : + new OrderedTraverser<>(step.next(), this.orderCounter++)); } } else { this.traversal.getSideEffects().forEach((k, v) -> { @@ -107,6 +118,9 @@ final class TraversalMasterProgram<M> implements ActorProgram.Master<M> { while (this.traversal.hasNext()) { this.results.add((Traverser.Admin) this.traversal.nextTraverser()); } + if (this.orderCounter != -1) + this.results.sort((a, b) -> Integer.compare(((OrderedTraverser<?>) a).order(), ((OrderedTraverser<?>) b).order())); + this.master.close(); } } else { @@ -132,8 +146,12 @@ final class TraversalMasterProgram<M> implements ActorProgram.Master<M> { final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId()); GraphComputing.atMaster(step, true); step.addStart(traverser); - while (step.hasNext()) { - this.processTraverser(step.next()); + if (step instanceof Barrier) { + this.barriers.put(step.getId(), (Barrier) step); + } else { + while (step.hasNext()) { + this.processTraverser(step.next()); + } } } } @@ -146,4 +164,13 @@ final class TraversalMasterProgram<M> implements ActorProgram.Master<M> { else this.master.send(this.master.address(), traverser); } + + private void orderBarrier(final Step step) { + if (this.orderCounter != -1 && step instanceof Barrier && (step instanceof RangeGlobalStep || step instanceof TailGlobalStep)) { + final Barrier barrier = (Barrier) step; + final TraverserSet<?> rangingBarrier = (TraverserSet<?>) barrier.nextBarrier(); + rangingBarrier.sort((a, b) -> Integer.compare(((OrderedTraverser<?>) a).order(), ((OrderedTraverser<?>) b).order())); + barrier.addBarrier(rangingBarrier); + } + } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/7d12d21a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/util/OrderedTraverser.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/util/OrderedTraverser.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/util/OrderedTraverser.java new file mode 100644 index 0000000..3be67a2 --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/util/OrderedTraverser.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.traversal.traverser.util; + +import org.apache.tinkerpop.gremlin.process.traversal.Path; +import org.apache.tinkerpop.gremlin.process.traversal.Step; +import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects; +import org.apache.tinkerpop.gremlin.process.traversal.Traverser; +import org.apache.tinkerpop.gremlin.structure.util.Attachable; + +import java.util.Set; +import java.util.function.Function; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class OrderedTraverser<T> implements Traverser.Admin<T> { + + private Traverser.Admin<T> internal; + private final int order; + + public OrderedTraverser(final Traverser.Admin<T> internal, final int order) { + this.internal = internal instanceof OrderedTraverser ? ((OrderedTraverser) internal).internal : internal; + this.order = order; + } + + public int order() { + return this.order; + } + + @Override + public void merge(final Admin<?> other) { + this.internal.merge(other); + } + + @Override + public <R> Admin<R> split(R r, Step<T, R> step) { + return new OrderedTraverser<>(this.internal.split(r, step), this.order); + } + + @Override + public Admin<T> split() { + return new OrderedTraverser<>(this.internal.split(), this.order); + } + + @Override + public void addLabels(final Set<String> labels) { + this.internal.addLabels(labels); + } + + @Override + public void keepLabels(final Set<String> labels) { + this.internal.keepLabels(labels); + } + + @Override + public void dropLabels(final Set<String> labels) { + this.internal.dropLabels(labels); + } + + @Override + public void dropPath() { + this.internal.dropPath(); + } + + @Override + public void set(final T t) { + this.internal.set(t); + } + + @Override + public void incrLoops(final String stepLabel) { + this.internal.incrLoops(stepLabel); + } + + @Override + public void resetLoops() { + this.internal.resetLoops(); + } + + @Override + public String getStepId() { + return this.internal.getStepId(); + } + + @Override + public void setStepId(final String stepId) { + this.internal.setStepId(stepId); + } + + @Override + public void setBulk(final long count) { + this.internal.setBulk(count); + } + + @Override + public Admin<T> detach() { + return this.internal.detach(); + } + + @Override + public T attach(final Function<Attachable<T>, T> method) { + return this.internal.attach(method); + } + + @Override + public void setSideEffects(final TraversalSideEffects sideEffects) { + this.internal.setSideEffects(sideEffects); + } + + @Override + public TraversalSideEffects getSideEffects() { + return this.internal.getSideEffects(); + } + + @Override + public Set<String> getTags() { + return this.internal.getTags(); + } + + @Override + public T get() { + return this.internal.get(); + } + + @Override + public <S> S sack() { + return this.internal.sack(); + } + + @Override + public <S> void sack(final S object) { + this.internal.sack(object); + } + + @Override + public Path path() { + return this.internal.path(); + } + + @Override + public int loops() { + return this.internal.loops(); + } + + @Override + public long bulk() { + return this.internal.bulk(); + } + + @Override + public Traverser<T> clone() { + try { + final OrderedTraverser<T> clone = (OrderedTraverser<T>) super.clone(); + clone.internal = (Traverser.Admin<T>) this.internal.clone(); + return clone; + } catch (final CloneNotSupportedException e) { + throw new IllegalStateException(e.getMessage(), e); + } + } +}