Repository: tinkerpop Updated Branches: refs/heads/TINKERPOP-1389 49924cc66 -> 576dd8ec8
using System.properties() to propagate shim class to workers. This is identical to the previous META-INF service model, save there is no META-INF service used. Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/576dd8ec Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/576dd8ec Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/576dd8ec Branch: refs/heads/TINKERPOP-1389 Commit: 576dd8ec8ce91aef1b20e8f820f2688e3a37eb88 Parents: 49924cc Author: Marko A. Rodriguez <[email protected]> Authored: Tue Oct 25 10:09:07 2016 -0600 Committer: Marko A. Rodriguez <[email protected]> Committed: Tue Oct 25 10:09:07 2016 -0600 ---------------------------------------------------------------------- .../process/computer/GiraphGraphComputer.java | 3 --- .../io/gryo/kryoshim/KryoShimServiceLoader.java | 12 ++++++++++-- .../structure/io/RecordReaderWriterTest.java | 2 ++ .../process/computer/SparkGraphComputer.java | 18 ++++++++++++++++-- 4 files changed, 28 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/576dd8ec/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java ---------------------------------------------------------------------- diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java index 6ffd5ea..b06b40a 100644 --- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java +++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java @@ -44,7 +44,6 @@ import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.MapReduceHelper import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph; import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorage; import org.apache.tinkerpop.gremlin.hadoop.structure.io.GraphFilterAware; -import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPoolShimService; import org.apache.tinkerpop.gremlin.hadoop.structure.io.InputOutputHelper; import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable; import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator; @@ -58,7 +57,6 @@ import org.apache.tinkerpop.gremlin.process.computer.VertexProgram; import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult; import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory; import org.apache.tinkerpop.gremlin.structure.io.Storage; -import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader; import org.apache.tinkerpop.gremlin.util.Gremlin; import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; @@ -84,7 +82,6 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple public GiraphGraphComputer(final HadoopGraph hadoopGraph) { super(hadoopGraph); - System.setProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, HadoopPoolShimService.class.getCanonicalName()); // HadoopPools only with Giraph final Configuration configuration = hadoopGraph.configuration(); configuration.getKeys().forEachRemaining(key -> this.giraphConfiguration.set(key, configuration.getProperty(key).toString())); this.giraphConfiguration.setMasterComputeClass(GiraphMemory.class); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/576dd8ec/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java index ac815b1..2edbc78 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java @@ -18,6 +18,7 @@ */ package org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim; +import org.apache.commons.configuration.BaseConfiguration; import org.apache.commons.configuration.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,8 +62,15 @@ public class KryoShimServiceLoader { public static KryoShimService load(final boolean forceReload) { if (null != cachedShimService && !forceReload) return cachedShimService; - if (!configuration.containsKey(KRYO_SHIM_SERVICE)) - throw new IllegalArgumentException("The provided configuration does not contain a " + KRYO_SHIM_SERVICE + " property"); + if (null == configuration) + configuration = new BaseConfiguration(); + if (!configuration.containsKey(KRYO_SHIM_SERVICE)) { + final String systemShimService = System.getProperty(KRYO_SHIM_SERVICE, null); + if (null == systemShimService) + throw new IllegalStateException("There is no configured shim, nor shim specified in the System properties"); + log.info("Using the KryoShimService registered with the System properties: " + systemShimService); + configuration.setProperty(KRYO_SHIM_SERVICE, systemShimService); + } try { cachedShimService = ((Class<? extends KryoShimService>) Class.forName(configuration.getString(KRYO_SHIM_SERVICE))).newInstance(); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/576dd8ec/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/RecordReaderWriterTest.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/RecordReaderWriterTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/RecordReaderWriterTest.java index f3c079b..ea5686a 100644 --- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/RecordReaderWriterTest.java +++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/RecordReaderWriterTest.java @@ -36,6 +36,7 @@ import org.apache.tinkerpop.gremlin.TestHelper; import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider; import org.apache.tinkerpop.gremlin.structure.Direction; import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader; import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; import org.junit.Test; import org.slf4j.Logger; @@ -83,6 +84,7 @@ public abstract class RecordReaderWriterTest { configuration.set("fs.file.impl", LocalFileSystem.class.getName()); configuration.set("fs.defaultFS", "file:///"); configuration.set("mapreduce.output.fileoutputformat.outputdir", "file:///" + outputDirectory.getAbsolutePath()); + configuration.set(KryoShimServiceLoader.KRYO_SHIM_SERVICE, HadoopPoolShimService.class.getCanonicalName()); return configuration; } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/576dd8ec/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java index 42f2493..80e7785 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java @@ -69,6 +69,7 @@ import org.apache.tinkerpop.gremlin.spark.structure.io.OutputRDD; import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedInputRDD; import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD; import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorage; +import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoRegistrator; import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoShimService; import org.apache.tinkerpop.gremlin.structure.Direction; import org.apache.tinkerpop.gremlin.structure.io.Storage; @@ -108,12 +109,21 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { super(hadoopGraph); this.sparkConfiguration = new HadoopConfiguration(); ConfigurationUtils.copy(this.hadoopGraph.configuration(), this.sparkConfiguration); + /////////////////////////////////////////////////////// + // Handle the KryoShimService for data serialization // + /////////////////////////////////////////////////////// if (HadoopPoolShimService.class.getCanonicalName().equals(this.sparkConfiguration.getString(KryoShimServiceLoader.KRYO_SHIM_SERVICE, HadoopPoolShimService.class.getCanonicalName()))) { this.sparkConfiguration.setProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, KryoSerializer.class.getCanonicalName().equals(this.sparkConfiguration.getString(Constants.SPARK_SERIALIZER, null)) ? UnshadedKryoShimService.class.getCanonicalName() : HadoopPoolShimService.class.getCanonicalName()); } + final String shimService = this.sparkConfiguration.getString(KryoShimServiceLoader.KRYO_SHIM_SERVICE); + this.sparkConfiguration.setProperty(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS, + (this.sparkConfiguration.getString(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS, "") + " -D" + KryoShimServiceLoader.KRYO_SHIM_SERVICE + "=" + shimService).trim()); + this.sparkConfiguration.setProperty(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, + (this.sparkConfiguration.getString(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "") + " -D" + KryoShimServiceLoader.KRYO_SHIM_SERVICE + "=" + shimService).trim()); + System.setProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, shimService); } @Override @@ -144,8 +154,12 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { final long startTime = System.currentTimeMillis(); // apache and hadoop configurations that are used throughout the graph computer computation final org.apache.commons.configuration.Configuration graphComputerConfiguration = new HadoopConfiguration(this.sparkConfiguration); - //if (!graphComputerConfiguration.containsKey(Constants.SPARK_SERIALIZER)) - // graphComputerConfiguration.setProperty(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName()); + // if no serializer is provided then use the default of KryoSerializer+GryoRegistrator + if (!graphComputerConfiguration.containsKey(Constants.SPARK_SERIALIZER)) { + graphComputerConfiguration.setProperty(Constants.SPARK_SERIALIZER, KryoSerializer.class.getCanonicalName()); + if (!graphComputerConfiguration.containsKey(Constants.SPARK_KRYO_REGISTRATOR)) + graphComputerConfiguration.setProperty(Constants.SPARK_KRYO_REGISTRATOR, GryoRegistrator.class.getCanonicalName()); + } graphComputerConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES, this.persist.equals(GraphComputer.Persist.EDGES)); final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(graphComputerConfiguration); final Storage fileSystemStorage = FileSystemStorage.open(hadoopConfiguration);
