Repository: tinkerpop
Updated Branches:
  refs/heads/master ff52eb63b -> b36b42f6c


found a bug that was introduced during the KryoShim work earlier this week. I 
made things super explicit in SparkGraphComputer as to what is the 
GraphComputer configuration and what is the VertexProgram configuration so that 
these issues don't pop up again. Simple fix -- CTR.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/b36b42f6
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/b36b42f6
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/b36b42f6

Branch: refs/heads/master
Commit: b36b42f6cd8cf74bc6bf267e732d79d55a0cd719
Parents: ff52eb6
Author: Marko A. Rodriguez <okramma...@gmail.com>
Authored: Fri Jun 10 12:10:17 2016 -0600
Committer: Marko A. Rodriguez <okramma...@gmail.com>
Committed: Fri Jun 10 12:10:34 2016 -0600

----------------------------------------------------------------------
 .../spark/process/computer/SparkExecutor.java   | 31 ++++++-------
 .../process/computer/SparkGraphComputer.java    | 46 ++++++++++----------
 2 files changed, 39 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b36b42f6/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index 4db8086..8dd2381 100644
--- 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -78,7 +78,8 @@ public final class SparkExecutor {
             final JavaPairRDD<Object, VertexWritable> graphRDD,
             final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD,
             final SparkMemory memory,
-            final Configuration apacheConfiguration) {
+            final Configuration graphComputerConfiguration,    // has the 
Graph/GraphComputer.configuration() information
+            final Configuration vertexProgramConfiguration) { // has the 
VertexProgram.loadState() information
 
         boolean partitionedGraphRDD = graphRDD.partitioner().isPresent();
 
@@ -89,8 +90,8 @@ public final class SparkExecutor {
                 graphRDD.leftOuterJoin(viewIncomingRDD))                       
                            // every other iteration may have views and messages
                 // for each partition of vertices emit a view and their 
outgoing messages
                 .mapPartitionsToPair(partitionIterator -> {
-                    
KryoShimServiceLoader.applyConfiguration(apacheConfiguration);
-                    final VertexProgram<M> workerVertexProgram = 
VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration),
 apacheConfiguration); // each partition(Spark)/worker(TP3) has a local copy of 
the vertex program (a worker's task)
+                    
KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration);
+                    final VertexProgram<M> workerVertexProgram = 
VertexProgram.createVertexProgram(HadoopGraph.open(graphComputerConfiguration), 
vertexProgramConfiguration); // each partition(Spark)/worker(TP3) has a local 
copy of the vertex program (a worker's task)
                     final String[] vertexComputeKeysArray = 
VertexProgramHelper.vertexComputeKeysAsArray(workerVertexProgram.getVertexComputeKeys());
 // the compute keys as an array
                     final SparkMessenger<M> messenger = new SparkMessenger<>();
                     
workerVertexProgram.workerIterationStart(memory.asImmutable()); // start the 
worker
@@ -132,10 +133,10 @@ public final class SparkExecutor {
         /////////////////////////////////////////////////////////////
         /////////////////////////////////////////////////////////////
         final PairFlatMapFunction<Tuple2<Object, ViewOutgoingPayload<M>>, 
Object, Payload> messageFunction =
-                tuple -> () -> IteratorUtils.<Tuple2<Object, Payload>>concat(
+                tuple -> () -> IteratorUtils.concat(
                         IteratorUtils.of(new Tuple2<>(tuple._1(), 
tuple._2().getView())),      // emit the view payload
                         
IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new 
Tuple2<>(message._1(), new MessagePayload<>(message._2()))));
-        final MessageCombiner<M> messageCombiner = 
VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration),
 apacheConfiguration).getMessageCombiner().orElse(null);
+        final MessageCombiner<M> messageCombiner = 
VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(vertexProgramConfiguration),
 vertexProgramConfiguration).getMessageCombiner().orElse(null);
         final Function2<Payload, Payload, Payload> reducerFunction = (a, b) -> 
{      // reduce the view and outgoing messages into a single payload object 
representing the new view and incoming messages for a vertex
             if (a instanceof ViewIncomingPayload) {
                 ((ViewIncomingPayload<M>) a).mergePayload(b, messageCombiner);
@@ -170,7 +171,7 @@ public final class SparkExecutor {
             assert 
graphRDD.partitioner().get().equals(newViewIncomingRDD.partitioner().get());
         newViewIncomingRDD
                 .foreachPartition(partitionIterator -> {
-                    
KryoShimServiceLoader.applyConfiguration(apacheConfiguration);
+                    
KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration);
                 }); // need to complete a task so its BSP and the memory for 
this iteration is updated
         return newViewIncomingRDD;
     }
@@ -203,10 +204,10 @@ public final class SparkExecutor {
 
     public static <K, V> JavaPairRDD<K, V> executeMap(
             final JavaPairRDD<Object, VertexWritable> graphRDD, final 
MapReduce<K, V, ?, ?, ?> mapReduce,
-            final Configuration apacheConfiguration) {
+            final Configuration graphComputerConfiguration) {
         JavaPairRDD<K, V> mapRDD = 
graphRDD.mapPartitionsToPair(partitionIterator -> {
-            KryoShimServiceLoader.applyConfiguration(apacheConfiguration);
-            return () -> new MapIterator<>(MapReduce.<MapReduce<K, V, ?, ?, 
?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), 
partitionIterator);
+            
KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration);
+            return () -> new MapIterator<>(MapReduce.<MapReduce<K, V, ?, ?, 
?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration), 
graphComputerConfiguration), partitionIterator);
         });
         if (mapReduce.getMapKeySort().isPresent())
             mapRDD = mapRDD.sortByKey(mapReduce.getMapKeySort().get(), true, 
1);
@@ -214,19 +215,19 @@ public final class SparkExecutor {
     }
 
     public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeCombine(final 
JavaPairRDD<K, V> mapRDD,
-                                                                    final 
Configuration apacheConfiguration) {
+                                                                    final 
Configuration graphComputerConfiguration) {
         return mapRDD.mapPartitionsToPair(partitionIterator -> {
-            KryoShimServiceLoader.applyConfiguration(apacheConfiguration);
-            return () -> new CombineIterator<>(MapReduce.<MapReduce<K, V, OK, 
OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), 
apacheConfiguration), partitionIterator);
+            
KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration);
+            return () -> new CombineIterator<>(MapReduce.<MapReduce<K, V, OK, 
OV, ?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration), 
graphComputerConfiguration), partitionIterator);
         });
     }
 
     public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(
             final JavaPairRDD<K, V> mapOrCombineRDD, final MapReduce<K, V, OK, 
OV, ?> mapReduce,
-            final Configuration apacheConfiguration) {
+            final Configuration graphComputerConfiguration) {
         JavaPairRDD<OK, OV> reduceRDD = 
mapOrCombineRDD.groupByKey().mapPartitionsToPair(partitionIterator -> {
-            KryoShimServiceLoader.applyConfiguration(apacheConfiguration);
-            return () -> new ReduceIterator<>(MapReduce.<MapReduce<K, V, OK, 
OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), 
apacheConfiguration), partitionIterator);
+            
KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration);
+            return () -> new ReduceIterator<>(MapReduce.<MapReduce<K, V, OK, 
OV, ?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration), 
graphComputerConfiguration), partitionIterator);
         });
         if (mapReduce.getReduceKeySort().isPresent())
             reduceRDD = 
reduceRDD.sortByKey(mapReduce.getReduceKeySort().get(), true, 1);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b36b42f6/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 9e05e53..5178225 100644
--- 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -135,19 +135,19 @@ public final class SparkGraphComputer extends 
AbstractHadoopGraphComputer {
         return computerService.submit(() -> {
             final long startTime = System.currentTimeMillis();
             // apache and hadoop configurations that are used throughout the 
graph computer computation
-            final org.apache.commons.configuration.Configuration 
apacheConfiguration = new HadoopConfiguration(this.sparkConfiguration);
-            if (!apacheConfiguration.containsKey(Constants.SPARK_SERIALIZER))
-                apacheConfiguration.setProperty(Constants.SPARK_SERIALIZER, 
GryoSerializer.class.getCanonicalName());
-            
apacheConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES,
 this.persist.equals(GraphComputer.Persist.EDGES));
-            final Configuration hadoopConfiguration = 
ConfUtil.makeHadoopConfiguration(apacheConfiguration);
+            final org.apache.commons.configuration.Configuration 
graphComputerConfiguration = new HadoopConfiguration(this.sparkConfiguration);
+            if 
(!graphComputerConfiguration.containsKey(Constants.SPARK_SERIALIZER))
+                
graphComputerConfiguration.setProperty(Constants.SPARK_SERIALIZER, 
GryoSerializer.class.getCanonicalName());
+            
graphComputerConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES,
 this.persist.equals(GraphComputer.Persist.EDGES));
+            final Configuration hadoopConfiguration = 
ConfUtil.makeHadoopConfiguration(graphComputerConfiguration);
             final Storage fileSystemStorage = 
FileSystemStorage.open(hadoopConfiguration);
-            final Storage sparkContextStorage = 
SparkContextStorage.open(apacheConfiguration);
+            final Storage sparkContextStorage = 
SparkContextStorage.open(graphComputerConfiguration);
             final boolean inputFromHDFS = 
FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER,
 Object.class));
             final boolean inputFromSpark = 
PersistedInputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER,
 Object.class));
             final boolean outputToHDFS = 
FileOutputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_WRITER,
 Object.class));
             final boolean outputToSpark = 
PersistedOutputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_WRITER,
 Object.class));
-            final boolean skipPartitioner = 
apacheConfiguration.getBoolean(Constants.GREMLIN_SPARK_SKIP_PARTITIONER, false);
-            final boolean skipPersist = 
apacheConfiguration.getBoolean(Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE, false);
+            final boolean skipPartitioner = 
graphComputerConfiguration.getBoolean(Constants.GREMLIN_SPARK_SKIP_PARTITIONER, 
false);
+            final boolean skipPersist = 
graphComputerConfiguration.getBoolean(Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE, 
false);
             String inputLocation = null;
             if (inputFromSpark)
                 inputLocation = 
Constants.getSearchGraphLocation(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION),
 sparkContextStorage).orElse(null);
@@ -158,7 +158,7 @@ public final class SparkGraphComputer extends 
AbstractHadoopGraphComputer {
 
             if (null != inputLocation && inputFromHDFS) {
                 try {
-                    
apacheConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR,
 FileSystem.get(hadoopConfiguration).getFileStatus(new 
Path(inputLocation)).getPath().toString());
+                    
graphComputerConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR,
 FileSystem.get(hadoopConfiguration).getFileStatus(new 
Path(inputLocation)).getPath().toString());
                     
hadoopConfiguration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, 
FileSystem.get(hadoopConfiguration).getFileStatus(new 
Path(inputLocation)).getPath().toString());
                 } catch (final IOException e) {
                     throw new IllegalStateException(e.getMessage(), e);
@@ -176,7 +176,7 @@ public final class SparkGraphComputer extends 
AbstractHadoopGraphComputer {
                         OutputFormatRDD.class.newInstance();
                 // if the input class can filter on load, then set the filters
                 if (inputRDD instanceof InputFormatRDD && 
GraphFilterAware.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER,
 InputFormat.class, InputFormat.class))) {
-                    GraphFilterAware.storeGraphFilter(apacheConfiguration, 
hadoopConfiguration, this.graphFilter);
+                    
GraphFilterAware.storeGraphFilter(graphComputerConfiguration, 
hadoopConfiguration, this.graphFilter);
                     filtered = false;
                 } else if (inputRDD instanceof GraphFilterAware) {
                     ((GraphFilterAware) 
inputRDD).setGraphFilter(this.graphFilter);
@@ -214,7 +214,7 @@ public final class SparkGraphComputer extends 
AbstractHadoopGraphComputer {
                 updateLocalConfiguration(sparkContext, sparkConfiguration);
                 // create a message-passing friendly rdd from the input rdd
                 boolean partitioned = false;
-                JavaPairRDD<Object, VertexWritable> loadedGraphRDD = 
inputRDD.readGraphRDD(apacheConfiguration, sparkContext);
+                JavaPairRDD<Object, VertexWritable> loadedGraphRDD = 
inputRDD.readGraphRDD(graphComputerConfiguration, sparkContext);
                 // if there are vertex or edge filters, filter the loaded 
graph rdd prior to partitioning and persisting
                 if (filtered) {
                     this.logger.debug("Filtering the loaded graphRDD: " + 
this.graphFilter);
@@ -255,10 +255,10 @@ public final class SparkGraphComputer extends 
AbstractHadoopGraphComputer {
                     memory = new SparkMemory(this.vertexProgram, 
this.mapReducers, sparkContext);
                     /////////////////
                     // if there is a registered VertexProgramInterceptor, use 
it to bypass the GraphComputer semantics
-                    if 
(apacheConfiguration.containsKey(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR))
 {
+                    if 
(graphComputerConfiguration.containsKey(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR))
 {
                         try {
                             final SparkVertexProgramInterceptor<VertexProgram> 
interceptor =
-                                    (SparkVertexProgramInterceptor) 
Class.forName(apacheConfiguration.getString(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR)).newInstance();
+                                    (SparkVertexProgramInterceptor) 
Class.forName(graphComputerConfiguration.getString(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR)).newInstance();
                             computedGraphRDD = 
interceptor.apply(this.vertexProgram, loadedGraphRDD, memory);
                         } catch (final ClassNotFoundException | 
IllegalAccessException | InstantiationException e) {
                             throw new IllegalStateException(e.getMessage());
@@ -278,7 +278,7 @@ public final class SparkGraphComputer extends 
AbstractHadoopGraphComputer {
                                 throw new TraversalInterruptedException();
                             }
                             memory.setInExecute(true);
-                            viewIncomingRDD = 
SparkExecutor.executeVertexProgramIteration(loadedGraphRDD, viewIncomingRDD, 
memory, vertexProgramConfiguration);
+                            viewIncomingRDD = 
SparkExecutor.executeVertexProgramIteration(loadedGraphRDD, viewIncomingRDD, 
memory, graphComputerConfiguration, vertexProgramConfiguration);
                             memory.setInExecute(false);
                             if (this.vertexProgram.terminate(memory))
                                 break;
@@ -301,7 +301,7 @@ public final class SparkGraphComputer extends 
AbstractHadoopGraphComputer {
                     // write the computed graph to the respective output (rdd 
or output format)
                     if (null != outputRDD && 
!this.persist.equals(Persist.NOTHING)) {
                         assert null != computedGraphRDD; // the logic holds 
that a computeGraphRDD must be created at this point
-                        outputRDD.writeGraphRDD(apacheConfiguration, 
computedGraphRDD);
+                        outputRDD.writeGraphRDD(graphComputerConfiguration, 
computedGraphRDD);
                     }
                 }
 
@@ -330,7 +330,7 @@ public final class SparkGraphComputer extends 
AbstractHadoopGraphComputer {
 
                     for (final MapReduce mapReduce : this.mapReducers) {
                         // execute the map reduce job
-                        final HadoopConfiguration newApacheConfiguration = new 
HadoopConfiguration(apacheConfiguration);
+                        final HadoopConfiguration newApacheConfiguration = new 
HadoopConfiguration(graphComputerConfiguration);
                         mapReduce.storeState(newApacheConfiguration);
                         // map
                         final JavaPairRDD mapRDD = 
SparkExecutor.executeMap((JavaPairRDD) mapReduceRDD, mapReduce, 
newApacheConfiguration);
@@ -340,7 +340,7 @@ public final class SparkGraphComputer extends 
AbstractHadoopGraphComputer {
                         final JavaPairRDD reduceRDD = 
mapReduce.doStage(MapReduce.Stage.REDUCE) ? 
SparkExecutor.executeReduce(combineRDD, mapReduce, newApacheConfiguration) : 
combineRDD;
                         // write the map reduce output back to disk and 
computer result memory
                         if (null != outputRDD)
-                            mapReduce.addResultToMemory(finalMemory, 
outputRDD.writeMemoryRDD(apacheConfiguration, mapReduce.getMemoryKey(), 
reduceRDD));
+                            mapReduce.addResultToMemory(finalMemory, 
outputRDD.writeMemoryRDD(graphComputerConfiguration, mapReduce.getMemoryKey(), 
reduceRDD));
                     }
                     // if the mapReduceRDD is not simply the computed graph, 
unpersist the mapReduceRDD
                     if (computedGraphCreated && !outputToSpark) {
@@ -370,13 +370,13 @@ public final class SparkGraphComputer extends 
AbstractHadoopGraphComputer {
                 // update runtime and return the newly computed graph
                 finalMemory.setRuntime(System.currentTimeMillis() - startTime);
                 // clear properties that should not be propagated in an OLAP 
chain
-                
apacheConfiguration.clearProperty(Constants.GREMLIN_HADOOP_GRAPH_FILTER);
-                
apacheConfiguration.clearProperty(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR);
-                
apacheConfiguration.clearProperty(Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE);
-                
apacheConfiguration.clearProperty(Constants.GREMLIN_SPARK_SKIP_PARTITIONER);
-                return new 
DefaultComputerResult(InputOutputHelper.getOutputGraph(apacheConfiguration, 
this.resultGraph, this.persist), finalMemory.asImmutable());
+                
graphComputerConfiguration.clearProperty(Constants.GREMLIN_HADOOP_GRAPH_FILTER);
+                
graphComputerConfiguration.clearProperty(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR);
+                
graphComputerConfiguration.clearProperty(Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE);
+                
graphComputerConfiguration.clearProperty(Constants.GREMLIN_SPARK_SKIP_PARTITIONER);
+                return new 
DefaultComputerResult(InputOutputHelper.getOutputGraph(graphComputerConfiguration,
 this.resultGraph, this.persist), finalMemory.asImmutable());
             } finally {
-                if 
(!apacheConfiguration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, 
false))
+                if 
(!graphComputerConfiguration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT,
 false))
                     Spark.close();
             }
         });

Reply via email to