mr-runner: fail early in the runner when MapReduce job fails.

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/989d7d8e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/989d7d8e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/989d7d8e

Branch: refs/heads/mr-runner
Commit: 989d7d8e4eabe6aafd2008eed16f533ec75a43d1
Parents: 9f312c5
Author: Pei He <p...@apache.org>
Authored: Fri Sep 1 13:04:54 2017 +0800
Committer: Pei He <p...@apache.org>
Committed: Fri Sep 1 17:13:52 2017 +0800

----------------------------------------------------------------------
 .../java/org/apache/beam/runners/mapreduce/MapReduceRunner.java  | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/989d7d8e/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
index 71edf1a..8198848 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.log4j.BasicConfigurator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -95,6 +96,9 @@ public class MapReduceRunner extends 
PipelineRunner<PipelineResult> {
       try {
         Job job = jobPrototype.build(options.getJarClass(), config);
         job.waitForCompletion(true);
+        if (!job.getStatus().getState().equals(JobStatus.State.SUCCEEDED)) {
+          throw new RuntimeException("MapReduce job failed: " + 
job.getJobID());
+        }
         jobs.add(job);
       } catch (Exception e) {
         Throwables.throwIfUnchecked(e);

Reply via email to