we now have a full generalized process/actor interface system where akka-gremlin/ implements those interfaces. Next, we have ActorProgram with TraversalActorProgram exectuing a traversal. This is identical in form to GraphComputer where people can submit arbitrary ActorPrograms. This is really clean and consistent with our other work.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/1d728207 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/1d728207 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/1d728207 Branch: refs/heads/TINKERPOP-1564 Commit: 1d72820748c0a51354e62f858e39c316c3901970 Parents: aea7fc4 Author: Marko A. Rodriguez <[email protected]> Authored: Tue Dec 13 09:52:41 2016 -0700 Committer: Marko A. Rodriguez <[email protected]> Committed: Wed Jan 4 05:07:59 2017 -0700 ---------------------------------------------------------------------- .../gremlin/akka/process/actor/AkkaActors.java | 22 ++- .../process/actor/MasterTraversalActor.java | 162 +++------------ .../akka/process/actor/TraverserMailbox.java | 5 +- .../process/actor/WorkerTraversalActor.java | 180 ++++------------- .../actor/WorkerTraversalSideEffects.java | 147 -------------- .../actor/message/BarrierAddMessage.java | 47 ----- .../actor/message/BarrierDoneMessage.java | 41 ---- .../actor/message/SideEffectAddMessage.java | 43 ---- .../actor/message/SideEffectSetMessage.java | 42 ---- .../process/actor/message/StartMessage.java | 35 ---- .../actor/message/VoteToHaltMessage.java | 36 ---- .../gremlin/akka/process/AkkaPlayTest.java | 2 +- .../tinkerpop/gremlin/process/actor/Actor.java | 30 ++- .../gremlin/process/actor/ActorProgram.java | 50 +++++ .../tinkerpop/gremlin/process/actor/Actors.java | 2 + .../gremlin/process/actor/Address.java | 62 ++++++ .../gremlin/process/actor/MasterActor.java | 35 ---- .../gremlin/process/actor/WorkerActor.java | 35 ---- .../actor/traversal/TraversalActorProgram.java | 66 +++++++ .../actor/traversal/TraversalMasterProgram.java | 154 +++++++++++++++ .../actor/traversal/TraversalWorkerProgram.java | 196 +++++++++++++++++++ .../traversal/WorkerTraversalSideEffects.java | 147 ++++++++++++++ .../traversal/message/BarrierAddMessage.java | 47 +++++ .../traversal/message/BarrierDoneMessage.java | 41 ++++ .../traversal/message/SideEffectAddMessage.java | 43 ++++ .../traversal/message/SideEffectSetMessage.java | 42 ++++ .../actor/traversal/message/StartMessage.java | 35 ++++ .../traversal/message/VoteToHaltMessage.java | 36 ++++ .../actor/traversal/step/map/ActorStep.java | 4 +- 29 files changed, 1035 insertions(+), 752 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d728207/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java index 8ef96bb..db024f6 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java @@ -21,8 +21,9 @@ package org.apache.tinkerpop.gremlin.akka.process.actor; import akka.actor.ActorSystem; import akka.actor.Props; +import org.apache.tinkerpop.gremlin.process.actor.ActorProgram; import org.apache.tinkerpop.gremlin.process.actor.Actors; -import org.apache.tinkerpop.gremlin.process.traversal.Traversal; +import org.apache.tinkerpop.gremlin.process.actor.Address; import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; import org.apache.tinkerpop.gremlin.structure.Partitioner; @@ -34,12 +35,19 @@ import java.util.concurrent.Future; */ public final class AkkaActors<S, E> implements Actors<S, E> { - public final ActorSystem system; - private TraverserSet<E> results = new TraverserSet<>(); + private final ActorProgram actorProgram; + private final ActorSystem system; + private final Address.Master master; - public AkkaActors(final Traversal.Admin<S, E> traversal, final Partitioner partitioner) { - this.system = ActorSystem.create("traversal-" + traversal.hashCode()); - this.system.actorOf(Props.create(MasterTraversalActor.class, traversal.clone(), partitioner, this.results), "master"); + public AkkaActors(final ActorProgram actorProgram, final Partitioner partitioner) { + this.actorProgram = actorProgram; + this.system = ActorSystem.create("traversal-" + actorProgram.hashCode()); + this.master = new Address.Master(this.system.actorOf(Props.create(MasterTraversalActor.class, this.actorProgram, partitioner), "master").path().toString()); + } + + @Override + public Address.Master master() { + return this.master; } @Override @@ -48,7 +56,7 @@ public final class AkkaActors<S, E> implements Actors<S, E> { while (!this.system.isTerminated()) { } - return this.results; + return (TraverserSet) this.actorProgram.getResult(); }); } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d728207/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java index 5009554..6799a28 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java @@ -20,35 +20,17 @@ package org.apache.tinkerpop.gremlin.akka.process.actor; import akka.actor.AbstractActor; -import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Props; import akka.dispatch.RequiresMessageQueue; import akka.japi.pf.ReceiveBuilder; -import org.apache.tinkerpop.gremlin.akka.process.actor.message.BarrierAddMessage; -import org.apache.tinkerpop.gremlin.akka.process.actor.message.BarrierDoneMessage; -import org.apache.tinkerpop.gremlin.akka.process.actor.message.SideEffectAddMessage; -import org.apache.tinkerpop.gremlin.akka.process.actor.message.SideEffectSetMessage; -import org.apache.tinkerpop.gremlin.akka.process.actor.message.StartMessage; -import org.apache.tinkerpop.gremlin.akka.process.actor.message.VoteToHaltMessage; -import org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.verification.ActorVerificationStrategy; -import org.apache.tinkerpop.gremlin.process.actor.MasterActor; -import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep; -import org.apache.tinkerpop.gremlin.process.traversal.Step; -import org.apache.tinkerpop.gremlin.process.traversal.Traversal; -import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies; -import org.apache.tinkerpop.gremlin.process.traversal.Traverser; -import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; -import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing; -import org.apache.tinkerpop.gremlin.process.traversal.step.LocalBarrier; -import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ComputerVerificationStrategy; -import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.StandardVerificationStrategy; -import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; -import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix; -import org.apache.tinkerpop.gremlin.structure.Element; +import org.apache.tinkerpop.gremlin.process.actor.Actor; +import org.apache.tinkerpop.gremlin.process.actor.ActorProgram; +import org.apache.tinkerpop.gremlin.process.actor.Address; import org.apache.tinkerpop.gremlin.structure.Partition; import org.apache.tinkerpop.gremlin.structure.Partitioner; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -56,131 +38,47 @@ import java.util.Map; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public final class MasterTraversalActor extends AbstractActor implements RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics>, MasterActor { +public final class MasterTraversalActor extends AbstractActor implements RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics>, Actor.Master { - private final Traversal.Admin<?, ?> traversal; - private final TraversalMatrix<?, ?> matrix; - private final Partitioner partitioner; - private final Map<String, ActorSelection> workers = new HashMap<>(); - private Map<String, Barrier> barriers = new HashMap<>(); - private final TraverserSet<?> results; - private final String leaderWorker; + private final Address.Master master; + private final List<Address.Worker> workers; + private final Map<Address, ActorSelection> actors = new HashMap<>(); - public MasterTraversalActor(final Traversal.Admin<?, ?> traversal, final Partitioner partitioner, final TraverserSet<?> results) { - System.out.println("master[created]: " + self().path()); - final TraversalStrategies strategies = traversal.getStrategies().clone(); - strategies.removeStrategies(ComputerVerificationStrategy.class, StandardVerificationStrategy.class); - strategies.addStrategies(ActorVerificationStrategy.instance()); - traversal.setStrategies(strategies); - traversal.applyStrategies(); - - this.traversal = ((TraversalVertexProgramStep) traversal.getStartStep()).computerTraversal.get(); - System.out.println(this.traversal); - this.matrix = new TraversalMatrix<>(this.traversal); - this.partitioner = partitioner; - this.results = results; - this.initializeWorkers(); - this.leaderWorker = "worker-" + this.partitioner.getPartitions().get(0).hashCode(); - - receive(ReceiveBuilder. - match(Traverser.Admin.class, this::processTraverser). - match(BarrierAddMessage.class, barrierMerge -> this.processBarrierAdd((Barrier) this.matrix.getStepById(barrierMerge.getStepId()), barrierMerge.getBarrier())). - match(SideEffectAddMessage.class, sideEffect -> this.processSideEffectAdd(((SideEffectAddMessage) sideEffect).getKey(), ((SideEffectAddMessage) sideEffect).getValue())). - match(VoteToHaltMessage.class, voteToHalt -> this.processVoteToHalt()). - build()); - } - - private void initializeWorkers() { - final List<Partition> partitions = this.partitioner.getPartitions(); + public MasterTraversalActor(final ActorProgram program, final Partitioner partitioner) { + this.master = new Address.Master(self().path().toString()); + this.workers = new ArrayList<>(); + final List<Partition> partitions = partitioner.getPartitions(); for (final Partition partition : partitions) { - final String workerPathString = "worker-" + partition.hashCode(); - final ActorRef worker = context().actorOf(Props.create(WorkerTraversalActor.class, this.traversal.clone(), partition, this.partitioner), workerPathString); - this.workers.put(workerPathString, context().actorSelection(worker.path())); - } - for (final ActorSelection worker : this.workers.values()) { - worker.tell(StartMessage.instance(), self()); + this.workers.add(new Address.Worker("worker-" + partition.hashCode())); + context().actorOf(Props.create(WorkerTraversalActor.class, program, partitioner, partition), "worker-" + partition.hashCode()); } - this.workers.clear(); + final ActorProgram.Master masterProgram = program.createMasterProgram(this); + receive(ReceiveBuilder.matchAny(masterProgram::execute).build()); + masterProgram.setup(); } @Override - public void processBarrierAdd(final Barrier barrier, final Object barrierAddition) { - final Step<?, ?> step = (Step) barrier; - GraphComputing.atMaster(step, true); - barrier.addBarrier(barrierAddition); - this.barriers.put(step.getId(), barrier); + public <M> void send(final Address toActor, final M message) { + ActorSelection actor = this.actors.get(toActor); + if (null == actor) { + actor = context().actorSelection(toActor.location()); + this.actors.put(toActor, actor); + } + actor.tell(message, self()); } @Override - public void processSideEffectAdd(final String key, final Object value) { - this.traversal.getSideEffects().add(key, value); + public List<Address.Worker> workers() { + return this.workers; } @Override - public void processVoteToHalt() { - assert !sender().equals(self()); - if (!this.barriers.isEmpty()) { - for (final Barrier barrier : this.barriers.values()) { - final Step<?, ?> step = (Step) barrier; - if (!(barrier instanceof LocalBarrier)) { - while (step.hasNext()) { - this.sendTraverser(step.next()); - } - } else { - this.traversal.getSideEffects().forEach((k, v) -> { - this.broadcast(new SideEffectSetMessage(k, v)); - }); - this.broadcast(new BarrierDoneMessage(barrier)); - barrier.done(); - } - } - this.barriers.clear(); - worker(this.leaderWorker).tell(StartMessage.instance(), self()); - } else { - while (this.traversal.hasNext()) { - this.results.add((Traverser.Admin) this.traversal.nextTraverser()); - } - context().system().terminate(); - } + public Address.Master address() { + return this.master; } @Override - public void processTraverser(final Traverser.Admin traverser) { - if (traverser.isHalted() || traverser.get() instanceof Element) { - this.sendTraverser(traverser); - } else { - final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId()); - GraphComputing.atMaster(step, true); - step.addStart(traverser); - while (step.hasNext()) { - this.processTraverser(step.next()); - } - } - } - - //////////////// - - private void broadcast(final Object message) { - for (final Partition partition : this.partitioner.getPartitions()) { - worker("worker-" + partition.hashCode()).tell(message, self()); - } - } - - private void sendTraverser(final Traverser.Admin traverser) { - if (traverser.isHalted()) - this.results.add(traverser); - else if (traverser.get() instanceof Element) - worker("worker-" + this.partitioner.getPartition((Element) traverser.get()).hashCode()).tell(traverser, self()); - else - self().tell(traverser, self()); - } - - private ActorSelection worker(final String workerPath) { - ActorSelection worker = this.workers.get(workerPath); - if (null == worker) { - worker = context().actorSelection(workerPath); - this.workers.put(workerPath, worker); - } - return worker; + public void close() { + context().system().terminate(); } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d728207/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java index 6a6c0f4..671f3a2 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/TraverserMailbox.java @@ -26,7 +26,8 @@ import akka.dispatch.MailboxType; import akka.dispatch.MessageQueue; import akka.dispatch.ProducesMessageQueue; import com.typesafe.config.Config; -import org.apache.tinkerpop.gremlin.akka.process.actor.message.VoteToHaltMessage; +import org.apache.tinkerpop.gremlin.process.actor.traversal.TraversalWorkerProgram; +import org.apache.tinkerpop.gremlin.process.actor.traversal.message.VoteToHaltMessage; import org.apache.tinkerpop.gremlin.process.traversal.Traverser; import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; import scala.Option; @@ -58,7 +59,7 @@ public final class TraverserMailbox implements MailboxType, ProducesMessageQueue else if (handle.message() instanceof VoteToHaltMessage) { assert null == this.haltMessage; this.haltMessage = handle; - } else if (handle.message() instanceof WorkerTraversalActor.Terminate) { + } else if (handle.message() instanceof TraversalWorkerProgram.Terminate) { assert null == this.terminateToken; this.terminateToken = handle; } else http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d728207/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java index 862b6b3..5a6bae7 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java @@ -20,184 +20,72 @@ package org.apache.tinkerpop.gremlin.akka.process.actor; import akka.actor.AbstractActor; -import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.dispatch.RequiresMessageQueue; import akka.japi.pf.ReceiveBuilder; -import org.apache.tinkerpop.gremlin.akka.process.actor.message.BarrierAddMessage; -import org.apache.tinkerpop.gremlin.akka.process.actor.message.BarrierDoneMessage; -import org.apache.tinkerpop.gremlin.akka.process.actor.message.SideEffectSetMessage; -import org.apache.tinkerpop.gremlin.akka.process.actor.message.StartMessage; -import org.apache.tinkerpop.gremlin.akka.process.actor.message.VoteToHaltMessage; -import org.apache.tinkerpop.gremlin.process.actor.WorkerActor; -import org.apache.tinkerpop.gremlin.process.traversal.Step; -import org.apache.tinkerpop.gremlin.process.traversal.Traversal; -import org.apache.tinkerpop.gremlin.process.traversal.Traverser; -import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; -import org.apache.tinkerpop.gremlin.process.traversal.step.Bypassing; -import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing; -import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep; -import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper; -import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix; -import org.apache.tinkerpop.gremlin.structure.Edge; -import org.apache.tinkerpop.gremlin.structure.Element; +import org.apache.tinkerpop.gremlin.process.actor.Actor; +import org.apache.tinkerpop.gremlin.process.actor.ActorProgram; +import org.apache.tinkerpop.gremlin.process.actor.Address; import org.apache.tinkerpop.gremlin.structure.Partition; import org.apache.tinkerpop.gremlin.structure.Partitioner; -import org.apache.tinkerpop.gremlin.structure.Vertex; -import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public final class WorkerTraversalActor extends AbstractActor implements RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics>, WorkerActor { +public final class WorkerTraversalActor extends AbstractActor implements RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics>, Actor.Worker { - // terminate token is passed around worker ring to gather termination consensus (dual-ring termination algorithm) - public enum Terminate { - MAYBE, YES, NO - } - - private final TraversalMatrix<?, ?> matrix; private final Partition localPartition; - private final Partitioner partitioner; - // - private final Map<String, ActorSelection> workers = new HashMap<>(); - private final String neighborWorker; - private boolean isLeader; - private Terminate terminate = null; - private boolean voteToHalt = false; - private Map<String, Barrier> barriers = new HashMap<>(); + private final Address.Worker self; + private final Address.Master master; + private final List<Address.Worker> workers; + private final Map<Address, ActorSelection> actors = new HashMap<>(); - public WorkerTraversalActor(final Traversal.Admin<?, ?> traversal, final Partition localPartition, final Partitioner partitioner) { - System.out.println("worker[created]: " + self().path()); - // set up partition and traversal information + public WorkerTraversalActor(final ActorProgram program, final Partitioner partitioner, final Partition localPartition) { this.localPartition = localPartition; - this.partitioner = partitioner; - final WorkerTraversalSideEffects sideEffects = new WorkerTraversalSideEffects(traversal.getSideEffects(), context()); - TraversalHelper.applyTraversalRecursively(t -> t.setSideEffects(sideEffects), traversal); - this.matrix = new TraversalMatrix<>(traversal); - final GraphStep graphStep = (GraphStep) traversal.getStartStep(); - if (0 == graphStep.getIds().length) - ((GraphStep) traversal.getStartStep()).setIteratorSupplier(graphStep.returnsVertex() ? this.localPartition::vertices : this.localPartition::edges); - else { - if (graphStep.returnsVertex()) - ((GraphStep<Vertex, Vertex>) traversal.getStartStep()).setIteratorSupplier( - () -> IteratorUtils.filter(this.localPartition.vertices(graphStep.getIds()), this.localPartition::contains)); - else - ((GraphStep<Edge, Edge>) traversal.getStartStep()).setIteratorSupplier( - () -> IteratorUtils.filter(this.localPartition.edges(graphStep.getIds()), this.localPartition::contains)); + this.self = new Address.Worker(self().path().toString()); + this.master = new Address.Master(context().parent().path().toString()); + this.workers = new ArrayList<>(); + for (final Partition partition : partitioner.getPartitions()) { + this.workers.add(new Address.Worker("../worker-" + partition.hashCode())); } - // create termination ring topology - final int i = this.partitioner.getPartitions().indexOf(this.localPartition); - this.neighborWorker = "../worker-" + this.partitioner.getPartitions().get(i == this.partitioner.getPartitions().size() - 1 ? 0 : i + 1).hashCode(); - this.isLeader = i == 0; - - receive(ReceiveBuilder. - match(StartMessage.class, start -> this.processStart()). - match(Traverser.Admin.class, this::processTraverser). - match(SideEffectSetMessage.class, sideEffect -> this.processSideEffectSet(sideEffect.getKey(), sideEffect.getValue())). - match(BarrierDoneMessage.class, barrierDone -> this.processBarrierDone(this.matrix.getStepById(barrierDone.getStepId()))). - match(Terminate.class, terminate -> { - assert this.isLeader || this.terminate != Terminate.MAYBE; - this.terminate = terminate; - self().tell(VoteToHaltMessage.instance(), self()); - }). - match(VoteToHaltMessage.class, haltSync -> { - // if there is a barrier and thus, halting at barrier, then process barrier - if (!this.barriers.isEmpty()) { - for (final Barrier barrier : this.barriers.values()) { - while (barrier.hasNextBarrier()) { - master().tell(new BarrierAddMessage(barrier), self()); - } - } - this.barriers.clear(); - this.voteToHalt = false; - } - // use termination token to determine termination condition - if (null != this.terminate) { - if (this.isLeader) { - if (this.voteToHalt && Terminate.YES == this.terminate) - master().tell(VoteToHaltMessage.instance(), self()); - else - worker(this.neighborWorker).tell(Terminate.YES, self()); - } else - worker(this.neighborWorker).tell(this.voteToHalt ? this.terminate : Terminate.NO, self()); - this.terminate = null; - this.voteToHalt = true; - } - }).build() - ); + ActorProgram.Worker workerProgram = program.createWorkerProgram(this); + receive(ReceiveBuilder.matchAny(workerProgram::execute).build()); + workerProgram.setup(); } @Override - public void processStart() { - // initial message from master that says: "start processing" - final GraphStep step = (GraphStep) this.matrix.getTraversal().getStartStep(); - while (step.hasNext()) { - this.sendTraverser(step.next()); - } - // internal vote to have in mailbox as final message to process - assert null == this.terminate; - if (this.isLeader) { - this.terminate = Terminate.MAYBE; - self().tell(VoteToHaltMessage.instance(), self()); + public <M> void send(final Address toActor, final M message) { + ActorSelection actor = this.actors.get(toActor); + if (null == actor) { + actor = context().actorSelection(toActor.location()); + this.actors.put(toActor, actor); } + actor.tell(message, self()); } @Override - public void processTraverser(final Traverser.Admin traverser) { - assert !(traverser.get() instanceof Element) || !traverser.isHalted() || this.localPartition.contains((Element) traverser.get()); - final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId()); - if (step instanceof Bypassing) ((Bypassing) step).setBypass(true); - GraphComputing.atMaster(step, false); - step.addStart(traverser); - if (step instanceof Barrier) { - this.barriers.put(step.getId(), (Barrier) step); - } else { - while (step.hasNext()) { - this.sendTraverser(step.next()); - } - } + public List<Address.Worker> workers() { + return this.workers; } @Override - public void processBarrierDone(final Barrier barrier) { - final Step<?, ?> step = (Step) barrier; - while (step.hasNext()) { - sendTraverser(step.next()); - } + public Partition partition() { + return this.localPartition; } @Override - public void processSideEffectSet(final String key, final Object value) { - this.matrix.getTraversal().getSideEffects().set(key, value); + public Address.Worker address() { + return this.self; } - ////////////////////// - - private void sendTraverser(final Traverser.Admin traverser) { - this.voteToHalt = false; - if (traverser.isHalted()) - master().tell(traverser, self()); - else if (traverser.get() instanceof Element && !this.localPartition.contains((Element) traverser.get())) - worker("../worker-" + this.partitioner.getPartition((Element) traverser.get()).hashCode()).tell(traverser, self()); - else - self().tell(traverser, self()); - } - - private ActorSelection worker(final String workerPath) { - ActorSelection worker = this.workers.get(workerPath); - if (null == worker) { - worker = context().actorSelection(workerPath); - this.workers.put(workerPath, worker); - } - return worker; - } - - private ActorRef master() { - return context().parent(); + @Override + public Address.Master master() { + return this.master; } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d728207/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalSideEffects.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalSideEffects.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalSideEffects.java deleted file mode 100644 index 9c03298..0000000 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalSideEffects.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.akka.process.actor; - -import akka.actor.ActorContext; -import org.apache.tinkerpop.gremlin.akka.process.actor.message.SideEffectAddMessage; -import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects; - -import java.util.Optional; -import java.util.Set; -import java.util.function.BinaryOperator; -import java.util.function.Supplier; -import java.util.function.UnaryOperator; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class WorkerTraversalSideEffects implements TraversalSideEffects { - - private TraversalSideEffects sideEffects; - private ActorContext context; - - - private WorkerTraversalSideEffects() { - // for serialization - } - - public WorkerTraversalSideEffects(final TraversalSideEffects sideEffects, final ActorContext context) { - this.sideEffects = sideEffects; - this.context = context; - } - - public TraversalSideEffects getSideEffects() { - return this.sideEffects; - } - - @Override - public void set(final String key, final Object value) { - this.sideEffects.set(key, value); - } - - @Override - public <V> V get(final String key) throws IllegalArgumentException { - return this.sideEffects.get(key); - } - - @Override - public void remove(final String key) { - this.sideEffects.remove(key); - } - - @Override - public Set<String> keys() { - return this.sideEffects.keys(); - } - - @Override - public void add(final String key, final Object value) { - this.context.parent().tell(new SideEffectAddMessage(key, value), this.context.self()); - } - - @Override - public <V> void register(final String key, final Supplier<V> initialValue, final BinaryOperator<V> reducer) { - this.sideEffects.register(key, initialValue, reducer); - } - - @Override - public <V> void registerIfAbsent(final String key, final Supplier<V> initialValue, final BinaryOperator<V> reducer) { - this.sideEffects.registerIfAbsent(key, initialValue, reducer); - } - - @Override - public <V> BinaryOperator<V> getReducer(final String key) { - return this.sideEffects.getReducer(key); - } - - @Override - public <V> Supplier<V> getSupplier(final String key) { - return this.sideEffects.getSupplier(key); - } - - @Override - @Deprecated - public void registerSupplier(final String key, final Supplier supplier) { - this.sideEffects.registerSupplier(key, supplier); - } - - @Override - @Deprecated - public <V> Optional<Supplier<V>> getRegisteredSupplier(final String key) { - return this.sideEffects.getRegisteredSupplier(key); - } - - @Override - public <S> void setSack(final Supplier<S> initialValue, final UnaryOperator<S> splitOperator, final BinaryOperator<S> mergeOperator) { - this.sideEffects.setSack(initialValue, splitOperator, mergeOperator); - } - - @Override - public <S> Supplier<S> getSackInitialValue() { - return this.sideEffects.getSackInitialValue(); - } - - @Override - public <S> UnaryOperator<S> getSackSplitter() { - return this.sideEffects.getSackSplitter(); - } - - @Override - public <S> BinaryOperator<S> getSackMerger() { - return this.sideEffects.getSackMerger(); - } - - @Override - public TraversalSideEffects clone() { - try { - final WorkerTraversalSideEffects clone = (WorkerTraversalSideEffects) super.clone(); - clone.sideEffects = this.sideEffects.clone(); - return clone; - } catch (final CloneNotSupportedException e) { - throw new IllegalStateException(e.getMessage(), e); - } - } - - @Override - public void mergeInto(final TraversalSideEffects sideEffects) { - this.sideEffects.mergeInto(sideEffects); - } - -} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d728207/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierAddMessage.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierAddMessage.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierAddMessage.java deleted file mode 100644 index 4a351c1..0000000 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierAddMessage.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.akka.process.actor.message; - -import org.apache.tinkerpop.gremlin.process.traversal.Step; -import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class BarrierAddMessage { - - private final Object barrier; - private final String stepId; - - public BarrierAddMessage(final Barrier barrier) { - this.barrier = barrier.nextBarrier(); - this.stepId = ((Step) barrier).getId(); - } - - public Object getBarrier() { - return this.barrier; - } - - public String getStepId() { - return this.stepId; - } - - -} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d728207/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierDoneMessage.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierDoneMessage.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierDoneMessage.java deleted file mode 100644 index 208b346..0000000 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/BarrierDoneMessage.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.akka.process.actor.message; - -import org.apache.tinkerpop.gremlin.process.traversal.Step; -import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class BarrierDoneMessage { - - private final String stepId; - - public BarrierDoneMessage(final Barrier barrier) { - this.stepId = ((Step) barrier).getId(); - - } - - public String getStepId() { - return this.stepId; - } -} - http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d728207/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectAddMessage.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectAddMessage.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectAddMessage.java deleted file mode 100644 index 2d97bfa..0000000 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectAddMessage.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.akka.process.actor.message; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class SideEffectAddMessage { - - private final String key; - private final Object value; - - public SideEffectAddMessage(final String key, final Object value) { - this.value = value; - this.key = key; - } - - public String getKey() { - return this.key; - } - - public Object getValue() { - return this.value; - } -} - http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d728207/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectSetMessage.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectSetMessage.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectSetMessage.java deleted file mode 100644 index 023133b..0000000 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/SideEffectSetMessage.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.akka.process.actor.message; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class SideEffectSetMessage { - - private final String key; - private final Object value; - - public SideEffectSetMessage(final String key, final Object value) { - this.key = key; - this.value = value; - } - - public String getKey() { - return this.key; - } - - public Object getValue() { - return this.value; - } -} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d728207/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/StartMessage.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/StartMessage.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/StartMessage.java deleted file mode 100644 index ebc469c..0000000 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/StartMessage.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.akka.process.actor.message; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class StartMessage { - - private static final StartMessage INSTANCE = new StartMessage(); - - private StartMessage() { - } - - public static StartMessage instance() { - return INSTANCE; - } -} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d728207/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/VoteToHaltMessage.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/VoteToHaltMessage.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/VoteToHaltMessage.java deleted file mode 100644 index 8bfa4c9..0000000 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/message/VoteToHaltMessage.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.akka.process.actor.message; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class VoteToHaltMessage { - - private static final VoteToHaltMessage INSTANCE = new VoteToHaltMessage(); - - private VoteToHaltMessage() { - } - - public static VoteToHaltMessage instance() { - return INSTANCE; - } -} - http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d728207/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java index bed7636..f94083c 100644 --- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java +++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java @@ -42,7 +42,7 @@ public class AkkaPlayTest { final Graph graph = TinkerGraph.open(); graph.io(GryoIo.build()).readGraph("../data/tinkerpop-modern.kryo"); GraphTraversalSource g = graph.traversal().withStrategies(new ActorStrategy(AkkaActors.class, new HashPartitioner(graph.partitioner(), 3))); - System.out.println(g.V(1, 2).union(outE().count(), inE().count(), (Traversal) outE().values("weight").sum()).toList()); + System.out.println(g.V().values("name").toList()); //3, 1.9, 1 /*for (int i = 0; i < 10000; i++) { final Graph graph = TinkerGraph.open(); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d728207/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java index aa2f429..2552883 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java @@ -19,13 +19,39 @@ package org.apache.tinkerpop.gremlin.process.actor; -import org.apache.tinkerpop.gremlin.process.traversal.Traverser; +import org.apache.tinkerpop.gremlin.structure.Partition; + +import java.util.List; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ public interface Actor { - public <S> void processTraverser(final Traverser.Admin<S> traverser); + public Address address(); + + public <M> void send(final Address toActor, final M message); + + public interface Master extends Actor { + + public List<Address.Worker> workers(); + + public Address.Master address(); + + public void close(); + + } + + public interface Worker extends Actor { + + public Address.Worker address(); + + public Address.Master master(); + + public List<Address.Worker> workers(); + + public Partition partition(); + } + } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d728207/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java new file mode 100644 index 0000000..b8f7ac1 --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actor; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public interface ActorProgram<M> { + + public Worker createWorkerProgram(final Actor.Worker worker); + + public Master createMasterProgram(final Actor.Master master); + + public M getResult(); + + public static interface Worker<M> { + public void setup(); + + public void execute(final M message); + + public void terminate(); + + } + + public static interface Master<M> { + public void setup(); + + public void execute(final M message); + + public void terminate(); + } + +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d728207/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java index d9e257e..2e410ec 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java @@ -28,5 +28,7 @@ import java.util.concurrent.Future; */ public interface Actors<S, E> { + public Address.Master master(); + public Future<TraverserSet<E>> submit(); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d728207/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Address.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Address.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Address.java new file mode 100644 index 0000000..c598eb7 --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Address.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actor; + +import java.io.Serializable; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public abstract class Address implements Serializable { + + private final String location; + + public Address(final String location) { + this.location = location; + } + + public String location() { + return this.location; + } + + public boolean equals(final Object other) { + return other instanceof Address && ((Address) other).location.equals(this.location); + } + + public int hashCode() { + return this.location.hashCode(); + } + + public static class Master extends Address { + + public Master(final String location) { + super(location); + } + + } + + public static class Worker extends Address { + + public Worker(final String location) { + super(location); + } + + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d728207/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/MasterActor.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/MasterActor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/MasterActor.java deleted file mode 100644 index 87efe51..0000000 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/MasterActor.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.process.actor; - -import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public interface MasterActor extends Actor { - - public <V> void processBarrierAdd(final Barrier barrier, final V barrierAddition); - - public <V> void processSideEffectAdd(final String key, final V value); - - public void processVoteToHalt(); - -} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d728207/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/WorkerActor.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/WorkerActor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/WorkerActor.java deleted file mode 100644 index 6d4ca64..0000000 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/WorkerActor.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.process.actor; - -import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public interface WorkerActor extends Actor { - - public void processStart(); - - public void processBarrierDone(final Barrier barrier); - - public void processSideEffectSet(final String key, final Object value); - -} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d728207/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java new file mode 100644 index 0000000..278fb3b --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actor.traversal; + +import org.apache.tinkerpop.gremlin.process.actor.Actor; +import org.apache.tinkerpop.gremlin.process.actor.ActorProgram; +import org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.verification.ActorVerificationStrategy; +import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep; +import org.apache.tinkerpop.gremlin.process.traversal.Traversal; +import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ComputerVerificationStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.StandardVerificationStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; +import org.apache.tinkerpop.gremlin.structure.Partitioner; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class TraversalActorProgram<M> implements ActorProgram<M> { + + private final Traversal.Admin<?, ?> traversal; + private final Partitioner partitioner; + public TraverserSet<?> result = new TraverserSet<>(); + + public TraversalActorProgram(final Traversal.Admin<?, ?> traversal, final Partitioner partitioner) { + this.partitioner = partitioner; + final TraversalStrategies strategies = traversal.getStrategies().clone(); + strategies.removeStrategies(ComputerVerificationStrategy.class, StandardVerificationStrategy.class); + strategies.addStrategies(ActorVerificationStrategy.instance()); + traversal.setStrategies(strategies); + traversal.applyStrategies(); + this.traversal = ((TraversalVertexProgramStep) traversal.getStartStep()).computerTraversal.get(); + } + + @Override + public Worker<M> createWorkerProgram(final Actor.Worker worker) { + return new TraversalWorkerProgram<>(worker, this.traversal.clone(), this.partitioner); + } + + @Override + public Master createMasterProgram(final Actor.Master master) { + return new TraversalMasterProgram<>(master, this.traversal.clone(), this.partitioner, this.result); + } + + @Override + public M getResult() { + return (M) this.result; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d728207/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java new file mode 100644 index 0000000..654969b --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actor.traversal; + +import org.apache.tinkerpop.gremlin.process.actor.Actor; +import org.apache.tinkerpop.gremlin.process.actor.ActorProgram; +import org.apache.tinkerpop.gremlin.process.actor.Address; +import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierAddMessage; +import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierDoneMessage; +import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectAddMessage; +import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectSetMessage; +import org.apache.tinkerpop.gremlin.process.actor.traversal.message.StartMessage; +import org.apache.tinkerpop.gremlin.process.actor.traversal.message.VoteToHaltMessage; +import org.apache.tinkerpop.gremlin.process.traversal.Step; +import org.apache.tinkerpop.gremlin.process.traversal.Traversal; +import org.apache.tinkerpop.gremlin.process.traversal.Traverser; +import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; +import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing; +import org.apache.tinkerpop.gremlin.process.traversal.step.LocalBarrier; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; +import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix; +import org.apache.tinkerpop.gremlin.structure.Element; +import org.apache.tinkerpop.gremlin.structure.Partitioner; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public class TraversalMasterProgram<M> implements ActorProgram.Master<M> { + + private final Actor.Master master; + private final Map<String, Address.Worker> workers = new HashMap<>(); + private final Traversal.Admin<?, ?> traversal; + private final TraversalMatrix<?, ?> matrix; + private final Partitioner partitioner; + private Map<String, Barrier> barriers = new HashMap<>(); + private final TraverserSet<?> results; + private final String leaderWorker; + + public TraversalMasterProgram(final Actor.Master master, final Traversal.Admin<?, ?> traversal, final Partitioner partitioner, final TraverserSet<?> results) { + this.traversal = traversal; + System.out.println("master[created]: " + master.address().location()); + System.out.println(this.traversal); + this.matrix = new TraversalMatrix<>(this.traversal); + this.partitioner = partitioner; + this.results = results; + this.master = master; + this.leaderWorker = "worker-" + this.partitioner.getPartitions().get(0).hashCode(); + } + + @Override + public void setup() { + for (final Address.Worker worker : master.workers()) { + this.workers.put(worker.location(), worker); + } + this.broadcast(StartMessage.instance()); + + } + + @Override + public void execute(final M message) { + if (message instanceof Traverser.Admin) { + this.processTraverser((Traverser.Admin) message); + } else if (message instanceof BarrierAddMessage) { + final Barrier barrier = (Barrier) this.matrix.getStepById(((BarrierAddMessage) message).getStepId()); + final Step<?, ?> step = (Step) barrier; + GraphComputing.atMaster(step, true); + barrier.addBarrier(((BarrierAddMessage) message).getBarrier()); + this.barriers.put(step.getId(), barrier); + } else if (message instanceof SideEffectAddMessage) { + this.traversal.getSideEffects().add(((SideEffectAddMessage) message).getKey(), ((SideEffectAddMessage) message).getValue()); + } else if (message instanceof VoteToHaltMessage) { + if (!this.barriers.isEmpty()) { + for (final Barrier barrier : this.barriers.values()) { + final Step<?, ?> step = (Step) barrier; + if (!(barrier instanceof LocalBarrier)) { + while (step.hasNext()) { + this.sendTraverser(step.next()); + } + } else { + this.traversal.getSideEffects().forEach((k, v) -> { + this.broadcast(new SideEffectSetMessage(k, v)); + }); + this.broadcast(new BarrierDoneMessage(barrier)); + barrier.done(); + } + } + this.barriers.clear(); + this.master.send(this.workers.get(this.leaderWorker), StartMessage.instance()); + } else { + while (this.traversal.hasNext()) { + this.results.add((Traverser.Admin) this.traversal.nextTraverser()); + } + this.master.close(); + } + } else { + throw new IllegalStateException("Unknown message:" + message); + } + } + + @Override + public void terminate() { + + } + + private void broadcast(final Object message) { + for (final Address.Worker worker : this.workers.values()) { + this.master.send(worker, message); + } + } + + private void processTraverser(final Traverser.Admin traverser) { + if (traverser.isHalted() || traverser.get() instanceof Element) { + this.sendTraverser(traverser); + } else { + final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId()); + GraphComputing.atMaster(step, true); + step.addStart(traverser); + while (step.hasNext()) { + this.processTraverser(step.next()); + } + } + } + + private void sendTraverser(final Traverser.Admin traverser) { + if (traverser.isHalted()) + this.results.add(traverser); + else if (traverser.get() instanceof Element) + this.master.send(this.workers.get("worker-" + this.partitioner.getPartition((Element) traverser.get()).hashCode()), traverser); + else + this.master.send(this.master.address(), traverser); + } + + +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d728207/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java new file mode 100644 index 0000000..58e06d6 --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actor.traversal; + +import org.apache.tinkerpop.gremlin.process.actor.Actor; +import org.apache.tinkerpop.gremlin.process.actor.ActorProgram; +import org.apache.tinkerpop.gremlin.process.actor.Address; +import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierAddMessage; +import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierDoneMessage; +import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectSetMessage; +import org.apache.tinkerpop.gremlin.process.actor.traversal.message.StartMessage; +import org.apache.tinkerpop.gremlin.process.actor.traversal.message.VoteToHaltMessage; +import org.apache.tinkerpop.gremlin.process.traversal.Step; +import org.apache.tinkerpop.gremlin.process.traversal.Traversal; +import org.apache.tinkerpop.gremlin.process.traversal.Traverser; +import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; +import org.apache.tinkerpop.gremlin.process.traversal.step.Bypassing; +import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep; +import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper; +import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix; +import org.apache.tinkerpop.gremlin.structure.Edge; +import org.apache.tinkerpop.gremlin.structure.Element; +import org.apache.tinkerpop.gremlin.structure.Partition; +import org.apache.tinkerpop.gremlin.structure.Partitioner; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> { + + // terminate token is passed around worker ring to gather termination consensus (dual-ring termination algorithm) + public enum Terminate { + MAYBE, YES, NO + } + + private final Actor.Worker self; + private final TraversalMatrix<?, ?> matrix; + private final Partition localPartition; + private final Partitioner partitioner; + // + private final Map<String, Address.Worker> workers = new HashMap<>(); + private final String neighborWorker; + private boolean isLeader; + private Terminate terminate = null; + private boolean voteToHalt = false; + private Map<String, Barrier> barriers = new HashMap<>(); + + public TraversalWorkerProgram(final Actor.Worker self, final Traversal.Admin<?, ?> traversal, final Partitioner partitioner) { + this.self = self; + System.out.println("worker[created]: " + this.self.address().location()); + // set up partition and traversal information + this.localPartition = self.partition(); + this.partitioner = partitioner; + final WorkerTraversalSideEffects sideEffects = new WorkerTraversalSideEffects(traversal.getSideEffects(), this.self); + TraversalHelper.applyTraversalRecursively(t -> t.setSideEffects(sideEffects), traversal); + this.matrix = new TraversalMatrix<>(traversal); + final GraphStep graphStep = (GraphStep) traversal.getStartStep(); + if (0 == graphStep.getIds().length) + ((GraphStep) traversal.getStartStep()).setIteratorSupplier(graphStep.returnsVertex() ? this.localPartition::vertices : this.localPartition::edges); + else { + if (graphStep.returnsVertex()) + ((GraphStep<Vertex, Vertex>) traversal.getStartStep()).setIteratorSupplier( + () -> IteratorUtils.filter(this.localPartition.vertices(graphStep.getIds()), this.localPartition::contains)); + else + ((GraphStep<Edge, Edge>) traversal.getStartStep()).setIteratorSupplier( + () -> IteratorUtils.filter(this.localPartition.edges(graphStep.getIds()), this.localPartition::contains)); + } + // create termination ring topology + final int i = this.partitioner.getPartitions().indexOf(this.localPartition); + this.neighborWorker = "../worker-" + this.partitioner.getPartitions().get(i == this.partitioner.getPartitions().size() - 1 ? 0 : i + 1).hashCode(); + this.isLeader = i == 0; + for (final Address.Worker worker : self.workers()) { + //if (!worker.equals(this.self.address())) + this.workers.put(worker.location(), worker); + } + } + + @Override + public void setup() { + + } + + @Override + public void execute(final M message) { + //System.out.println(message + "::" + this.isLeader); + if (message instanceof StartMessage) { + // initial message from master that says: "start processing" + final GraphStep step = (GraphStep) this.matrix.getTraversal().getStartStep(); + while (step.hasNext()) { + this.sendTraverser(step.next()); + } + // internal vote to have in mailbox as final message to process + // assert null == this.terminate; + if (this.isLeader) { + this.terminate = Terminate.MAYBE; + this.self.send(this.self.address(), VoteToHaltMessage.instance()); + } + } else if (message instanceof Traverser.Admin) { + final Traverser.Admin<?> traverser = (Traverser.Admin) message; + this.processTraverser(traverser); + + } else if (message instanceof SideEffectSetMessage) { + this.matrix.getTraversal().getSideEffects().set(((SideEffectSetMessage) message).getKey(), ((SideEffectSetMessage) message).getValue()); + } else if (message instanceof Terminate) { + // assert this.isLeader || this.terminate != Terminate.MAYBE; + this.terminate = (Terminate) message; + this.self.send(this.self.address(), VoteToHaltMessage.instance()); + } else if (message instanceof VoteToHaltMessage) { + // if there is a barrier and thus, halting at barrier, then process barrier + if (!this.barriers.isEmpty()) { + for (final Barrier barrier : this.barriers.values()) { + while (barrier.hasNextBarrier()) { + this.self.send(this.self.master(), new BarrierAddMessage(barrier)); + } + } + this.barriers.clear(); + this.voteToHalt = false; + } + // use termination token to determine termination condition + if (null != this.terminate) { + if (this.isLeader) { + if (this.voteToHalt && Terminate.YES == this.terminate) + this.self.send(this.self.master(), VoteToHaltMessage.instance()); + else + this.self.send(this.workers.get(this.neighborWorker), Terminate.YES); + } else + this.self.send(this.workers.get(this.neighborWorker), this.voteToHalt ? this.terminate : Terminate.NO); + this.terminate = null; + this.voteToHalt = true; + } + } else if (message instanceof BarrierDoneMessage) { + final Step<?, ?> step = (Step) this.matrix.getStepById(((BarrierDoneMessage) message).getStepId()); + while (step.hasNext()) { + sendTraverser(step.next()); + } + } else { + throw new IllegalArgumentException("The following message is unknown: " + message); + } + } + + @Override + public void terminate() { + + } + + ////////////// + + private void processTraverser(final Traverser.Admin traverser) { + // assert !(traverser.get() instanceof Element) || !traverser.isHalted() || this.localPartition.contains((Element) traverser.get()); + final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId()); + if (step instanceof Bypassing) ((Bypassing) step).setBypass(true); + GraphComputing.atMaster(step, false); + step.addStart(traverser); + if (step instanceof Barrier) { + this.barriers.put(step.getId(), (Barrier) step); + } else { + while (step.hasNext()) { + this.sendTraverser(step.next()); + } + } + } + + private void sendTraverser(final Traverser.Admin traverser) { + this.voteToHalt = false; + if (traverser.isHalted()) + this.self.send(this.self.master(), traverser); + else if (traverser.get() instanceof Element && !this.localPartition.contains((Element) traverser.get())) + this.self.send(this.workers.get("../worker-" + this.partitioner.getPartition((Element) traverser.get()).hashCode()), traverser); + else + this.self.send(this.self.address(), traverser); + } + +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d728207/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/WorkerTraversalSideEffects.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/WorkerTraversalSideEffects.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/WorkerTraversalSideEffects.java new file mode 100644 index 0000000..6ab66c4 --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/WorkerTraversalSideEffects.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actor.traversal; + +import org.apache.tinkerpop.gremlin.process.actor.Actor; +import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectAddMessage; +import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects; + +import java.util.Optional; +import java.util.Set; +import java.util.function.BinaryOperator; +import java.util.function.Supplier; +import java.util.function.UnaryOperator; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class WorkerTraversalSideEffects implements TraversalSideEffects { + + private TraversalSideEffects sideEffects; + private Actor.Worker worker; + + + private WorkerTraversalSideEffects() { + // for serialization + } + + public WorkerTraversalSideEffects(final TraversalSideEffects sideEffects, final Actor.Worker worker) { + this.sideEffects = sideEffects; + this.worker = worker; + } + + public TraversalSideEffects getSideEffects() { + return this.sideEffects; + } + + @Override + public void set(final String key, final Object value) { + this.sideEffects.set(key, value); + } + + @Override + public <V> V get(final String key) throws IllegalArgumentException { + return this.sideEffects.get(key); + } + + @Override + public void remove(final String key) { + this.sideEffects.remove(key); + } + + @Override + public Set<String> keys() { + return this.sideEffects.keys(); + } + + @Override + public void add(final String key, final Object value) { + this.worker.send(this.worker.master(), new SideEffectAddMessage(key, value)); + } + + @Override + public <V> void register(final String key, final Supplier<V> initialValue, final BinaryOperator<V> reducer) { + this.sideEffects.register(key, initialValue, reducer); + } + + @Override + public <V> void registerIfAbsent(final String key, final Supplier<V> initialValue, final BinaryOperator<V> reducer) { + this.sideEffects.registerIfAbsent(key, initialValue, reducer); + } + + @Override + public <V> BinaryOperator<V> getReducer(final String key) { + return this.sideEffects.getReducer(key); + } + + @Override + public <V> Supplier<V> getSupplier(final String key) { + return this.sideEffects.getSupplier(key); + } + + @Override + @Deprecated + public void registerSupplier(final String key, final Supplier supplier) { + this.sideEffects.registerSupplier(key, supplier); + } + + @Override + @Deprecated + public <V> Optional<Supplier<V>> getRegisteredSupplier(final String key) { + return this.sideEffects.getRegisteredSupplier(key); + } + + @Override + public <S> void setSack(final Supplier<S> initialValue, final UnaryOperator<S> splitOperator, final BinaryOperator<S> mergeOperator) { + this.sideEffects.setSack(initialValue, splitOperator, mergeOperator); + } + + @Override + public <S> Supplier<S> getSackInitialValue() { + return this.sideEffects.getSackInitialValue(); + } + + @Override + public <S> UnaryOperator<S> getSackSplitter() { + return this.sideEffects.getSackSplitter(); + } + + @Override + public <S> BinaryOperator<S> getSackMerger() { + return this.sideEffects.getSackMerger(); + } + + @Override + public TraversalSideEffects clone() { + try { + final WorkerTraversalSideEffects clone = (WorkerTraversalSideEffects) super.clone(); + clone.sideEffects = this.sideEffects.clone(); + return clone; + } catch (final CloneNotSupportedException e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + + @Override + public void mergeInto(final TraversalSideEffects sideEffects) { + this.sideEffects.mergeInto(sideEffects); + } + +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1d728207/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/BarrierAddMessage.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/BarrierAddMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/BarrierAddMessage.java new file mode 100644 index 0000000..dba9b86 --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/BarrierAddMessage.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actor.traversal.message; + +import org.apache.tinkerpop.gremlin.process.traversal.Step; +import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class BarrierAddMessage { + + private final Object barrier; + private final String stepId; + + public BarrierAddMessage(final Barrier barrier) { + this.barrier = barrier.nextBarrier(); + this.stepId = ((Step) barrier).getId(); + } + + public Object getBarrier() { + return this.barrier; + } + + public String getStepId() { + return this.stepId; + } + + +}
