Repository: crunch Updated Branches: refs/heads/master 25c22e58d -> f2cf619ca
CRUNCH-394: Ensure Broadcast variable w/serialized Configuration is never null in SparkRuntimeContext Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/f2cf619c Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/f2cf619c Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/f2cf619c Branch: refs/heads/master Commit: f2cf619ca35307f39b86fc348aba8ff633ee0c5c Parents: 25c22e5 Author: Josh Wills <[email protected]> Authored: Wed May 14 17:11:56 2014 -0700 Committer: Josh Wills <[email protected]> Committed: Wed May 14 17:11:56 2014 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/crunch/impl/spark/SparkRuntime.java | 2 +- .../java/org/apache/crunch/impl/spark/SparkRuntimeContext.java | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/f2cf619c/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java index 2cb2fb3..22375ee 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java @@ -110,7 +110,7 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe this.conf = conf; this.counters = sparkContext.accumulator(Maps.<String, Map<String, Long>>newHashMap(), new CounterAccumulatorParam()); - this.ctxt = new SparkRuntimeContext(counters); + this.ctxt = new SparkRuntimeContext(counters, sparkContext.broadcast(WritableUtils.toByteArray(conf))); this.outputTargets = Maps.newTreeMap(DEPTH_COMPARATOR); this.outputTargets.putAll(outputTargets); this.toMaterialize = toMaterialize; http://git-wip-us.apache.org/repos/asf/crunch/blob/f2cf619c/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java index 102ad4a..cea317c 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java @@ -50,8 +50,11 @@ public class SparkRuntimeContext implements Serializable { private transient Configuration conf; private transient TaskInputOutputContext context; - public SparkRuntimeContext(Accumulator<Map<String, Map<String, Long>>> counters) { + public SparkRuntimeContext( + Accumulator<Map<String, Map<String, Long>>> counters, + Broadcast<byte[]> broadConf) { this.counters = counters; + this.broadConf = broadConf; } public void setConf(Broadcast<byte[]> broadConf) {
