learned about preStart() and postStop() methods in Akka Actors. Lined those up with actor.terminate() and actor.setup(). JavaDoc here and there.... nada much. done for the night.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/0d171503 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/0d171503 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/0d171503 Branch: refs/heads/TINKERPOP-1564 Commit: 0d171503ebfd3695b8ea1fee01ef060fba7226f4 Parents: 2d02406 Author: Marko A. Rodriguez <okramma...@gmail.com> Authored: Wed Dec 14 18:22:14 2016 -0700 Committer: Marko A. Rodriguez <okramma...@gmail.com> Committed: Thu Jan 19 13:01:41 2017 -0700 ---------------------------------------------------------------------- .../gremlin/akka/process/actor/MasterActor.java | 16 ++++++++++--- .../gremlin/akka/process/actor/WorkerActor.java | 16 ++++++++++--- .../gremlin/process/actor/GraphActors.java | 3 +++ .../actor/traversal/TraversalWorkerProgram.java | 13 ++++------ .../step/map/TraversalActorProgramStep.java | 6 ----- .../gremlin/process/computer/Computer.java | 25 +++++++++++++++++--- 6 files changed, 56 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0d171503/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java index 05bedbc..29cd212 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java @@ -42,6 +42,7 @@ import java.util.Map; */ public final class MasterActor extends AbstractActor implements RequiresMessageQueue<ActorMailbox.ActorSemantics>, Actor.Master { + private final ActorProgram.Master masterProgram; private final Address.Master master; private final List<Address.Worker> workers; private final Map<Address, ActorSelection> actors = new HashMap<>(); @@ -59,9 +60,18 @@ public final class MasterActor extends AbstractActor implements RequiresMessageQ this.workers.add(new Address.Worker(workerPathString, partition.location())); context().actorOf(Props.create(WorkerActor.class, program, this.master, partition, partitioner), workerPathString); } - final ActorProgram.Master masterProgram = program.createMasterProgram(this); - receive(ReceiveBuilder.matchAny(masterProgram::execute).build()); - masterProgram.setup(); + this.masterProgram = program.createMasterProgram(this); + receive(ReceiveBuilder.matchAny(this.masterProgram::execute).build()); + } + + @Override + public void preStart() { + this.masterProgram.setup(); + } + + @Override + public void postStop() { + this.masterProgram.terminate(); } @Override http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0d171503/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java index d83252e..e043c20 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java @@ -39,6 +39,7 @@ import java.util.Map; */ public final class WorkerActor extends AbstractActor implements RequiresMessageQueue<ActorMailbox.ActorSemantics>, Actor.Worker { + private final ActorProgram.Worker workerProgram; private final Partition localPartition; private final Address.Worker self; private final Address.Master master; @@ -53,9 +54,18 @@ public final class WorkerActor extends AbstractActor implements RequiresMessageQ for (final Partition partition : partitioner.getPartitions()) { this.workers.add(new Address.Worker(this.createWorkerAddress(partition), partition.location())); } - final ActorProgram.Worker workerProgram = program.createWorkerProgram(this); - receive(ReceiveBuilder.matchAny(workerProgram::execute).build()); - workerProgram.setup(); + this.workerProgram = program.createWorkerProgram(this); + receive(ReceiveBuilder.matchAny(this.workerProgram::execute).build()); + } + + @Override + public void preStart() { + this.workerProgram.setup(); + } + + @Override + public void postStop() { + this.workerProgram.terminate(); } @Override http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0d171503/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/GraphActors.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/GraphActors.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/GraphActors.java index 1b01f36..d018397 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/GraphActors.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/GraphActors.java @@ -24,6 +24,9 @@ import org.apache.tinkerpop.gremlin.process.Processor; import java.util.concurrent.Future; /** + * GraphActors is a message-passing based graph {@link Processor} that is: + * asynchronous, distributed, partition-bound, and traverser-centric. + * * @author Marko A. Rodriguez (http://markorodriguez.com) */ public interface GraphActors<R> extends Processor { http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0d171503/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java index f01c138..ef80f03 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java @@ -52,10 +52,8 @@ import java.util.Map; */ final class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> { - private final Actor.Worker self; private final TraversalMatrix<?, ?> matrix; - private final Partition localPartition; private final Partitioner partitioner; private final Map<Partition, Address.Worker> partitionToWorkerMap = new HashMap<>(); // @@ -70,7 +68,6 @@ final class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> { // System.out.println("worker[created]: " + this.self.address().getId()); // set up partition and traversal information this.partitioner = partitioner; - this.localPartition = self.partition(); final WorkerTraversalSideEffects sideEffects = new WorkerTraversalSideEffects(traversal.getSideEffects(), this.self); TraversalHelper.applyTraversalRecursively(t -> t.setSideEffects(sideEffects), traversal); this.matrix = new TraversalMatrix<>(traversal); @@ -79,14 +76,14 @@ final class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> { ////// final GraphStep graphStep = (GraphStep) traversal.getStartStep(); if (0 == graphStep.getIds().length) - ((GraphStep) traversal.getStartStep()).setIteratorSupplier(graphStep.returnsVertex() ? this.localPartition::vertices : this.localPartition::edges); + ((GraphStep) traversal.getStartStep()).setIteratorSupplier(graphStep.returnsVertex() ? this.self.partition()::vertices : this.self.partition()::edges); else { if (graphStep.returnsVertex()) ((GraphStep<Vertex, Vertex>) traversal.getStartStep()).setIteratorSupplier( - () -> IteratorUtils.filter(this.localPartition.vertices(graphStep.getIds()), this.localPartition::contains)); + () -> IteratorUtils.filter(self.partition().vertices(graphStep.getIds()), this.self.partition()::contains)); else ((GraphStep<Edge, Edge>) traversal.getStartStep()).setIteratorSupplier( - () -> IteratorUtils.filter(this.localPartition.edges(graphStep.getIds()), this.localPartition::contains)); + () -> IteratorUtils.filter(self.partition().edges(graphStep.getIds()), this.self.partition()::contains)); } } @@ -153,7 +150,7 @@ final class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> { ////////////// private void processTraverser(final Traverser.Admin traverser) { - assert !(traverser.get() instanceof Element) || !traverser.isHalted() || this.localPartition.contains((Element) traverser.get()); + assert !(traverser.get() instanceof Element) || !traverser.isHalted() || this.self.partition().contains((Element) traverser.get()); final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId()); step.addStart(traverser); if (step instanceof Barrier) { @@ -169,7 +166,7 @@ final class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> { this.voteToHalt = false; if (traverser.isHalted()) this.self.send(this.self.master(), traverser); - else if (traverser.get() instanceof Element && !this.localPartition.contains((Element) traverser.get())) + else if (traverser.get() instanceof Element && !this.self.partition().contains((Element) traverser.get())) this.self.send(this.partitionToWorkerMap.get(this.partitioner.getPartition((Element) traverser.get())), traverser); else this.self.send(this.self.address(), traverser); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0d171503/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/TraversalActorProgramStep.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/TraversalActorProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/TraversalActorProgramStep.java index de82599..ba2a08e 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/TraversalActorProgramStep.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/TraversalActorProgramStep.java @@ -19,10 +19,6 @@ package org.apache.tinkerpop.gremlin.process.actor.traversal.step.map; -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ - import org.apache.tinkerpop.gremlin.process.actor.ActorProgram; import org.apache.tinkerpop.gremlin.process.actor.GraphActors; import org.apache.tinkerpop.gremlin.process.actor.traversal.TraversalActorProgram; @@ -44,10 +40,8 @@ public final class TraversalActorProgramStep<S, E> extends AbstractStep<E, E> { private final Class<? extends GraphActors> actorsClass; private final Traversal.Admin<S, E> actorsTraversal; private final Partitioner partitioner; - private boolean first = true; - public TraversalActorProgramStep(final Traversal.Admin<?, ?> traversal, final Class<? extends GraphActors> actorsClass, final Partitioner partitioner) { super(traversal); this.actorsClass = actorsClass; http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0d171503/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Computer.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Computer.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Computer.java index d0baec0..9214a5e 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Computer.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Computer.java @@ -23,11 +23,14 @@ import org.apache.tinkerpop.gremlin.process.Processor; import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration.VertexProgramStrategy; import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource; +import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies; +import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy; import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.Vertex; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.function.Function; @@ -60,11 +63,18 @@ public final class Computer implements Processor.Description<GraphComputer>, Fun return new Computer(graphComputerClass); } + + /** + * @deprecated As of release 3.3.0, replaced by using {@link Computer#of()}. + */ @Deprecated public static Computer compute() { return new Computer(GraphComputer.class); } + /** + * @deprecated As of release 3.3.0, replaced by using {@link Computer#of(Class)}. + */ @Deprecated public static Computer compute(final Class<? extends GraphComputer> graphComputerClass) { return new Computer(graphComputerClass); @@ -159,9 +169,18 @@ public final class Computer implements Processor.Description<GraphComputer>, Fun @Override public void addTraversalStrategies(final TraversalSource traversalSource) { - final VertexProgramStrategy vertexProgramStrategy = new VertexProgramStrategy(this); - traversalSource.getStrategies().addStrategies(vertexProgramStrategy); - vertexProgramStrategy.addGraphComputerStrategies(traversalSource); + Class<? extends GraphComputer> graphComputerClass; + if (this.getGraphComputerClass().equals(GraphComputer.class)) { + try { + graphComputerClass = this.apply(traversalSource.getGraph()).getClass(); + } catch (final Exception e) { + graphComputerClass = GraphComputer.class; + } + } else + graphComputerClass = this.getGraphComputerClass(); + final List<TraversalStrategy<?>> graphComputerStrategies = TraversalStrategies.GlobalCache.getStrategies(graphComputerClass).toList(); + traversalSource.getStrategies().addStrategies(graphComputerStrategies.toArray(new TraversalStrategy[graphComputerStrategies.size()])); + traversalSource.getStrategies().addStrategies(new VertexProgramStrategy(this)); } /////////////////