lots of cleanup and organization of the interfaces. trying to line up the GraphActors interfaces as much as possible with GraphComputer interfaces. JavaDoc'in.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/389ee9bf Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/389ee9bf Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/389ee9bf Branch: refs/heads/TINKERPOP-1564 Commit: 389ee9bf54ec1d13e75d2fb677e967921db14020 Parents: 10773cb Author: Marko A. Rodriguez <[email protected]> Authored: Thu Dec 15 10:54:54 2016 -0700 Committer: Marko A. Rodriguez <[email protected]> Committed: Wed Jan 4 05:07:59 2017 -0700 ---------------------------------------------------------------------- .../akka/process/actor/AkkaGraphActors.java | 11 +-- .../gremlin/akka/process/actor/MasterActor.java | 11 ++- .../tinkerpop/gremlin/process/actor/Actor.java | 2 +- .../gremlin/process/actor/ActorProgram.java | 72 ++++++++++++++++++-- .../gremlin/process/actor/ActorsResult.java | 30 ++++++++ .../actor/traversal/TraversalActorProgram.java | 19 ++++-- .../actor/traversal/TraversalMasterProgram.java | 2 +- .../process/actor/util/DefaultActorsResult.java | 42 ++++++++++++ 8 files changed, 171 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/389ee9bf/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java index 8cfe56f..2f62beb 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java @@ -25,8 +25,10 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigValueFactory; import org.apache.tinkerpop.gremlin.process.actor.ActorProgram; +import org.apache.tinkerpop.gremlin.process.actor.ActorsResult; import org.apache.tinkerpop.gremlin.process.actor.Address; import org.apache.tinkerpop.gremlin.process.actor.GraphActors; +import org.apache.tinkerpop.gremlin.process.actor.util.DefaultActorsResult; import org.apache.tinkerpop.gremlin.structure.Partitioner; import org.apache.tinkerpop.gremlin.structure.util.StringFactory; @@ -41,18 +43,17 @@ import java.util.stream.Collectors; */ public final class AkkaGraphActors<R> implements GraphActors<R> { - private final ActorProgram<R> actorProgram; private final ActorSystem system; private final Address.Master master; + private final ActorsResult<R> result = new DefaultActorsResult<>(); public AkkaGraphActors(final ActorProgram<R> actorProgram, final Partitioner partitioner) { - this.actorProgram = actorProgram; final Config config = ConfigFactory.defaultApplication(). withValue("message-priorities", - ConfigValueFactory.fromAnyRef(this.actorProgram.getMessagePriorities().stream().map(Class::getCanonicalName).collect(Collectors.toList()).toString())); + ConfigValueFactory.fromAnyRef(actorProgram.getMessagePriorities().get().stream().map(Class::getCanonicalName).collect(Collectors.toList()).toString())); this.system = ActorSystem.create("traversal-" + actorProgram.hashCode(), config); try { - this.master = new Address.Master(this.system.actorOf(Props.create(MasterActor.class, this.actorProgram, partitioner), "master").path().toString(), InetAddress.getLocalHost()); + this.master = new Address.Master(this.system.actorOf(Props.create(MasterActor.class, actorProgram, partitioner, result), "master").path().toString(), InetAddress.getLocalHost()); } catch (final UnknownHostException e) { throw new IllegalStateException(e.getMessage(), e); } @@ -74,7 +75,7 @@ public final class AkkaGraphActors<R> implements GraphActors<R> { while (!this.system.isTerminated()) { } - return this.actorProgram.getResult(); + return this.result.getResult(); }); } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/389ee9bf/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java index 29cd212..11069f2 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java @@ -26,6 +26,7 @@ import akka.dispatch.RequiresMessageQueue; import akka.japi.pf.ReceiveBuilder; import org.apache.tinkerpop.gremlin.process.actor.Actor; import org.apache.tinkerpop.gremlin.process.actor.ActorProgram; +import org.apache.tinkerpop.gremlin.process.actor.ActorsResult; import org.apache.tinkerpop.gremlin.process.actor.Address; import org.apache.tinkerpop.gremlin.structure.Partition; import org.apache.tinkerpop.gremlin.structure.Partitioner; @@ -46,8 +47,10 @@ public final class MasterActor extends AbstractActor implements RequiresMessageQ private final Address.Master master; private final List<Address.Worker> workers; private final Map<Address, ActorSelection> actors = new HashMap<>(); + private final ActorsResult<?> result; - public MasterActor(final ActorProgram program, final Partitioner partitioner) { + public MasterActor(final ActorProgram program, final Partitioner partitioner, final ActorsResult<?> result) { + this.result = result; try { this.master = new Address.Master(self().path().toString(), InetAddress.getLocalHost()); } catch (final UnknownHostException e) { @@ -97,6 +100,12 @@ public final class MasterActor extends AbstractActor implements RequiresMessageQ @Override public void close() { context().system().terminate(); + + } + + @Override + public <R> ActorsResult<R> result() { + return (ActorsResult<R>) this.result; } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/389ee9bf/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java index ed627de..e2f596e 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java @@ -33,7 +33,6 @@ public interface Actor { public <M> void send(final Address toActor, final M message); - public interface Master extends Actor { public List<Address.Worker> workers(); @@ -42,6 +41,7 @@ public interface Actor { public void close(); + public <R> ActorsResult<R> result(); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/389ee9bf/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java index 3ae54d1..dd0d7e7 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java @@ -19,21 +19,85 @@ package org.apache.tinkerpop.gremlin.process.actor; +import org.apache.commons.configuration.Configuration; +import org.apache.tinkerpop.gremlin.structure.Graph; + import java.util.List; +import java.util.Optional; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public interface ActorProgram<M> { +public interface ActorProgram<M> extends Cloneable { + + public static final String ACTOR_PROGRAM = "gremlin.actorProgram"; + + /** + * When it is necessary to store the state of the ActorProgram, this method is called. + * This is typically required when the ActorProgram needs to be serialized to another machine. + * Note that what is stored is simply the instance/configuration state, not any processed data. + * The default implementation provided simply stores the ActorProgarm class name for reflective reconstruction. + * It is typically a good idea to ActorProgram.super.storeState(). + * + * @param configuration the configuration to store the state of the ActorProgram in. + */ + public default void storeState(final Configuration configuration) { + configuration.setProperty(ACTOR_PROGRAM, this.getClass().getName()); + } + /** + * When it is necessary to load the state of the ActorProgram, this method is called. + * This is typically required when the ActorProgram needs to be serialized to another machine. + * Note that what is loaded is simply the instance state, not any processed data. + * + * @param graph the graph that the ActorProgram will run against + * @param configuration the configuration to load the state of the ActorProgram from. + */ + public default void loadState(final Graph graph, final Configuration configuration) { + + } + + /** + * Create the {@link org.apache.tinkerpop.gremlin.process.actor.Actor.Worker} program. + * This is typically used by {@link Worker} to spawn its program. + * + * @param worker the worker actor creating the worker program + * @return the worker program + */ public Worker createWorkerProgram(final Actor.Worker worker); + /** + * Create the {@link org.apache.tinkerpop.gremlin.process.actor.Actor.Master} program. + * This is typically used by {@link Master} to spawn its program. + * + * @param master the master actor creating the master program + * @return the master program + */ public Master createMasterProgram(final Actor.Master master); - public List<Class> getMessagePriorities(); - - public M getResult(); + /** + * Get the ordered list of message classes where order determines the priority + * of message reception by the respective {@link Actor}. For instance, + * if an {@link Actor} has a message of type {@code X} and a message of type {@code Y} + * in its message buffer, and {@code X} has a higher priority, it will be fetched + * first from the buffer. If no list is provided then its FIFO. + * The default implementation returns an {@link Optional#empty()}. + * + * @return the optional ordered list of message priorities. + */ + public default Optional<List<Class>> getMessagePriorities() { + return Optional.empty(); + } + /** + * When multiple workers on a single machine need ActorProgram instances, it is possible to use clone. + * This will provide a speedier way of generating instances, over the {@link ActorProgram#storeState} and {@link ActorProgram#loadState} model. + * The default implementation simply returns the object as it assumes that the ActorProgram instance is a stateless singleton. + * + * @return A clone of the VertexProgram object + */ + @SuppressWarnings("CloneDoesntDeclareCloneNotSupportedException") + public ActorProgram<M> clone(); public static interface Worker<M> { public void setup(); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/389ee9bf/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorsResult.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorsResult.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorsResult.java new file mode 100644 index 0000000..c9db36a --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorsResult.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actor; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public interface ActorsResult<R> { + + public R getResult(); + + public void setResult(final R result); +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/389ee9bf/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java index f9f86da..6bfdff7 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java @@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.process.actor.traversal; import org.apache.tinkerpop.gremlin.process.actor.Actor; import org.apache.tinkerpop.gremlin.process.actor.ActorProgram; +import org.apache.tinkerpop.gremlin.process.actor.ActorsResult; import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierAddMessage; import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierDoneMessage; import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectAddMessage; @@ -45,6 +46,7 @@ import org.apache.tinkerpop.gremlin.structure.Partitioner; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Optional; /** * @author Marko A. Rodriguez (http://markorodriguez.com) @@ -60,7 +62,7 @@ public final class TraversalActorProgram<R> implements ActorProgram<TraverserSet BarrierDoneMessage.class, Terminate.class); - private final Traversal.Admin<?, R> traversal; + private Traversal.Admin<?, R> traversal; private final Partitioner partitioner; public TraverserSet<R> result = new TraverserSet<>(); @@ -96,13 +98,18 @@ public final class TraversalActorProgram<R> implements ActorProgram<TraverserSet } @Override - public List<Class> getMessagePriorities() { - return MESSAGE_PRIORITIES; + public Optional<List<Class>> getMessagePriorities() { + return Optional.of(MESSAGE_PRIORITIES); } - @Override - public TraverserSet<R> getResult() { - return this.result; + public ActorProgram<TraverserSet<R>> clone() { + try { + final TraversalActorProgram<R> clone = (TraversalActorProgram<R>) super.clone(); + clone.traversal = this.traversal.clone(); + return clone; + } catch (final CloneNotSupportedException e) { + throw new IllegalStateException(e.getMessage(), e); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/389ee9bf/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java index 87ed4e6..e15106f 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java @@ -138,7 +138,7 @@ final class TraversalMasterProgram<M> implements ActorProgram.Master<M> { @Override public void terminate() { - + this.master.result().setResult(this.results); } private void broadcast(final Object message) { http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/389ee9bf/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/util/DefaultActorsResult.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/util/DefaultActorsResult.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/util/DefaultActorsResult.java new file mode 100644 index 0000000..c650ba1 --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/util/DefaultActorsResult.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actor.util; + +import org.apache.tinkerpop.gremlin.process.actor.ActorsResult; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class DefaultActorsResult<R> implements ActorsResult<R> { + + private R result; + + public DefaultActorsResult() { + + } + + public R getResult() { + return this.result; + } + + public void setResult(final R result) { + this.result = result; + } +}
