Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1389 49924cc66 -> 576dd8ec8


using System.properties() to propagate shim class to workers. This is identical 
to the previous META-INF service model, save there is no META-INF service used.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/576dd8ec
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/576dd8ec
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/576dd8ec

Branch: refs/heads/TINKERPOP-1389
Commit: 576dd8ec8ce91aef1b20e8f820f2688e3a37eb88
Parents: 49924cc
Author: Marko A. Rodriguez <[email protected]>
Authored: Tue Oct 25 10:09:07 2016 -0600
Committer: Marko A. Rodriguez <[email protected]>
Committed: Tue Oct 25 10:09:07 2016 -0600

----------------------------------------------------------------------
 .../process/computer/GiraphGraphComputer.java     |  3 ---
 .../io/gryo/kryoshim/KryoShimServiceLoader.java   | 12 ++++++++++--
 .../structure/io/RecordReaderWriterTest.java      |  2 ++
 .../process/computer/SparkGraphComputer.java      | 18 ++++++++++++++++--
 4 files changed, 28 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/576dd8ec/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git 
a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
 
b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index 6ffd5ea..b06b40a 100644
--- 
a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ 
b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -44,7 +44,6 @@ import 
org.apache.tinkerpop.gremlin.hadoop.process.computer.util.MapReduceHelper
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorage;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.GraphFilterAware;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPoolShimService;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.InputOutputHelper;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
@@ -58,7 +57,6 @@ import 
org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import 
org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
 import org.apache.tinkerpop.gremlin.structure.io.Storage;
-import 
org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
 import org.apache.tinkerpop.gremlin.util.Gremlin;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
@@ -84,7 +82,6 @@ public final class GiraphGraphComputer extends 
AbstractHadoopGraphComputer imple
 
     public GiraphGraphComputer(final HadoopGraph hadoopGraph) {
         super(hadoopGraph);
-        System.setProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, 
HadoopPoolShimService.class.getCanonicalName()); // HadoopPools only with Giraph
         final Configuration configuration = hadoopGraph.configuration();
         configuration.getKeys().forEachRemaining(key -> 
this.giraphConfiguration.set(key, configuration.getProperty(key).toString()));
         this.giraphConfiguration.setMasterComputeClass(GiraphMemory.class);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/576dd8ec/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
index ac815b1..2edbc78 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
@@ -18,6 +18,7 @@
  */
 package org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim;
 
+import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.commons.configuration.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,8 +62,15 @@ public class KryoShimServiceLoader {
     public static KryoShimService load(final boolean forceReload) {
         if (null != cachedShimService && !forceReload)
             return cachedShimService;
-        if (!configuration.containsKey(KRYO_SHIM_SERVICE))
-            throw new IllegalArgumentException("The provided configuration 
does not contain a " + KRYO_SHIM_SERVICE + " property");
+        if (null == configuration)
+            configuration = new BaseConfiguration();
+        if (!configuration.containsKey(KRYO_SHIM_SERVICE)) {
+            final String systemShimService = 
System.getProperty(KRYO_SHIM_SERVICE, null);
+            if (null == systemShimService)
+                throw new IllegalStateException("There is no configured shim, 
nor shim specified in the System properties");
+            log.info("Using the KryoShimService registered with the System 
properties: " + systemShimService);
+            configuration.setProperty(KRYO_SHIM_SERVICE, systemShimService);
+        }
 
         try {
             cachedShimService = ((Class<? extends KryoShimService>) 
Class.forName(configuration.getString(KRYO_SHIM_SERVICE))).newInstance();

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/576dd8ec/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/RecordReaderWriterTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/RecordReaderWriterTest.java
 
b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/RecordReaderWriterTest.java
index f3c079b..ea5686a 100644
--- 
a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/RecordReaderWriterTest.java
+++ 
b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/RecordReaderWriterTest.java
@@ -36,6 +36,7 @@ import org.apache.tinkerpop.gremlin.TestHelper;
 import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider;
 import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
+import 
org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -83,6 +84,7 @@ public abstract class RecordReaderWriterTest {
         configuration.set("fs.file.impl", LocalFileSystem.class.getName());
         configuration.set("fs.defaultFS", "file:///");
         configuration.set("mapreduce.output.fileoutputformat.outputdir", 
"file:///" + outputDirectory.getAbsolutePath());
+        configuration.set(KryoShimServiceLoader.KRYO_SHIM_SERVICE, 
HadoopPoolShimService.class.getCanonicalName());
         return configuration;
     }
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/576dd8ec/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 42f2493..80e7785 100644
--- 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -69,6 +69,7 @@ import 
org.apache.tinkerpop.gremlin.spark.structure.io.OutputRDD;
 import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedInputRDD;
 import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
 import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorage;
+import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoRegistrator;
 import 
org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoShimService;
 import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.io.Storage;
@@ -108,12 +109,21 @@ public final class SparkGraphComputer extends 
AbstractHadoopGraphComputer {
         super(hadoopGraph);
         this.sparkConfiguration = new HadoopConfiguration();
         ConfigurationUtils.copy(this.hadoopGraph.configuration(), 
this.sparkConfiguration);
+        ///////////////////////////////////////////////////////
+        // Handle the KryoShimService for data serialization //
+        ///////////////////////////////////////////////////////
         if 
(HadoopPoolShimService.class.getCanonicalName().equals(this.sparkConfiguration.getString(KryoShimServiceLoader.KRYO_SHIM_SERVICE,
 HadoopPoolShimService.class.getCanonicalName()))) {
             
this.sparkConfiguration.setProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE,
                     
KryoSerializer.class.getCanonicalName().equals(this.sparkConfiguration.getString(Constants.SPARK_SERIALIZER,
 null)) ?
                             UnshadedKryoShimService.class.getCanonicalName() :
                             HadoopPoolShimService.class.getCanonicalName());
         }
+        final String shimService = 
this.sparkConfiguration.getString(KryoShimServiceLoader.KRYO_SHIM_SERVICE);
+        
this.sparkConfiguration.setProperty(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS,
+                
(this.sparkConfiguration.getString(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS, 
"") + " -D" + KryoShimServiceLoader.KRYO_SHIM_SERVICE + "=" + 
shimService).trim());
+        
this.sparkConfiguration.setProperty(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
+                
(this.sparkConfiguration.getString(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "") 
+ " -D" + KryoShimServiceLoader.KRYO_SHIM_SERVICE + "=" + shimService).trim());
+        System.setProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, 
shimService);
     }
 
     @Override
@@ -144,8 +154,12 @@ public final class SparkGraphComputer extends 
AbstractHadoopGraphComputer {
             final long startTime = System.currentTimeMillis();
             // apache and hadoop configurations that are used throughout the 
graph computer computation
             final org.apache.commons.configuration.Configuration 
graphComputerConfiguration = new HadoopConfiguration(this.sparkConfiguration);
-            //if 
(!graphComputerConfiguration.containsKey(Constants.SPARK_SERIALIZER))
-            //    
graphComputerConfiguration.setProperty(Constants.SPARK_SERIALIZER, 
GryoSerializer.class.getCanonicalName());
+            // if no serializer is provided then use the default of 
KryoSerializer+GryoRegistrator
+            if 
(!graphComputerConfiguration.containsKey(Constants.SPARK_SERIALIZER)) {
+                
graphComputerConfiguration.setProperty(Constants.SPARK_SERIALIZER, 
KryoSerializer.class.getCanonicalName());
+                if 
(!graphComputerConfiguration.containsKey(Constants.SPARK_KRYO_REGISTRATOR))
+                    
graphComputerConfiguration.setProperty(Constants.SPARK_KRYO_REGISTRATOR, 
GryoRegistrator.class.getCanonicalName());
+            }
             
graphComputerConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES,
 this.persist.equals(GraphComputer.Persist.EDGES));
             final Configuration hadoopConfiguration = 
ConfUtil.makeHadoopConfiguration(graphComputerConfiguration);
             final Storage fileSystemStorage = 
FileSystemStorage.open(hadoopConfiguration);

Reply via email to