mr-runner: fail early in the runner when MapReduce job fails.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/989d7d8e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/989d7d8e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/989d7d8e Branch: refs/heads/mr-runner Commit: 989d7d8e4eabe6aafd2008eed16f533ec75a43d1 Parents: 9f312c5 Author: Pei He <p...@apache.org> Authored: Fri Sep 1 13:04:54 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Fri Sep 1 17:13:52 2017 +0800 ---------------------------------------------------------------------- .../java/org/apache/beam/runners/mapreduce/MapReduceRunner.java | 4 ++++ 1 file changed, 4 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/989d7d8e/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java index 71edf1a..8198848 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java @@ -35,6 +35,7 @@ import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobStatus; import org.apache.log4j.BasicConfigurator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,6 +96,9 @@ public class MapReduceRunner extends PipelineRunner<PipelineResult> { try { Job job = jobPrototype.build(options.getJarClass(), config); job.waitForCompletion(true); + if (!job.getStatus().getState().equals(JobStatus.State.SUCCEEDED)) { + throw new RuntimeException("MapReduce job failed: " + job.getJobID()); + } jobs.add(job); } catch (Exception e) { Throwables.throwIfUnchecked(e);