http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java deleted file mode 100644 index 89002fe..0000000 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.process.actor; - -import org.apache.commons.configuration.Configuration; -import org.apache.tinkerpop.gremlin.structure.Graph; - -import java.util.List; -import java.util.Optional; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public interface ActorProgram<M> extends Cloneable { - - public static final String ACTOR_PROGRAM = "gremlin.actorProgram"; - - /** - * When it is necessary to store the state of the ActorProgram, this method is called. - * This is typically required when the ActorProgram needs to be serialized to another machine. - * Note that what is stored is simply the instance/configuration state, not any processed data. - * The default implementation provided simply stores the ActorProgarm class name for reflective reconstruction. - * It is typically a good idea to ActorProgram.super.storeState(). - * - * @param configuration the configuration to store the state of the ActorProgram in. - */ - public default void storeState(final Configuration configuration) { - configuration.setProperty(ACTOR_PROGRAM, this.getClass().getName()); - } - - /** - * When it is necessary to load the state of the ActorProgram, this method is called. - * This is typically required when the ActorProgram needs to be serialized to another machine. - * Note that what is loaded is simply the instance state, not any processed data. - * - * @param graph the graph that the ActorProgram will run against - * @param configuration the configuration to load the state of the ActorProgram from. - */ - public default void loadState(final Graph graph, final Configuration configuration) { - - } - - /** - * Create the {@link org.apache.tinkerpop.gremlin.process.actor.Actor.Worker} program. - * This is typically used by {@link Worker} to spawn its program. - * - * @param worker the worker actor creating the worker program - * @return the worker program - */ - public ActorProgram.Worker createWorkerProgram(final Actor.Worker worker); - - /** - * Create the {@link org.apache.tinkerpop.gremlin.process.actor.Actor.Master} program. - * This is typically used by {@link Master} to spawn its program. - * - * @param master the master actor creating the master program - * @return the master program - */ - public ActorProgram.Master createMasterProgram(final Actor.Master master); - - /** - * Get the ordered list of message classes where order determines the priority - * of message reception by the respective {@link Actor}. For instance, - * if an {@link Actor} has a message of type {@code X} and a message of type {@code Y} - * in its message buffer, and {@code X} has a higher priority, it will be fetched - * first from the buffer. If no list is provided then its FIFO. - * The default implementation returns an {@link Optional#empty()}. - * - * @return the optional ordered list of message priorities. - */ - public default Optional<List<Class>> getMessagePriorities() { - return Optional.empty(); - } - - /** - * When multiple workers on a single machine need ActorProgram instances, it is possible to use clone. - * This will provide a speedier way of generating instances, over the {@link ActorProgram#storeState} and {@link ActorProgram#loadState} model. - * The default implementation simply returns the object as it assumes that the ActorProgram instance is a stateless singleton. - * - * @return A clone of the VertexProgram object - */ - @SuppressWarnings("CloneDoesntDeclareCloneNotSupportedException") - public ActorProgram<M> clone(); - - /** - * The Worker program is executed by a worker process in the {@link GraphActors} system. - * There are many workers and a single master. - * All workers execute the same program. - * - * @param <M> The message type accepted by the worker - */ - public static interface Worker<M> { - - /** - * This method is evaluated when the worker process is spawned. - */ - public void setup(); - - /** - * This method is evaluated when the worker receives a new message. - * - * @param message the received message - */ - public void execute(final M message); - - /** - * This method is evaluated when the worker process is destroyed. - */ - public void terminate(); - - } - - /** - * The Master program is executed by the master process in the {@link GraphActors} system. - * There are many workers and a single master. - * - * @param <M> The message type accepted by the master - */ - public static interface Master<M> { - public void setup(); - - public void execute(final M message); - - public void terminate(); - - } - -}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorsResult.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorsResult.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorsResult.java deleted file mode 100644 index c9db36a..0000000 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorsResult.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.process.actor; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public interface ActorsResult<R> { - - public R getResult(); - - public void setResult(final R result); -} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Address.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Address.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Address.java deleted file mode 100644 index 9f59e5e..0000000 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Address.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.process.actor; - -import java.io.Serializable; -import java.net.InetAddress; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public abstract class Address implements Serializable { - - private final String id; - private final InetAddress location; - - public Address(final String id, final InetAddress location) { - this.id = id; - this.location = location; - } - - public InetAddress getLocation() { - return this.location; - } - - public String getId() { - return this.id; - } - - @Override - public boolean equals(final Object other) { - return other instanceof Address && ((Address) other).id.equals(this.id); - } - - @Override - public int hashCode() { - return this.id.hashCode(); - } - - @Override - public String toString() { - return this.id; - } - - public static final class Master extends Address { - - public Master(final String id, final InetAddress location) { - super(id, location); - } - - } - - public static final class Worker extends Address { - - public Worker(final String id, final InetAddress location) { - super(id, location); - } - - } -} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/GraphActors.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/GraphActors.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/GraphActors.java deleted file mode 100644 index 51f4c4a..0000000 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/GraphActors.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.process.actor; - -import org.apache.commons.configuration.BaseConfiguration; -import org.apache.commons.configuration.Configuration; -import org.apache.tinkerpop.gremlin.process.Processor; -import org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.decoration.ActorProgramStrategy; -import org.apache.tinkerpop.gremlin.process.traversal.Traversal; -import org.apache.tinkerpop.gremlin.process.traversal.strategy.ProcessorTraversalStrategy; -import org.apache.tinkerpop.gremlin.structure.Graph; - -import java.util.concurrent.Future; - -/** - * GraphActors is a message-passing based graph {@link Processor} that is: - * asynchronous, distributed, and partition centric. - * - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public interface GraphActors<R> extends Processor { - - public static final String GRAPH_ACTORS = "gremlin.graphActors"; - public static final String GRAPH_ACTORS_WORKERS = "gremlin.graphActors.workers"; - - /** - * Provide the {@link ActorProgram} that the GraphActors will execute. - * - * @param program the program to execute - * @return the updated GraphActors with newly defined program - */ - public GraphActors<R> program(final ActorProgram<R> program); - - /** - * Specify the number of workers per {@link Graph} {@link org.apache.tinkerpop.gremlin.structure.Partition}. - * - * @param workers the number of workers per partition - * @return the updated GraphActors with newly defined workers - */ - public GraphActors<R> workers(final int workers); - - /** - * Add an arbitrary configuration to the GraphActors system. - * Typically, these configurations are provider-specific and do not generalize across all GraphActor implementations. - * - * @param key the key of the configuration - * @param value the value of the configuration - * @return the updated GraphActors with newly defined configuration - */ - public GraphActors<R> configure(final String key, final Object value); - - /** - * Submit the {@link ActorProgram} for execution by the {@link GraphActors}. - * - * @return a {@link Future} denoting a reference to the asynchronous computation's result - */ - public Future<R> submit(final Graph graph); - - /** - * Returns an {@link ActorProgramStrategy} which enables a {@link Traversal} to execute on {@link GraphActors}. - * - * @return a traversal strategy capable of executing traversals on a GraphActors - */ - public default ProcessorTraversalStrategy<GraphActors> getProcessorTraversalStrategy() { - return new ActorProgramStrategy(this); - } - - public static <A extends GraphActors> A open(final Configuration configuration) { - try { - return (A) Class.forName(configuration.getString(GRAPH_ACTORS)).getMethod("open", Configuration.class).invoke(null, configuration); - } catch (final Exception e) { - throw new IllegalArgumentException(e.getMessage(), e); - } - } - - public static <A extends GraphActors> A open(final Class<A> graphActorsClass) { - final BaseConfiguration configuration = new BaseConfiguration(); - configuration.setProperty(GRAPH_ACTORS, graphActorsClass.getCanonicalName()); - return GraphActors.open(configuration); - } -} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java deleted file mode 100644 index b584322..0000000 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.process.actor.traversal; - -import org.apache.tinkerpop.gremlin.process.actor.Actor; -import org.apache.tinkerpop.gremlin.process.actor.ActorProgram; -import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierAddMessage; -import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierDoneMessage; -import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectAddMessage; -import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectSetMessage; -import org.apache.tinkerpop.gremlin.process.actor.traversal.message.StartMessage; -import org.apache.tinkerpop.gremlin.process.actor.traversal.message.Terminate; -import org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.decoration.ActorProgramStrategy; -import org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.verification.ActorVerificationStrategy; -import org.apache.tinkerpop.gremlin.process.traversal.Traversal; -import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies; -import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy; -import org.apache.tinkerpop.gremlin.process.traversal.Traverser; -import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.InlineFilterStrategy; -import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.LazyBarrierStrategy; -import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.MatchPredicateStrategy; -import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.PathRetractionStrategy; -import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.RepeatUnrollStrategy; -import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ReadOnlyStrategy; -import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Optional; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class TraversalActorProgram<R> implements ActorProgram<TraverserSet<R>> { - - private static final List<Class> MESSAGE_PRIORITIES = Arrays.asList( - StartMessage.class, - Traverser.class, - SideEffectAddMessage.class, - BarrierAddMessage.class, - SideEffectSetMessage.class, - BarrierDoneMessage.class, - Terminate.class); - - private Traversal.Admin<?, R> traversal; - public TraverserSet<R> result = new TraverserSet<>(); - - public TraversalActorProgram(final Traversal.Admin<?, R> traversal) { - this.traversal = traversal; - final TraversalStrategies strategies = this.traversal.getStrategies().clone(); - strategies.addStrategies(ActorVerificationStrategy.instance(), ReadOnlyStrategy.instance()); - // TODO: make TinkerGraph/etc. strategies smart about actors - new ArrayList<>(strategies.toList()).stream(). - filter(s -> s instanceof TraversalStrategy.ProviderOptimizationStrategy). - map(TraversalStrategy::getClass). - forEach(strategies::removeStrategies); - strategies.removeStrategies( - ActorProgramStrategy.class, - LazyBarrierStrategy.class, - RepeatUnrollStrategy.class, - MatchPredicateStrategy.class, - InlineFilterStrategy.class, - PathRetractionStrategy.class); - this.traversal.setStrategies(strategies); - this.traversal.applyStrategies(); - } - - @Override - public Worker createWorkerProgram(final Actor.Worker worker) { - return new TraversalWorkerProgram<>(worker, this.traversal.clone()); - } - - @Override - public Master createMasterProgram(final Actor.Master master) { - return new TraversalMasterProgram<>(master, this.traversal.clone(), this.result); - } - - @Override - public Optional<List<Class>> getMessagePriorities() { - return Optional.of(MESSAGE_PRIORITIES); - } - - @Override - public ActorProgram<TraverserSet<R>> clone() { - try { - final TraversalActorProgram<R> clone = (TraversalActorProgram<R>) super.clone(); - clone.traversal = this.traversal.clone(); - return clone; - } catch (final CloneNotSupportedException e) { - throw new IllegalStateException(e.getMessage(), e); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java deleted file mode 100644 index 2aaf686..0000000 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.process.actor.traversal; - -import org.apache.tinkerpop.gremlin.process.actor.Actor; -import org.apache.tinkerpop.gremlin.process.actor.ActorProgram; -import org.apache.tinkerpop.gremlin.process.actor.Address; -import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierAddMessage; -import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierDoneMessage; -import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectAddMessage; -import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectSetMessage; -import org.apache.tinkerpop.gremlin.process.actor.traversal.message.StartMessage; -import org.apache.tinkerpop.gremlin.process.actor.traversal.message.Terminate; -import org.apache.tinkerpop.gremlin.process.traversal.Step; -import org.apache.tinkerpop.gremlin.process.traversal.Traversal; -import org.apache.tinkerpop.gremlin.process.traversal.Traverser; -import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; -import org.apache.tinkerpop.gremlin.process.traversal.step.Distributing; -import org.apache.tinkerpop.gremlin.process.traversal.step.LocalBarrier; -import org.apache.tinkerpop.gremlin.process.traversal.step.Pushing; -import org.apache.tinkerpop.gremlin.process.traversal.step.SideEffectCapable; -import org.apache.tinkerpop.gremlin.process.traversal.step.filter.RangeGlobalStep; -import org.apache.tinkerpop.gremlin.process.traversal.step.filter.TailGlobalStep; -import org.apache.tinkerpop.gremlin.process.traversal.step.map.OrderGlobalStep; -import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.OrderedTraverser; -import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; -import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix; -import org.apache.tinkerpop.gremlin.structure.Element; -import org.apache.tinkerpop.gremlin.structure.Partition; - -import java.util.HashMap; -import java.util.Map; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -final class TraversalMasterProgram<M> implements ActorProgram.Master<M> { - - private final Actor.Master master; - private final Traversal.Admin<?, ?> traversal; - private final TraversalMatrix<?, ?> matrix; - private Map<String, Barrier> barriers = new HashMap<>(); - private final TraverserSet<?> results; - private Address.Worker leaderWorker; - private int orderCounter = -1; - private final Map<Partition, Address.Worker> partitionToWorkerMap = new HashMap<>(); - - public TraversalMasterProgram(final Actor.Master master, final Traversal.Admin<?, ?> traversal, final TraverserSet<?> results) { - this.traversal = traversal; - // System.out.println("master[created]: " + master.address().getId()); - // System.out.println(this.traversal); - this.matrix = new TraversalMatrix<>(this.traversal); - this.results = results; - this.master = master; - Distributing.configure(this.traversal, true, true); - Pushing.configure(this.traversal, true, false); - } - - @Override - public void setup() { - this.leaderWorker = this.master.workers().get(0); - for (int i = 0; i < this.master.partitioner().getPartitions().size(); i++) { - this.partitionToWorkerMap.put(this.master.partitioner().getPartitions().get(i), this.master.workers().get(i)); - } - this.broadcast(StartMessage.instance()); - this.master.send(this.leaderWorker, Terminate.MAYBE); - } - - @Override - public void execute(final M message) { - if (message instanceof Traverser.Admin) { - this.processTraverser((Traverser.Admin) message); - } else if (message instanceof BarrierAddMessage) { - final Barrier barrier = (Barrier) this.matrix.getStepById(((BarrierAddMessage) message).getStepId()); - final Step<?, ?> step = (Step) barrier; - barrier.addBarrier(((BarrierAddMessage) message).getBarrier()); - this.barriers.put(step.getId(), barrier); - } else if (message instanceof SideEffectAddMessage) { - this.traversal.getSideEffects().add(((SideEffectAddMessage) message).getKey(), ((SideEffectAddMessage) message).getValue()); - } else if (message instanceof Terminate) { - assert Terminate.YES == message; - if (!this.barriers.isEmpty()) { - for (final Barrier barrier : this.barriers.values()) { - final Step<?, ?> step = (Step) barrier; - if (!(barrier instanceof LocalBarrier)) { - this.orderBarrier(step); - if (step instanceof OrderGlobalStep) this.orderCounter = 0; - while (step.hasNext()) { - this.sendTraverser(-1 == this.orderCounter ? - step.next() : - new OrderedTraverser<>(step.next(), this.orderCounter++)); - } - } else { - if (step instanceof SideEffectCapable) { - final String key = ((SideEffectCapable) step).getSideEffectKey(); - this.broadcast(new SideEffectSetMessage(key, this.traversal.getSideEffects().get(key))); - } - this.broadcast(new BarrierDoneMessage(barrier)); - barrier.done(); - } - } - this.barriers.clear(); - this.master.send(this.leaderWorker, Terminate.MAYBE); - } else { - while (this.traversal.hasNext()) { - this.results.add((Traverser.Admin) this.traversal.nextTraverser()); - } - if (this.orderCounter != -1) - this.results.sort((a, b) -> Integer.compare(((OrderedTraverser<?>) a).order(), ((OrderedTraverser<?>) b).order())); - - this.master.close(); - } - } else { - throw new IllegalStateException("Unknown message:" + message); - } - } - - @Override - public void terminate() { - this.master.result().setResult(this.results); - } - - private void broadcast(final Object message) { - for (final Address.Worker worker : this.master.workers()) { - this.master.send(worker, message); - } - } - - private void processTraverser(final Traverser.Admin traverser) { - if (traverser.isHalted() || traverser.get() instanceof Element) { - this.sendTraverser(traverser); - } else { - final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId()); - step.addStart(traverser); - if (step instanceof Barrier) { - this.barriers.put(step.getId(), (Barrier) step); - } else { - while (step.hasNext()) { - this.processTraverser(step.next()); - } - } - } - } - - private void sendTraverser(final Traverser.Admin traverser) { - if (traverser.isHalted()) - this.results.add(traverser); - else if (traverser.get() instanceof Element) - this.master.send(this.partitionToWorkerMap.get(this.master.partitioner().getPartition((Element) traverser.get())), traverser); - else - this.master.send(this.master.address(), traverser); - } - - private void orderBarrier(final Step step) { - if (this.orderCounter != -1 && step instanceof Barrier && (step instanceof RangeGlobalStep || step instanceof TailGlobalStep)) { - final Barrier barrier = (Barrier) step; - final TraverserSet<?> rangingBarrier = (TraverserSet<?>) barrier.nextBarrier(); - rangingBarrier.sort((a, b) -> Integer.compare(((OrderedTraverser<?>) a).order(), ((OrderedTraverser<?>) b).order())); - barrier.addBarrier(rangingBarrier); - } - } -} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java deleted file mode 100644 index 001219a..0000000 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.process.actor.traversal; - -import org.apache.tinkerpop.gremlin.process.actor.Actor; -import org.apache.tinkerpop.gremlin.process.actor.ActorProgram; -import org.apache.tinkerpop.gremlin.process.actor.Address; -import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierAddMessage; -import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierDoneMessage; -import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectSetMessage; -import org.apache.tinkerpop.gremlin.process.actor.traversal.message.StartMessage; -import org.apache.tinkerpop.gremlin.process.actor.traversal.message.Terminate; -import org.apache.tinkerpop.gremlin.process.traversal.Step; -import org.apache.tinkerpop.gremlin.process.traversal.Traversal; -import org.apache.tinkerpop.gremlin.process.traversal.Traverser; -import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; -import org.apache.tinkerpop.gremlin.process.traversal.step.Distributing; -import org.apache.tinkerpop.gremlin.process.traversal.step.Pushing; -import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep; -import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper; -import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix; -import org.apache.tinkerpop.gremlin.structure.Edge; -import org.apache.tinkerpop.gremlin.structure.Element; -import org.apache.tinkerpop.gremlin.structure.Partition; -import org.apache.tinkerpop.gremlin.structure.Vertex; -import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; - -import java.util.HashMap; -import java.util.Map; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -final class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> { - - private final Actor.Worker self; - private final TraversalMatrix<?, ?> matrix; - private final Map<Partition, Address.Worker> partitionToWorkerMap = new HashMap<>(); - // - private Address.Worker neighborWorker; - private boolean isLeader; - private Terminate terminate = null; - private boolean voteToHalt = false; - private Map<String, Barrier> barriers = new HashMap<>(); - - public TraversalWorkerProgram(final Actor.Worker self, final Traversal.Admin<?, ?> traversal) { - this.self = self; - // System.out.println("worker[created]: " + this.self.address().getId()); - // set up partition and traversal information - final WorkerTraversalSideEffects sideEffects = new WorkerTraversalSideEffects(traversal.getSideEffects(), this.self); - TraversalHelper.applyTraversalRecursively(t -> t.setSideEffects(sideEffects), traversal); - this.matrix = new TraversalMatrix<>(traversal); - Distributing.configure(traversal, false, true); - Pushing.configure(traversal, true, false); - ////// - final GraphStep graphStep = (GraphStep) traversal.getStartStep(); - if (0 == graphStep.getIds().length) - ((GraphStep) traversal.getStartStep()).setIteratorSupplier(graphStep.returnsVertex() ? this.self.partition()::vertices : this.self.partition()::edges); - else { - if (graphStep.returnsVertex()) - ((GraphStep<Vertex, Vertex>) traversal.getStartStep()).setIteratorSupplier( - () -> IteratorUtils.filter(self.partition().vertices(graphStep.getIds()), this.self.partition()::contains)); - else - ((GraphStep<Edge, Edge>) traversal.getStartStep()).setIteratorSupplier( - () -> IteratorUtils.filter(self.partition().edges(graphStep.getIds()), this.self.partition()::contains)); - } - } - - @Override - public void setup() { - // create termination ring topology - final int i = this.self.workers().indexOf(this.self.address()); - this.neighborWorker = this.self.workers().get(i == this.self.workers().size() - 1 ? 0 : i + 1); - this.isLeader = i == 0; - for (int j = 0; j < this.self.partitioner().getPartitions().size(); j++) { - this.partitionToWorkerMap.put(this.self.partitioner().getPartitions().get(j), this.self.workers().get(j)); - } - } - - @Override - public void execute(final M message) { - //System.out.println(message + "::" + this.isLeader); - if (message instanceof StartMessage) { - // initial message from master that says: "start processing" - final GraphStep step = (GraphStep) this.matrix.getTraversal().getStartStep(); - while (step.hasNext()) { - this.sendTraverser(step.next()); - } - } else if (message instanceof Traverser.Admin) { - this.processTraverser((Traverser.Admin) message); - } else if (message instanceof SideEffectSetMessage) { - this.matrix.getTraversal().getSideEffects().set(((SideEffectSetMessage) message).getKey(), ((SideEffectSetMessage) message).getValue()); - } else if (message instanceof BarrierDoneMessage) { - final Step<?, ?> step = (Step) this.matrix.getStepById(((BarrierDoneMessage) message).getStepId()); - while (step.hasNext()) { - sendTraverser(step.next()); - } - } else if (message instanceof Terminate) { - assert null == this.terminate; - this.terminate = (Terminate) message; - if (!this.barriers.isEmpty()) { - for (final Barrier barrier : this.barriers.values()) { - while (barrier.hasNextBarrier()) { - this.self.send(this.self.master(), new BarrierAddMessage(barrier)); - } - } - this.barriers.clear(); - } - // use termination token to determine termination condition - if (this.isLeader) { - if (this.voteToHalt && Terminate.YES == this.terminate) - this.self.send(this.self.master(), Terminate.YES); - else - this.self.send(this.neighborWorker, Terminate.YES); - } else - this.self.send(this.neighborWorker, this.voteToHalt ? this.terminate : Terminate.NO); - this.terminate = null; - this.voteToHalt = true; - } else { - throw new IllegalArgumentException("The following message is unknown: " + message); - } - } - - @Override - public void terminate() { - - } - - ////////////// - - private void processTraverser(final Traverser.Admin traverser) { - assert !(traverser.get() instanceof Element) || !traverser.isHalted() || this.self.partition().contains((Element) traverser.get()); - final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId()); - step.addStart(traverser); - if (step instanceof Barrier) { - this.barriers.put(step.getId(), (Barrier) step); - } else { - while (step.hasNext()) { - this.sendTraverser(step.next()); - } - } - } - - private void sendTraverser(final Traverser.Admin traverser) { - this.voteToHalt = false; - if (traverser.isHalted()) - this.self.send(this.self.master(), traverser); - else if (traverser.get() instanceof Element && !this.self.partition().contains((Element) traverser.get())) - this.self.send(this.partitionToWorkerMap.get(this.self.partitioner().getPartition((Element) traverser.get())), traverser); - else - this.self.send(this.self.address(), traverser); - } -} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/WorkerTraversalSideEffects.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/WorkerTraversalSideEffects.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/WorkerTraversalSideEffects.java deleted file mode 100644 index 0950435..0000000 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/WorkerTraversalSideEffects.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.process.actor.traversal; - -import org.apache.tinkerpop.gremlin.process.actor.Actor; -import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectAddMessage; -import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects; - -import java.util.Optional; -import java.util.Set; -import java.util.function.BinaryOperator; -import java.util.function.Supplier; -import java.util.function.UnaryOperator; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class WorkerTraversalSideEffects implements TraversalSideEffects { - - private TraversalSideEffects sideEffects; - private Actor.Worker worker; - - - private WorkerTraversalSideEffects() { - // for serialization - } - - public WorkerTraversalSideEffects(final TraversalSideEffects sideEffects, final Actor.Worker worker) { - this.sideEffects = sideEffects; - this.worker = worker; - } - - public TraversalSideEffects getSideEffects() { - return this.sideEffects; - } - - @Override - public void set(final String key, final Object value) { - this.sideEffects.set(key, value); - } - - @Override - public <V> V get(final String key) throws IllegalArgumentException { - return this.sideEffects.get(key); - } - - @Override - public void remove(final String key) { - this.sideEffects.remove(key); - } - - @Override - public Set<String> keys() { - return this.sideEffects.keys(); - } - - @Override - public void add(final String key, final Object value) { - this.sideEffects.add(key, value); - this.worker.send(this.worker.master(), new SideEffectAddMessage(key, value)); - } - - @Override - public <V> void register(final String key, final Supplier<V> initialValue, final BinaryOperator<V> reducer) { - this.sideEffects.register(key, initialValue, reducer); - } - - @Override - public <V> void registerIfAbsent(final String key, final Supplier<V> initialValue, final BinaryOperator<V> reducer) { - this.sideEffects.registerIfAbsent(key, initialValue, reducer); - } - - @Override - public <V> BinaryOperator<V> getReducer(final String key) { - return this.sideEffects.getReducer(key); - } - - @Override - public <V> Supplier<V> getSupplier(final String key) { - return this.sideEffects.getSupplier(key); - } - - @Override - @Deprecated - public void registerSupplier(final String key, final Supplier supplier) { - this.sideEffects.registerSupplier(key, supplier); - } - - @Override - @Deprecated - public <V> Optional<Supplier<V>> getRegisteredSupplier(final String key) { - return this.sideEffects.getRegisteredSupplier(key); - } - - @Override - public <S> void setSack(final Supplier<S> initialValue, final UnaryOperator<S> splitOperator, final BinaryOperator<S> mergeOperator) { - this.sideEffects.setSack(initialValue, splitOperator, mergeOperator); - } - - @Override - public <S> Supplier<S> getSackInitialValue() { - return this.sideEffects.getSackInitialValue(); - } - - @Override - public <S> UnaryOperator<S> getSackSplitter() { - return this.sideEffects.getSackSplitter(); - } - - @Override - public <S> BinaryOperator<S> getSackMerger() { - return this.sideEffects.getSackMerger(); - } - - @Override - public TraversalSideEffects clone() { - try { - final WorkerTraversalSideEffects clone = (WorkerTraversalSideEffects) super.clone(); - clone.sideEffects = this.sideEffects.clone(); - return clone; - } catch (final CloneNotSupportedException e) { - throw new IllegalStateException(e.getMessage(), e); - } - } - - @Override - public void mergeInto(final TraversalSideEffects sideEffects) { - this.sideEffects.mergeInto(sideEffects); - } - -} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/BarrierAddMessage.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/BarrierAddMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/BarrierAddMessage.java deleted file mode 100644 index dba9b86..0000000 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/BarrierAddMessage.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.process.actor.traversal.message; - -import org.apache.tinkerpop.gremlin.process.traversal.Step; -import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class BarrierAddMessage { - - private final Object barrier; - private final String stepId; - - public BarrierAddMessage(final Barrier barrier) { - this.barrier = barrier.nextBarrier(); - this.stepId = ((Step) barrier).getId(); - } - - public Object getBarrier() { - return this.barrier; - } - - public String getStepId() { - return this.stepId; - } - - -} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/BarrierDoneMessage.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/BarrierDoneMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/BarrierDoneMessage.java deleted file mode 100644 index 837a55f..0000000 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/BarrierDoneMessage.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.process.actor.traversal.message; - -import org.apache.tinkerpop.gremlin.process.traversal.Step; -import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class BarrierDoneMessage { - - private final String stepId; - - public BarrierDoneMessage(final Barrier barrier) { - this.stepId = ((Step) barrier).getId(); - - } - - public String getStepId() { - return this.stepId; - } -} - http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/SideEffectAddMessage.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/SideEffectAddMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/SideEffectAddMessage.java deleted file mode 100644 index 511c125..0000000 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/SideEffectAddMessage.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.process.actor.traversal.message; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class SideEffectAddMessage { - - private final String key; - private final Object value; - - public SideEffectAddMessage(final String key, final Object value) { - this.value = value; - this.key = key; - } - - public String getKey() { - return this.key; - } - - public Object getValue() { - return this.value; - } -} - http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/SideEffectSetMessage.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/SideEffectSetMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/SideEffectSetMessage.java deleted file mode 100644 index 31f83c2..0000000 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/SideEffectSetMessage.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.process.actor.traversal.message; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class SideEffectSetMessage { - - private final String key; - private final Object value; - - public SideEffectSetMessage(final String key, final Object value) { - this.key = key; - this.value = value; - } - - public String getKey() { - return this.key; - } - - public Object getValue() { - return this.value; - } -} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/StartMessage.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/StartMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/StartMessage.java deleted file mode 100644 index 1b4292e..0000000 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/StartMessage.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.process.actor.traversal.message; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class StartMessage { - - private static final StartMessage INSTANCE = new StartMessage(); - - private StartMessage() { - } - - public static StartMessage instance() { - return INSTANCE; - } -} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/Terminate.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/Terminate.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/Terminate.java deleted file mode 100644 index 4ab789d..0000000 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/Terminate.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.process.actor.traversal.message; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public enum Terminate { - - MAYBE, YES, NO -} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/TraversalActorProgramStep.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/TraversalActorProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/TraversalActorProgramStep.java deleted file mode 100644 index e4520aa..0000000 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/TraversalActorProgramStep.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.process.actor.traversal.step.map; - -import org.apache.commons.configuration.Configuration; -import org.apache.tinkerpop.gremlin.process.actor.ActorProgram; -import org.apache.tinkerpop.gremlin.process.actor.GraphActors; -import org.apache.tinkerpop.gremlin.process.actor.traversal.TraversalActorProgram; -import org.apache.tinkerpop.gremlin.process.traversal.Traversal; -import org.apache.tinkerpop.gremlin.process.traversal.Traverser; -import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep; -import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep; -import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; -import org.apache.tinkerpop.gremlin.structure.util.StringFactory; - -import java.util.NoSuchElementException; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class TraversalActorProgramStep<S, E> extends AbstractStep<E, E> { - - - private final Traversal.Admin<S, E> actorsTraversal; - private final Configuration graphActorsConfiguration; - private boolean first = true; - - public TraversalActorProgramStep(final Traversal.Admin<?, ?> traversal, final Configuration graphActorsConfiguration) { - super(traversal); - this.graphActorsConfiguration = graphActorsConfiguration; - this.actorsTraversal = (Traversal.Admin) traversal.clone(); - this.actorsTraversal.setParent(EmptyStep.instance()); - } - - @Override - protected Traverser.Admin<E> processNextStart() throws NoSuchElementException { - if (this.first) { - this.first = false; - try { - final GraphActors<TraverserSet<E>> graphActors = GraphActors.open(this.graphActorsConfiguration); - final ActorProgram<TraverserSet<E>> actorProgram = new TraversalActorProgram<>(this.actorsTraversal); - graphActors.program(actorProgram).submit(this.getTraversal().getGraph().get()).get().forEach(this.starts::add); - } catch (final Exception e) { - throw new IllegalStateException(e.getMessage(), e); - } - } - return this.starts.next(); - } - - @Override - public String toString() { - return StringFactory.stepString(this, this.actorsTraversal); - } - -} - http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/decoration/ActorProgramStrategy.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/decoration/ActorProgramStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/decoration/ActorProgramStrategy.java deleted file mode 100644 index 7e713de..0000000 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/decoration/ActorProgramStrategy.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.decoration; - -import org.apache.commons.configuration.Configuration; -import org.apache.tinkerpop.gremlin.process.actor.GraphActors; -import org.apache.tinkerpop.gremlin.process.actor.traversal.step.map.TraversalActorProgramStep; -import org.apache.tinkerpop.gremlin.process.remote.traversal.strategy.decoration.RemoteStrategy; -import org.apache.tinkerpop.gremlin.process.traversal.Traversal; -import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy; -import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep; -import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy; -import org.apache.tinkerpop.gremlin.process.traversal.strategy.ProcessorTraversalStrategy; -import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ReadOnlyStrategy; -import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper; - -import java.util.Collections; -import java.util.Set; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class ActorProgramStrategy extends AbstractTraversalStrategy<TraversalStrategy.DecorationStrategy> - implements TraversalStrategy.DecorationStrategy, ProcessorTraversalStrategy<GraphActors> { - - private static final Set<Class<? extends DecorationStrategy>> PRIORS = Collections.singleton(RemoteStrategy.class); - - private final Configuration graphActorsConfiguration; - - public ActorProgramStrategy(final GraphActors graphActors) { - this.graphActorsConfiguration = graphActors.configuration(); - } - - @Override - public void apply(final Traversal.Admin<?, ?> traversal) { - ReadOnlyStrategy.instance().apply(traversal); - - if (!(traversal.getParent() instanceof EmptyStep)) - return; - - final TraversalActorProgramStep<?, ?> actorStep = new TraversalActorProgramStep<>(traversal, this.graphActorsConfiguration); - TraversalHelper.removeAllSteps(traversal); - traversal.addStep(actorStep); - - // validations - assert traversal.getStartStep().equals(actorStep); - assert traversal.getSteps().size() == 1; - assert traversal.getEndStep() == actorStep; - } - - @Override - public Set<Class<? extends DecorationStrategy>> applyPrior() { - return PRIORS; - } - - //////////////////////////////////////////////////////////// - - @Override - public Configuration getConfiguration() { - return this.graphActorsConfiguration; - } - - public static ActorProgramStrategy create(final Configuration configuration) { - try { - return new ActorProgramStrategy(GraphActors.open(configuration)); - } catch (final Exception e) { - throw new IllegalArgumentException(e.getMessage(), e); - } - } - - @Override - public GraphActors getProcessor() { - return GraphActors.open(this.graphActorsConfiguration); - } - -} - http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/verification/ActorVerificationStrategy.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/verification/ActorVerificationStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/verification/ActorVerificationStrategy.java deleted file mode 100644 index f6e93ef..0000000 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/verification/ActorVerificationStrategy.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.verification; - -import org.apache.tinkerpop.gremlin.process.traversal.Step; -import org.apache.tinkerpop.gremlin.process.traversal.Traversal; -import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy; -import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing; -import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.InjectStep; -import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy; -import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ReadOnlyStrategy; -import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.VerificationException; -import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper; -import org.apache.tinkerpop.gremlin.structure.Graph; - -import java.util.Collections; -import java.util.Set; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class ActorVerificationStrategy extends AbstractTraversalStrategy<TraversalStrategy.VerificationStrategy> implements TraversalStrategy.VerificationStrategy { - - private static final ActorVerificationStrategy INSTANCE = new ActorVerificationStrategy(); - - private ActorVerificationStrategy() { - } - - @Override - public void apply(final Traversal.Admin<?, ?> traversal) { - if (!TraversalHelper.getStepsOfAssignableClass(InjectStep.class, traversal).isEmpty()) - throw new VerificationException("Inject traversal currently not supported", traversal); - } - - public static ActorVerificationStrategy instance() { - return INSTANCE; - } -} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/util/DefaultActorsResult.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/util/DefaultActorsResult.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/util/DefaultActorsResult.java deleted file mode 100644 index c650ba1..0000000 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/util/DefaultActorsResult.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.process.actor.util; - -import org.apache.tinkerpop.gremlin.process.actor.ActorsResult; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class DefaultActorsResult<R> implements ActorsResult<R> { - - private R result; - - public DefaultActorsResult() { - - } - - public R getResult() { - return this.result; - } - - public void setResult(final R result) { - this.result = result; - } -} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/util/GraphActorsHelper.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/util/GraphActorsHelper.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/util/GraphActorsHelper.java deleted file mode 100644 index eebee17..0000000 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/util/GraphActorsHelper.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.process.actor.util; - -import org.apache.commons.configuration.Configuration; -import org.apache.tinkerpop.gremlin.process.actor.GraphActors; -import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; - -import java.util.Iterator; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class GraphActorsHelper { - - private GraphActorsHelper() { - - } - - public static GraphActors configure(GraphActors actors, final Configuration configuration) { - final Iterator<String> keys = IteratorUtils.asList(configuration.getKeys()).iterator(); - while (keys.hasNext()) { - final String key = keys.next(); - if (key.equals(GraphActors.GRAPH_ACTORS_WORKERS)) - actors = actors.workers(configuration.getInt(GraphActors.GRAPH_ACTORS_WORKERS)); - else if (!key.equals(GraphActors.GRAPH_ACTORS)) - actors = actors.configure(key, configuration.getProperty(key)); - } - return actors; - } -} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java new file mode 100644 index 0000000..0445968 --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actors; + +import org.apache.tinkerpop.gremlin.structure.Partition; +import org.apache.tinkerpop.gremlin.structure.Partitioner; + +import java.util.List; + +/** + * An Actor represents an isolated processing unit that can only be interacted with via messages. + * Actors are able to send and receive messages. The {@link GraphActors} framework has two types of actors: + * {@link Master} and {@link Worker}. A master actors is not associated with a particular graph {@link Partition}. + * Instead, its role is to coordinate the workers and ultimately, yield the final result of the submitted + * {@link ActorProgram}. + * + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public interface Actor { + + /** + * Get the {@link Partitioner} associated with the {@link GraphActors} system. + * + * @return the partitioner used to partition (logically and/or physically) the {@link org.apache.tinkerpop.gremlin.structure.Graph} + */ + public Partitioner partitioner(); + + /** + * Get the {@link Address} of the actors. + * + * @return the actors's address + */ + public Address address(); + + /** + * Get a list of the {@link Address} values of all the workers in {@link GraphActors} system. + * + * @return the worker's addresses + */ + public List<Address.Worker> workers(); + + /** + * Send a message from this actors to another actors given their {@link Address}. + * + * @param toActor the actors to receive the messages + * @param message the message being sent + * @param <M> the message type + */ + public <M> void send(final Address toActor, final M message); + + public interface Master extends Actor { + + public Address.Master address(); + + public void close(); + + public <R> ActorsResult<R> result(); + + } + + public interface Worker extends Actor { + + public Address.Worker address(); + + public Address.Master master(); + + /** + * Get the {@link Partition} associated with this worker. + * In principle, this is the subset of the {@link org.apache.tinkerpop.gremlin.structure.Graph} that + * the worker is "data-local" to. + * + * @return the worker's partition + */ + public Partition partition(); + } + + +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorProgram.java new file mode 100644 index 0000000..b1e3065 --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorProgram.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actors; + +import org.apache.commons.configuration.Configuration; +import org.apache.tinkerpop.gremlin.structure.Graph; + +import java.util.List; +import java.util.Optional; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public interface ActorProgram extends Cloneable { + + public static final String ACTOR_PROGRAM = "gremlin.actorProgram"; + + /** + * When it is necessary to store the state of the ActorProgram, this method is called. + * This is typically required when the ActorProgram needs to be serialized to another machine. + * Note that what is stored is simply the instance/configuration state, not any processed data. + * The default implementation provided simply stores the ActorProgarm class name for reflective reconstruction. + * It is typically a good idea to ActorProgram.super.storeState(). + * + * @param configuration the configuration to store the state of the ActorProgram in. + */ + public default void storeState(final Configuration configuration) { + configuration.setProperty(ACTOR_PROGRAM, this.getClass().getCanonicalName()); + } + + /** + * When it is necessary to load the state of the ActorProgram, this method is called. + * This is typically required when the ActorProgram needs to be serialized to another machine. + * Note that what is loaded is simply the instance state, not any processed data. + * + * @param graph the graph that the ActorProgram will run against + * @param configuration the configuration to load the state of the ActorProgram from. + */ + public default void loadState(final Graph graph, final Configuration configuration) { + + } + + /** + * Create the {@link org.apache.tinkerpop.gremlin.process.actors.Actor.Worker} program. + * This is typically used by {@link Worker} to spawn its program. + * + * @param worker the worker actors creating the worker program + * @return the worker program + */ + public ActorProgram.Worker createWorkerProgram(final Actor.Worker worker); + + /** + * Create the {@link org.apache.tinkerpop.gremlin.process.actors.Actor.Master} program. + * This is typically used by {@link Master} to spawn its program. + * + * @param master the master actors creating the master program + * @return the master program + */ + public ActorProgram.Master createMasterProgram(final Actor.Master master); + + /** + * Get the ordered list of message classes where order determines the priority + * of message reception by the respective {@link Actor}. For instance, + * if an {@link Actor} has a message of type {@code X} and a message of type {@code Y} + * in its message buffer, and {@code X} has a higher priority, it will be fetched + * first from the buffer. If no list is provided then its FIFO. + * The default implementation returns an {@link Optional#empty()}. + * + * @return the optional ordered list of message priorities. + */ + public default Optional<List<Class>> getMessagePriorities() { + return Optional.empty(); + } + + /** + * When multiple workers on a single machine need ActorProgram instances, it is possible to use clone. + * This will provide a speedier way of generating instances, over the {@link ActorProgram#storeState} and {@link ActorProgram#loadState} model. + * The default implementation simply returns the object as it assumes that the ActorProgram instance is a stateless singleton. + * + * @return A clone of the VertexProgram object + */ + @SuppressWarnings("CloneDoesntDeclareCloneNotSupportedException") + public ActorProgram clone(); + + /** + * The Worker program is executed by a worker process in the {@link GraphActors} system. + * There are many workers and a single master. + * All workers execute the same program. + * + * @param <M> The message type accepted by the worker + */ + public static interface Worker<M> { + + /** + * This method is evaluated when the worker process is spawned. + */ + public void setup(); + + /** + * This method is evaluated when the worker receives a new message. + * + * @param message the received message + */ + public void execute(final M message); + + /** + * This method is evaluated when the worker process is destroyed. + */ + public void terminate(); + + } + + /** + * The Master program is executed by the master process in the {@link GraphActors} system. + * There are many workers and a single master. + * + * @param <M> The message type accepted by the master + */ + public static interface Master<M> { + public void setup(); + + public void execute(final M message); + + public void terminate(); + + } + +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorsResult.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorsResult.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorsResult.java new file mode 100644 index 0000000..beb7ab9 --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorsResult.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actors; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public interface ActorsResult<R> { + + public R getResult(); + + public void setResult(final R result); +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/243ab6a7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Address.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Address.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Address.java new file mode 100644 index 0000000..894a961 --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Address.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actors; + +import java.io.Serializable; +import java.net.InetAddress; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public abstract class Address implements Serializable { + + private final String id; + private final InetAddress location; + + public Address(final String id, final InetAddress location) { + this.id = id; + this.location = location; + } + + public InetAddress getLocation() { + return this.location; + } + + public String getId() { + return this.id; + } + + @Override + public boolean equals(final Object other) { + return other instanceof Address && ((Address) other).id.equals(this.id); + } + + @Override + public int hashCode() { + return this.id.hashCode(); + } + + @Override + public String toString() { + return this.id; + } + + public static final class Master extends Address { + + public Master(final String id, final InetAddress location) { + super(id, location); + } + + } + + public static final class Worker extends Address { + + public Worker(final String id, final InetAddress location) { + super(id, location); + } + + } +}