Repository: crunch Updated Branches: refs/heads/master ebe306126 -> 8434dcaab
CRUNCH-435: Cleaner controls for configuring and initializing the SparkContext and runtime Configuration. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/8434dcaa Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/8434dcaa Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/8434dcaa Branch: refs/heads/master Commit: 8434dcaab5504e69c42b914551634b2a9ac838b2 Parents: ebe3061 Author: Josh Wills <[email protected]> Authored: Tue Jul 1 17:07:06 2014 -0700 Committer: Josh Wills <[email protected]> Committed: Sun Jul 6 17:57:00 2014 -0700 ---------------------------------------------------------------------- .../apache/crunch/impl/spark/SparkPipeline.java | 30 ++++++++++++++++---- .../crunch/impl/spark/SparkRuntimeContext.java | 8 ++---- 2 files changed, 28 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/8434dcaa/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java index 1076c33..7a69707 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java @@ -36,6 +36,7 @@ import org.apache.crunch.materialize.MaterializableIterable; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; import org.apache.hadoop.conf.Configuration; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.storage.StorageLevel; @@ -55,13 +56,17 @@ public class SparkPipeline extends DistributedPipeline { } public SparkPipeline(String sparkConnect, String appName, Class<?> jarClass) { - super(appName, new Configuration(), new SparkCollectFactory()); + this(sparkConnect, appName, jarClass, new Configuration()); + } + + public SparkPipeline(String sparkConnect, String appName, Class<?> jarClass, Configuration conf) { + super(appName, conf, new SparkCollectFactory()); this.sparkConnect = Preconditions.checkNotNull(sparkConnect); this.jarClass = jarClass; } public SparkPipeline(JavaSparkContext sparkContext, String appName) { - super(appName, new Configuration(), new SparkCollectFactory()); + super(appName, sparkContext.hadoopConfiguration(), new SparkCollectFactory()); this.sparkContext = Preconditions.checkNotNull(sparkContext); this.sparkConnect = sparkContext.getSparkHome().orNull(); } @@ -120,8 +125,16 @@ public class SparkPipeline extends DistributedPipeline { outputTargetsToMaterialize.remove(c); } } + + Configuration conf = getConfiguration(); if (sparkContext == null) { - this.sparkContext = new JavaSparkContext(sparkConnect, getName()); + SparkConf sparkConf = new SparkConf(); + for (Map.Entry<String, String> e : conf) { + if (e.getKey().startsWith("spark.")) { + sparkConf.set(e.getKey(), e.getValue()); + } + } + this.sparkContext = new JavaSparkContext(sparkConnect, getName(), sparkConf); if (jarClass != null) { String[] jars = JavaSparkContext.jarOfClass(jarClass); if (jars != null && jars.length > 0) { @@ -131,8 +144,9 @@ public class SparkPipeline extends DistributedPipeline { } } } - SparkRuntime runtime = new SparkRuntime(this, sparkContext, getConfiguration(), outputTargets, toMaterialize, - cachedCollections); + + copyConfiguration(conf, sparkContext.hadoopConfiguration()); + SparkRuntime runtime = new SparkRuntime(this, sparkContext, conf, outputTargets, toMaterialize, cachedCollections); runtime.execute(); outputTargets.clear(); return runtime; @@ -147,4 +161,10 @@ public class SparkPipeline extends DistributedPipeline { } return res; } + + private static void copyConfiguration(Configuration from, Configuration to) { + for (Map.Entry<String, String> e : from) { + to.set(e.getKey(), e.getValue()); + } + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/8434dcaa/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java index cea317c..ca52c29 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java @@ -20,6 +20,7 @@ package org.apache.crunch.impl.spark; import com.google.common.base.Joiner; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.io.ByteStreams; import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.DoFn; import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.TaskInputOutputContextFactory; @@ -34,8 +35,6 @@ import org.apache.spark.Accumulator; import org.apache.spark.SparkFiles; import org.apache.spark.broadcast.Broadcast; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; import java.io.File; import java.io.IOException; import java.io.Serializable; @@ -59,6 +58,7 @@ public class SparkRuntimeContext implements Serializable { public void setConf(Broadcast<byte[]> broadConf) { this.broadConf = broadConf; + this.conf = null; } public void initialize(DoFn<?, ?> fn) { @@ -94,9 +94,7 @@ public class SparkRuntimeContext implements Serializable { if (conf == null) { conf = new Configuration(); try { - ByteArrayInputStream bais = new ByteArrayInputStream(broadConf.value()); - conf.readFields(new DataInputStream(bais)); - bais.close(); + conf.readFields(ByteStreams.newDataInput(broadConf.value())); } catch (Exception e) { throw new RuntimeException("Error reading broadcast configuration", e); }
