Repository: crunch Updated Branches: refs/heads/apache-crunch-0.8 ad2532703 -> 9ff4f850b
CRUNCH-388: Fix memory and spark issues discovered testing w/Oryx Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/9ff4f850 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/9ff4f850 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/9ff4f850 Branch: refs/heads/apache-crunch-0.8 Commit: 9ff4f850be25c4f02671eda341cf904a32eea118 Parents: ad25327 Author: Josh Wills <[email protected]> Authored: Sun May 4 22:32:11 2014 -0700 Committer: Josh Wills <[email protected]> Committed: Mon May 5 08:22:59 2014 -0700 ---------------------------------------------------------------------- .../apache/crunch/impl/mem/collect/MemCollection.java | 4 +++- .../org/apache/crunch/impl/spark/SparkPipeline.java | 14 +++++++++++--- .../org/apache/crunch/impl/spark/SparkRuntime.java | 2 +- 3 files changed, 15 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/9ff4f850/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java index 8e509bc..240de1c 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java @@ -297,7 +297,7 @@ public class MemCollection<S> implements PCollection<S> { } final Set<String> handledMethods = ImmutableSet.of("getConfiguration", "getCounter", - "progress", "getTaskAttemptID"); + "progress", "getNumReduceTasks", "getTaskAttemptID"); factory.setFilter(new MethodFilter() { @Override public boolean isHandled(Method m) { @@ -315,6 +315,8 @@ public class MemCollection<S> implements PCollection<S> { return null; } else if ("getTaskAttemptID".equals(name)) { return taskAttemptId; + } else if ("getNumReduceTasks".equals(name)) { + return 1; } else if ("getCounter".equals(name)){ // getCounter if (args.length == 1) { return MemPipeline.getCounters().findCounter((Enum<?>) args[0]); http://git-wip-us.apache.org/repos/asf/crunch/blob/9ff4f850/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java index 05e6e0c..1076c33 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java @@ -19,6 +19,8 @@ package org.apache.crunch.impl.spark; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.crunch.CachingOptions; import org.apache.crunch.PCollection; import org.apache.crunch.PTable; @@ -41,6 +43,8 @@ import java.util.Map; public class SparkPipeline extends DistributedPipeline { + private static final Log LOG = LogFactory.getLog(SparkPipeline.class); + private final String sparkConnect; private JavaSparkContext sparkContext; private Class<?> jarClass; @@ -102,8 +106,7 @@ public class SparkPipeline extends DistributedPipeline { exec.waitUntilDone(); return exec.getResult(); } catch (Exception e) { - // TODO: How to handle this without changing signature? - // LOG.error("Exception running pipeline", e); + LOG.error("Exception running pipeline", e); return PipelineResult.EMPTY; } } @@ -120,7 +123,12 @@ public class SparkPipeline extends DistributedPipeline { if (sparkContext == null) { this.sparkContext = new JavaSparkContext(sparkConnect, getName()); if (jarClass != null) { - sparkContext.addJar(JavaSparkContext.jarOfClass(jarClass)[0]); + String[] jars = JavaSparkContext.jarOfClass(jarClass); + if (jars != null && jars.length > 0) { + for (String jar : jars) { + sparkContext.addJar(jar); + } + } } } SparkRuntime runtime = new SparkRuntime(this, sparkContext, getConfiguration(), outputTargets, toMaterialize, http://git-wip-us.apache.org/repos/asf/crunch/blob/9ff4f850/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java index 2016c50..2cb2fb3 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java @@ -264,7 +264,7 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe throw new IllegalArgumentException("Spark execution cannot handle non-MapReduceTarget: " + t); } } catch (Exception et) { - et.printStackTrace(); + LOG.error("Spark Exception", et); status.set(Status.FAILED); set(PipelineResult.EMPTY); doneSignal.countDown();
