got new GraphComputer.configuration() model working for SparkGraphComputer and GiraphGraphComputer.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/843ad33d Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/843ad33d Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/843ad33d Branch: refs/heads/TINKERPOP-1564 Commit: 843ad33d8a19b748f3718652bfba173485f70947 Parents: 6a7c906 Author: Marko A. Rodriguez <okramma...@gmail.com> Authored: Mon Dec 19 12:51:41 2016 -0700 Committer: Marko A. Rodriguez <okramma...@gmail.com> Committed: Tue Jan 10 08:25:26 2017 -0700 ---------------------------------------------------------------------- .../process/computer/GiraphGraphComputer.java | 37 ++++-------- .../computer/AbstractHadoopGraphComputer.java | 33 ++++++++-- .../process/computer/SparkGraphComputer.java | 63 ++++++++------------ 3 files changed, 63 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/843ad33d/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java ---------------------------------------------------------------------- diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java index 3047ee4..db4d6da 100644 --- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java +++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java @@ -85,9 +85,11 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple private Set<String> vertexProgramConfigurationKeys = new HashSet<>(); public GiraphGraphComputer(final HadoopGraph hadoopGraph) { - super(hadoopGraph); - final Configuration configuration = hadoopGraph.configuration(); - configuration.getKeys().forEachRemaining(key -> this.giraphConfiguration.set(key, configuration.getProperty(key).toString())); + this(hadoopGraph.configuration()); + } + + private GiraphGraphComputer(final Configuration configuration) { + super(configuration); this.giraphConfiguration.setMasterComputeClass(GiraphMemory.class); this.giraphConfiguration.setVertexClass(GiraphVertex.class); this.giraphConfiguration.setComputationClass(GiraphComputation.class); @@ -102,31 +104,18 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple } public static GiraphGraphComputer open(final org.apache.commons.configuration.Configuration configuration) { - return HadoopGraph.open(configuration).compute(GiraphGraphComputer.class); + return new GiraphGraphComputer(configuration); } @Override public Future<ComputerResult> submit(final Graph graph) { - this.hadoopGraph = (HadoopGraph)graph; - final Configuration configuration = this.hadoopGraph.configuration(); + final Configuration configuration = graph.configuration(); + this.configuration.copy(configuration); configuration.getKeys().forEachRemaining(key -> this.giraphConfiguration.set(key, configuration.getProperty(key).toString())); return this.submit(); } @Override - public GraphComputer workers(final int workers) { - this.useWorkerThreadsInConfiguration = false; - return super.workers(workers); - } - - @Override - public GraphComputer configure(final String key, final Object value) { - this.giraphConfiguration.set(key, value.toString()); - this.useWorkerThreadsInConfiguration = this.giraphConfiguration.getInt(GiraphConstants.MAX_WORKERS, -666) != -666 || this.giraphConfiguration.getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), -666) != -666; - return this; - } - - @Override public GraphComputer program(final VertexProgram vertexProgram) { super.program(vertexProgram); this.memory.addVertexProgramMemoryComputeKeys(this.vertexProgram); @@ -145,14 +134,12 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple return ComputerSubmissionHelper.runWithBackgroundThread(this::submitWithExecutor, "GiraphSubmitter"); } - @Override - public org.apache.commons.configuration.Configuration configuration() { - return ConfUtil.makeApacheConfiguration(this.giraphConfiguration); - } - private Future<ComputerResult> submitWithExecutor(final Executor exec) { final long startTime = System.currentTimeMillis(); + this.configuration.getKeys().forEachRemaining(key -> this.giraphConfiguration.set(key, this.configuration.getProperty(key).toString())); + this.useWorkerThreadsInConfiguration = this.giraphConfiguration.getInt(GiraphConstants.MAX_WORKERS, -666) != -666 || this.giraphConfiguration.getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), -666) != -666; final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(this.giraphConfiguration); + ConfigurationUtils.copy(this.configuration, apacheConfiguration); return CompletableFuture.<ComputerResult>supplyAsync(() -> { try { this.loadJars(giraphConfiguration); @@ -185,7 +172,7 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple if (null != this.vertexProgram) { // a way to verify in Giraph whether the traversal will go over the wire or not try { - VertexProgram.createVertexProgram(this.hadoopGraph, ConfUtil.makeApacheConfiguration(this.giraphConfiguration)); + VertexProgram.createVertexProgram(HadoopGraph.open(this.configuration), ConfUtil.makeApacheConfiguration(this.giraphConfiguration)); } catch (final IllegalStateException e) { if (e.getCause() instanceof NumberFormatException) throw new NotSerializableException("The provided traversal is not serializable and thus, can not be distributed across the cluster"); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/843ad33d/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java index 344f04e..b95fb7e 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.util.ReflectionUtils; import org.apache.tinkerpop.gremlin.hadoop.Constants; +import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration; import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph; import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil; import org.apache.tinkerpop.gremlin.process.computer.ComputerResult; @@ -63,7 +64,7 @@ public abstract class AbstractHadoopGraphComputer implements GraphComputer { Pattern.compile(File.pathSeparator.equals(":") ? "([^:]|://)+" : ("[^" + File.pathSeparator + "]")); protected final Logger logger; - protected HadoopGraph hadoopGraph; + protected HadoopConfiguration configuration; protected boolean executed = false; protected final Set<MapReduce> mapReducers = new HashSet<>(); protected VertexProgram<Object> vertexProgram; @@ -75,32 +76,40 @@ public abstract class AbstractHadoopGraphComputer implements GraphComputer { protected GraphFilter graphFilter = new GraphFilter(); public AbstractHadoopGraphComputer(final HadoopGraph hadoopGraph) { - this.hadoopGraph = hadoopGraph; + this(hadoopGraph.configuration()); + } + + protected AbstractHadoopGraphComputer(final org.apache.commons.configuration.Configuration configuration) { + this.configuration = new HadoopConfiguration(configuration); this.logger = LoggerFactory.getLogger(this.getClass()); - //GraphComputerHelper.configure(this, this.hadoopGraph.configuration()); + GraphComputerHelper.configure(this, this.configuration); } @Override public GraphComputer vertices(final Traversal<Vertex, Vertex> vertexFilter) { this.graphFilter.setVertexFilter(vertexFilter); + this.configuration.setProperty(VERTICES, vertexFilter); return this; } @Override public GraphComputer edges(final Traversal<Vertex, Edge> edgeFilter) { this.graphFilter.setEdgeFilter(edgeFilter); + this.configuration.setProperty(EDGES, edgeFilter); return this; } @Override public GraphComputer result(final ResultGraph resultGraph) { this.resultGraph = resultGraph; + this.configuration.setProperty(RESULT, resultGraph.name()); return this; } @Override public GraphComputer persist(final Persist persist) { this.persist = persist; + this.configuration.setProperty(PERSIST, persist.name()); return this; } @@ -119,16 +128,28 @@ public abstract class AbstractHadoopGraphComputer implements GraphComputer { @Override public GraphComputer workers(final int workers) { this.workers = workers; + this.configuration.setProperty(WORKERS, workers); return this; } @Override public Future<ComputerResult> submit(final Graph graph) { - this.hadoopGraph = (HadoopGraph) graph; + ConfigurationUtils.copy(graph.configuration(), this.configuration); return this.submit(); } @Override + public GraphComputer configure(final String key, final Object value) { + this.configuration.setProperty(key,value); + return this; + } + + @Override + public org.apache.commons.configuration.Configuration configuration() { + return this.configuration; + } + + @Override public String toString() { return StringFactory.graphComputerString(this); } @@ -239,8 +260,8 @@ public abstract class AbstractHadoopGraphComputer implements GraphComputer { @Override public boolean supportsResultGraphPersistCombination(final ResultGraph resultGraph, final Persist persist) { - if (hadoopGraph.configuration().containsKey(Constants.GREMLIN_HADOOP_GRAPH_WRITER)) { - final Object writer = ReflectionUtils.newInstance(hadoopGraph.configuration().getGraphWriter(), ConfUtil.makeHadoopConfiguration(hadoopGraph.configuration())); + if (configuration().containsKey(Constants.GREMLIN_HADOOP_GRAPH_WRITER)) { + final Object writer = ReflectionUtils.newInstance(configuration.getGraphWriter(), ConfUtil.makeHadoopConfiguration(configuration)); if (writer instanceof PersistResultGraphAware) return ((PersistResultGraphAware) writer).supportsResultGraphPersistCombination(resultGraph, persist); else { http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/843ad33d/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java index cbcdfe7..8e62c62 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java @@ -18,7 +18,6 @@ */ package org.apache.tinkerpop.gremlin.spark.process.computer; -import org.apache.commons.configuration.ConfigurationUtils; import org.apache.commons.configuration.FileConfiguration; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.lang3.concurrent.BasicThreadFactory; @@ -53,7 +52,6 @@ import org.apache.tinkerpop.gremlin.process.computer.MapReduce; import org.apache.tinkerpop.gremlin.process.computer.Memory; import org.apache.tinkerpop.gremlin.process.computer.VertexProgram; import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult; -import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper; import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory; import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies; import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException; @@ -73,7 +71,6 @@ import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorage; import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoRegistrator; import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoShimService; import org.apache.tinkerpop.gremlin.structure.Direction; -import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.io.IoRegistry; import org.apache.tinkerpop.gremlin.structure.io.Storage; import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader; @@ -94,7 +91,6 @@ import java.util.concurrent.ThreadFactory; */ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { - private final org.apache.commons.configuration.Configuration sparkConfiguration; private boolean workersSet = false; private final ThreadFactory threadFactoryBoss = new BasicThreadFactory.Builder().namingPattern(SparkGraphComputer.class.getSimpleName() + "-boss").build(); @@ -117,14 +113,17 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { public SparkGraphComputer(final HadoopGraph hadoopGraph) { super(hadoopGraph); - this.sparkConfiguration = new HadoopConfiguration(); + } + + private SparkGraphComputer(final org.apache.commons.configuration.Configuration configuration) { + super(configuration); } @Override public GraphComputer workers(final int workers) { super.workers(workers); - if (this.sparkConfiguration.containsKey(SparkLauncher.SPARK_MASTER) && this.sparkConfiguration.getString(SparkLauncher.SPARK_MASTER).startsWith("local")) { - this.sparkConfiguration.setProperty(SparkLauncher.SPARK_MASTER, "local[" + this.workers + "]"); + if (this.configuration.containsKey(SparkLauncher.SPARK_MASTER) && this.configuration.getString(SparkLauncher.SPARK_MASTER).startsWith("local")) { + this.configuration.setProperty(SparkLauncher.SPARK_MASTER, "local[" + this.workers + "]"); } this.workersSet = true; return this; @@ -132,7 +131,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { @Override public GraphComputer configure(final String key, final Object value) { - this.sparkConfiguration.setProperty(key, value); + this.configuration.setProperty(key, value); return this; } @@ -142,19 +141,8 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { return ComputerSubmissionHelper.runWithBackgroundThread(this::submitWithExecutor, "SparkSubmitter"); } - @Override - public Future<ComputerResult> submit(final Graph graph) { - ConfigurationUtils.copy(graph.configuration(), this.sparkConfiguration); - return this.submit(); - } - - @Override - public org.apache.commons.configuration.Configuration configuration() { - return new HadoopConfiguration(this.sparkConfiguration); - } - public static SparkGraphComputer open(final org.apache.commons.configuration.Configuration configuration) { - return new SparkGraphComputer(HadoopGraph.open(configuration)); + return new SparkGraphComputer(configuration); } private Future<ComputerResult> submitWithExecutor(Executor exec) { @@ -164,36 +152,33 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { ////////////////////////////////////////////////// /////// PROCESS SHIM AND SYSTEM PROPERTIES /////// ////////////////////////////////////////////////// - ConfigurationUtils.copy(this.hadoopGraph.configuration(), this.sparkConfiguration); - final String shimService = KryoSerializer.class.getCanonicalName().equals(this.sparkConfiguration.getString(Constants.SPARK_SERIALIZER, null)) ? + final org.apache.commons.configuration.Configuration graphComputerConfiguration = new HadoopConfiguration(this.configuration); + if (!graphComputerConfiguration.containsKey(Constants.SPARK_SERIALIZER)) { + graphComputerConfiguration.setProperty(Constants.SPARK_SERIALIZER, KryoSerializer.class.getCanonicalName()); + if (!graphComputerConfiguration.containsKey(Constants.SPARK_KRYO_REGISTRATOR)) + graphComputerConfiguration.setProperty(Constants.SPARK_KRYO_REGISTRATOR, GryoRegistrator.class.getCanonicalName()); + } + final String shimService = KryoSerializer.class.getCanonicalName().equals(graphComputerConfiguration.getString(Constants.SPARK_SERIALIZER, null)) ? UnshadedKryoShimService.class.getCanonicalName() : HadoopPoolShimService.class.getCanonicalName(); - this.sparkConfiguration.setProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, shimService); - /////////// + graphComputerConfiguration.setProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, shimService); final StringBuilder params = new StringBuilder(); - this.sparkConfiguration.getKeys().forEachRemaining(key -> { + graphComputerConfiguration.getKeys().forEachRemaining(key -> { if (KEYS_PASSED_IN_JVM_SYSTEM_PROPERTIES.contains(key)) { - params.append(" -D").append("tinkerpop.").append(key).append("=").append(this.sparkConfiguration.getProperty(key)); - System.setProperty("tinkerpop." + key, this.sparkConfiguration.getProperty(key).toString()); + params.append(" -D").append("tinkerpop.").append(key).append("=").append(graphComputerConfiguration.getProperty(key)); + System.setProperty("tinkerpop." + key, graphComputerConfiguration.getProperty(key).toString()); } }); if (params.length() > 0) { - this.sparkConfiguration.setProperty(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS, - (this.sparkConfiguration.getString(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS, "") + params.toString()).trim()); - this.sparkConfiguration.setProperty(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, - (this.sparkConfiguration.getString(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "") + params.toString()).trim()); + graphComputerConfiguration.setProperty(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS, + (graphComputerConfiguration.getString(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS, "") + params.toString()).trim()); + graphComputerConfiguration.setProperty(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, + (graphComputerConfiguration.getString(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "") + params.toString()).trim()); } - KryoShimServiceLoader.applyConfiguration(this.sparkConfiguration); + KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration); ////////////////////////////////////////////////// ////////////////////////////////////////////////// ////////////////////////////////////////////////// - // apache and hadoop configurations that are used throughout the graph computer computation - final org.apache.commons.configuration.Configuration graphComputerConfiguration = new HadoopConfiguration(this.sparkConfiguration); - if (!graphComputerConfiguration.containsKey(Constants.SPARK_SERIALIZER)) { - graphComputerConfiguration.setProperty(Constants.SPARK_SERIALIZER, KryoSerializer.class.getCanonicalName()); - if (!graphComputerConfiguration.containsKey(Constants.SPARK_KRYO_REGISTRATOR)) - graphComputerConfiguration.setProperty(Constants.SPARK_KRYO_REGISTRATOR, GryoRegistrator.class.getCanonicalName()); - } graphComputerConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES, this.persist.equals(GraphComputer.Persist.EDGES)); final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(graphComputerConfiguration); final Storage fileSystemStorage = FileSystemStorage.open(hadoopConfiguration);