http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActors.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActors.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActors.java new file mode 100644 index 0000000..9b8fe38 --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActors.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actors; + +import org.apache.commons.configuration.BaseConfiguration; +import org.apache.commons.configuration.Configuration; +import org.apache.tinkerpop.gremlin.process.Processor; +import org.apache.tinkerpop.gremlin.process.actors.traversal.strategy.decoration.ActorProgramStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.Traversal; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.ProcessorTraversalStrategy; +import org.apache.tinkerpop.gremlin.structure.Graph; + +import java.util.concurrent.Future; + +/** + * GraphActors is a message-passing based graph {@link Processor} that is: + * asynchronous, distributed, and partition-centric. + * + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public interface GraphActors<R> extends Processor { + + public static final String GRAPH_ACTORS = "gremlin.graphActors"; + public static final String GRAPH_ACTORS_WORKERS = "gremlin.graphActors.workers"; + + /** + * Provide the {@link ActorProgram} that the GraphActors will execute. + * + * @param program the program to execute + * @return the updated GraphActors with newly defined program + */ + public GraphActors<R> program(final ActorProgram program); + + /** + * Specify the number of workers per {@link Graph} {@link org.apache.tinkerpop.gremlin.structure.Partition}. + * + * @param workers the number of workers per partition + * @return the updated GraphActors with newly defined workers + */ + public GraphActors<R> workers(final int workers); + + /** + * Add an arbitrary configuration to the GraphActors system. + * Typically, these configurations are provider-specific and do not generalize across all GraphActor implementations. + * + * @param key the key of the configuration + * @param value the value of the configuration + * @return the updated GraphActors with newly defined configuration + */ + public GraphActors<R> configure(final String key, final Object value); + + /** + * Execute the {@link ActorProgram} on the {@link GraphActors} system against the specified {@link Graph}. + * + * @return a {@link Future} denoting a reference to the asynchronous computation's result + */ + @Override + public Future<R> submit(final Graph graph); + + /** + * Returns an {@link ActorProgramStrategy} which enables a {@link Traversal} to execute on {@link GraphActors}. + * + * @return a {@link org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy} capable of executing traversals on a GraphActors system + */ + @Override + public default ProcessorTraversalStrategy<GraphActors> getProcessorTraversalStrategy() { + return new ActorProgramStrategy(this); + } + + /** + * Create an arbitrary GraphActors system given the information contained in the provided {@link Configuration}. + * + * @param configuration the {@link Configuration} containing, at minimum, {@link GraphActors#GRAPH_ACTORS} system class name + * @param <A> the particular type of GraphActors + * @return a constructed GraphActors system + */ + public static <A extends GraphActors> A open(final Configuration configuration) { + try { + return (A) Class.forName(configuration.getString(GRAPH_ACTORS)).getMethod("open", Configuration.class).invoke(null, configuration); + } catch (final Exception e) { + throw new IllegalArgumentException(e.getMessage(), e); + } + } + + public static <A extends GraphActors> A open(final Class<A> graphActorsClass) { + final BaseConfiguration configuration = new BaseConfiguration(); + configuration.setProperty(GRAPH_ACTORS, graphActorsClass.getCanonicalName()); + return GraphActors.open(configuration); + } +}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java new file mode 100644 index 0000000..484b904 --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actors.traversal; + +import org.apache.commons.configuration.Configuration; +import org.apache.tinkerpop.gremlin.jsr223.JavaTranslator; +import org.apache.tinkerpop.gremlin.process.actors.Actor; +import org.apache.tinkerpop.gremlin.process.actors.ActorProgram; +import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierAddMessage; +import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierDoneMessage; +import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectAddMessage; +import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectSetMessage; +import org.apache.tinkerpop.gremlin.process.actors.traversal.message.StartMessage; +import org.apache.tinkerpop.gremlin.process.actors.traversal.message.Terminate; +import org.apache.tinkerpop.gremlin.process.actors.traversal.strategy.decoration.ActorProgramStrategy; +import org.apache.tinkerpop.gremlin.process.actors.traversal.strategy.verification.ActorVerificationStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.Bytecode; +import org.apache.tinkerpop.gremlin.process.traversal.Traversal; +import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies; +import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.Traverser; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.InlineFilterStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.LazyBarrierStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.MatchPredicateStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.PathRetractionStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.RepeatUnrollStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ReadOnlyStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; +import org.apache.tinkerpop.gremlin.structure.Graph; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class TraversalActorProgram<R> implements ActorProgram { + + public static final String TRAVERSAL_ACTOR_PROGRAM_BYTECODE = "gremlin.traversalActorProgram.bytecode"; + + private static final List<Class> MESSAGE_PRIORITIES = Arrays.asList( + StartMessage.class, + Traverser.class, + SideEffectAddMessage.class, + BarrierAddMessage.class, + SideEffectSetMessage.class, + BarrierDoneMessage.class, + Terminate.class); + + private Traversal.Admin<?, R> traversal; + public TraverserSet<R> result = new TraverserSet<>(); + + public TraversalActorProgram(final Traversal.Admin<?, R> traversal) { + this.traversal = traversal; + final TraversalStrategies strategies = this.traversal.getStrategies().clone(); + strategies.addStrategies(ActorVerificationStrategy.instance(), ReadOnlyStrategy.instance()); + // TODO: make TinkerGraph/etc. strategies smart about actors + new ArrayList<>(strategies.toList()).stream(). + filter(s -> s instanceof TraversalStrategy.ProviderOptimizationStrategy). + map(TraversalStrategy::getClass). + forEach(strategies::removeStrategies); + strategies.removeStrategies( + ActorProgramStrategy.class, + LazyBarrierStrategy.class, + RepeatUnrollStrategy.class, + MatchPredicateStrategy.class, + InlineFilterStrategy.class, + PathRetractionStrategy.class); + this.traversal.setStrategies(strategies); + this.traversal.applyStrategies(); + } + + @Override + public void storeState(final Configuration configuration) { + configuration.setProperty(ACTOR_PROGRAM, TraversalActorProgram.class.getCanonicalName()); + configuration.setProperty(TRAVERSAL_ACTOR_PROGRAM_BYTECODE, this.traversal.getBytecode()); + } + + @Override + public void loadState(final Graph graph, final Configuration configuration) { + final Bytecode bytecode = (Bytecode) configuration.getProperty(TRAVERSAL_ACTOR_PROGRAM_BYTECODE); + this.traversal = (Traversal.Admin<?, R>) JavaTranslator.of(graph.traversal()).translate(bytecode); + } + + @Override + public TraversalActorProgram.Worker createWorkerProgram(final Actor.Worker worker) { + return new TraversalWorkerProgram(worker, this.traversal.clone()); + } + + @Override + public TraversalActorProgram.Master createMasterProgram(final Actor.Master master) { + return new TraversalMasterProgram(master, this.traversal.clone(), this.result); + } + + @Override + public Optional<List<Class>> getMessagePriorities() { + return Optional.of(MESSAGE_PRIORITIES); + } + + @Override + public TraversalActorProgram<R> clone() { + try { + final TraversalActorProgram<R> clone = (TraversalActorProgram<R>) super.clone(); + clone.traversal = this.traversal.clone(); + return clone; + } catch (final CloneNotSupportedException e) { + throw new IllegalStateException(e.getMessage(), e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java new file mode 100644 index 0000000..e447cdb --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actors.traversal; + +import org.apache.tinkerpop.gremlin.process.actors.Actor; +import org.apache.tinkerpop.gremlin.process.actors.ActorProgram; +import org.apache.tinkerpop.gremlin.process.actors.Address; +import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierAddMessage; +import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierDoneMessage; +import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectAddMessage; +import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectSetMessage; +import org.apache.tinkerpop.gremlin.process.actors.traversal.message.StartMessage; +import org.apache.tinkerpop.gremlin.process.actors.traversal.message.Terminate; +import org.apache.tinkerpop.gremlin.process.traversal.Step; +import org.apache.tinkerpop.gremlin.process.traversal.Traversal; +import org.apache.tinkerpop.gremlin.process.traversal.Traverser; +import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; +import org.apache.tinkerpop.gremlin.process.traversal.step.Distributing; +import org.apache.tinkerpop.gremlin.process.traversal.step.LocalBarrier; +import org.apache.tinkerpop.gremlin.process.traversal.step.Pushing; +import org.apache.tinkerpop.gremlin.process.traversal.step.SideEffectCapable; +import org.apache.tinkerpop.gremlin.process.traversal.step.filter.RangeGlobalStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.filter.TailGlobalStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.OrderGlobalStep; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.OrderedTraverser; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; +import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix; +import org.apache.tinkerpop.gremlin.structure.Element; +import org.apache.tinkerpop.gremlin.structure.Partition; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +final class TraversalMasterProgram implements ActorProgram.Master<Object> { + + private final Actor.Master master; + private final Traversal.Admin<?, ?> traversal; + private final TraversalMatrix<?, ?> matrix; + private Map<String, Barrier> barriers = new HashMap<>(); + private final TraverserSet<?> results; + private Address.Worker leaderWorker; + private int orderCounter = -1; + private final Map<Partition, Address.Worker> partitionToWorkerMap = new HashMap<>(); + + public TraversalMasterProgram(final Actor.Master master, final Traversal.Admin<?, ?> traversal, final TraverserSet<?> results) { + this.traversal = traversal; + // System.out.println("master[created]: " + master.address().getId()); + // System.out.println(this.traversal); + this.matrix = new TraversalMatrix<>(this.traversal); + this.results = results; + this.master = master; + Distributing.configure(this.traversal, true, true); + Pushing.configure(this.traversal, true, false); + } + + @Override + public void setup() { + this.leaderWorker = this.master.workers().get(0); + for (int i = 0; i < this.master.partitioner().getPartitions().size(); i++) { + this.partitionToWorkerMap.put(this.master.partitioner().getPartitions().get(i), this.master.workers().get(i)); + } + this.broadcast(StartMessage.instance()); + this.master.send(this.leaderWorker, Terminate.MAYBE); + } + + @Override + public void execute(final Object message) { + if (message instanceof Traverser.Admin) { + this.processTraverser((Traverser.Admin) message); + } else if (message instanceof BarrierAddMessage) { + final Barrier barrier = (Barrier) this.matrix.getStepById(((BarrierAddMessage) message).getStepId()); + final Step<?, ?> step = (Step) barrier; + barrier.addBarrier(((BarrierAddMessage) message).getBarrier()); + this.barriers.put(step.getId(), barrier); + } else if (message instanceof SideEffectAddMessage) { + this.traversal.getSideEffects().add(((SideEffectAddMessage) message).getKey(), ((SideEffectAddMessage) message).getValue()); + } else if (message instanceof Terminate) { + assert Terminate.YES == message; + if (!this.barriers.isEmpty()) { + for (final Barrier barrier : this.barriers.values()) { + final Step<?, ?> step = (Step) barrier; + if (!(barrier instanceof LocalBarrier)) { + this.orderBarrier(step); + if (step instanceof OrderGlobalStep) this.orderCounter = 0; + while (step.hasNext()) { + this.sendTraverser(-1 == this.orderCounter ? + step.next() : + new OrderedTraverser<>(step.next(), this.orderCounter++)); + } + } else { + if (step instanceof SideEffectCapable) { + final String key = ((SideEffectCapable) step).getSideEffectKey(); + this.broadcast(new SideEffectSetMessage(key, this.traversal.getSideEffects().get(key))); + } + this.broadcast(new BarrierDoneMessage(barrier)); + barrier.done(); + } + } + this.barriers.clear(); + this.master.send(this.leaderWorker, Terminate.MAYBE); + } else { + while (this.traversal.hasNext()) { + this.results.add((Traverser.Admin) this.traversal.nextTraverser()); + } + if (this.orderCounter != -1) + this.results.sort((a, b) -> Integer.compare(((OrderedTraverser<?>) a).order(), ((OrderedTraverser<?>) b).order())); + + this.master.close(); + } + } else { + throw new IllegalStateException("Unknown message:" + message); + } + } + + @Override + public void terminate() { + this.master.result().setResult(this.results); + } + + private void broadcast(final Object message) { + for (final Address.Worker worker : this.master.workers()) { + this.master.send(worker, message); + } + } + + private void processTraverser(final Traverser.Admin traverser) { + if (traverser.isHalted() || traverser.get() instanceof Element) { + this.sendTraverser(traverser); + } else { + final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId()); + step.addStart(traverser); + if (step instanceof Barrier) { + this.barriers.put(step.getId(), (Barrier) step); + } else { + while (step.hasNext()) { + this.processTraverser(step.next()); + } + } + } + } + + private void sendTraverser(final Traverser.Admin traverser) { + if (traverser.isHalted()) + this.results.add(traverser); + else if (traverser.get() instanceof Element) + this.master.send(this.partitionToWorkerMap.get(this.master.partitioner().getPartition((Element) traverser.get())), traverser); + else + this.master.send(this.master.address(), traverser); + } + + private void orderBarrier(final Step step) { + if (this.orderCounter != -1 && step instanceof Barrier && (step instanceof RangeGlobalStep || step instanceof TailGlobalStep)) { + final Barrier barrier = (Barrier) step; + final TraverserSet<?> rangingBarrier = (TraverserSet<?>) barrier.nextBarrier(); + rangingBarrier.sort((a, b) -> Integer.compare(((OrderedTraverser<?>) a).order(), ((OrderedTraverser<?>) b).order())); + barrier.addBarrier(rangingBarrier); + } + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java new file mode 100644 index 0000000..2aaa7b5 --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actors.traversal; + +import org.apache.tinkerpop.gremlin.process.actors.Actor; +import org.apache.tinkerpop.gremlin.process.actors.ActorProgram; +import org.apache.tinkerpop.gremlin.process.actors.Address; +import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierAddMessage; +import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierDoneMessage; +import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectSetMessage; +import org.apache.tinkerpop.gremlin.process.actors.traversal.message.StartMessage; +import org.apache.tinkerpop.gremlin.process.actors.traversal.message.Terminate; +import org.apache.tinkerpop.gremlin.process.traversal.Step; +import org.apache.tinkerpop.gremlin.process.traversal.Traversal; +import org.apache.tinkerpop.gremlin.process.traversal.Traverser; +import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; +import org.apache.tinkerpop.gremlin.process.traversal.step.Distributing; +import org.apache.tinkerpop.gremlin.process.traversal.step.Pushing; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep; +import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper; +import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix; +import org.apache.tinkerpop.gremlin.structure.Edge; +import org.apache.tinkerpop.gremlin.structure.Element; +import org.apache.tinkerpop.gremlin.structure.Partition; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +final class TraversalWorkerProgram implements ActorProgram.Worker<Object> { + + private final Actor.Worker self; + private final TraversalMatrix<?, ?> matrix; + private final Map<Partition, Address.Worker> partitionToWorkerMap = new HashMap<>(); + // + private Address.Worker neighborWorker; + private boolean isLeader; + private Terminate terminate = null; + private boolean voteToHalt = false; + private Map<String, Barrier> barriers = new HashMap<>(); + + public TraversalWorkerProgram(final Actor.Worker self, final Traversal.Admin<?, ?> traversal) { + this.self = self; + // System.out.println("worker[created]: " + this.self.address().getId()); + // set up partition and traversal information + final WorkerTraversalSideEffects sideEffects = new WorkerTraversalSideEffects(traversal.getSideEffects(), this.self); + TraversalHelper.applyTraversalRecursively(t -> t.setSideEffects(sideEffects), traversal); + this.matrix = new TraversalMatrix<>(traversal); + Distributing.configure(traversal, false, true); + Pushing.configure(traversal, true, false); + ////// + final GraphStep graphStep = (GraphStep) traversal.getStartStep(); + if (0 == graphStep.getIds().length) + ((GraphStep) traversal.getStartStep()).setIteratorSupplier(graphStep.returnsVertex() ? this.self.partition()::vertices : this.self.partition()::edges); + else { + if (graphStep.returnsVertex()) + ((GraphStep<Vertex, Vertex>) traversal.getStartStep()).setIteratorSupplier( + () -> IteratorUtils.filter(self.partition().vertices(graphStep.getIds()), this.self.partition()::contains)); + else + ((GraphStep<Edge, Edge>) traversal.getStartStep()).setIteratorSupplier( + () -> IteratorUtils.filter(self.partition().edges(graphStep.getIds()), this.self.partition()::contains)); + } + } + + @Override + public void setup() { + // create termination ring topology + final int i = this.self.workers().indexOf(this.self.address()); + this.neighborWorker = this.self.workers().get(i == this.self.workers().size() - 1 ? 0 : i + 1); + this.isLeader = i == 0; + for (int j = 0; j < this.self.partitioner().getPartitions().size(); j++) { + this.partitionToWorkerMap.put(this.self.partitioner().getPartitions().get(j), this.self.workers().get(j)); + } + } + + @Override + public void execute(final Object message) { + //System.out.println(message + "::" + this.isLeader); + if (message instanceof StartMessage) { + // initial message from master that says: "start processing" + final GraphStep step = (GraphStep) this.matrix.getTraversal().getStartStep(); + while (step.hasNext()) { + this.sendTraverser(step.next()); + } + } else if (message instanceof Traverser.Admin) { + this.processTraverser((Traverser.Admin) message); + } else if (message instanceof SideEffectSetMessage) { + this.matrix.getTraversal().getSideEffects().set(((SideEffectSetMessage) message).getKey(), ((SideEffectSetMessage) message).getValue()); + } else if (message instanceof BarrierDoneMessage) { + final Step<?, ?> step = (Step) this.matrix.getStepById(((BarrierDoneMessage) message).getStepId()); + while (step.hasNext()) { + sendTraverser(step.next()); + } + } else if (message instanceof Terminate) { + assert null == this.terminate; + this.terminate = (Terminate) message; + if (!this.barriers.isEmpty()) { + for (final Barrier barrier : this.barriers.values()) { + while (barrier.hasNextBarrier()) { + this.self.send(this.self.master(), new BarrierAddMessage(barrier)); + } + } + this.barriers.clear(); + } + // use termination token to determine termination condition + if (this.isLeader) { + if (this.voteToHalt && Terminate.YES == this.terminate) + this.self.send(this.self.master(), Terminate.YES); + else + this.self.send(this.neighborWorker, Terminate.YES); + } else + this.self.send(this.neighborWorker, this.voteToHalt ? this.terminate : Terminate.NO); + this.terminate = null; + this.voteToHalt = true; + } else { + throw new IllegalArgumentException("The following message is unknown: " + message); + } + } + + @Override + public void terminate() { + + } + + ////////////// + + private void processTraverser(final Traverser.Admin traverser) { + assert !(traverser.get() instanceof Element) || !traverser.isHalted() || this.self.partition().contains((Element) traverser.get()); + final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId()); + step.addStart(traverser); + if (step instanceof Barrier) { + this.barriers.put(step.getId(), (Barrier) step); + } else { + while (step.hasNext()) { + this.sendTraverser(step.next()); + } + } + } + + private void sendTraverser(final Traverser.Admin traverser) { + this.voteToHalt = false; + if (traverser.isHalted()) + this.self.send(this.self.master(), traverser); + else if (traverser.get() instanceof Element && !this.self.partition().contains((Element) traverser.get())) + this.self.send(this.partitionToWorkerMap.get(this.self.partitioner().getPartition((Element) traverser.get())), traverser); + else + this.self.send(this.self.address(), traverser); + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/WorkerTraversalSideEffects.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/WorkerTraversalSideEffects.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/WorkerTraversalSideEffects.java new file mode 100644 index 0000000..b660eda --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/WorkerTraversalSideEffects.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actors.traversal; + +import org.apache.tinkerpop.gremlin.process.actors.Actor; +import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectAddMessage; +import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects; + +import java.util.Optional; +import java.util.Set; +import java.util.function.BinaryOperator; +import java.util.function.Supplier; +import java.util.function.UnaryOperator; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class WorkerTraversalSideEffects implements TraversalSideEffects { + + private TraversalSideEffects sideEffects; + private Actor.Worker worker; + + + private WorkerTraversalSideEffects() { + // for serialization + } + + public WorkerTraversalSideEffects(final TraversalSideEffects sideEffects, final Actor.Worker worker) { + this.sideEffects = sideEffects; + this.worker = worker; + } + + public TraversalSideEffects getSideEffects() { + return this.sideEffects; + } + + @Override + public void set(final String key, final Object value) { + this.sideEffects.set(key, value); + } + + @Override + public <V> V get(final String key) throws IllegalArgumentException { + return this.sideEffects.get(key); + } + + @Override + public void remove(final String key) { + this.sideEffects.remove(key); + } + + @Override + public Set<String> keys() { + return this.sideEffects.keys(); + } + + @Override + public void add(final String key, final Object value) { + this.sideEffects.add(key, value); + this.worker.send(this.worker.master(), new SideEffectAddMessage(key, value)); + } + + @Override + public <V> void register(final String key, final Supplier<V> initialValue, final BinaryOperator<V> reducer) { + this.sideEffects.register(key, initialValue, reducer); + } + + @Override + public <V> void registerIfAbsent(final String key, final Supplier<V> initialValue, final BinaryOperator<V> reducer) { + this.sideEffects.registerIfAbsent(key, initialValue, reducer); + } + + @Override + public <V> BinaryOperator<V> getReducer(final String key) { + return this.sideEffects.getReducer(key); + } + + @Override + public <V> Supplier<V> getSupplier(final String key) { + return this.sideEffects.getSupplier(key); + } + + @Override + @Deprecated + public void registerSupplier(final String key, final Supplier supplier) { + this.sideEffects.registerSupplier(key, supplier); + } + + @Override + @Deprecated + public <V> Optional<Supplier<V>> getRegisteredSupplier(final String key) { + return this.sideEffects.getRegisteredSupplier(key); + } + + @Override + public <S> void setSack(final Supplier<S> initialValue, final UnaryOperator<S> splitOperator, final BinaryOperator<S> mergeOperator) { + this.sideEffects.setSack(initialValue, splitOperator, mergeOperator); + } + + @Override + public <S> Supplier<S> getSackInitialValue() { + return this.sideEffects.getSackInitialValue(); + } + + @Override + public <S> UnaryOperator<S> getSackSplitter() { + return this.sideEffects.getSackSplitter(); + } + + @Override + public <S> BinaryOperator<S> getSackMerger() { + return this.sideEffects.getSackMerger(); + } + + @Override + public TraversalSideEffects clone() { + try { + final WorkerTraversalSideEffects clone = (WorkerTraversalSideEffects) super.clone(); + clone.sideEffects = this.sideEffects.clone(); + return clone; + } catch (final CloneNotSupportedException e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + + @Override + public void mergeInto(final TraversalSideEffects sideEffects) { + this.sideEffects.mergeInto(sideEffects); + } + +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java new file mode 100644 index 0000000..ac4c61d --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actors.traversal.message; + +import org.apache.tinkerpop.gremlin.process.traversal.Step; +import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class BarrierAddMessage { + + private final Object barrier; + private final String stepId; + + public BarrierAddMessage(final Barrier barrier) { + this.barrier = barrier.nextBarrier(); + this.stepId = ((Step) barrier).getId(); + } + + public Object getBarrier() { + return this.barrier; + } + + public String getStepId() { + return this.stepId; + } + + +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierDoneMessage.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierDoneMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierDoneMessage.java new file mode 100644 index 0000000..7979c33 --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierDoneMessage.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actors.traversal.message; + +import org.apache.tinkerpop.gremlin.process.traversal.Step; +import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class BarrierDoneMessage { + + private final String stepId; + + public BarrierDoneMessage(final Barrier barrier) { + this.stepId = ((Step) barrier).getId(); + + } + + public String getStepId() { + return this.stepId; + } +} + http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectAddMessage.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectAddMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectAddMessage.java new file mode 100644 index 0000000..1c0a9de --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectAddMessage.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actors.traversal.message; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class SideEffectAddMessage { + + private final String key; + private final Object value; + + public SideEffectAddMessage(final String key, final Object value) { + this.value = value; + this.key = key; + } + + public String getKey() { + return this.key; + } + + public Object getValue() { + return this.value; + } +} + http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectSetMessage.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectSetMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectSetMessage.java new file mode 100644 index 0000000..84788f9 --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectSetMessage.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actors.traversal.message; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class SideEffectSetMessage { + + private final String key; + private final Object value; + + public SideEffectSetMessage(final String key, final Object value) { + this.key = key; + this.value = value; + } + + public String getKey() { + return this.key; + } + + public Object getValue() { + return this.value; + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/StartMessage.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/StartMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/StartMessage.java new file mode 100644 index 0000000..e704033 --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/StartMessage.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actors.traversal.message; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class StartMessage { + + private static final StartMessage INSTANCE = new StartMessage(); + + private StartMessage() { + } + + public static StartMessage instance() { + return INSTANCE; + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/Terminate.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/Terminate.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/Terminate.java new file mode 100644 index 0000000..5621528 --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/Terminate.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actors.traversal.message; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public enum Terminate { + + MAYBE, YES, NO +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/step/map/TraversalActorProgramStep.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/step/map/TraversalActorProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/step/map/TraversalActorProgramStep.java new file mode 100644 index 0000000..d6b8858 --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/step/map/TraversalActorProgramStep.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actors.traversal.step.map; + +import org.apache.commons.configuration.Configuration; +import org.apache.tinkerpop.gremlin.process.actors.ActorProgram; +import org.apache.tinkerpop.gremlin.process.actors.GraphActors; +import org.apache.tinkerpop.gremlin.process.actors.traversal.TraversalActorProgram; +import org.apache.tinkerpop.gremlin.process.traversal.Traversal; +import org.apache.tinkerpop.gremlin.process.traversal.Traverser; +import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; +import org.apache.tinkerpop.gremlin.structure.util.StringFactory; + +import java.util.NoSuchElementException; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class TraversalActorProgramStep<S, E> extends AbstractStep<E, E> { + + + private final Traversal.Admin<S, E> actorsTraversal; + private final Configuration graphActorsConfiguration; + private boolean first = true; + + public TraversalActorProgramStep(final Traversal.Admin<?, ?> traversal, final Configuration graphActorsConfiguration) { + super(traversal); + this.graphActorsConfiguration = graphActorsConfiguration; + this.actorsTraversal = (Traversal.Admin) traversal.clone(); + this.actorsTraversal.setParent(EmptyStep.instance()); + } + + @Override + protected Traverser.Admin<E> processNextStart() throws NoSuchElementException { + if (this.first) { + this.first = false; + try { + final GraphActors<TraverserSet<E>> graphActors = GraphActors.open(this.graphActorsConfiguration); + final ActorProgram actorProgram = new TraversalActorProgram<>(this.actorsTraversal); + graphActors.program(actorProgram).submit(this.getTraversal().getGraph().get()).get().forEach(this.starts::add); + } catch (final Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + return this.starts.next(); + } + + @Override + public String toString() { + return StringFactory.stepString(this, this.actorsTraversal); + } + +} + http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/strategy/decoration/ActorProgramStrategy.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/strategy/decoration/ActorProgramStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/strategy/decoration/ActorProgramStrategy.java new file mode 100644 index 0000000..6e4365e --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/strategy/decoration/ActorProgramStrategy.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actors.traversal.strategy.decoration; + +import org.apache.commons.configuration.Configuration; +import org.apache.tinkerpop.gremlin.process.actors.GraphActors; +import org.apache.tinkerpop.gremlin.process.actors.traversal.step.map.TraversalActorProgramStep; +import org.apache.tinkerpop.gremlin.process.remote.traversal.strategy.decoration.RemoteStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.Traversal; +import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.ProcessorTraversalStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ReadOnlyStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper; + +import java.util.Collections; +import java.util.Set; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class ActorProgramStrategy extends AbstractTraversalStrategy<TraversalStrategy.DecorationStrategy> + implements TraversalStrategy.DecorationStrategy, ProcessorTraversalStrategy<GraphActors> { + + private static final Set<Class<? extends DecorationStrategy>> PRIORS = Collections.singleton(RemoteStrategy.class); + + private final Configuration graphActorsConfiguration; + + public ActorProgramStrategy(final GraphActors graphActors) { + this.graphActorsConfiguration = graphActors.configuration(); + } + + @Override + public void apply(final Traversal.Admin<?, ?> traversal) { + ReadOnlyStrategy.instance().apply(traversal); + + if (!(traversal.getParent() instanceof EmptyStep)) + return; + + final TraversalActorProgramStep<?, ?> actorStep = new TraversalActorProgramStep<>(traversal, this.graphActorsConfiguration); + TraversalHelper.removeAllSteps(traversal); + traversal.addStep(actorStep); + + // validations + assert traversal.getStartStep().equals(actorStep); + assert traversal.getSteps().size() == 1; + assert traversal.getEndStep() == actorStep; + } + + @Override + public Set<Class<? extends DecorationStrategy>> applyPrior() { + return PRIORS; + } + + //////////////////////////////////////////////////////////// + + @Override + public Configuration getConfiguration() { + return this.graphActorsConfiguration; + } + + public static ActorProgramStrategy create(final Configuration configuration) { + try { + return new ActorProgramStrategy(GraphActors.open(configuration)); + } catch (final Exception e) { + throw new IllegalArgumentException(e.getMessage(), e); + } + } + + @Override + public GraphActors getProcessor() { + return GraphActors.open(this.graphActorsConfiguration); + } + +} + http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/strategy/verification/ActorVerificationStrategy.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/strategy/verification/ActorVerificationStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/strategy/verification/ActorVerificationStrategy.java new file mode 100644 index 0000000..cdf5465 --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/strategy/verification/ActorVerificationStrategy.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actors.traversal.strategy.verification; + +import org.apache.tinkerpop.gremlin.process.traversal.Traversal; +import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.InjectStep; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.VerificationException; +import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class ActorVerificationStrategy extends AbstractTraversalStrategy<TraversalStrategy.VerificationStrategy> implements TraversalStrategy.VerificationStrategy { + + private static final ActorVerificationStrategy INSTANCE = new ActorVerificationStrategy(); + + private ActorVerificationStrategy() { + } + + @Override + public void apply(final Traversal.Admin<?, ?> traversal) { + if (!TraversalHelper.getStepsOfAssignableClass(InjectStep.class, traversal).isEmpty()) + throw new VerificationException("Inject traversal currently not supported", traversal); + } + + public static ActorVerificationStrategy instance() { + return INSTANCE; + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/util/DefaultActorsResult.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/util/DefaultActorsResult.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/util/DefaultActorsResult.java new file mode 100644 index 0000000..208a9a1 --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/util/DefaultActorsResult.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actors.util; + +import org.apache.tinkerpop.gremlin.process.actors.ActorsResult; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class DefaultActorsResult<R> implements ActorsResult<R> { + + private R result; + + public DefaultActorsResult() { + + } + + public R getResult() { + return this.result; + } + + public void setResult(final R result) { + this.result = result; + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/util/GraphActorsHelper.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/util/GraphActorsHelper.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/util/GraphActorsHelper.java new file mode 100644 index 0000000..2af1ecd --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/util/GraphActorsHelper.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.actors.util; + +import org.apache.commons.configuration.Configuration; +import org.apache.tinkerpop.gremlin.process.actors.GraphActors; +import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; + +import java.util.Iterator; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class GraphActorsHelper { + + private GraphActorsHelper() { + + } + + public static GraphActors configure(GraphActors actors, final Configuration configuration) { + final Iterator<String> keys = IteratorUtils.asList(configuration.getKeys()).iterator(); + while (keys.hasNext()) { + final String key = keys.next(); + if (key.equals(GraphActors.GRAPH_ACTORS_WORKERS)) + actors = actors.workers(configuration.getInt(GraphActors.GRAPH_ACTORS_WORKERS)); + else if (!key.equals(GraphActors.GRAPH_ACTORS)) + actors = actors.configure(key, configuration.getProperty(key)); + } + return actors; + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java index 96dae61..42bd864 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java @@ -18,7 +18,7 @@ */ package org.apache.tinkerpop.gremlin.process.traversal; -import org.apache.tinkerpop.gremlin.process.actor.GraphActors; +import org.apache.tinkerpop.gremlin.process.actors.GraphActors; import org.apache.tinkerpop.gremlin.process.computer.GraphComputer; import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization.GraphFilterStrategy; import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ConnectiveStrategy; http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java index f7c350d..ee7a196 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java @@ -19,7 +19,7 @@ package org.apache.tinkerpop.gremlin.structure.util; import org.apache.tinkerpop.gremlin.process.Processor; -import org.apache.tinkerpop.gremlin.process.actor.GraphActors; +import org.apache.tinkerpop.gremlin.process.actors.GraphActors; import org.apache.tinkerpop.gremlin.process.computer.ComputerResult; import org.apache.tinkerpop.gremlin.process.computer.GraphComputer; import org.apache.tinkerpop.gremlin.process.computer.MapReduce; http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/GraphManager.java ---------------------------------------------------------------------- diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/GraphManager.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/GraphManager.java index 43b3608..482577f 100644 --- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/GraphManager.java +++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/GraphManager.java @@ -19,7 +19,7 @@ package org.apache.tinkerpop.gremlin; import org.apache.commons.configuration.Configuration; -import org.apache.tinkerpop.gremlin.process.actor.GraphActors; +import org.apache.tinkerpop.gremlin.process.actors.GraphActors; import org.apache.tinkerpop.gremlin.process.computer.GraphComputer; import org.apache.tinkerpop.gremlin.process.traversal.TraversalEngine; import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy; http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/GraphProvider.java ---------------------------------------------------------------------- diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/GraphProvider.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/GraphProvider.java index 9d63b3c..d0d877d 100644 --- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/GraphProvider.java +++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/GraphProvider.java @@ -19,7 +19,7 @@ package org.apache.tinkerpop.gremlin; import org.apache.commons.configuration.Configuration; -import org.apache.tinkerpop.gremlin.process.actor.GraphActors; +import org.apache.tinkerpop.gremlin.process.actors.GraphActors; import org.apache.tinkerpop.gremlin.process.computer.GraphComputer; import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy; http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java ---------------------------------------------------------------------- diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java index ec3ece2..cc9d995 100644 --- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java +++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java @@ -21,7 +21,6 @@ package org.apache.tinkerpop.gremlin.process.actors; import org.apache.tinkerpop.gremlin.LoadGraphWith; import org.apache.tinkerpop.gremlin.process.AbstractGremlinProcessTest; -import org.apache.tinkerpop.gremlin.process.actor.GraphActors; import org.apache.tinkerpop.gremlin.structure.util.StringFactory; import org.junit.Test;