Merge branch 'tp32' into tp33

Conflicts:
        
spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
        
spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/3891777e
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/3891777e
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/3891777e

Branch: refs/heads/tp33
Commit: 3891777e4b30665bd47a5ead9e50871f37f7e9d8
Parents: a708cc3 bd85e5f
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Tue May 22 07:08:22 2018 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Tue May 22 07:08:22 2018 -0400

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |   1 +
 .../process/computer/SparkGraphComputer.java    | 104 ++++++++++++++++---
 2 files changed, 93 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3891777e/CHANGELOG.asciidoc
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3891777e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --cc 
spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index dafe613,4c896cd..5184db6
--- 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@@ -33,9 -33,7 +33,9 @@@ import org.apache.spark.Partitioner
  import org.apache.spark.api.java.JavaPairRDD;
  import org.apache.spark.api.java.JavaSparkContext;
  import org.apache.spark.launcher.SparkLauncher;
++import org.apache.spark.serializer.KryoRegistrator;
 +import org.apache.spark.serializer.KryoSerializer;
+ import org.apache.spark.serializer.Serializer;
  import org.apache.spark.storage.StorageLevel;
  import org.apache.tinkerpop.gremlin.hadoop.Constants;
  import 
org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
@@@ -87,7 -78,16 +87,17 @@@ import java.util.concurrent.Executors
  import java.util.concurrent.Future;
  import java.util.concurrent.ThreadFactory;
  
+ import static 
org.apache.tinkerpop.gremlin.hadoop.Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL;
+ import static 
org.apache.tinkerpop.gremlin.hadoop.Constants.GREMLIN_SPARK_PERSIST_CONTEXT;
+ import static 
org.apache.tinkerpop.gremlin.hadoop.Constants.GREMLIN_SPARK_PERSIST_STORAGE_LEVEL;
+ import static 
org.apache.tinkerpop.gremlin.hadoop.Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE;
+ import static 
org.apache.tinkerpop.gremlin.hadoop.Constants.GREMLIN_SPARK_SKIP_PARTITIONER;
++import static 
org.apache.tinkerpop.gremlin.hadoop.Constants.SPARK_KRYO_REGISTRATION_REQUIRED;
+ import static org.apache.tinkerpop.gremlin.hadoop.Constants.SPARK_SERIALIZER;
+ 
  /**
+  * {@link GraphComputer} implementation for Apache Spark.
+  *
   * @author Marko A. Rodriguez (http://markorodriguez.com)
   */
  public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
@@@ -116,10 -112,15 +126,14 @@@
      public SparkGraphComputer(final HadoopGraph hadoopGraph) {
          super(hadoopGraph);
          this.sparkConfiguration = new HadoopConfiguration();
 -        ConfigurationUtils.copy(this.hadoopGraph.configuration(), 
this.sparkConfiguration);
      }
  
+     /**
+      * Sets the number of workers. If the {@code spark.master} configuration 
is configured with "local" then it will
+      * change that configuration to use the specified number of worker 
threads.
+      */
      @Override
-     public GraphComputer workers(final int workers) {
+     public SparkGraphComputer workers(final int workers) {
          super.workers(workers);
          if (this.sparkConfiguration.containsKey(SparkLauncher.SPARK_MASTER) 
&& 
this.sparkConfiguration.getString(SparkLauncher.SPARK_MASTER).startsWith("local"))
 {
              this.sparkConfiguration.setProperty(SparkLauncher.SPARK_MASTER, 
"local[" + this.workers + "]");
@@@ -134,6 -135,56 +148,72 @@@
          return this;
      }
  
+     /**
+      * Sets the configuration option for {@code spark.master} which is the 
cluster manager to connect to which may be
+      * one of the <a 
href="https://spark.apache.org/docs/latest/submitting-applications.html#master-urls";>allowed
 master URLs</a>.
+      */
+     public SparkGraphComputer master(final String clusterManager) {
+         return configure(SparkLauncher.SPARK_MASTER, clusterManager);
+     }
+ 
+     /**
+      * Determines if the Spark context should be left open preventing Spark 
from garbage collecting unreferenced RDDs.
+      */
+     public SparkGraphComputer persistContext(final boolean persist) {
+         return configure(GREMLIN_SPARK_PERSIST_CONTEXT, persist);
+     }
+ 
+     /**
+      * Specifies the method by which the {@link VertexProgram} created graph 
is persisted. By default, it is configured
+      * to use {@code StorageLevel#MEMORY_ONLY()}
+      */
+     public SparkGraphComputer graphStorageLevel(final StorageLevel 
storageLevel) {
+         return configure(GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, 
storageLevel.description());
+     }
+ 
+     public SparkGraphComputer persistStorageLevel(final StorageLevel 
storageLevel) {
+         return configure(GREMLIN_SPARK_PERSIST_STORAGE_LEVEL, 
storageLevel.description());
+     }
+ 
+     /**
+      * Determines if the graph RDD should be partitioned or not. By default, 
this value is {@code false}.
+      */
+     public SparkGraphComputer skipPartitioner(final boolean skip) {
+         return configure(GREMLIN_SPARK_SKIP_PARTITIONER, skip);
+     }
+ 
+     /**
+      * Determines if the graph RDD should be cached or not. If {@code true} 
then
+      * {@link #graphStorageLevel(StorageLevel)} is ignored. By default, this 
value is {@code false}.
+      */
+     public SparkGraphComputer skipGraphCache(final boolean skip) {
+         return configure(GREMLIN_SPARK_SKIP_GRAPH_CACHE, skip);
+     }
+ 
+     /**
+      * Specifies the {@code org.apache.spark.serializer.Serializer} 
implementation to use. By default, this value is
 -     * set to {@link GryoSerializer}.
++     * set to {@code org.apache.spark.serializer.KryoSerializer}.
+      */
+     public SparkGraphComputer serializer(final Class<? extends Serializer> 
serializer) {
+         return configure(SPARK_SERIALIZER, serializer.getCanonicalName());
+     }
+ 
++    /**
++     * Specifies the {@code org.apache.spark.serializer.KryoRegistrator} to 
use to install additional types. By
++     * default this value is set to TinkerPop's {@link GryoRegistrator}.
++     */
++    public SparkGraphComputer sparkKryoRegistrator(final Class<? extends 
KryoRegistrator> registrator) {
++        return configure(Constants.SPARK_KRYO_REGISTRATOR, 
registrator.getCanonicalName());
++    }
++
++    /**
++     * Determines if kryo registration is required such that attempts to 
serialize classes that are not registered
++     * will result in an error. By default this value is {@code false}.
++     */
++    public SparkGraphComputer kryoRegistrationRequired(final boolean 
required) {
++        return configure(SPARK_KRYO_REGISTRATION_REQUIRED, required);
++    }
++
      @Override
      public Future<ComputerResult> submit() {
          this.validateStatePriorToExecution();

Reply via email to