Updated Branches: refs/heads/master a6df0ccac -> b8ae33ddb
CRUNCH-217: Ensure PipelineResult captures pipeline failures. Contributed by Joe Adler. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/b8ae33dd Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/b8ae33dd Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/b8ae33dd Branch: refs/heads/master Commit: b8ae33ddb7eae55626e718f5f2902565a2104e8c Parents: a6df0cc Author: Josh Wills <[email protected]> Authored: Mon Jun 10 09:32:32 2013 -0700 Committer: Josh Wills <[email protected]> Committed: Mon Jun 10 09:32:32 2013 -0700 ---------------------------------------------------------------------- .../java/org/apache/crunch/PipelineResult.java | 15 +++++++----- .../org/apache/crunch/impl/mem/MemPipeline.java | 24 +++++++++---------- .../apache/crunch/impl/mr/exec/MRExecutor.java | 25 ++++++++++---------- 3 files changed, 34 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/b8ae33dd/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java b/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java index 90b1067..71a05e2 100644 --- a/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java +++ b/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java @@ -17,12 +17,11 @@ */ package org.apache.crunch; -import java.util.List; - +import com.google.common.collect.ImmutableList; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Counters; -import com.google.common.collect.ImmutableList; +import java.util.List; /** * Container for the results of a call to {@code run} or {@code done} on the @@ -58,16 +57,20 @@ public class PipelineResult { } } - public static final PipelineResult EMPTY = new PipelineResult(ImmutableList.<StageResult> of()); + public static final PipelineResult EMPTY = new PipelineResult(ImmutableList.<StageResult> of(), PipelineExecution.Status.READY); private final List<StageResult> stageResults; - public PipelineResult(List<StageResult> stageResults) { + public PipelineExecution.Status status; + + public PipelineResult(List<StageResult> stageResults, PipelineExecution.Status status) { this.stageResults = ImmutableList.copyOf(stageResults); + this.status = status; } public boolean succeeded() { - return !stageResults.isEmpty(); + // return !stageResults.isEmpty(); + return this.status.equals(PipelineExecution.Status.SUCCEEDED); } public List<StageResult> getStageResults() { http://git-wip-us.apache.org/repos/asf/crunch/blob/b8ae33dd/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java index 80b0543..e2a2529 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java @@ -17,11 +17,9 @@ */ package org.apache.crunch.impl.mem; -import java.io.IOException; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; - +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.apache.avro.Schema; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericContainer; @@ -38,7 +36,6 @@ import org.apache.crunch.PipelineResult; import org.apache.crunch.Source; import org.apache.crunch.TableSource; import org.apache.crunch.Target; -import org.apache.crunch.Target.WriteMode; import org.apache.crunch.impl.mem.collect.MemCollection; import org.apache.crunch.impl.mem.collect.MemTable; import org.apache.crunch.io.At; @@ -54,9 +51,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Counters; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; public class MemPipeline implements Pipeline { @@ -175,7 +173,7 @@ public class MemPipeline implements Pipeline { public void write(PCollection<?> collection, Target target, Target.WriteMode writeMode) { target.handleExisting(writeMode, getConfiguration()); - if (writeMode != WriteMode.APPEND && activeTargets.contains(target)) { + if (writeMode != Target.WriteMode.APPEND && activeTargets.contains(target)) { throw new CrunchRuntimeException("Target " + target + " is already written in the current run." + " Use WriteMode.APPEND in order to write additional data to it."); } @@ -277,7 +275,8 @@ public class MemPipeline implements Pipeline { @Override public PipelineResult getResult() { - return new PipelineResult(ImmutableList.of(new PipelineResult.StageResult("MemPipelineStage", COUNTERS))); + return new PipelineResult(ImmutableList.of(new PipelineResult.StageResult("MemPipelineStage", COUNTERS)), + Status.SUCCEEDED); } @Override @@ -289,7 +288,8 @@ public class MemPipeline implements Pipeline { @Override public PipelineResult run() { activeTargets.clear(); - return new PipelineResult(ImmutableList.of(new PipelineResult.StageResult("MemPipelineStage", COUNTERS))); + return new PipelineResult(ImmutableList.of(new PipelineResult.StageResult("MemPipelineStage", COUNTERS)), + PipelineExecution.Status.SUCCEEDED); } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/b8ae33dd/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java index 9318271..1e03ff2 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java @@ -17,14 +17,8 @@ */ package org.apache.crunch.impl.mr.exec; -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - +import com.google.common.base.Function; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.crunch.PipelineResult; @@ -38,8 +32,13 @@ import org.apache.crunch.impl.mr.collect.PCollectionImpl; import org.apache.crunch.materialize.MaterializableIterable; import org.apache.hadoop.conf.Configuration; -import com.google.common.base.Function; -import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; /** * Provides APIs for job control at runtime to clients. @@ -143,12 +142,14 @@ public class MRExecutor implements MRPipelineExecution { } synchronized (this) { - result = new PipelineResult(stages); if (killSignal.getCount() == 0) { status.set(Status.KILLED); + } else if (!failures.isEmpty()) { + status.set(Status.FAILED); } else { - status.set(result.succeeded() ? Status.SUCCEEDED : Status.FAILED); + status.set(Status.SUCCEEDED); } + result = new PipelineResult(stages, status.get()); } } catch (InterruptedException e) { throw new AssertionError(e); // Nobody should interrupt us.
