Repository: tinkerpop Updated Branches: refs/heads/master ff52eb63b -> b36b42f6c
found a bug that was introduced during the KryoShim work earlier this week. I made things super explicit in SparkGraphComputer as to what is the GraphComputer configuration and what is the VertexProgram configuration so that these issues don't pop up again. Simple fix -- CTR. Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/b36b42f6 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/b36b42f6 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/b36b42f6 Branch: refs/heads/master Commit: b36b42f6cd8cf74bc6bf267e732d79d55a0cd719 Parents: ff52eb6 Author: Marko A. Rodriguez <okramma...@gmail.com> Authored: Fri Jun 10 12:10:17 2016 -0600 Committer: Marko A. Rodriguez <okramma...@gmail.com> Committed: Fri Jun 10 12:10:34 2016 -0600 ---------------------------------------------------------------------- .../spark/process/computer/SparkExecutor.java | 31 ++++++------- .../process/computer/SparkGraphComputer.java | 46 ++++++++++---------- 2 files changed, 39 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b36b42f6/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java index 4db8086..8dd2381 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java @@ -78,7 +78,8 @@ public final class SparkExecutor { final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD, final SparkMemory memory, - final Configuration apacheConfiguration) { + final Configuration graphComputerConfiguration, // has the Graph/GraphComputer.configuration() information + final Configuration vertexProgramConfiguration) { // has the VertexProgram.loadState() information boolean partitionedGraphRDD = graphRDD.partitioner().isPresent(); @@ -89,8 +90,8 @@ public final class SparkExecutor { graphRDD.leftOuterJoin(viewIncomingRDD)) // every other iteration may have views and messages // for each partition of vertices emit a view and their outgoing messages .mapPartitionsToPair(partitionIterator -> { - KryoShimServiceLoader.applyConfiguration(apacheConfiguration); - final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program (a worker's task) + KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration); + final VertexProgram<M> workerVertexProgram = VertexProgram.createVertexProgram(HadoopGraph.open(graphComputerConfiguration), vertexProgramConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program (a worker's task) final String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(workerVertexProgram.getVertexComputeKeys()); // the compute keys as an array final SparkMessenger<M> messenger = new SparkMessenger<>(); workerVertexProgram.workerIterationStart(memory.asImmutable()); // start the worker @@ -132,10 +133,10 @@ public final class SparkExecutor { ///////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////// final PairFlatMapFunction<Tuple2<Object, ViewOutgoingPayload<M>>, Object, Payload> messageFunction = - tuple -> () -> IteratorUtils.<Tuple2<Object, Payload>>concat( + tuple -> () -> IteratorUtils.concat( IteratorUtils.of(new Tuple2<>(tuple._1(), tuple._2().getView())), // emit the view payload IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new Tuple2<>(message._1(), new MessagePayload<>(message._2())))); - final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration).getMessageCombiner().orElse(null); + final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(vertexProgramConfiguration), vertexProgramConfiguration).getMessageCombiner().orElse(null); final Function2<Payload, Payload, Payload> reducerFunction = (a, b) -> { // reduce the view and outgoing messages into a single payload object representing the new view and incoming messages for a vertex if (a instanceof ViewIncomingPayload) { ((ViewIncomingPayload<M>) a).mergePayload(b, messageCombiner); @@ -170,7 +171,7 @@ public final class SparkExecutor { assert graphRDD.partitioner().get().equals(newViewIncomingRDD.partitioner().get()); newViewIncomingRDD .foreachPartition(partitionIterator -> { - KryoShimServiceLoader.applyConfiguration(apacheConfiguration); + KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration); }); // need to complete a task so its BSP and the memory for this iteration is updated return newViewIncomingRDD; } @@ -203,10 +204,10 @@ public final class SparkExecutor { public static <K, V> JavaPairRDD<K, V> executeMap( final JavaPairRDD<Object, VertexWritable> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce, - final Configuration apacheConfiguration) { + final Configuration graphComputerConfiguration) { JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> { - KryoShimServiceLoader.applyConfiguration(apacheConfiguration); - return () -> new MapIterator<>(MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator); + KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration); + return () -> new MapIterator<>(MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration), partitionIterator); }); if (mapReduce.getMapKeySort().isPresent()) mapRDD = mapRDD.sortByKey(mapReduce.getMapKeySort().get(), true, 1); @@ -214,19 +215,19 @@ public final class SparkExecutor { } public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeCombine(final JavaPairRDD<K, V> mapRDD, - final Configuration apacheConfiguration) { + final Configuration graphComputerConfiguration) { return mapRDD.mapPartitionsToPair(partitionIterator -> { - KryoShimServiceLoader.applyConfiguration(apacheConfiguration); - return () -> new CombineIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator); + KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration); + return () -> new CombineIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration), partitionIterator); }); } public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce( final JavaPairRDD<K, V> mapOrCombineRDD, final MapReduce<K, V, OK, OV, ?> mapReduce, - final Configuration apacheConfiguration) { + final Configuration graphComputerConfiguration) { JavaPairRDD<OK, OV> reduceRDD = mapOrCombineRDD.groupByKey().mapPartitionsToPair(partitionIterator -> { - KryoShimServiceLoader.applyConfiguration(apacheConfiguration); - return () -> new ReduceIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator); + KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration); + return () -> new ReduceIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration), partitionIterator); }); if (mapReduce.getReduceKeySort().isPresent()) reduceRDD = reduceRDD.sortByKey(mapReduce.getReduceKeySort().get(), true, 1); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b36b42f6/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java index 9e05e53..5178225 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java @@ -135,19 +135,19 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { return computerService.submit(() -> { final long startTime = System.currentTimeMillis(); // apache and hadoop configurations that are used throughout the graph computer computation - final org.apache.commons.configuration.Configuration apacheConfiguration = new HadoopConfiguration(this.sparkConfiguration); - if (!apacheConfiguration.containsKey(Constants.SPARK_SERIALIZER)) - apacheConfiguration.setProperty(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName()); - apacheConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES, this.persist.equals(GraphComputer.Persist.EDGES)); - final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(apacheConfiguration); + final org.apache.commons.configuration.Configuration graphComputerConfiguration = new HadoopConfiguration(this.sparkConfiguration); + if (!graphComputerConfiguration.containsKey(Constants.SPARK_SERIALIZER)) + graphComputerConfiguration.setProperty(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName()); + graphComputerConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES, this.persist.equals(GraphComputer.Persist.EDGES)); + final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(graphComputerConfiguration); final Storage fileSystemStorage = FileSystemStorage.open(hadoopConfiguration); - final Storage sparkContextStorage = SparkContextStorage.open(apacheConfiguration); + final Storage sparkContextStorage = SparkContextStorage.open(graphComputerConfiguration); final boolean inputFromHDFS = FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, Object.class)); final boolean inputFromSpark = PersistedInputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, Object.class)); final boolean outputToHDFS = FileOutputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_WRITER, Object.class)); final boolean outputToSpark = PersistedOutputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_WRITER, Object.class)); - final boolean skipPartitioner = apacheConfiguration.getBoolean(Constants.GREMLIN_SPARK_SKIP_PARTITIONER, false); - final boolean skipPersist = apacheConfiguration.getBoolean(Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE, false); + final boolean skipPartitioner = graphComputerConfiguration.getBoolean(Constants.GREMLIN_SPARK_SKIP_PARTITIONER, false); + final boolean skipPersist = graphComputerConfiguration.getBoolean(Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE, false); String inputLocation = null; if (inputFromSpark) inputLocation = Constants.getSearchGraphLocation(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION), sparkContextStorage).orElse(null); @@ -158,7 +158,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { if (null != inputLocation && inputFromHDFS) { try { - apacheConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, FileSystem.get(hadoopConfiguration).getFileStatus(new Path(inputLocation)).getPath().toString()); + graphComputerConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, FileSystem.get(hadoopConfiguration).getFileStatus(new Path(inputLocation)).getPath().toString()); hadoopConfiguration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, FileSystem.get(hadoopConfiguration).getFileStatus(new Path(inputLocation)).getPath().toString()); } catch (final IOException e) { throw new IllegalStateException(e.getMessage(), e); @@ -176,7 +176,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { OutputFormatRDD.class.newInstance(); // if the input class can filter on load, then set the filters if (inputRDD instanceof InputFormatRDD && GraphFilterAware.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, InputFormat.class, InputFormat.class))) { - GraphFilterAware.storeGraphFilter(apacheConfiguration, hadoopConfiguration, this.graphFilter); + GraphFilterAware.storeGraphFilter(graphComputerConfiguration, hadoopConfiguration, this.graphFilter); filtered = false; } else if (inputRDD instanceof GraphFilterAware) { ((GraphFilterAware) inputRDD).setGraphFilter(this.graphFilter); @@ -214,7 +214,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { updateLocalConfiguration(sparkContext, sparkConfiguration); // create a message-passing friendly rdd from the input rdd boolean partitioned = false; - JavaPairRDD<Object, VertexWritable> loadedGraphRDD = inputRDD.readGraphRDD(apacheConfiguration, sparkContext); + JavaPairRDD<Object, VertexWritable> loadedGraphRDD = inputRDD.readGraphRDD(graphComputerConfiguration, sparkContext); // if there are vertex or edge filters, filter the loaded graph rdd prior to partitioning and persisting if (filtered) { this.logger.debug("Filtering the loaded graphRDD: " + this.graphFilter); @@ -255,10 +255,10 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { memory = new SparkMemory(this.vertexProgram, this.mapReducers, sparkContext); ///////////////// // if there is a registered VertexProgramInterceptor, use it to bypass the GraphComputer semantics - if (apacheConfiguration.containsKey(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR)) { + if (graphComputerConfiguration.containsKey(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR)) { try { final SparkVertexProgramInterceptor<VertexProgram> interceptor = - (SparkVertexProgramInterceptor) Class.forName(apacheConfiguration.getString(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR)).newInstance(); + (SparkVertexProgramInterceptor) Class.forName(graphComputerConfiguration.getString(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR)).newInstance(); computedGraphRDD = interceptor.apply(this.vertexProgram, loadedGraphRDD, memory); } catch (final ClassNotFoundException | IllegalAccessException | InstantiationException e) { throw new IllegalStateException(e.getMessage()); @@ -278,7 +278,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { throw new TraversalInterruptedException(); } memory.setInExecute(true); - viewIncomingRDD = SparkExecutor.executeVertexProgramIteration(loadedGraphRDD, viewIncomingRDD, memory, vertexProgramConfiguration); + viewIncomingRDD = SparkExecutor.executeVertexProgramIteration(loadedGraphRDD, viewIncomingRDD, memory, graphComputerConfiguration, vertexProgramConfiguration); memory.setInExecute(false); if (this.vertexProgram.terminate(memory)) break; @@ -301,7 +301,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { // write the computed graph to the respective output (rdd or output format) if (null != outputRDD && !this.persist.equals(Persist.NOTHING)) { assert null != computedGraphRDD; // the logic holds that a computeGraphRDD must be created at this point - outputRDD.writeGraphRDD(apacheConfiguration, computedGraphRDD); + outputRDD.writeGraphRDD(graphComputerConfiguration, computedGraphRDD); } } @@ -330,7 +330,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { for (final MapReduce mapReduce : this.mapReducers) { // execute the map reduce job - final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration); + final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(graphComputerConfiguration); mapReduce.storeState(newApacheConfiguration); // map final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) mapReduceRDD, mapReduce, newApacheConfiguration); @@ -340,7 +340,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { final JavaPairRDD reduceRDD = mapReduce.doStage(MapReduce.Stage.REDUCE) ? SparkExecutor.executeReduce(combineRDD, mapReduce, newApacheConfiguration) : combineRDD; // write the map reduce output back to disk and computer result memory if (null != outputRDD) - mapReduce.addResultToMemory(finalMemory, outputRDD.writeMemoryRDD(apacheConfiguration, mapReduce.getMemoryKey(), reduceRDD)); + mapReduce.addResultToMemory(finalMemory, outputRDD.writeMemoryRDD(graphComputerConfiguration, mapReduce.getMemoryKey(), reduceRDD)); } // if the mapReduceRDD is not simply the computed graph, unpersist the mapReduceRDD if (computedGraphCreated && !outputToSpark) { @@ -370,13 +370,13 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { // update runtime and return the newly computed graph finalMemory.setRuntime(System.currentTimeMillis() - startTime); // clear properties that should not be propagated in an OLAP chain - apacheConfiguration.clearProperty(Constants.GREMLIN_HADOOP_GRAPH_FILTER); - apacheConfiguration.clearProperty(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR); - apacheConfiguration.clearProperty(Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE); - apacheConfiguration.clearProperty(Constants.GREMLIN_SPARK_SKIP_PARTITIONER); - return new DefaultComputerResult(InputOutputHelper.getOutputGraph(apacheConfiguration, this.resultGraph, this.persist), finalMemory.asImmutable()); + graphComputerConfiguration.clearProperty(Constants.GREMLIN_HADOOP_GRAPH_FILTER); + graphComputerConfiguration.clearProperty(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR); + graphComputerConfiguration.clearProperty(Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE); + graphComputerConfiguration.clearProperty(Constants.GREMLIN_SPARK_SKIP_PARTITIONER); + return new DefaultComputerResult(InputOutputHelper.getOutputGraph(graphComputerConfiguration, this.resultGraph, this.persist), finalMemory.asImmutable()); } finally { - if (!apacheConfiguration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false)) + if (!graphComputerConfiguration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false)) Spark.close(); } });