Repository: tinkerpop
Updated Branches:
refs/heads/tp32 9d743725d -> 4a328b92c
TINKERPOP-1271: Refactor SparkContext creation and handling of external
sc.stop()
org.apache.tinkerpop.gremlin.spark.structure.Spark is a SparkContext holder
for SparkGraphComputer.
It was refactored do detect external stop calls and recreate SparkContext
in that case.
Context creation process was reordered to make all configuration options to
take effect.
Spark.create() methods return created context now.
The external stop also requires SPARK-18751 fix, that was integrated into
Spark 2.1.
By the way the refactoring and configuration loading part gives effect on
all versions.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/6bea8a56
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/6bea8a56
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/6bea8a56
Branch: refs/heads/tp32
Commit: 6bea8a562a7d2d2a940a5cb7db3f2a4ce09f3dac
Parents: 8282a12
Author: artemaliev <artem.aliev@gmail,com>
Authored: Thu Feb 2 15:15:04 2017 +0300
Committer: artemaliev <artem.aliev@gmail,com>
Committed: Thu Feb 2 15:15:04 2017 +0300
----------------------------------------------------------------------
.../process/computer/SparkGraphComputer.java | 52 ++++++++++----------
.../gremlin/spark/structure/Spark.java | 46 ++++++++++++++---
.../spark/structure/io/InputRDDFormat.java | 3 +-
.../spark/structure/io/PersistedInputRDD.java | 2 +-
4 files changed, 66 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6bea8a56/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 40598c0..6d2742b 100644
---
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -140,27 +140,27 @@ public final class SparkGraphComputer extends
AbstractHadoopGraphComputer {
graphComputerConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES,
this.persist.equals(GraphComputer.Persist.EDGES));
final Configuration hadoopConfiguration =
ConfUtil.makeHadoopConfiguration(graphComputerConfiguration);
final Storage fileSystemStorage =
FileSystemStorage.open(hadoopConfiguration);
- final Storage sparkContextStorage =
SparkContextStorage.open(graphComputerConfiguration);
final boolean inputFromHDFS =
FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER,
Object.class));
final boolean inputFromSpark =
PersistedInputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER,
Object.class));
final boolean outputToHDFS =
FileOutputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_WRITER,
Object.class));
final boolean outputToSpark =
PersistedOutputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_WRITER,
Object.class));
final boolean skipPartitioner =
graphComputerConfiguration.getBoolean(Constants.GREMLIN_SPARK_SKIP_PARTITIONER,
false);
final boolean skipPersist =
graphComputerConfiguration.getBoolean(Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE,
false);
- String inputLocation = null;
- if (inputFromSpark)
- inputLocation =
Constants.getSearchGraphLocation(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION),
sparkContextStorage).orElse(null);
- else if (inputFromHDFS)
- inputLocation =
Constants.getSearchGraphLocation(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION),
fileSystemStorage).orElse(null);
- if (null == inputLocation)
- inputLocation =
hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION);
-
- if (null != inputLocation && inputFromHDFS) {
- try {
-
graphComputerConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR,
FileSystem.get(hadoopConfiguration).getFileStatus(new
Path(inputLocation)).getPath().toString());
-
hadoopConfiguration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR,
FileSystem.get(hadoopConfiguration).getFileStatus(new
Path(inputLocation)).getPath().toString());
- } catch (final IOException e) {
- throw new IllegalStateException(e.getMessage(), e);
+ if (inputFromHDFS) {
+ String inputLocation = Constants
+
.getSearchGraphLocation(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION),
+ fileSystemStorage).orElse(null);
+ if (null != inputLocation) {
+ try {
+
graphComputerConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR,
+
FileSystem.get(hadoopConfiguration).getFileStatus(new
Path(inputLocation)).getPath()
+ .toString());
+
hadoopConfiguration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR,
+
FileSystem.get(hadoopConfiguration).getFileStatus(new
Path(inputLocation)).getPath()
+ .toString());
+ } catch (final IOException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
}
}
final InputRDD inputRDD;
@@ -189,6 +189,10 @@ public final class SparkGraphComputer extends
AbstractHadoopGraphComputer {
throw new IllegalStateException(e.getMessage(), e);
}
+ // create the spark context from the graph computer configuration
+ final JavaSparkContext sparkContext = new
JavaSparkContext(Spark.create(hadoopConfiguration));
+ final Storage sparkContextStorage = SparkContextStorage.open();
+
SparkMemory memory = null;
// delete output location
final String outputLocation =
hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
@@ -202,15 +206,10 @@ public final class SparkGraphComputer extends
AbstractHadoopGraphComputer {
// the Spark application name will always be set by
SparkContextStorage, thus, INFO the name to make it easier to debug
logger.debug(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + (null ==
this.vertexProgram ? "No VertexProgram" : this.vertexProgram) + "[" +
this.mapReducers + "]");
- // create the spark configuration from the graph computer
configuration
- final SparkConf sparkConfiguration = new SparkConf();
- hadoopConfiguration.forEach(entry ->
sparkConfiguration.set(entry.getKey(), entry.getValue()));
// execute the vertex program and map reducers and if there is a
failure, auto-close the spark context
try {
- final JavaSparkContext sparkContext = new
JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
this.loadJars(hadoopConfiguration, sparkContext); // add the
project jars to the cluster
- Spark.create(sparkContext.sc()); // this is the context RDD
holder that prevents GC
- updateLocalConfiguration(sparkContext, sparkConfiguration);
+ updateLocalConfiguration(sparkContext, hadoopConfiguration);
// create a message-passing friendly rdd from the input rdd
boolean partitioned = false;
JavaPairRDD<Object, VertexWritable> loadedGraphRDD =
inputRDD.readGraphRDD(graphComputerConfiguration, sparkContext);
@@ -394,7 +393,7 @@ public final class SparkGraphComputer extends
AbstractHadoopGraphComputer {
* in configuration. Spark allows us to override these inherited
properties via
* SparkContext.setLocalProperty
*/
- private void updateLocalConfiguration(final JavaSparkContext sparkContext,
final SparkConf sparkConfiguration) {
+ private void updateLocalConfiguration(final JavaSparkContext sparkContext,
final Configuration configuration) {
/*
* While we could enumerate over the entire SparkConfiguration and
copy into the Thread
* Local properties of the Spark Context this could cause adverse
effects with future
@@ -410,12 +409,11 @@ public final class SparkGraphComputer extends
AbstractHadoopGraphComputer {
};
for (String propertyName : validPropertyNames) {
- if (sparkConfiguration.contains(propertyName)) {
- String propertyValue = sparkConfiguration.get(propertyName);
+ String propertyValue = configuration.get(propertyName);
+ if (propertyValue != null) {
this.logger.info("Setting Thread Local SparkContext Property -
"
+ propertyName + " : " + propertyValue);
-
- sparkContext.setLocalProperty(propertyName,
sparkConfiguration.get(propertyName));
+ sparkContext.setLocalProperty(propertyName,
configuration.get(propertyName));
}
}
}
@@ -424,4 +422,4 @@ public final class SparkGraphComputer extends
AbstractHadoopGraphComputer {
final FileConfiguration configuration = new
PropertiesConfiguration(args[0]);
new
SparkGraphComputer(HadoopGraph.open(configuration)).program(VertexProgram.createVertexProgram(HadoopGraph.open(configuration),
configuration)).submit().get();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6bea8a56/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java
----------------------------------------------------------------------
diff --git
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java
index 08639f4..e4e90b3 100644
---
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java
+++
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java
@@ -46,31 +46,62 @@ public class Spark {
private Spark() {
}
- public static void create(final Configuration configuration) {
+ public static SparkContext create(final SparkConf sparkConf) {
+ if (null == CONTEXT || CONTEXT.isStopped()) {
+ sparkConf.setAppName("Apache TinkerPop's Spark-Gremlin");
+ CONTEXT = SparkContext.getOrCreate(sparkConf);
+ }
+ return CONTEXT;
+ }
+
+ public static SparkContext create(org.apache.hadoop.conf.Configuration
hadoopConfiguration) {
+ final SparkConf sparkConfiguration = new SparkConf();
+ hadoopConfiguration.forEach(entry ->
sparkConfiguration.set(entry.getKey(), entry.getValue()));
+ return Spark.create(sparkConfiguration);
+ }
+
+ public static SparkContext create(final Configuration configuration) {
final SparkConf sparkConf = new SparkConf();
configuration.getKeys().forEachRemaining(key -> sparkConf.set(key,
configuration.getProperty(key).toString()));
- sparkConf.setAppName("Apache TinkerPop's Spark-Gremlin");
- CONTEXT = SparkContext.getOrCreate(sparkConf);
+ return Spark.create(sparkConf);
}
- public static void create(final String master) {
+ public static SparkContext create(final String master) {
final SparkConf sparkConf = new SparkConf();
- sparkConf.setAppName("Apache TinkerPop's Spark-Gremlin");
sparkConf.setMaster(master);
- CONTEXT = SparkContext.getOrCreate(sparkConf);
+ return Spark.create(sparkConf);
}
- public static void create(final SparkContext sparkContext) {
+ public static SparkContext create(final SparkContext sparkContext) {
+ if (null != CONTEXT && !CONTEXT.isStopped() && sparkContext !=CONTEXT
/*exact the same object => NOP*/
+ &&
!sparkContext.getConf().getBoolean("spark.driver.allowMultipleContexts",
false)) {
+ throw new IllegalStateException(
+ "Active Spark context exists. Call Spark.close() to close
it before creating a new one");
+ }
CONTEXT = sparkContext;
+ return CONTEXT;
}
+ public static SparkContext recreateStopped() {
+ if (null == CONTEXT)
+ throw new IllegalStateException("The Spark context has not been
created.");
+ if (!CONTEXT.isStopped())
+ throw new IllegalStateException("The Spark context is not
stopped.");
+ CONTEXT = SparkContext.getOrCreate(CONTEXT.getConf());
+ return CONTEXT;
+ }
public static SparkContext getContext() {
+ if (null != CONTEXT && CONTEXT.isStopped())
+ recreateStopped();
return CONTEXT;
}
public static void refresh() {
if (null == CONTEXT)
throw new IllegalStateException("The Spark context has not been
created.");
+ if (CONTEXT.isStopped())
+ recreateStopped();
+
final Set<String> keepNames = new HashSet<>();
for (final RDD<?> rdd :
JavaConversions.asJavaIterable(CONTEXT.persistentRdds().values())) {
if (null != rdd.name()) {
@@ -114,4 +145,5 @@ public class Spark {
CONTEXT.stop();
CONTEXT = null;
}
+
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6bea8a56/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java
----------------------------------------------------------------------
diff --git
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java
index d33b1dc..cdc9a2d 100644
---
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java
+++
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDFormat.java
@@ -72,8 +72,7 @@ public final class InputRDDFormat extends
InputFormat<NullWritable, VertexWritab
sparkConfiguration.setAppName(UUID.randomUUID().toString());
hadoopConfiguration.forEach(entry ->
sparkConfiguration.set(entry.getKey(), entry.getValue()));
final InputRDD inputRDD = (InputRDD)
Class.forName(sparkConfiguration.get(Constants.GREMLIN_HADOOP_GRAPH_READER)).newInstance();
- final JavaSparkContext javaSparkContext = new
JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
- Spark.create(javaSparkContext.sc());
+ final JavaSparkContext javaSparkContext = new
JavaSparkContext(Spark.create(sparkConfiguration));
final Iterator<Tuple2<Object, VertexWritable>> iterator =
inputRDD.readGraphRDD(ConfUtil.makeApacheConfiguration(taskAttemptContext.getConfiguration()),
javaSparkContext).toLocalIterator();
return new RecordReader<NullWritable, VertexWritable>() {
@Override
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/6bea8a56/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java
----------------------------------------------------------------------
diff --git
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java
index 58d3b47..00237db 100644
---
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java
+++
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java
@@ -39,7 +39,7 @@ public final class PersistedInputRDD implements InputRDD {
if
(!configuration.containsKey(Constants.GREMLIN_HADOOP_INPUT_LOCATION))
throw new IllegalArgumentException("There is no provided " +
Constants.GREMLIN_HADOOP_INPUT_LOCATION + " to read the persisted RDD from");
Spark.create(sparkContext.sc());
- final Optional<String> graphLocation =
Constants.getSearchGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION),
SparkContextStorage.open(sparkContext.sc()));
+ final Optional<String> graphLocation =
Constants.getSearchGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION),
SparkContextStorage.open());
return graphLocation.isPresent() ? JavaPairRDD.fromJavaRDD((JavaRDD)
Spark.getRDD(graphLocation.get()).toJavaRDD()) :
JavaPairRDD.fromJavaRDD(sparkContext.emptyRDD());
}