more alignment between GraphActors and GraphComputer. GraphActors.partitioner().program().submit(). Unlike GraphComputer, there is no Graph.actors(). HOWEVER -- since GraphActors is Partition-centric, perhaps we do Graph.partitioner().actors(). eek. hehe..
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/fc7da3fc Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/fc7da3fc Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/fc7da3fc Branch: refs/heads/TINKERPOP-1564 Commit: fc7da3fca30d2fdaa0ba4614aec43ec0aef4dca6 Parents: e9751da Author: Marko A. Rodriguez <okramma...@gmail.com> Authored: Thu Dec 15 11:59:48 2016 -0700 Committer: Marko A. Rodriguez <okramma...@gmail.com> Committed: Thu Jan 19 10:26:57 2017 -0700 ---------------------------------------------------------------------- .../akka/process/actor/AkkaGraphActors.java | 48 +++++++++++++------- .../gremlin/akka/process/actor/MasterActor.java | 7 +++ .../gremlin/akka/process/actor/WorkerActor.java | 7 +++ .../tinkerpop/gremlin/process/actor/Actor.java | 46 ++++++++++++++++--- .../gremlin/process/actor/GraphActors.java | 7 ++- .../actor/traversal/TraversalActorProgram.java | 10 ++-- .../actor/traversal/TraversalMasterProgram.java | 14 ++---- .../actor/traversal/TraversalWorkerProgram.java | 12 ++--- .../step/map/TraversalActorProgramStep.java | 6 +-- 9 files changed, 105 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/fc7da3fc/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java index 51747ac..5739369 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java @@ -44,20 +44,12 @@ import java.util.stream.Collectors; */ public final class AkkaGraphActors<R> implements GraphActors<R> { - private final ActorSystem system; - private final Address.Master master; - private final ActorsResult<R> result = new DefaultActorsResult<>(); + private ActorProgram<R> actorProgram; + private Partitioner partitioner; + private boolean executed = false; + + public AkkaGraphActors() { - public AkkaGraphActors(final ActorProgram<R> actorProgram, final Partitioner partitioner) { - final Config config = ConfigFactory.defaultApplication(). - withValue("message-priorities", - ConfigValueFactory.fromAnyRef(actorProgram.getMessagePriorities().get().stream().map(Class::getCanonicalName).collect(Collectors.toList()).toString())); - this.system = ActorSystem.create("traversal-" + UUID.randomUUID(), config); - try { - this.master = new Address.Master(this.system.actorOf(Props.create(MasterActor.class, actorProgram, partitioner, result), "master").path().toString(), InetAddress.getLocalHost()); - } catch (final UnknownHostException e) { - throw new IllegalStateException(e.getMessage(), e); - } } @Override @@ -66,17 +58,39 @@ public final class AkkaGraphActors<R> implements GraphActors<R> { } @Override - public Address.Master master() { - return this.master; + public GraphActors<R> program(final ActorProgram<R> actorProgram) { + this.actorProgram = actorProgram; + return this; + } + + @Override + public GraphActors<R> partitioner(final Partitioner partitioner) { + this.partitioner = partitioner; + return this; } @Override public Future<R> submit() { + if (this.executed) + throw new IllegalStateException("Can not execute twice"); + this.executed = true; + final ActorSystem system; + final ActorsResult<R> result = new DefaultActorsResult<>(); + + final Config config = ConfigFactory.defaultApplication(). + withValue("message-priorities", + ConfigValueFactory.fromAnyRef(actorProgram.getMessagePriorities().get().stream().map(Class::getCanonicalName).collect(Collectors.toList()).toString())); + system = ActorSystem.create("traversal-" + UUID.randomUUID(), config); + try { + new Address.Master(system.actorOf(Props.create(MasterActor.class, actorProgram, partitioner, result), "master").path().toString(), InetAddress.getLocalHost()); + } catch (final UnknownHostException e) { + throw new IllegalStateException(e.getMessage(), e); + } return CompletableFuture.supplyAsync(() -> { - while (!this.system.isTerminated()) { + while (!system.isTerminated()) { } - return this.result.getResult(); + return result.getResult(); }); } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/fc7da3fc/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java index 11069f2..a4ef639 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java @@ -48,8 +48,10 @@ public final class MasterActor extends AbstractActor implements RequiresMessageQ private final List<Address.Worker> workers; private final Map<Address, ActorSelection> actors = new HashMap<>(); private final ActorsResult<?> result; + private final Partitioner partitioner; public MasterActor(final ActorProgram program, final Partitioner partitioner, final ActorsResult<?> result) { + this.partitioner = partitioner; this.result = result; try { this.master = new Address.Master(self().path().toString(), InetAddress.getLocalHost()); @@ -93,6 +95,11 @@ public final class MasterActor extends AbstractActor implements RequiresMessageQ } @Override + public Partitioner partitioner() { + return this.partitioner; + } + + @Override public Address.Master address() { return this.master; } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/fc7da3fc/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java index e043c20..35b5a4f 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java @@ -41,6 +41,7 @@ public final class WorkerActor extends AbstractActor implements RequiresMessageQ private final ActorProgram.Worker workerProgram; private final Partition localPartition; + private final Partitioner partitioner; private final Address.Worker self; private final Address.Master master; private final List<Address.Worker> workers; @@ -48,6 +49,7 @@ public final class WorkerActor extends AbstractActor implements RequiresMessageQ public WorkerActor(final ActorProgram program, final Address.Master master, final Partition localPartition, final Partitioner partitioner) { this.localPartition = localPartition; + this.partitioner = partitioner; this.self = new Address.Worker(this.createWorkerAddress(localPartition), localPartition.location()); this.master = master; this.workers = new ArrayList<>(); @@ -89,6 +91,11 @@ public final class WorkerActor extends AbstractActor implements RequiresMessageQ } @Override + public Partitioner partitioner() { + return this.partitioner; + } + + @Override public Address.Worker address() { return this.self; } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/fc7da3fc/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java index e2f596e..5a0b869 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java @@ -20,23 +20,53 @@ package org.apache.tinkerpop.gremlin.process.actor; import org.apache.tinkerpop.gremlin.structure.Partition; +import org.apache.tinkerpop.gremlin.structure.Partitioner; import java.util.List; /** + * An Actor represents an isolated processing unit that can only be interacted with via messages. + * Actors are able to send and receive messages. The {@link GraphActors} framework has two types of actors: + * {@link Master} and {@link Worker}. A master actor is not associated with a particular graph {@link Partition}. + * Instead, its role is to coordinate the workers and ultimately, yield the final result of the submitted + * {@link ActorProgram}. + * * @author Marko A. Rodriguez (http://markorodriguez.com) */ public interface Actor { + /** + * Get the {@link Partitioner} associated with the {@link GraphActors} system. + * + * @return the partitioner used to partition (logically and/or physically) the {@link org.apache.tinkerpop.gremlin.structure.Graph} + */ + public Partitioner partitioner(); + + /** + * Get the {@link Address} of the actor. + * + * @return the actor's address + */ public Address address(); + /** + * Get a list of the {@link Address} values of all the workers in {@link GraphActors} system. + * + * @return the worker's addresses + */ + public List<Address.Worker> workers(); + + /** + * Send a message from this actor to another actor given their {@link Address}. + * + * @param toActor the actor to receive the messages + * @param message the message being sent + * @param <M> the message type + */ public <M> void send(final Address toActor, final M message); - public interface Master extends Actor { - public List<Address.Worker> workers(); - public Address.Master address(); public void close(); @@ -51,10 +81,14 @@ public interface Actor { public Address.Master master(); - public List<Address.Worker> workers(); - + /** + * Get the {@link Partition} associated with this worker. + * In principle, this is the subset of the {@link org.apache.tinkerpop.gremlin.structure.Graph} that + * the worker is "data-local" to. + * + * @return the worker's partition + */ public Partition partition(); - } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/fc7da3fc/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/GraphActors.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/GraphActors.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/GraphActors.java index d018397..c19dbf7 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/GraphActors.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/GraphActors.java @@ -20,18 +20,21 @@ package org.apache.tinkerpop.gremlin.process.actor; import org.apache.tinkerpop.gremlin.process.Processor; +import org.apache.tinkerpop.gremlin.structure.Partitioner; import java.util.concurrent.Future; /** * GraphActors is a message-passing based graph {@link Processor} that is: - * asynchronous, distributed, partition-bound, and traverser-centric. + * asynchronous, distributed, and partition centric. * * @author Marko A. Rodriguez (http://markorodriguez.com) */ public interface GraphActors<R> extends Processor { - public Address.Master master(); + public GraphActors<R> program(final ActorProgram<R> program); + + public GraphActors<R> partitioner(final Partitioner partitioner); public Future<R> submit(); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/fc7da3fc/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java index 6bfdff7..b584322 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java @@ -21,7 +21,6 @@ package org.apache.tinkerpop.gremlin.process.actor.traversal; import org.apache.tinkerpop.gremlin.process.actor.Actor; import org.apache.tinkerpop.gremlin.process.actor.ActorProgram; -import org.apache.tinkerpop.gremlin.process.actor.ActorsResult; import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierAddMessage; import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierDoneMessage; import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectAddMessage; @@ -41,7 +40,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.Path import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.RepeatUnrollStrategy; import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ReadOnlyStrategy; import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; -import org.apache.tinkerpop.gremlin.structure.Partitioner; import java.util.ArrayList; import java.util.Arrays; @@ -63,11 +61,9 @@ public final class TraversalActorProgram<R> implements ActorProgram<TraverserSet Terminate.class); private Traversal.Admin<?, R> traversal; - private final Partitioner partitioner; public TraverserSet<R> result = new TraverserSet<>(); - public TraversalActorProgram(final Traversal.Admin<?, R> traversal, final Partitioner partitioner) { - this.partitioner = partitioner; + public TraversalActorProgram(final Traversal.Admin<?, R> traversal) { this.traversal = traversal; final TraversalStrategies strategies = this.traversal.getStrategies().clone(); strategies.addStrategies(ActorVerificationStrategy.instance(), ReadOnlyStrategy.instance()); @@ -89,12 +85,12 @@ public final class TraversalActorProgram<R> implements ActorProgram<TraverserSet @Override public Worker createWorkerProgram(final Actor.Worker worker) { - return new TraversalWorkerProgram<>(worker, this.traversal.clone(), this.partitioner); + return new TraversalWorkerProgram<>(worker, this.traversal.clone()); } @Override public Master createMasterProgram(final Actor.Master master) { - return new TraversalMasterProgram<>(master, this.traversal.clone(), this.partitioner, this.result); + return new TraversalMasterProgram<>(master, this.traversal.clone(), this.result); } @Override http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/fc7da3fc/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java index e15106f..2aaf686 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java @@ -44,7 +44,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSe import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix; import org.apache.tinkerpop.gremlin.structure.Element; import org.apache.tinkerpop.gremlin.structure.Partition; -import org.apache.tinkerpop.gremlin.structure.Partitioner; import java.util.HashMap; import java.util.Map; @@ -54,23 +53,20 @@ import java.util.Map; */ final class TraversalMasterProgram<M> implements ActorProgram.Master<M> { - private final Actor.Master master; private final Traversal.Admin<?, ?> traversal; private final TraversalMatrix<?, ?> matrix; - private final Partitioner partitioner; private Map<String, Barrier> barriers = new HashMap<>(); private final TraverserSet<?> results; private Address.Worker leaderWorker; private int orderCounter = -1; - private final Map<Partition,Address.Worker> partitionToWorkerMap = new HashMap<>(); + private final Map<Partition, Address.Worker> partitionToWorkerMap = new HashMap<>(); - public TraversalMasterProgram(final Actor.Master master, final Traversal.Admin<?, ?> traversal, final Partitioner partitioner, final TraverserSet<?> results) { + public TraversalMasterProgram(final Actor.Master master, final Traversal.Admin<?, ?> traversal, final TraverserSet<?> results) { this.traversal = traversal; // System.out.println("master[created]: " + master.address().getId()); // System.out.println(this.traversal); this.matrix = new TraversalMatrix<>(this.traversal); - this.partitioner = partitioner; this.results = results; this.master = master; Distributing.configure(this.traversal, true, true); @@ -80,8 +76,8 @@ final class TraversalMasterProgram<M> implements ActorProgram.Master<M> { @Override public void setup() { this.leaderWorker = this.master.workers().get(0); - for(int i=0; i<this.partitioner.getPartitions().size(); i++) { - this.partitionToWorkerMap.put(this.partitioner.getPartitions().get(i),this.master.workers().get(i)); + for (int i = 0; i < this.master.partitioner().getPartitions().size(); i++) { + this.partitionToWorkerMap.put(this.master.partitioner().getPartitions().get(i), this.master.workers().get(i)); } this.broadcast(StartMessage.instance()); this.master.send(this.leaderWorker, Terminate.MAYBE); @@ -167,7 +163,7 @@ final class TraversalMasterProgram<M> implements ActorProgram.Master<M> { if (traverser.isHalted()) this.results.add(traverser); else if (traverser.get() instanceof Element) - this.master.send(this.partitionToWorkerMap.get(this.partitioner.getPartition((Element) traverser.get())), traverser); + this.master.send(this.partitionToWorkerMap.get(this.master.partitioner().getPartition((Element) traverser.get())), traverser); else this.master.send(this.master.address(), traverser); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/fc7da3fc/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java index ef80f03..001219a 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java @@ -31,7 +31,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.Step; import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.Traverser; import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; -import org.apache.tinkerpop.gremlin.process.traversal.step.Bypassing; import org.apache.tinkerpop.gremlin.process.traversal.step.Distributing; import org.apache.tinkerpop.gremlin.process.traversal.step.Pushing; import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep; @@ -40,7 +39,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix; import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Element; import org.apache.tinkerpop.gremlin.structure.Partition; -import org.apache.tinkerpop.gremlin.structure.Partitioner; import org.apache.tinkerpop.gremlin.structure.Vertex; import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; @@ -54,7 +52,6 @@ final class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> { private final Actor.Worker self; private final TraversalMatrix<?, ?> matrix; - private final Partitioner partitioner; private final Map<Partition, Address.Worker> partitionToWorkerMap = new HashMap<>(); // private Address.Worker neighborWorker; @@ -63,11 +60,10 @@ final class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> { private boolean voteToHalt = false; private Map<String, Barrier> barriers = new HashMap<>(); - public TraversalWorkerProgram(final Actor.Worker self, final Traversal.Admin<?, ?> traversal, final Partitioner partitioner) { + public TraversalWorkerProgram(final Actor.Worker self, final Traversal.Admin<?, ?> traversal) { this.self = self; // System.out.println("worker[created]: " + this.self.address().getId()); // set up partition and traversal information - this.partitioner = partitioner; final WorkerTraversalSideEffects sideEffects = new WorkerTraversalSideEffects(traversal.getSideEffects(), this.self); TraversalHelper.applyTraversalRecursively(t -> t.setSideEffects(sideEffects), traversal); this.matrix = new TraversalMatrix<>(traversal); @@ -93,8 +89,8 @@ final class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> { final int i = this.self.workers().indexOf(this.self.address()); this.neighborWorker = this.self.workers().get(i == this.self.workers().size() - 1 ? 0 : i + 1); this.isLeader = i == 0; - for (int j = 0; j < this.partitioner.getPartitions().size(); j++) { - this.partitionToWorkerMap.put(this.partitioner.getPartitions().get(j), this.self.workers().get(j)); + for (int j = 0; j < this.self.partitioner().getPartitions().size(); j++) { + this.partitionToWorkerMap.put(this.self.partitioner().getPartitions().get(j), this.self.workers().get(j)); } } @@ -167,7 +163,7 @@ final class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> { if (traverser.isHalted()) this.self.send(this.self.master(), traverser); else if (traverser.get() instanceof Element && !this.self.partition().contains((Element) traverser.get())) - this.self.send(this.partitionToWorkerMap.get(this.partitioner.getPartition((Element) traverser.get())), traverser); + this.self.send(this.partitionToWorkerMap.get(this.self.partitioner().getPartition((Element) traverser.get())), traverser); else this.self.send(this.self.address(), traverser); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/fc7da3fc/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/TraversalActorProgramStep.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/TraversalActorProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/TraversalActorProgramStep.java index ba2a08e..5d643af 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/TraversalActorProgramStep.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/TraversalActorProgramStep.java @@ -60,9 +60,9 @@ public final class TraversalActorProgramStep<S, E> extends AbstractStep<E, E> { if (this.first) { this.first = false; try { - final GraphActors<TraverserSet<E>> graphActors = this.actorsClass.getConstructor(ActorProgram.class, Partitioner.class). - newInstance(new TraversalActorProgram<E>(this.actorsTraversal, this.partitioner), this.partitioner); - graphActors.submit().get().forEach(this.starts::add); + final GraphActors<TraverserSet<E>> graphActors = this.actorsClass.newInstance(); + final ActorProgram<TraverserSet<E>> actorProgram = new TraversalActorProgram<>(this.actorsTraversal); + graphActors.partitioner(this.partitioner).program(actorProgram).submit().get().forEach(this.starts::add); } catch (final Exception e) { throw new IllegalStateException(e.getMessage(), e); }