Changed some spark-gremlin tests to integration tests. This change cuts the build time in about half on my system. Dicussed here https://lists.apache.org/thread.html/7a37a3967ad15e8cd30a2bc29e15bf266f44df040f482f94e09b1e8e@%3Cdev.tinkerpop.apache.org%3E CTR
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/5040b544 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/5040b544 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/5040b544 Branch: refs/heads/TINKERPOP-1502 Commit: 5040b54449d0786f2e6a3127851fc42252162257 Parents: 64609a7 Author: Stephen Mallette <sp...@genoprime.com> Authored: Tue Nov 15 10:55:06 2016 -0500 Committer: Stephen Mallette <sp...@genoprime.com> Committed: Tue Nov 15 10:55:06 2016 -0500 ---------------------------------------------------------------------- CHANGELOG.asciidoc | 1 + .../SparkGraphComputerProcessIntegrateTest.java | 32 ++ .../computer/SparkGraphComputerProcessTest.java | 32 -- ...GraphComputerGroovyProcessIntegrateTest.java | 33 ++ .../SparkGraphComputerGroovyProcessTest.java | 33 -- .../PersistedInputOutputRDDIntegrateTest.java | 358 +++++++++++++++++++ .../io/PersistedInputOutputRDDTest.java | 358 ------------------- 7 files changed, 424 insertions(+), 423 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5040b544/CHANGELOG.asciidoc ---------------------------------------------------------------------- diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 3d963c9..7c4d41f 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -26,6 +26,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima TinkerPop 3.2.4 (Release Date: NOT OFFICIALLY RELEASED YET) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +* Converted Spark process suite tests to "integration" tests. * Added another optimization in `RangeByIsCountStrategy`, that removes `count().is()` altogether if it's not needed. * Fixed a OLAP `MatchStep.clone()`-bug that occurs when the `match()` is in a local child. * Added another optimization in `RangeByIsCountStrategy`, that removes `count().is()` altogether if it's not needed. http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5040b544/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputerProcessIntegrateTest.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputerProcessIntegrateTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputerProcessIntegrateTest.java new file mode 100644 index 0000000..f38dcf5 --- /dev/null +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputerProcessIntegrateTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.spark.process.computer; + +import org.apache.tinkerpop.gremlin.GraphProviderClass; +import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph; +import org.apache.tinkerpop.gremlin.process.ProcessComputerSuite; +import org.junit.runner.RunWith; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +@RunWith(ProcessComputerSuite.class) +@GraphProviderClass(provider = SparkHadoopGraphProvider.class, graph = HadoopGraph.class) +public class SparkGraphComputerProcessIntegrateTest { +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5040b544/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputerProcessTest.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputerProcessTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputerProcessTest.java deleted file mode 100644 index 055c9bd..0000000 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputerProcessTest.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.tinkerpop.gremlin.spark.process.computer; - -import org.apache.tinkerpop.gremlin.GraphProviderClass; -import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph; -import org.apache.tinkerpop.gremlin.process.ProcessComputerSuite; -import org.junit.runner.RunWith; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -@RunWith(ProcessComputerSuite.class) -@GraphProviderClass(provider = SparkHadoopGraphProvider.class, graph = HadoopGraph.class) -public class SparkGraphComputerProcessTest { -} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5040b544/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/groovy/SparkGraphComputerGroovyProcessIntegrateTest.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/groovy/SparkGraphComputerGroovyProcessIntegrateTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/groovy/SparkGraphComputerGroovyProcessIntegrateTest.java new file mode 100644 index 0000000..6f259dd --- /dev/null +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/groovy/SparkGraphComputerGroovyProcessIntegrateTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.spark.process.computer.groovy; + +import org.apache.tinkerpop.gremlin.GraphProviderClass; +import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph; +import org.apache.tinkerpop.gremlin.process.GroovyProcessComputerSuite; +import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider; +import org.junit.runner.RunWith; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +@RunWith(GroovyProcessComputerSuite.class) +@GraphProviderClass(provider = SparkHadoopGraphProvider.class, graph = HadoopGraph.class) +public class SparkGraphComputerGroovyProcessIntegrateTest { +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5040b544/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/groovy/SparkGraphComputerGroovyProcessTest.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/groovy/SparkGraphComputerGroovyProcessTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/groovy/SparkGraphComputerGroovyProcessTest.java deleted file mode 100644 index 65ebd75..0000000 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/groovy/SparkGraphComputerGroovyProcessTest.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.tinkerpop.gremlin.spark.process.computer.groovy; - -import org.apache.tinkerpop.gremlin.GraphProviderClass; -import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph; -import org.apache.tinkerpop.gremlin.process.GroovyProcessComputerSuite; -import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider; -import org.junit.runner.RunWith; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -@RunWith(GroovyProcessComputerSuite.class) -@GraphProviderClass(provider = SparkHadoopGraphProvider.class, graph = HadoopGraph.class) -public class SparkGraphComputerGroovyProcessTest { -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5040b544/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDIntegrateTest.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDIntegrateTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDIntegrateTest.java new file mode 100644 index 0000000..249d2f2 --- /dev/null +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDIntegrateTest.java @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.spark.structure.io; + +import org.apache.commons.configuration.BaseConfiguration; +import org.apache.commons.configuration.Configuration; +import org.apache.spark.storage.StorageLevel; +import org.apache.tinkerpop.gremlin.TestHelper; +import org.apache.tinkerpop.gremlin.hadoop.Constants; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat; +import org.apache.tinkerpop.gremlin.process.computer.Computer; +import org.apache.tinkerpop.gremlin.process.computer.GraphComputer; +import org.apache.tinkerpop.gremlin.process.computer.bulkloading.BulkLoaderVertexProgram; +import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankMapReduce; +import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram; +import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; +import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest; +import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer; +import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider; +import org.apache.tinkerpop.gremlin.spark.structure.Spark; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.io.IoCore; +import org.apache.tinkerpop.gremlin.structure.util.GraphFactory; +import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; +import org.junit.Test; + +import java.util.Arrays; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public class PersistedInputOutputRDDIntegrateTest extends AbstractSparkTest { + + @Test + public void shouldNotHaveDanglingPersistedComputeRDDs() throws Exception { + Spark.create("local[4]"); + final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDIntegrateTest.class, UUID.randomUUID().toString()); + final Configuration configuration = super.getBaseConfiguration(); + configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo")); + configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName()); + configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, GryoOutputFormat.class.getCanonicalName()); + configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName); + configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true); + Graph graph = GraphFactory.open(configuration); + /// + assertEquals(6, graph.traversal().withComputer(Computer.compute(SparkGraphComputer.class)).V().out().count().next().longValue()); + assertFalse(Spark.hasRDD(Constants.getGraphLocation(rddName))); + assertEquals(0, Spark.getContext().getPersistentRDDs().size()); + // + assertEquals(2, graph.traversal().withComputer(Computer.compute(SparkGraphComputer.class)).V().out().out().count().next().longValue()); + assertFalse(Spark.hasRDD(Constants.getGraphLocation(rddName))); + assertEquals(0, Spark.getContext().getPersistentRDDs().size()); + /////// + Spark.close(); + } + + @Test + public void shouldPersistRDDBasedOnStorageLevel() throws Exception { + Spark.create("local[4]"); + int counter = 0; + for (final String storageLevel : Arrays.asList("MEMORY_ONLY", "DISK_ONLY", "MEMORY_ONLY_SER", "MEMORY_AND_DISK_SER")) { + assertEquals(counter, Spark.getRDDs().size()); + assertEquals(counter, Spark.getContext().getPersistentRDDs().size()); + counter++; + final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDIntegrateTest.class, UUID.randomUUID().toString()); + final Configuration configuration = super.getBaseConfiguration(); + configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo")); + configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName()); + configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, PersistedOutputRDD.class.getCanonicalName()); + configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_STORAGE_LEVEL, storageLevel); + configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName); + configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true); + Graph graph = GraphFactory.open(configuration); + graph.compute(SparkGraphComputer.class) + .result(GraphComputer.ResultGraph.NEW) + .persist(GraphComputer.Persist.EDGES) + .program(TraversalVertexProgram.build() + .traversal(graph.traversal().withComputer(SparkGraphComputer.class), + "gremlin-groovy", + "g.V().groupCount('m').by('name').out()").create(graph)).submit().get(); + //////// + assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName))); + assertEquals(StorageLevel.fromString(storageLevel), Spark.getRDD(Constants.getGraphLocation(rddName)).getStorageLevel()); + assertEquals(counter, Spark.getRDDs().size()); + assertEquals(counter, Spark.getContext().getPersistentRDDs().size()); + } + Spark.close(); + } + + @Test + public void shouldNotPersistRDDAcrossJobs() throws Exception { + Spark.create("local[4]"); + final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDIntegrateTest.class, UUID.randomUUID().toString()); + final Configuration configuration = super.getBaseConfiguration(); + configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo")); + configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName()); + configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, PersistedOutputRDD.class.getCanonicalName()); + configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName); + configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false); // because the spark context is NOT persisted, neither is the RDD + Graph graph = GraphFactory.open(configuration); + graph.compute(SparkGraphComputer.class) + .result(GraphComputer.ResultGraph.NEW) + .persist(GraphComputer.Persist.EDGES) + .program(TraversalVertexProgram.build() + .traversal(graph.traversal().withComputer(SparkGraphComputer.class), + "gremlin-groovy", + "g.V()").create(graph)).submit().get(); + //////// + Spark.create("local[4]"); + assertFalse(Spark.hasRDD(Constants.getGraphLocation(rddName))); + assertEquals(0, Spark.getContext().getPersistentRDDs().size()); + Spark.close(); + } + + @Test + public void shouldPersistRDDAcrossJobs() throws Exception { + Spark.create("local[4]"); + final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDIntegrateTest.class, UUID.randomUUID().toString()); + final String rddName2 = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDIntegrateTest.class, UUID.randomUUID().toString()); + final Configuration configuration = super.getBaseConfiguration(); + configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo")); + configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName()); + configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, PersistedOutputRDD.class.getCanonicalName()); + configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName); + configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true); + Graph graph = GraphFactory.open(configuration); + graph.compute(SparkGraphComputer.class) + .result(GraphComputer.ResultGraph.NEW) + .persist(GraphComputer.Persist.EDGES) + .program(TraversalVertexProgram.build() + .traversal(graph.traversal().withComputer(SparkGraphComputer.class), + "gremlin-groovy", + "g.V().count()").create(graph)).submit().get(); + assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName))); + assertEquals(1, Spark.getContext().getPersistentRDDs().size()); + /////// + configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, PersistedInputRDD.class.getCanonicalName()); + configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, rddName); + configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, PersistedOutputRDD.class.getCanonicalName()); + configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName2); + graph = GraphFactory.open(configuration); + assertEquals(6, graph.traversal().withComputer(SparkGraphComputer.class).V().out().count().next().longValue()); + assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName))); + assertEquals(1, Spark.getContext().getPersistentRDDs().size()); + /////// + graph = GraphFactory.open(configuration); + graph.compute(SparkGraphComputer.class) + .result(GraphComputer.ResultGraph.NEW) + .persist(GraphComputer.Persist.EDGES) + .program(TraversalVertexProgram.build() + .traversal(graph.traversal().withComputer(SparkGraphComputer.class), + "gremlin-groovy", + "g.V().count()").create(graph)).submit().get(); + assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName))); + assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName2))); + assertEquals(2, Spark.getContext().getPersistentRDDs().size()); + /////// + configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, PersistedInputRDD.class.getCanonicalName()); + configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, rddName); + configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, PersistedOutputRDD.class.getCanonicalName()); + configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName2); + graph = GraphFactory.open(configuration); + assertEquals(6, graph.traversal().withComputer(SparkGraphComputer.class).V().out().count().next().longValue()); + assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName))); + assertEquals(1, Spark.getContext().getPersistentRDDs().size()); + /////// + graph = GraphFactory.open(configuration); + graph.compute(SparkGraphComputer.class) + .result(GraphComputer.ResultGraph.NEW) + .persist(GraphComputer.Persist.EDGES) + .program(TraversalVertexProgram.build() + .traversal(graph.traversal().withComputer(SparkGraphComputer.class), + "gremlin-groovy", + "g.V().count()").create(graph)).submit().get(); + assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName))); + assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName2))); + assertEquals(2, Spark.getContext().getPersistentRDDs().size()); + /////// + graph = GraphFactory.open(configuration); + assertEquals(6, graph.traversal().withComputer(SparkGraphComputer.class).V().out().count().next().longValue()); + assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName))); + assertEquals(1, Spark.getContext().getPersistentRDDs().size()); + Spark.close(); + } + + @Test + public void testBulkLoaderVertexProgramChain() throws Exception { + Spark.create("local[4]"); + final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDIntegrateTest.class, UUID.randomUUID().toString()); + final Configuration readConfiguration = super.getBaseConfiguration(); + readConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName()); + readConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo")); + readConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, PersistedOutputRDD.class.getCanonicalName()); + readConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName); + readConfiguration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true); + Graph pageRankGraph = GraphFactory.open(readConfiguration); + /////////////// + final Configuration writeConfiguration = new BaseConfiguration(); + writeConfiguration.setProperty(Graph.GRAPH, TinkerGraph.class.getCanonicalName()); + writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_FORMAT, "gryo"); + writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_LOCATION, TestHelper.makeTestDataDirectory(PersistedInputOutputRDDIntegrateTest.class) + "testBulkLoaderVertexProgramChain.kryo"); + final Graph bulkLoaderGraph = pageRankGraph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.VERTEX_PROPERTIES).program(PageRankVertexProgram.build().create(pageRankGraph)).submit().get().graph(); + bulkLoaderGraph.compute(SparkGraphComputer.class) + .persist(GraphComputer.Persist.NOTHING) + .workers(1) + .configure(Constants.GREMLIN_HADOOP_GRAPH_READER, PersistedInputRDD.class.getCanonicalName()) + .configure(Constants.GREMLIN_HADOOP_INPUT_LOCATION, rddName) + .configure(Constants.GREMLIN_HADOOP_GRAPH_WRITER, null) + .configure(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null) + .program(BulkLoaderVertexProgram.build().userSuppliedIds(true).writeGraph(writeConfiguration).create(bulkLoaderGraph)) + .submit().get(); + //// + assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName))); + assertEquals(1, Spark.getContext().getPersistentRDDs().size()); + //// + final Graph graph = TinkerGraph.open(); + final GraphTraversalSource g = graph.traversal(); + graph.io(IoCore.gryo()).readGraph(TestHelper.makeTestDataDirectory(PersistedInputOutputRDDIntegrateTest.class) + "testBulkLoaderVertexProgramChain.kryo"); + assertEquals(6l, g.V().count().next().longValue()); + assertEquals(0l, g.E().count().next().longValue()); + assertEquals("marko", g.V().has("name", "marko").values("name").next()); + assertEquals(6l, g.V().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue()); + //// + Spark.close(); + } + + @Test + public void testBulkLoaderVertexProgramChainWithInputOutputHelperMapping() throws Exception { + Spark.create("local[4]"); + + final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDIntegrateTest.class, UUID.randomUUID().toString()); + final Configuration readConfiguration = super.getBaseConfiguration(); + readConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName()); + readConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo")); + readConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, PersistedOutputRDD.class.getCanonicalName()); + readConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName); + readConfiguration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true); + Graph pageRankGraph = GraphFactory.open(readConfiguration); + /////////////// + final Configuration writeConfiguration = new BaseConfiguration(); + writeConfiguration.setProperty(Graph.GRAPH, TinkerGraph.class.getCanonicalName()); + writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_FORMAT, "gryo"); + writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_LOCATION, TestHelper.makeTestDataDirectory(PersistedInputOutputRDDIntegrateTest.class) + "testBulkLoaderVertexProgramChainWithInputOutputHelperMapping.kryo"); + final Graph bulkLoaderGraph = pageRankGraph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.EDGES).program(PageRankVertexProgram.build().create(pageRankGraph)).submit().get().graph(); + bulkLoaderGraph.compute(SparkGraphComputer.class) + .persist(GraphComputer.Persist.NOTHING) + .workers(1) + .program(BulkLoaderVertexProgram.build().userSuppliedIds(true).writeGraph(writeConfiguration).create(bulkLoaderGraph)) + .submit().get(); + //// + Spark.create(readConfiguration); + assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName))); + assertEquals(1, Spark.getContext().getPersistentRDDs().size()); + //// + final Graph graph = TinkerGraph.open(); + final GraphTraversalSource g = graph.traversal(); + graph.io(IoCore.gryo()).readGraph(TestHelper.makeTestDataDirectory(PersistedInputOutputRDDIntegrateTest.class) + "testBulkLoaderVertexProgramChainWithInputOutputHelperMapping.kryo"); + assertEquals(6l, g.V().count().next().longValue()); + assertEquals(6l, g.E().count().next().longValue()); + assertEquals("marko", g.V().has("name", "marko").values("name").next()); + assertEquals(6l, g.V().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue()); + //// + Spark.close(); + } + + @Test + public void testComplexChain() throws Exception { + Spark.create("local[4]"); + + final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDIntegrateTest.class, "testComplexChain", "graphRDD"); + final String rddName2 = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDIntegrateTest.class, "testComplexChain", "graphRDD2"); + final Configuration configuration = super.getBaseConfiguration(); + configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName()); + configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo")); + configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, PersistedOutputRDD.class.getCanonicalName()); + configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName); + configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true); + + assertFalse(Spark.hasRDD(Constants.getGraphLocation(rddName))); + assertEquals(0, Spark.getContext().getPersistentRDDs().size()); + Graph graph = GraphFactory.open(configuration); + graph = graph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.EDGES).program(PageRankVertexProgram.build().iterations(2).create(graph)).submit().get().graph(); + GraphTraversalSource g = graph.traversal(); + assertEquals(6l, g.V().count().next().longValue()); + assertEquals(6l, g.E().count().next().longValue()); + assertEquals(6l, g.V().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue()); + //// + assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName))); + assertEquals(1, Spark.getContext().getPersistentRDDs().size()); + //// + configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, PersistedInputRDD.class.getCanonicalName()); + configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, rddName); + configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, PersistedOutputRDD.class.getCanonicalName()); + configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName2); + //// + graph = GraphFactory.open(configuration); + graph = graph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.EDGES).mapReduce(PageRankMapReduce.build().create()).program(PageRankVertexProgram.build().iterations(2).create(graph)).submit().get().graph(); + g = graph.traversal(); + assertEquals(6l, g.V().count().next().longValue()); + assertEquals(6l, g.E().count().next().longValue()); + assertEquals(6l, g.V().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue()); + //// + assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName))); + assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName2))); + assertTrue(Spark.hasRDD(Constants.getMemoryLocation(rddName2, PageRankMapReduce.DEFAULT_MEMORY_KEY))); + assertEquals(3, Spark.getContext().getPersistentRDDs().size()); + //// + graph = GraphFactory.open(configuration); + graph = graph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.VERTEX_PROPERTIES).program(PageRankVertexProgram.build().iterations(2).create(graph)).submit().get().graph(); + g = graph.traversal(); + assertEquals(6l, g.V().count().next().longValue()); + assertEquals(0l, g.E().count().next().longValue()); + assertEquals(6l, g.V().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue()); + //// + assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName))); + assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName2))); + assertFalse(Spark.hasRDD(Constants.getMemoryLocation(rddName2, PageRankMapReduce.DEFAULT_MEMORY_KEY))); + assertEquals(2, Spark.getContext().getPersistentRDDs().size()); + //// + graph = GraphFactory.open(configuration); + graph = graph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.NOTHING).program(PageRankVertexProgram.build().iterations(2).create(graph)).submit().get().graph(); + assertFalse(Spark.hasRDD(Constants.getGraphLocation(rddName2))); + g = graph.traversal(); + assertEquals(0l, g.V().count().next().longValue()); + assertEquals(0l, g.E().count().next().longValue()); + assertEquals(0l, g.V().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue()); + //// + assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName))); + assertFalse(Spark.hasRDD(Constants.getGraphLocation(rddName2))); + assertFalse(Spark.hasRDD(Constants.getMemoryLocation(rddName2, PageRankMapReduce.DEFAULT_MEMORY_KEY))); + assertEquals(1, Spark.getContext().getPersistentRDDs().size()); + Spark.close(); + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5040b544/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java deleted file mode 100644 index 7a089b6..0000000 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java +++ /dev/null @@ -1,358 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.spark.structure.io; - -import org.apache.commons.configuration.BaseConfiguration; -import org.apache.commons.configuration.Configuration; -import org.apache.spark.storage.StorageLevel; -import org.apache.tinkerpop.gremlin.TestHelper; -import org.apache.tinkerpop.gremlin.hadoop.Constants; -import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat; -import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat; -import org.apache.tinkerpop.gremlin.process.computer.Computer; -import org.apache.tinkerpop.gremlin.process.computer.GraphComputer; -import org.apache.tinkerpop.gremlin.process.computer.bulkloading.BulkLoaderVertexProgram; -import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankMapReduce; -import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram; -import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram; -import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; -import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest; -import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer; -import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider; -import org.apache.tinkerpop.gremlin.spark.structure.Spark; -import org.apache.tinkerpop.gremlin.structure.Graph; -import org.apache.tinkerpop.gremlin.structure.io.IoCore; -import org.apache.tinkerpop.gremlin.structure.util.GraphFactory; -import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; -import org.junit.Test; - -import java.util.Arrays; -import java.util.UUID; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public class PersistedInputOutputRDDTest extends AbstractSparkTest { - - @Test - public void shouldNotHaveDanglingPersistedComputeRDDs() throws Exception { - Spark.create("local[4]"); - final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, UUID.randomUUID().toString()); - final Configuration configuration = super.getBaseConfiguration(); - configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo")); - configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName()); - configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, GryoOutputFormat.class.getCanonicalName()); - configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName); - configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true); - Graph graph = GraphFactory.open(configuration); - /// - assertEquals(6, graph.traversal().withComputer(Computer.compute(SparkGraphComputer.class)).V().out().count().next().longValue()); - assertFalse(Spark.hasRDD(Constants.getGraphLocation(rddName))); - assertEquals(0, Spark.getContext().getPersistentRDDs().size()); - // - assertEquals(2, graph.traversal().withComputer(Computer.compute(SparkGraphComputer.class)).V().out().out().count().next().longValue()); - assertFalse(Spark.hasRDD(Constants.getGraphLocation(rddName))); - assertEquals(0, Spark.getContext().getPersistentRDDs().size()); - /////// - Spark.close(); - } - - @Test - public void shouldPersistRDDBasedOnStorageLevel() throws Exception { - Spark.create("local[4]"); - int counter = 0; - for (final String storageLevel : Arrays.asList("MEMORY_ONLY", "DISK_ONLY", "MEMORY_ONLY_SER", "MEMORY_AND_DISK_SER")) { - assertEquals(counter, Spark.getRDDs().size()); - assertEquals(counter, Spark.getContext().getPersistentRDDs().size()); - counter++; - final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, UUID.randomUUID().toString()); - final Configuration configuration = super.getBaseConfiguration(); - configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo")); - configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName()); - configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, PersistedOutputRDD.class.getCanonicalName()); - configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_STORAGE_LEVEL, storageLevel); - configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName); - configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true); - Graph graph = GraphFactory.open(configuration); - graph.compute(SparkGraphComputer.class) - .result(GraphComputer.ResultGraph.NEW) - .persist(GraphComputer.Persist.EDGES) - .program(TraversalVertexProgram.build() - .traversal(graph.traversal().withComputer(SparkGraphComputer.class), - "gremlin-groovy", - "g.V().groupCount('m').by('name').out()").create(graph)).submit().get(); - //////// - assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName))); - assertEquals(StorageLevel.fromString(storageLevel), Spark.getRDD(Constants.getGraphLocation(rddName)).getStorageLevel()); - assertEquals(counter, Spark.getRDDs().size()); - assertEquals(counter, Spark.getContext().getPersistentRDDs().size()); - } - Spark.close(); - } - - @Test - public void shouldNotPersistRDDAcrossJobs() throws Exception { - Spark.create("local[4]"); - final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, UUID.randomUUID().toString()); - final Configuration configuration = super.getBaseConfiguration(); - configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo")); - configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName()); - configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, PersistedOutputRDD.class.getCanonicalName()); - configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName); - configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false); // because the spark context is NOT persisted, neither is the RDD - Graph graph = GraphFactory.open(configuration); - graph.compute(SparkGraphComputer.class) - .result(GraphComputer.ResultGraph.NEW) - .persist(GraphComputer.Persist.EDGES) - .program(TraversalVertexProgram.build() - .traversal(graph.traversal().withComputer(SparkGraphComputer.class), - "gremlin-groovy", - "g.V()").create(graph)).submit().get(); - //////// - Spark.create("local[4]"); - assertFalse(Spark.hasRDD(Constants.getGraphLocation(rddName))); - assertEquals(0, Spark.getContext().getPersistentRDDs().size()); - Spark.close(); - } - - @Test - public void shouldPersistRDDAcrossJobs() throws Exception { - Spark.create("local[4]"); - final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, UUID.randomUUID().toString()); - final String rddName2 = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, UUID.randomUUID().toString()); - final Configuration configuration = super.getBaseConfiguration(); - configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo")); - configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName()); - configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, PersistedOutputRDD.class.getCanonicalName()); - configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName); - configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true); - Graph graph = GraphFactory.open(configuration); - graph.compute(SparkGraphComputer.class) - .result(GraphComputer.ResultGraph.NEW) - .persist(GraphComputer.Persist.EDGES) - .program(TraversalVertexProgram.build() - .traversal(graph.traversal().withComputer(SparkGraphComputer.class), - "gremlin-groovy", - "g.V().count()").create(graph)).submit().get(); - assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName))); - assertEquals(1, Spark.getContext().getPersistentRDDs().size()); - /////// - configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, PersistedInputRDD.class.getCanonicalName()); - configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, rddName); - configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, PersistedOutputRDD.class.getCanonicalName()); - configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName2); - graph = GraphFactory.open(configuration); - assertEquals(6, graph.traversal().withComputer(SparkGraphComputer.class).V().out().count().next().longValue()); - assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName))); - assertEquals(1, Spark.getContext().getPersistentRDDs().size()); - /////// - graph = GraphFactory.open(configuration); - graph.compute(SparkGraphComputer.class) - .result(GraphComputer.ResultGraph.NEW) - .persist(GraphComputer.Persist.EDGES) - .program(TraversalVertexProgram.build() - .traversal(graph.traversal().withComputer(SparkGraphComputer.class), - "gremlin-groovy", - "g.V().count()").create(graph)).submit().get(); - assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName))); - assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName2))); - assertEquals(2, Spark.getContext().getPersistentRDDs().size()); - /////// - configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, PersistedInputRDD.class.getCanonicalName()); - configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, rddName); - configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, PersistedOutputRDD.class.getCanonicalName()); - configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName2); - graph = GraphFactory.open(configuration); - assertEquals(6, graph.traversal().withComputer(SparkGraphComputer.class).V().out().count().next().longValue()); - assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName))); - assertEquals(1, Spark.getContext().getPersistentRDDs().size()); - /////// - graph = GraphFactory.open(configuration); - graph.compute(SparkGraphComputer.class) - .result(GraphComputer.ResultGraph.NEW) - .persist(GraphComputer.Persist.EDGES) - .program(TraversalVertexProgram.build() - .traversal(graph.traversal().withComputer(SparkGraphComputer.class), - "gremlin-groovy", - "g.V().count()").create(graph)).submit().get(); - assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName))); - assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName2))); - assertEquals(2, Spark.getContext().getPersistentRDDs().size()); - /////// - graph = GraphFactory.open(configuration); - assertEquals(6, graph.traversal().withComputer(SparkGraphComputer.class).V().out().count().next().longValue()); - assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName))); - assertEquals(1, Spark.getContext().getPersistentRDDs().size()); - Spark.close(); - } - - @Test - public void testBulkLoaderVertexProgramChain() throws Exception { - Spark.create("local[4]"); - final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, UUID.randomUUID().toString()); - final Configuration readConfiguration = super.getBaseConfiguration(); - readConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName()); - readConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo")); - readConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, PersistedOutputRDD.class.getCanonicalName()); - readConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName); - readConfiguration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true); - Graph pageRankGraph = GraphFactory.open(readConfiguration); - /////////////// - final Configuration writeConfiguration = new BaseConfiguration(); - writeConfiguration.setProperty(Graph.GRAPH, TinkerGraph.class.getCanonicalName()); - writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_FORMAT, "gryo"); - writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_LOCATION, TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class) + "testBulkLoaderVertexProgramChain.kryo"); - final Graph bulkLoaderGraph = pageRankGraph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.VERTEX_PROPERTIES).program(PageRankVertexProgram.build().create(pageRankGraph)).submit().get().graph(); - bulkLoaderGraph.compute(SparkGraphComputer.class) - .persist(GraphComputer.Persist.NOTHING) - .workers(1) - .configure(Constants.GREMLIN_HADOOP_GRAPH_READER, PersistedInputRDD.class.getCanonicalName()) - .configure(Constants.GREMLIN_HADOOP_INPUT_LOCATION, rddName) - .configure(Constants.GREMLIN_HADOOP_GRAPH_WRITER, null) - .configure(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null) - .program(BulkLoaderVertexProgram.build().userSuppliedIds(true).writeGraph(writeConfiguration).create(bulkLoaderGraph)) - .submit().get(); - //// - assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName))); - assertEquals(1, Spark.getContext().getPersistentRDDs().size()); - //// - final Graph graph = TinkerGraph.open(); - final GraphTraversalSource g = graph.traversal(); - graph.io(IoCore.gryo()).readGraph(TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class) + "testBulkLoaderVertexProgramChain.kryo"); - assertEquals(6l, g.V().count().next().longValue()); - assertEquals(0l, g.E().count().next().longValue()); - assertEquals("marko", g.V().has("name", "marko").values("name").next()); - assertEquals(6l, g.V().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue()); - //// - Spark.close(); - } - - @Test - public void testBulkLoaderVertexProgramChainWithInputOutputHelperMapping() throws Exception { - Spark.create("local[4]"); - - final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, UUID.randomUUID().toString()); - final Configuration readConfiguration = super.getBaseConfiguration(); - readConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName()); - readConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo")); - readConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, PersistedOutputRDD.class.getCanonicalName()); - readConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName); - readConfiguration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true); - Graph pageRankGraph = GraphFactory.open(readConfiguration); - /////////////// - final Configuration writeConfiguration = new BaseConfiguration(); - writeConfiguration.setProperty(Graph.GRAPH, TinkerGraph.class.getCanonicalName()); - writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_FORMAT, "gryo"); - writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_LOCATION, TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class) + "testBulkLoaderVertexProgramChainWithInputOutputHelperMapping.kryo"); - final Graph bulkLoaderGraph = pageRankGraph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.EDGES).program(PageRankVertexProgram.build().create(pageRankGraph)).submit().get().graph(); - bulkLoaderGraph.compute(SparkGraphComputer.class) - .persist(GraphComputer.Persist.NOTHING) - .workers(1) - .program(BulkLoaderVertexProgram.build().userSuppliedIds(true).writeGraph(writeConfiguration).create(bulkLoaderGraph)) - .submit().get(); - //// - Spark.create(readConfiguration); - assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName))); - assertEquals(1, Spark.getContext().getPersistentRDDs().size()); - //// - final Graph graph = TinkerGraph.open(); - final GraphTraversalSource g = graph.traversal(); - graph.io(IoCore.gryo()).readGraph(TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class) + "testBulkLoaderVertexProgramChainWithInputOutputHelperMapping.kryo"); - assertEquals(6l, g.V().count().next().longValue()); - assertEquals(6l, g.E().count().next().longValue()); - assertEquals("marko", g.V().has("name", "marko").values("name").next()); - assertEquals(6l, g.V().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue()); - //// - Spark.close(); - } - - @Test - public void testComplexChain() throws Exception { - Spark.create("local[4]"); - - final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, "testComplexChain", "graphRDD"); - final String rddName2 = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, "testComplexChain", "graphRDD2"); - final Configuration configuration = super.getBaseConfiguration(); - configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName()); - configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo")); - configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, PersistedOutputRDD.class.getCanonicalName()); - configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName); - configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true); - - assertFalse(Spark.hasRDD(Constants.getGraphLocation(rddName))); - assertEquals(0, Spark.getContext().getPersistentRDDs().size()); - Graph graph = GraphFactory.open(configuration); - graph = graph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.EDGES).program(PageRankVertexProgram.build().iterations(2).create(graph)).submit().get().graph(); - GraphTraversalSource g = graph.traversal(); - assertEquals(6l, g.V().count().next().longValue()); - assertEquals(6l, g.E().count().next().longValue()); - assertEquals(6l, g.V().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue()); - //// - assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName))); - assertEquals(1, Spark.getContext().getPersistentRDDs().size()); - //// - configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, PersistedInputRDD.class.getCanonicalName()); - configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, rddName); - configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, PersistedOutputRDD.class.getCanonicalName()); - configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName2); - //// - graph = GraphFactory.open(configuration); - graph = graph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.EDGES).mapReduce(PageRankMapReduce.build().create()).program(PageRankVertexProgram.build().iterations(2).create(graph)).submit().get().graph(); - g = graph.traversal(); - assertEquals(6l, g.V().count().next().longValue()); - assertEquals(6l, g.E().count().next().longValue()); - assertEquals(6l, g.V().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue()); - //// - assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName))); - assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName2))); - assertTrue(Spark.hasRDD(Constants.getMemoryLocation(rddName2, PageRankMapReduce.DEFAULT_MEMORY_KEY))); - assertEquals(3, Spark.getContext().getPersistentRDDs().size()); - //// - graph = GraphFactory.open(configuration); - graph = graph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.VERTEX_PROPERTIES).program(PageRankVertexProgram.build().iterations(2).create(graph)).submit().get().graph(); - g = graph.traversal(); - assertEquals(6l, g.V().count().next().longValue()); - assertEquals(0l, g.E().count().next().longValue()); - assertEquals(6l, g.V().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue()); - //// - assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName))); - assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName2))); - assertFalse(Spark.hasRDD(Constants.getMemoryLocation(rddName2, PageRankMapReduce.DEFAULT_MEMORY_KEY))); - assertEquals(2, Spark.getContext().getPersistentRDDs().size()); - //// - graph = GraphFactory.open(configuration); - graph = graph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.NOTHING).program(PageRankVertexProgram.build().iterations(2).create(graph)).submit().get().graph(); - assertFalse(Spark.hasRDD(Constants.getGraphLocation(rddName2))); - g = graph.traversal(); - assertEquals(0l, g.V().count().next().longValue()); - assertEquals(0l, g.E().count().next().longValue()); - assertEquals(0l, g.V().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue()); - //// - assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName))); - assertFalse(Spark.hasRDD(Constants.getGraphLocation(rddName2))); - assertFalse(Spark.hasRDD(Constants.getMemoryLocation(rddName2, PageRankMapReduce.DEFAULT_MEMORY_KEY))); - assertEquals(1, Spark.getContext().getPersistentRDDs().size()); - Spark.close(); - } -}