Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1599 5fd4dfa8d -> 44a20fdcf (forced update)


TINKERPOP-1271: Refactor SparkContext creation and handling of external 
sc.stop()
    org.apache.tinkerpop.gremlin.spark.structure.Spark is a SparkContext holder 
for SparkGraphComputer.
    It was refactored do detect external stop calls and recreate SparkContext 
in that case.
    Context creation process was reordered to make all configuration options to 
take effect.
    Spark.create() methods return created context now.
    The external stop also requires SPARK-18751 fix, that was integrated into 
Spark 2.1.
    By the way the refactoring and configuration loading part gives effect on 
all versions.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/6bea8a56
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/6bea8a56
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/6bea8a56

Branch: refs/heads/TINKERPOP-1599
Commit: 6bea8a562a7d2d2a940a5cb7db3f2a4ce09f3dac
Parents: 8282a12
Author: artemaliev <artem.aliev@gmail,com>
Authored: Thu Feb 2 15:15:04 2017 +0300
Committer: artemaliev <artem.aliev@gmail,com>
Committed: Thu Feb 2 15:15:04 2017 +0300

----------------------------------------------------------------------
 .../process/computer/SparkGraphComputer.java    | 52 ++++++++++----------
 .../gremlin/spark/structure/Spark.java          | 46 ++++++++++++++---
 .../spark/structure/io/InputRDDFormat.java      |  3 +-
 .../spark/structure/io/PersistedInputRDD.java   |  2 +-
 4 files changed, 66 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6bea8a56/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 40598c0..6d2742b 100644
--- 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -140,27 +140,27 @@ public final class SparkGraphComputer extends 
AbstractHadoopGraphComputer {
             
graphComputerConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES,
 this.persist.equals(GraphComputer.Persist.EDGES));
             final Configuration hadoopConfiguration = 
ConfUtil.makeHadoopConfiguration(graphComputerConfiguration);
             final Storage fileSystemStorage = 
FileSystemStorage.open(hadoopConfiguration);
-            final Storage sparkContextStorage = 
SparkContextStorage.open(graphComputerConfiguration);
             final boolean inputFromHDFS = 
FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER,
 Object.class));
             final boolean inputFromSpark = 
PersistedInputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER,
 Object.class));
             final boolean outputToHDFS = 
FileOutputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_WRITER,
 Object.class));
             final boolean outputToSpark = 
PersistedOutputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_WRITER,
 Object.class));
             final boolean skipPartitioner = 
graphComputerConfiguration.getBoolean(Constants.GREMLIN_SPARK_SKIP_PARTITIONER, 
false);
             final boolean skipPersist = 
graphComputerConfiguration.getBoolean(Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE, 
false);
-            String inputLocation = null;
-            if (inputFromSpark)
-                inputLocation = 
Constants.getSearchGraphLocation(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION),
 sparkContextStorage).orElse(null);
-            else if (inputFromHDFS)
-                inputLocation = 
Constants.getSearchGraphLocation(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION),
 fileSystemStorage).orElse(null);
-            if (null == inputLocation)
-                inputLocation = 
hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION);
-
-            if (null != inputLocation && inputFromHDFS) {
-                try {
-                    
graphComputerConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR,
 FileSystem.get(hadoopConfiguration).getFileStatus(new 
Path(inputLocation)).getPath().toString());
-                    
hadoopConfiguration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, 
FileSystem.get(hadoopConfiguration).getFileStatus(new 
Path(inputLocation)).getPath().toString());
-                } catch (final IOException e) {
-                    throw new IllegalStateException(e.getMessage(), e);
+            if (inputFromHDFS) {
+                String inputLocation = Constants
+                        
.getSearchGraphLocation(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION),
+                                fileSystemStorage).orElse(null);
+                if (null != inputLocation) {
+                    try {
+                        
graphComputerConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR,
+                                
FileSystem.get(hadoopConfiguration).getFileStatus(new 
Path(inputLocation)).getPath()
+                                        .toString());
+                        
hadoopConfiguration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR,
+                                
FileSystem.get(hadoopConfiguration).getFileStatus(new 
Path(inputLocation)).getPath()
+                                        .toString());
+                    } catch (final IOException e) {
+                        throw new IllegalStateException(e.getMessage(), e);
+                    }
                 }
             }
             final InputRDD inputRDD;
@@ -189,6 +189,10 @@ public final class SparkGraphComputer extends 
AbstractHadoopGraphComputer {
                 throw new IllegalStateException(e.getMessage(), e);
             }
 
+            // create the spark context from the graph computer configuration
+            final JavaSparkContext sparkContext = new 
JavaSparkContext(Spark.create(hadoopConfiguration));
+            final Storage sparkContextStorage = SparkContextStorage.open();
+
             SparkMemory memory = null;
             // delete output location
             final String outputLocation = 
hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
@@ -202,15 +206,10 @@ public final class SparkGraphComputer extends 
AbstractHadoopGraphComputer {
             // the Spark application name will always be set by 
SparkContextStorage, thus, INFO the name to make it easier to debug
             logger.debug(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + (null == 
this.vertexProgram ? "No VertexProgram" : this.vertexProgram) + "[" + 
this.mapReducers + "]");
 
-            // create the spark configuration from the graph computer 
configuration
-            final SparkConf sparkConfiguration = new SparkConf();
-            hadoopConfiguration.forEach(entry -> 
sparkConfiguration.set(entry.getKey(), entry.getValue()));
             // execute the vertex program and map reducers and if there is a 
failure, auto-close the spark context
             try {
-                final JavaSparkContext sparkContext = new 
JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
                 this.loadJars(hadoopConfiguration, sparkContext); // add the 
project jars to the cluster
-                Spark.create(sparkContext.sc()); // this is the context RDD 
holder that prevents GC
-                updateLocalConfiguration(sparkContext, sparkConfiguration);
+                updateLocalConfiguration(sparkContext, hadoopConfiguration);
                 // create a message-passing friendly rdd from the input rdd
                 boolean partitioned = false;
                 JavaPairRDD<Object, VertexWritable> loadedGraphRDD = 
inputRDD.readGraphRDD(graphComputerConfiguration, sparkContext);
@@ -394,7 +393,7 @@ public final class SparkGraphComputer extends 
AbstractHadoopGraphComputer {
      * in configuration. Spark allows us to override these inherited 
properties via
      * SparkContext.setLocalProperty
      */
-    private void updateLocalConfiguration(final JavaSparkContext sparkContext, 
final SparkConf sparkConfiguration) {
+    private void updateLocalConfiguration(final JavaSparkContext sparkContext, 
final Configuration configuration) {
         /*
          * While we could enumerate over the entire SparkConfiguration and 
copy into the Thread
          * Local properties of the Spark Context this could cause adverse 
effects with future
@@ -410,12 +409,11 @@ public final class SparkGraphComputer extends 
AbstractHadoopGraphComputer {
         };
 
         for (String propertyName : validPropertyNames) {
-            if (sparkConfiguration.contains(propertyName)) {
-                String propertyValue = sparkConfiguration.get(propertyName);
+            String propertyValue = configuration.get(propertyName);
+            if (propertyValue != null) {
                 this.logger.info("Setting Thread Local SparkContext Property - 
"
                         + propertyName + " : " + propertyValue);
-
-                sparkContext.setLocalProperty(propertyName, 
sparkConfiguration.get(propertyName));
+                sparkContext.setLocalProperty(propertyName, 
configuration.get(propertyName));
             }
         }
     }
@@ -424,4 +422,4 @@ public final class SparkGraphComputer extends 
AbstractHadoopGraphComputer {
         final FileConfiguration configuration = new 
PropertiesConfiguration(args[0]);
         new 
SparkGraphComputer(HadoopGraph.open(configuration)).program(VertexProgram.createVertexProgram(HadoopGraph.open(configuration),
 configuration)).submit().get();
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6bea8a56/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java
 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java
index 08639f4..e4e90b3 100644
--- 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java
+++ 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java
@@ -46,31 +46,62 @@ public class Spark {
     private Spark() {
     }
 
-    public static void create(final Configuration configuration) {
+    public static SparkContext create(final SparkConf sparkConf) {
+        if (null == CONTEXT || CONTEXT.isStopped()) {
+            sparkConf.setAppName("Apache TinkerPop's Spark-Gremlin");
+            CONTEXT = SparkContext.getOrCreate(sparkConf);
+        }
+        return CONTEXT;
+    }
+
+    public static SparkContext create(org.apache.hadoop.conf.Configuration 
hadoopConfiguration) {
+        final SparkConf sparkConfiguration = new SparkConf();
+        hadoopConfiguration.forEach(entry -> 
sparkConfiguration.set(entry.getKey(), entry.getValue()));
+        return Spark.create(sparkConfiguration);
+    }
+
+    public static SparkContext create(final Configuration configuration) {
         final SparkConf sparkConf = new SparkConf();
         configuration.getKeys().forEachRemaining(key -> sparkConf.set(key, 
configuration.getProperty(key).toString()));
-        sparkConf.setAppName("Apache TinkerPop's Spark-Gremlin");
-        CONTEXT = SparkContext.getOrCreate(sparkConf);
+        return Spark.create(sparkConf);
     }
 
-    public static void create(final String master) {
+    public static SparkContext create(final String master) {
         final SparkConf sparkConf = new SparkConf();
-        sparkConf.setAppName("Apache TinkerPop's Spark-Gremlin");
         sparkConf.setMaster(master);
-        CONTEXT = SparkContext.getOrCreate(sparkConf);
+        return Spark.create(sparkConf);
     }
 
-    public static void create(final SparkContext sparkContext) {
+    public static SparkContext create(final SparkContext sparkContext) {
+        if (null != CONTEXT && !CONTEXT.isStopped() && sparkContext !=CONTEXT 
/*exact the same object => NOP*/
+                && 
!sparkContext.getConf().getBoolean("spark.driver.allowMultipleContexts", 
false)) {
+            throw new IllegalStateException(
+                    "Active Spark context exists. Call Spark.close() to close 
it before creating a new one");
+        }
         CONTEXT = sparkContext;
+        return CONTEXT;
     }
 
+    public static SparkContext recreateStopped() {
+        if (null == CONTEXT)
+            throw new IllegalStateException("The Spark context has not been 
created.");
+        if (!CONTEXT.isStopped())
+            throw new IllegalStateException("The Spark context is not 
stopped.");
+        CONTEXT = SparkContext.getOrCreate(CONTEXT.getConf());
+        return CONTEXT;
+    }
     public static SparkContext getContext() {
+        if (null != CONTEXT && CONTEXT.isStopped())
+            recreateStopped();
         return CONTEXT;
     }
 
     public static void refresh() {
         if (null == CONTEXT)
             throw new IllegalStateException("The Spark context has not been 
created.");
+        if (CONTEXT.isStopped())
+            recreateStopped();
+
         final Set<String> keepNames = new HashSet<>();
         for (final RDD<?> rdd : 
JavaConversions.asJavaIterable(CONTEXT.persistentRdds().values())) {
             if (null != rdd.name()) {
@@ -114,4 +145,5 @@ public class Spark {
             CONTEXT.stop();
         CONTEXT = null;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6bea8a56/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java
 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java
index d33b1dc..cdc9a2d 100644
--- 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java
+++ 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java
@@ -72,8 +72,7 @@ public final class InputRDDFormat extends 
InputFormat<NullWritable, VertexWritab
             sparkConfiguration.setAppName(UUID.randomUUID().toString());
             hadoopConfiguration.forEach(entry -> 
sparkConfiguration.set(entry.getKey(), entry.getValue()));
             final InputRDD inputRDD = (InputRDD) 
Class.forName(sparkConfiguration.get(Constants.GREMLIN_HADOOP_GRAPH_READER)).newInstance();
-            final JavaSparkContext javaSparkContext = new 
JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
-            Spark.create(javaSparkContext.sc());
+            final JavaSparkContext javaSparkContext = new 
JavaSparkContext(Spark.create(sparkConfiguration));
             final Iterator<Tuple2<Object, VertexWritable>> iterator = 
inputRDD.readGraphRDD(ConfUtil.makeApacheConfiguration(taskAttemptContext.getConfiguration()),
 javaSparkContext).toLocalIterator();
             return new RecordReader<NullWritable, VertexWritable>() {
                 @Override

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6bea8a56/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java
 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java
index 58d3b47..00237db 100644
--- 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java
+++ 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java
@@ -39,7 +39,7 @@ public final class PersistedInputRDD implements InputRDD {
         if 
(!configuration.containsKey(Constants.GREMLIN_HADOOP_INPUT_LOCATION))
             throw new IllegalArgumentException("There is no provided " + 
Constants.GREMLIN_HADOOP_INPUT_LOCATION + " to read the persisted RDD from");
         Spark.create(sparkContext.sc());
-        final Optional<String> graphLocation = 
Constants.getSearchGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION),
 SparkContextStorage.open(sparkContext.sc()));
+        final Optional<String> graphLocation = 
Constants.getSearchGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION),
 SparkContextStorage.open());
         return graphLocation.isPresent() ? JavaPairRDD.fromJavaRDD((JavaRDD) 
Spark.getRDD(graphLocation.get()).toJavaRDD()) : 
JavaPairRDD.fromJavaRDD(sparkContext.emptyRDD());
     }
 

Reply via email to