Repository: tinkerpop Updated Branches: refs/heads/TINKERPOP-1564 a2dbe7b0b -> 6b6df35d0
got LocalBarriers with SideEffects working in GraphActors. There are only 3 tests that fail -- having to do with nested group().groupCount()-style things. Almost done. Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/6b6df35d Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/6b6df35d Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/6b6df35d Branch: refs/heads/TINKERPOP-1564 Commit: 6b6df35d053aad1807ae9d72454c3cf3c4e96e9a Parents: a2dbe7b Author: Marko A. Rodriguez <okramma...@gmail.com> Authored: Thu Jan 19 14:58:59 2017 -0700 Committer: Marko A. Rodriguez <okramma...@gmail.com> Committed: Thu Jan 19 14:58:59 2017 -0700 ---------------------------------------------------------------------- .../akka/process/actors/AkkaActorsProvider.java | 7 +--- .../traversal/TraversalMasterProgram.java | 35 ++++++++++++-------- .../traversal/TraversalWorkerProgram.java | 8 ++++- .../traversal/WorkerTraversalSideEffects.java | 1 + .../traversal/message/BarrierAddMessage.java | 3 +- 5 files changed, 33 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6b6df35d/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java index a9b5820..7bc88fb 100644 --- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java +++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java @@ -29,15 +29,11 @@ import org.apache.tinkerpop.gremlin.process.traversal.TraversalInterruptionTest; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.step.ComplexTest; import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphTest; -import org.apache.tinkerpop.gremlin.process.traversal.step.map.OrderTest; import org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest; -import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.GroupTest; import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectTest; -import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SubgraphTest; import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest; import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.EventStrategyProcessTest; import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.PartitionStrategyProcessTest; -import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.SubgraphStrategyProcessTest; import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.VertexProperty; import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONResourceAccess; @@ -68,12 +64,11 @@ public class AkkaActorsProvider extends AbstractGraphProvider { private static Set<String> SKIP_TESTS = new HashSet<>(Arrays.asList( "g_V_hasLabelXpersonX_V_hasLabelXsoftwareX_name", - "g_VX1X_repeatXbothEXcreatedX_whereXwithoutXeXX_aggregateXeX_otherVX_emit_path", "g_withBulkXfalseX_withSackX1_sumX_V_out_barrier_sack", "g_V_both_groupCountXaX_out_capXaX_selectXkeysX_unfold_both_groupCountXaX_capXaX", "g_V_repeatXdedupX_timesX2X_count", "g_withSackXmap__map_cloneX_V_out_out_sackXmap_a_nameX_sack", - //"g_V_out_group_byXlabelX_selectXpersonX_unfold_outXcreatedX_name_limitX2X", + "g_V_out_group_byXlabelX_selectXpersonX_unfold_outXcreatedX_name_limitX2X", "g_V_group_byXlabelX_byXbothE_groupXaX_byXlabelX_byXweight_sumX_weight_sumX", GraphTest.Traversals.class.getCanonicalName(), ComplexTest.Traversals.class.getCanonicalName(), http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6b6df35d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java index a6bb94e..e28919b 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java @@ -46,7 +46,9 @@ import org.apache.tinkerpop.gremlin.structure.Element; import org.apache.tinkerpop.gremlin.structure.Partition; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; /** * @author Marko A. Rodriguez (http://markorodriguez.com) @@ -57,6 +59,7 @@ final class TraversalMasterProgram implements ActorProgram.Master<Object> { private final Traversal.Admin<?, ?> traversal; private final TraversalMatrix<?, ?> matrix; private Map<String, Barrier> barriers = new HashMap<>(); + private Set<String> sideEffects = new HashSet<>(); private final TraverserSet<?> results; private Address.Worker leaderWorker; private int orderCounter = -1; @@ -89,17 +92,29 @@ final class TraversalMasterProgram implements ActorProgram.Master<Object> { this.processTraverser((Traverser.Admin) message); } else if (message instanceof BarrierAddMessage) { final Barrier barrier = (Barrier) this.matrix.getStepById(((BarrierAddMessage) message).getStepId()); - final Step<?, ?> step = (Step) barrier; - barrier.addBarrier(TraversalActorProgram.attach(((BarrierAddMessage) message).getBarrier(), this.master.partitioner().getGraph())); - this.barriers.put(step.getId(), barrier); + if (!(barrier instanceof LocalBarrier)) + barrier.addBarrier(TraversalActorProgram.attach(((BarrierAddMessage) message).getBarrier(), this.master.partitioner().getGraph())); + if (barrier instanceof SideEffectCapable) + this.sideEffects.add(((SideEffectCapable) barrier).getSideEffectKey()); + this.barriers.put(((Step) barrier).getId(), barrier); } else if (message instanceof SideEffectAddMessage) { - this.traversal.getSideEffects().add(((SideEffectAddMessage) message).getKey(), ((SideEffectAddMessage) message).getValue()); + final SideEffectAddMessage sideEffectAddMessage = (SideEffectAddMessage) message; + this.traversal.getSideEffects().add(sideEffectAddMessage.getKey(), sideEffectAddMessage.getValue()); + this.sideEffects.add(sideEffectAddMessage.getKey()); } else if (message instanceof Terminate) { assert Terminate.YES == message; - if (!this.barriers.isEmpty()) { + if (!this.barriers.isEmpty() || !this.sideEffects.isEmpty()) { + // process all side-effect updates + for (final String key : this.sideEffects) { + this.broadcast(new SideEffectSetMessage(key, this.traversal.getSideEffects().get(key))); + } + // process all barriers for (final Barrier barrier : this.barriers.values()) { final Step<?, ?> step = (Step) barrier; - if (!(barrier instanceof LocalBarrier)) { + if (barrier instanceof LocalBarrier) { // the barriers are distributed amongst the workers + this.broadcast(new BarrierDoneMessage(barrier)); + barrier.done(); + } else { // the barrier is at the master this.orderBarrier(step); if (step instanceof OrderGlobalStep) this.orderCounter = 0; while (step.hasNext()) { @@ -107,15 +122,9 @@ final class TraversalMasterProgram implements ActorProgram.Master<Object> { step.next() : new OrderedTraverser<>(step.next(), this.orderCounter++)); } - } else { - if (step instanceof SideEffectCapable) { - final String key = ((SideEffectCapable) step).getSideEffectKey(); - this.broadcast(new SideEffectSetMessage(key, this.traversal.getSideEffects().get(key))); - } - this.broadcast(new BarrierDoneMessage(barrier)); - barrier.done(); } } + this.sideEffects.clear(); this.barriers.clear(); this.master.send(this.leaderWorker, Terminate.MAYBE); } else { http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6b6df35d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java index b4df2b6..05735a4 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java @@ -32,6 +32,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.Traverser; import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; import org.apache.tinkerpop.gremlin.process.traversal.step.Distributing; +import org.apache.tinkerpop.gremlin.process.traversal.step.LocalBarrier; import org.apache.tinkerpop.gremlin.process.traversal.step.Pushing; import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep; import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper; @@ -118,8 +119,13 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> { this.terminate = (Terminate) message; if (!this.barriers.isEmpty()) { for (final Barrier barrier : this.barriers.values()) { - while (barrier.hasNextBarrier()) { + if (barrier instanceof LocalBarrier) { + barrier.processAllStarts(); this.self.send(this.self.master(), new BarrierAddMessage(barrier)); + } else { + while (barrier.hasNextBarrier()) { + this.self.send(this.self.master(), new BarrierAddMessage(barrier)); + } } } this.barriers.clear(); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6b6df35d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/WorkerTraversalSideEffects.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/WorkerTraversalSideEffects.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/WorkerTraversalSideEffects.java index 7337316..6dcde9d 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/WorkerTraversalSideEffects.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/WorkerTraversalSideEffects.java @@ -43,6 +43,7 @@ public final class WorkerTraversalSideEffects implements TraversalSideEffects { } public WorkerTraversalSideEffects(final TraversalSideEffects sideEffects, final Actor.Worker worker) { + assert !(sideEffects instanceof WorkerTraversalSideEffects); this.sideEffects = sideEffects; this.worker = worker; } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6b6df35d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java index b17e83c..4da7aac 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java @@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.process.actors.traversal.message; import org.apache.tinkerpop.gremlin.process.traversal.Step; import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; +import org.apache.tinkerpop.gremlin.process.traversal.step.LocalBarrier; /** * @author Marko A. Rodriguez (http://markorodriguez.com) @@ -35,7 +36,7 @@ public final class BarrierAddMessage { } public BarrierAddMessage(final Barrier barrier) { - this.barrier = barrier.nextBarrier(); + this.barrier = barrier instanceof LocalBarrier ? null : barrier.nextBarrier(); this.stepId = ((Step) barrier).getId(); }