commited to g.withProcessor(Processor.Description). Actors and Computer implement Processor.Description. Following the VertexProgramStrategy model, this makes it easy for language variant providers to support any arbitrary Processor down the line.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/2beb380f Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/2beb380f Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/2beb380f Branch: refs/heads/TINKERPOP-1564 Commit: 2beb380f59b5d70c9df254014b3ca3e9ddd0dfe3 Parents: 3a4bda9 Author: Marko A. Rodriguez <okramma...@gmail.com> Authored: Thu Dec 15 11:33:28 2016 -0700 Committer: Marko A. Rodriguez <okramma...@gmail.com> Committed: Thu Jan 19 13:01:41 2017 -0700 ---------------------------------------------------------------------- .../akka/process/actor/AkkaGraphActors.java | 3 +- .../akka/process/AkkaActorsProvider.java | 2 +- .../gremlin/akka/process/AkkaPlayTest.java | 2 +- .../tinkerpop/gremlin/process/Processor.java | 2 +- .../tinkerpop/gremlin/process/actor/Actors.java | 22 +++-- .../decoration/ActorProgramStrategy.java | 87 +++++++++++++++++--- .../gremlin/process/computer/Computer.java | 3 +- .../decoration/VertexProgramStrategy.java | 2 +- .../process/traversal/TraversalSource.java | 5 +- .../gremlin/process/traversal/BytecodeTest.java | 8 +- 10 files changed, 104 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/2beb380f/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java index 2f62beb..51747ac 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java @@ -34,6 +34,7 @@ import org.apache.tinkerpop.gremlin.structure.util.StringFactory; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.stream.Collectors; @@ -51,7 +52,7 @@ public final class AkkaGraphActors<R> implements GraphActors<R> { final Config config = ConfigFactory.defaultApplication(). withValue("message-priorities", ConfigValueFactory.fromAnyRef(actorProgram.getMessagePriorities().get().stream().map(Class::getCanonicalName).collect(Collectors.toList()).toString())); - this.system = ActorSystem.create("traversal-" + actorProgram.hashCode(), config); + this.system = ActorSystem.create("traversal-" + UUID.randomUUID(), config); try { this.master = new Address.Master(this.system.actorOf(Props.create(MasterActor.class, actorProgram, partitioner, result), "master").path().toString(), InetAddress.getLocalHost()); } catch (final UnknownHostException e) { http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/2beb380f/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java index b36e3b5..3e9f5df 100644 --- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java +++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java @@ -147,7 +147,7 @@ public class AkkaActorsProvider extends AbstractGraphProvider { else { final GraphTraversalSource g = graph.traversal(); return RANDOM.nextBoolean() ? - g.withProcessor(Actors.of(AkkaGraphActors.class).partitioner(new HashPartitioner(graph.partitioner(), new Random().nextInt(15) + 1))) : + g.withProcessor(Actors.of(AkkaGraphActors.class).workers(new Random().nextInt(15) + 1)) : g.withProcessor(Actors.of(AkkaGraphActors.class)); } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/2beb380f/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java index 93fac3d..7de8304 100644 --- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java +++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java @@ -42,7 +42,7 @@ public class AkkaPlayTest { public void testPlay1() throws Exception { final Graph graph = TinkerGraph.open(); graph.io(GryoIo.build()).readGraph("../data/tinkerpop-modern.kryo"); - GraphTraversalSource g = graph.traversal().withProcessor(Actors.of(AkkaGraphActors.class).partitioner(new HashPartitioner(graph.partitioner(), 3))); + GraphTraversalSource g = graph.traversal().withProcessor(Actors.of(AkkaGraphActors.class).workers(3)); // System.out.println(g.V().group().by("name").by(outE().values("weight").fold()).toList()); for (int i = 0; i < 1000; i++) { http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/2beb380f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Processor.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Processor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Processor.java index b55415c..bffda58 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Processor.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Processor.java @@ -46,7 +46,7 @@ public interface Processor { * * @param traversalSource the traversal source to add processor-specific strategies to */ - public void addTraversalStrategies(final TraversalSource traversalSource); + public TraversalSource addTraversalStrategies(final TraversalSource traversalSource); } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/2beb380f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java index 0822017..d3b5d17 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java @@ -22,15 +22,14 @@ package org.apache.tinkerpop.gremlin.process.actor; import org.apache.tinkerpop.gremlin.process.Processor; import org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.decoration.ActorProgramStrategy; import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource; -import org.apache.tinkerpop.gremlin.structure.Partitioner; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ public final class Actors implements Processor.Description<GraphActors> { - private final Class<? extends GraphActors> graphActorsClass; - private Partitioner partitioner = null; + private Class<? extends GraphActors> graphActorsClass; + private int workers = 1; private Actors(final Class<? extends GraphActors> graphActorsClass) { this.graphActorsClass = graphActorsClass; @@ -40,9 +39,15 @@ public final class Actors implements Processor.Description<GraphActors> { return new Actors(graphActorsClass); } - public Actors partitioner(final Partitioner partitioner) { + public Actors graphActors(final Class<? extends GraphActors> graphActorsClass) { final Actors clone = this.clone(); - clone.partitioner = partitioner; + clone.graphActorsClass = graphActorsClass; + return clone; + } + + public Actors workers(final int workers) { + final Actors clone = this.clone(); + clone.workers = workers; return clone; } @@ -50,8 +55,8 @@ public final class Actors implements Processor.Description<GraphActors> { return this.graphActorsClass; } - public Partitioner getPartitioner() { - return this.partitioner; + public int getWorkers() { + return this.workers; } @@ -69,8 +74,9 @@ public final class Actors implements Processor.Description<GraphActors> { } @Override - public void addTraversalStrategies(final TraversalSource traversalSource) { + public TraversalSource addTraversalStrategies(final TraversalSource traversalSource) { final ActorProgramStrategy actorProgramStrategy = new ActorProgramStrategy(this); traversalSource.getStrategies().addStrategies(actorProgramStrategy); + return traversalSource; } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/2beb380f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/decoration/ActorProgramStrategy.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/decoration/ActorProgramStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/decoration/ActorProgramStrategy.java index 26d3eec..81bcda6 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/decoration/ActorProgramStrategy.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/decoration/ActorProgramStrategy.java @@ -19,6 +19,8 @@ package org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.decoration; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.MapConfiguration; import org.apache.tinkerpop.gremlin.process.actor.Actors; import org.apache.tinkerpop.gremlin.process.actor.GraphActors; import org.apache.tinkerpop.gremlin.process.actor.traversal.step.map.TraversalActorProgramStep; @@ -29,10 +31,14 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep; import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy; import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ReadOnlyStrategy; import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper; -import org.apache.tinkerpop.gremlin.structure.Partitioner; import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph; +import org.apache.tinkerpop.gremlin.structure.util.partitioner.HashPartitioner; +import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Set; /** @@ -43,16 +49,14 @@ public final class ActorProgramStrategy extends AbstractTraversalStrategy<Traver private static final Set<Class<? extends DecorationStrategy>> PRIORS = Collections.singleton(RemoteStrategy.class); - private final Partitioner partitioner; - private final Class<? extends GraphActors> actors; + private final Actors actors; - private ActorProgramStrategy(final Class<? extends GraphActors> actors, final Partitioner partitioner) { - this.actors = actors; - this.partitioner = partitioner; + private ActorProgramStrategy() { + this(null); } public ActorProgramStrategy(final Actors actors) { - this(actors.getGraphActorsClass(), actors.getPartitioner()); + this.actors = actors; } @Override @@ -62,10 +66,10 @@ public final class ActorProgramStrategy extends AbstractTraversalStrategy<Traver if (!(traversal.getParent() instanceof EmptyStep)) return; - final TraversalActorProgramStep<?, ?> actorStep = new TraversalActorProgramStep<>(traversal, this.actors, - null == this.partitioner ? + final TraversalActorProgramStep<?, ?> actorStep = new TraversalActorProgramStep<>(traversal, this.actors.getGraphActorsClass(), + 1 == this.actors.getWorkers() ? traversal.getGraph().orElse(EmptyGraph.instance()).partitioner() : - this.partitioner); + new HashPartitioner(traversal.getGraph().orElse(EmptyGraph.instance()).partitioner(), this.actors.getWorkers())); TraversalHelper.removeAllSteps(traversal); traversal.addStep(actorStep); @@ -79,5 +83,68 @@ public final class ActorProgramStrategy extends AbstractTraversalStrategy<Traver public Set<Class<? extends DecorationStrategy>> applyPrior() { return PRIORS; } + + //////////////////////////////////////////////////////////// + + public static final String GRAPH_ACTORS = "graphActors"; + public static final String WORKERS = "workers"; + + @Override + public Configuration getConfiguration() { + final Map<String, Object> map = new HashMap<>(); + map.put(GRAPH_ACTORS, this.actors.getGraphActorsClass().getCanonicalName()); + map.put(WORKERS, this.actors.getWorkers()); + return new MapConfiguration(map); + } + + public static ActorProgramStrategy create(final Configuration configuration) { + try { + final ActorProgramStrategy.Builder builder = ActorProgramStrategy.build(); + for (final String key : (List<String>) IteratorUtils.asList(configuration.getKeys())) { + if (key.equals(GRAPH_ACTORS)) + builder.graphComputer((Class) Class.forName(configuration.getString(key))); + else if (key.equals(WORKERS)) + builder.workers(configuration.getInt(key)); + else + throw new IllegalArgumentException("The provided key is unknown: " + key); + } + return builder.create(); + } catch (final ClassNotFoundException e) { + throw new IllegalArgumentException(e.getMessage(), e); + } + } + + public static ActorProgramStrategy.Builder build() { + return new ActorProgramStrategy.Builder(); + } + + public final static class Builder { + + private Actors actors = Actors.of(GraphActors.class); + + private Builder() { + } + + public Builder computer(final Actors actors) { + this.actors = actors; + return this; + } + + public Builder graphComputer(final Class<? extends GraphActors> graphActorsClass) { + this.actors = this.actors.graphActors(graphActorsClass); + return this; + } + + + public Builder workers(final int workers) { + this.actors = this.actors.workers(workers); + return this; + } + + public ActorProgramStrategy create() { + return new ActorProgramStrategy(this.actors); + } + } + } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/2beb380f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Computer.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Computer.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Computer.java index 9214a5e..8691a41 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Computer.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Computer.java @@ -168,7 +168,7 @@ public final class Computer implements Processor.Description<GraphComputer>, Fun } @Override - public void addTraversalStrategies(final TraversalSource traversalSource) { + public TraversalSource addTraversalStrategies(final TraversalSource traversalSource) { Class<? extends GraphComputer> graphComputerClass; if (this.getGraphComputerClass().equals(GraphComputer.class)) { try { @@ -181,6 +181,7 @@ public final class Computer implements Processor.Description<GraphComputer>, Fun final List<TraversalStrategy<?>> graphComputerStrategies = TraversalStrategies.GlobalCache.getStrategies(graphComputerClass).toList(); traversalSource.getStrategies().addStrategies(graphComputerStrategies.toArray(new TraversalStrategy[graphComputerStrategies.size()])); traversalSource.getStrategies().addStrategies(new VertexProgramStrategy(this)); + return traversalSource; } ///////////////// http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/2beb380f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/VertexProgramStrategy.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/VertexProgramStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/VertexProgramStrategy.java index 89e40cb..ac2e75a 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/VertexProgramStrategy.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/VertexProgramStrategy.java @@ -55,7 +55,7 @@ import java.util.Set; */ public final class VertexProgramStrategy extends AbstractTraversalStrategy<TraversalStrategy.DecorationStrategy> implements TraversalStrategy.DecorationStrategy { - private static final VertexProgramStrategy INSTANCE = new VertexProgramStrategy(Computer.compute()); + private static final VertexProgramStrategy INSTANCE = new VertexProgramStrategy(Computer.of()); private final Computer computer; http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/2beb380f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalSource.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalSource.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalSource.java index b6d948d..20926fc 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalSource.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalSource.java @@ -137,10 +137,7 @@ public interface TraversalSource extends Cloneable, AutoCloseable { * @return a new traversal source with updated strategies */ public default TraversalSource withProcessor(final Processor.Description processor) { - final TraversalSource clone = this.clone(); - processor.addTraversalStrategies(clone); - clone.getBytecode().addSource(Symbols.withProcessor, processor); - return clone; + return processor.addTraversalStrategies(this.clone()); } /** http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/2beb380f/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/BytecodeTest.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/BytecodeTest.java b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/BytecodeTest.java index 7b1d810..62ef89b 100644 --- a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/BytecodeTest.java +++ b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/BytecodeTest.java @@ -112,16 +112,16 @@ public class BytecodeTest { assertEquals(P.gt(32), bytecode.getBindings().get("c")); assertEquals("name", bytecode.getBindings().get("d")); // - Bytecode.Binding binding = (Bytecode.Binding)((List<Bytecode.Instruction>)bytecode.getStepInstructions()).get(1).getArguments()[0]; + Bytecode.Binding binding = (Bytecode.Binding)(bytecode.getStepInstructions()).get(1).getArguments()[0]; assertEquals("a", binding.variable()); assertEquals("created", binding.value()); - binding = (Bytecode.Binding) ((List<Bytecode.Instruction>)((Bytecode)((List<Bytecode.Instruction>)bytecode.getStepInstructions()).get(2).getArguments()[0]).getStepInstructions()).get(0).getArguments()[0]; + binding = (Bytecode.Binding) (((Bytecode)(bytecode.getStepInstructions()).get(2).getArguments()[0]).getStepInstructions()).get(0).getArguments()[0]; assertEquals("b", binding.variable()); assertEquals("knows", binding.value()); - binding = (Bytecode.Binding) ((List<Bytecode.Instruction>)((Bytecode)((List<Bytecode.Instruction>)bytecode.getStepInstructions()).get(2).getArguments()[0]).getStepInstructions()).get(1).getArguments()[1]; + binding = (Bytecode.Binding) (((Bytecode)(bytecode.getStepInstructions()).get(2).getArguments()[0]).getStepInstructions()).get(1).getArguments()[1]; assertEquals("c", binding.variable()); assertEquals(P.gt(32), binding.value()); - binding = (Bytecode.Binding) ((List<Bytecode.Instruction>)((Bytecode)((List<Bytecode.Instruction>)((Bytecode)((List<Bytecode.Instruction>)bytecode.getStepInstructions()).get(2).getArguments()[0]).getStepInstructions()).get(2).getArguments()[0]).getStepInstructions()).get(0).getArguments()[0]; + binding = (Bytecode.Binding) (((Bytecode)(((Bytecode)(bytecode.getStepInstructions()).get(2).getArguments()[0]).getStepInstructions()).get(2).getArguments()[0]).getStepInstructions()).get(0).getArguments()[0]; assertEquals("d", binding.variable()); assertEquals("name", binding.value()); }