http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6353595e/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java ---------------------------------------------------------------------- diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java index 58fbd9e..9cedf7f 100644 --- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java +++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java @@ -203,6 +203,13 @@ public class GraphComputerTest extends AbstractGremlinProcessTest { } @Override + public Configuration configuration() { + final BaseConfiguration configuration = new BaseConfiguration(); + configuration.setProperty(GRAPH_COMPUTER, BadGraphComputer.class.getCanonicalName()); + return configuration; + } + + @Override public Future<ComputerResult> submit() { return null; } @@ -2346,7 +2353,7 @@ public class GraphComputerTest extends AbstractGremlinProcessTest { /////////////////////////////////// - + @Test @LoadGraphWith(MODERN) public void shouldSucceedWithProperTraverserRequirements() throws Exception { @@ -2355,7 +2362,7 @@ public class GraphComputerTest extends AbstractGremlinProcessTest { final Map<String, Object> idsByName = new HashMap<>(); final VertexProgramQ vp = VertexProgramQ.build().from("a").property("coworkers").create(); - g.V().hasLabel("person").filter(outE("created")).valueMap(true, "name").forEachRemaining((Map<Object,Object> map) -> + g.V().hasLabel("person").filter(outE("created")).valueMap(true, "name").forEachRemaining((Map<Object, Object> map) -> idsByName.put((String) ((List) map.get("name")).get(0), map.get(id))); try { @@ -2389,7 +2396,7 @@ public class GraphComputerTest extends AbstractGremlinProcessTest { final VertexProgramQ vp = VertexProgramQ.build().from("a").property("coworkers"). useTraverserRequirements(false).create(); - g.V().hasLabel("person").filter(outE("created")).valueMap(true, "name").forEachRemaining((Map<Object,Object> map) -> + g.V().hasLabel("person").filter(outE("created")).valueMap(true, "name").forEachRemaining((Map<Object, Object> map) -> idsByName.put((String) ((List) map.get("name")).get(0), map.get(id.getAccessor()))); try {
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6353595e/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/PeerPressureTest.java ---------------------------------------------------------------------- diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/PeerPressureTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/PeerPressureTest.java index 996be6d..6ca4e3c 100644 --- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/PeerPressureTest.java +++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/PeerPressureTest.java @@ -26,6 +26,7 @@ import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.Pee import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__; import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.junit.Ignore; import org.junit.Test; import java.util.HashMap; @@ -79,6 +80,7 @@ public abstract class PeerPressureTest extends AbstractGremlinProcessTest { } @Test + @Ignore @LoadGraphWith(MODERN) public void g_V_hasXname_rippleX_inXcreatedX_peerPressure_byXoutEX_byXclusterX_repeatXunionXidentity__bothX_timesX2X_dedup_valueMapXname_clusterX() { TestHelper.assumeNonDeterministic(); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6353595e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopRemoteAcceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopRemoteAcceptor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopRemoteAcceptor.java index acae442..9a8b52c 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopRemoteAcceptor.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopRemoteAcceptor.java @@ -23,12 +23,13 @@ import org.apache.tinkerpop.gremlin.groovy.plugin.RemoteAcceptor; import org.apache.tinkerpop.gremlin.groovy.plugin.RemoteException; import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph; import org.apache.tinkerpop.gremlin.process.computer.ComputerResult; +import org.apache.tinkerpop.gremlin.process.computer.GraphComputer; import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram; import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.ComputerResultStep; import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep; -import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration.VertexProgramStrategy; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.ProcessorTraversalStrategy; import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversal; import org.codehaus.groovy.tools.shell.Groovysh; @@ -98,7 +99,7 @@ public final class HadoopRemoteAcceptor implements RemoteAcceptor { if (this.useSugar) script = SugarLoader.class.getCanonicalName() + ".load()\n" + script; final TraversalVertexProgram program = TraversalVertexProgram.build().traversal(this.traversalSource, "gremlin-groovy", script).create(this.hadoopGraph); - final ComputerResult computerResult = VertexProgramStrategy.getComputer(this.traversalSource.getStrategies()).get().apply(this.hadoopGraph).program(program).submit().get(); + final ComputerResult computerResult = ProcessorTraversalStrategy.<GraphComputer>getProcessor(this.traversalSource.getStrategies()).get().program(program).submit(this.hadoopGraph).get(); this.shell.getInterp().getContext().setVariable(RESULT, computerResult); /// final Traversal.Admin<ComputerResult, ?> traversal = new DefaultTraversal<>(computerResult.graph()); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6353595e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/jsr223/HadoopRemoteAcceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/jsr223/HadoopRemoteAcceptor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/jsr223/HadoopRemoteAcceptor.java index 1fcdab1..4494abc 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/jsr223/HadoopRemoteAcceptor.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/jsr223/HadoopRemoteAcceptor.java @@ -24,12 +24,13 @@ import org.apache.tinkerpop.gremlin.jsr223.console.GremlinShellEnvironment; import org.apache.tinkerpop.gremlin.jsr223.console.RemoteAcceptor; import org.apache.tinkerpop.gremlin.jsr223.console.RemoteException; import org.apache.tinkerpop.gremlin.process.computer.ComputerResult; +import org.apache.tinkerpop.gremlin.process.computer.GraphComputer; import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram; import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.ComputerResultStep; -import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration.VertexProgramStrategy; import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep; +import org.apache.tinkerpop.gremlin.process.traversal.strategy.ProcessorTraversalStrategy; import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversal; import java.io.IOException; @@ -96,7 +97,7 @@ public final class HadoopRemoteAcceptor implements RemoteAcceptor { if (this.useSugar) script = SugarLoader.class.getCanonicalName() + ".load()\n" + script; final TraversalVertexProgram program = TraversalVertexProgram.build().traversal(this.traversalSource, "gremlin-groovy", script).create(this.hadoopGraph); - final ComputerResult computerResult = VertexProgramStrategy.getComputer(this.traversalSource.getStrategies()).get().apply(this.hadoopGraph).program(program).submit().get(); + final ComputerResult computerResult = ((GraphComputer) ProcessorTraversalStrategy.getProcessor(this.traversalSource.getStrategies()).get()).program(program).submit(this.hadoopGraph).get(); this.shellEnvironment.setVariable(RESULT, computerResult); /// final Traversal.Admin<ComputerResult, ?> traversal = new DefaultTraversal<>(computerResult.graph()); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6353595e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java index 6a68046..93f3de6 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java @@ -27,6 +27,7 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.tinkerpop.gremlin.hadoop.Constants; import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph; import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil; +import org.apache.tinkerpop.gremlin.process.computer.ComputerResult; import org.apache.tinkerpop.gremlin.process.computer.GraphComputer; import org.apache.tinkerpop.gremlin.process.computer.GraphFilter; import org.apache.tinkerpop.gremlin.process.computer.MapReduce; @@ -34,6 +35,7 @@ import org.apache.tinkerpop.gremlin.process.computer.VertexProgram; import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper; import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.structure.Edge; +import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.Vertex; import org.apache.tinkerpop.gremlin.structure.util.StringFactory; import org.apache.tinkerpop.gremlin.util.Gremlin; @@ -47,6 +49,7 @@ import java.net.URISyntaxException; import java.util.HashSet; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Future; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -59,7 +62,7 @@ public abstract class AbstractHadoopGraphComputer implements GraphComputer { Pattern.compile(File.pathSeparator.equals(":") ? "([^:]|://)+" : ("[^" + File.pathSeparator + "]")); protected final Logger logger; - protected final HadoopGraph hadoopGraph; + protected HadoopGraph hadoopGraph; protected boolean executed = false; protected final Set<MapReduce> mapReducers = new HashSet<>(); protected VertexProgram<Object> vertexProgram; @@ -118,6 +121,12 @@ public abstract class AbstractHadoopGraphComputer implements GraphComputer { } @Override + public Future<ComputerResult> submit(final Graph graph) { + this.hadoopGraph = (HadoopGraph) graph; + return this.submit(); + } + + @Override public String toString() { return StringFactory.graphComputerString(this); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6353595e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java index 2403f46..61c9663 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java @@ -72,6 +72,7 @@ import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorage; import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoRegistrator; import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoShimService; import org.apache.tinkerpop.gremlin.structure.Direction; +import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.io.IoRegistry; import org.apache.tinkerpop.gremlin.structure.io.Storage; import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader; @@ -140,6 +141,22 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { return ComputerSubmissionHelper.runWithBackgroundThread(this::submitWithExecutor, "SparkSubmitter"); } + @Override + public Future<ComputerResult> submit(final Graph graph) { + this.hadoopGraph = (HadoopGraph) graph; + ConfigurationUtils.copy(this.hadoopGraph.configuration(), this.sparkConfiguration); + return this.submit(); + } + + @Override + public org.apache.commons.configuration.Configuration configuration() { + return new HadoopConfiguration(this.sparkConfiguration); + } + + public static SparkGraphComputer open(final org.apache.commons.configuration.Configuration configuration) { + return HadoopGraph.open(configuration).compute(SparkGraphComputer.class); + } + private Future<ComputerResult> submitWithExecutor(Executor exec) { // create the completable future return computerService.submit(() -> { http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6353595e/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java ---------------------------------------------------------------------- diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java index 2abce9a..82cb934 100644 --- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java +++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java @@ -18,6 +18,9 @@ */ package org.apache.tinkerpop.gremlin.tinkergraph.process.computer; +import org.apache.commons.configuration.BaseConfiguration; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationUtils; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.tinkerpop.gremlin.process.computer.ComputerResult; import org.apache.tinkerpop.gremlin.process.computer.GraphComputer; @@ -74,6 +77,8 @@ public final class TinkerGraphComputer implements GraphComputer { private int workers = Runtime.getRuntime().availableProcessors(); private final GraphFilter graphFilter = new GraphFilter(); + private final Configuration configuration; + private final ThreadFactory threadFactoryBoss = new BasicThreadFactory.Builder().namingPattern(TinkerGraphComputer.class.getSimpleName() + "-boss").build(); /** @@ -84,23 +89,45 @@ public final class TinkerGraphComputer implements GraphComputer { public TinkerGraphComputer(final TinkerGraph graph) { this.graph = graph; + this.configuration = new BaseConfiguration(); + this.configuration.setProperty(GRAPH_COMPUTER, TinkerGraphComputer.class.getCanonicalName()); + } + + private TinkerGraphComputer(final Configuration configuration) { + this.graph = null; + this.configuration = configuration; + this.configuration.setProperty(GRAPH_COMPUTER, TinkerGraphComputer.class.getCanonicalName()); + } + + public static TinkerGraphComputer open(final Configuration configuration) { + return new TinkerGraphComputer(ConfigurationUtils.cloneConfiguration(configuration)); + } + + public static TinkerGraphComputer open() { + final BaseConfiguration configuration = new BaseConfiguration(); + configuration.setDelimiterParsingDisabled(true); + configuration.setProperty(GRAPH_COMPUTER, TinkerGraphComputer.class.getCanonicalName()); + return new TinkerGraphComputer(configuration); } @Override public GraphComputer result(final ResultGraph resultGraph) { this.resultGraph = resultGraph; + this.configuration.setProperty(RESULT, resultGraph.name()); return this; } @Override public GraphComputer persist(final Persist persist) { this.persist = persist; + this.configuration.setProperty(PERSIST, persist.name()); return this; } @Override public GraphComputer program(final VertexProgram vertexProgram) { this.vertexProgram = vertexProgram; + this.vertexProgram.storeState(this.configuration); return this; } @@ -113,22 +140,30 @@ public final class TinkerGraphComputer implements GraphComputer { @Override public GraphComputer workers(final int workers) { this.workers = workers; + this.configuration.setProperty(WORKERS, workers); return this; } @Override public GraphComputer vertices(final Traversal<Vertex, Vertex> vertexFilter) { this.graphFilter.setVertexFilter(vertexFilter); + this.configuration.setProperty(VERTICES, vertexFilter); return this; } @Override public GraphComputer edges(final Traversal<Vertex, Edge> edgeFilter) { this.graphFilter.setEdgeFilter(edgeFilter); + this.configuration.setProperty(EDGES, edgeFilter); return this; } @Override + public Configuration configuration() { + return this.configuration; + } + + @Override public Future<ComputerResult> submit() { // a graph computer can only be executed once if (this.executed) http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6353595e/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/TinkerGraphComputerProvider.java ---------------------------------------------------------------------- diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/TinkerGraphComputerProvider.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/TinkerGraphComputerProvider.java index 8902b1f..6b9db8b 100644 --- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/TinkerGraphComputerProvider.java +++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/TinkerGraphComputerProvider.java @@ -20,7 +20,6 @@ package org.apache.tinkerpop.gremlin.tinkergraph.process; import org.apache.commons.configuration.MapConfiguration; import org.apache.tinkerpop.gremlin.GraphProvider; -import org.apache.tinkerpop.gremlin.process.computer.Computer; import org.apache.tinkerpop.gremlin.process.computer.GraphComputer; import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration.VertexProgramStrategy; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; @@ -41,13 +40,22 @@ public class TinkerGraphComputerProvider extends TinkerGraphProvider { @Override public GraphTraversalSource traversal(final Graph graph) { - return RANDOM.nextBoolean() ? - graph.traversal().withStrategies(VertexProgramStrategy.create(new MapConfiguration(new HashMap<String, Object>() {{ - put(VertexProgramStrategy.WORKERS, RANDOM.nextInt(Runtime.getRuntime().availableProcessors()) + 1); - put(VertexProgramStrategy.GRAPH_COMPUTER, RANDOM.nextBoolean() ? - GraphComputer.class.getCanonicalName() : - TinkerGraphComputer.class.getCanonicalName()); - }}))) : - graph.traversal(GraphTraversalSource.computer()); + final int pick = RANDOM.nextInt(4); + if (pick == 0) { + return graph.traversal().withStrategies(VertexProgramStrategy.create(new MapConfiguration(new HashMap<String, Object>() {{ + put(GraphComputer.WORKERS, RANDOM.nextInt(Runtime.getRuntime().availableProcessors()) + 1); + put(GraphComputer.GRAPH_COMPUTER, RANDOM.nextBoolean() ? + GraphComputer.class.getCanonicalName() : + TinkerGraphComputer.class.getCanonicalName()); + }}))); + } else if (pick == 1) { + return graph.traversal(GraphTraversalSource.computer()); + } else if (pick == 2) { + return graph.traversal().withProcessor(TinkerGraphComputer.open().workers(RANDOM.nextInt(Runtime.getRuntime().availableProcessors()) + 1)); + } else if (pick == 3) { + return graph.traversal().withProcessor(TinkerGraphComputer.open()); + } else { + throw new IllegalStateException("The random pick generator is bad: " + pick); + } } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6353595e/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/groovy/jsr223/TinkerGraphGroovyTranslatorComputerProvider.java ---------------------------------------------------------------------- diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/groovy/jsr223/TinkerGraphGroovyTranslatorComputerProvider.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/groovy/jsr223/TinkerGraphGroovyTranslatorComputerProvider.java index ca0e58f..fb64d76 100644 --- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/groovy/jsr223/TinkerGraphGroovyTranslatorComputerProvider.java +++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/groovy/jsr223/TinkerGraphGroovyTranslatorComputerProvider.java @@ -32,6 +32,6 @@ public class TinkerGraphGroovyTranslatorComputerProvider extends TinkerGraphGroo @Override public GraphTraversalSource traversal(final Graph graph) { - return super.traversal(graph).withComputer(); + return super.traversal(graph).withProcessor(TinkerGraphComputer.open()); } }