Author: omalley Date: Tue Aug 14 15:14:06 2007 New Revision: 565946 URL: http://svn.apache.org/viewvc?view=rev&rev=565946 Log: HADOOP-1663. Fix streaming to return a non-zero exit code when it fails. Contributed by Lohit Renu.
Added: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=565946&r1=565945&r2=565946 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Tue Aug 14 15:14:06 2007 @@ -529,6 +529,8 @@ 155. HADOOP-1714. Fix TestDFSUpgradeFromImage to work on Windows. (Raghu Angadi via nigel) +156. HADOOP-1663. Return a non-zero exit code if streaming fails. (Lohit Renu + via omalley) Release 0.13.0 - 2007-06-08 Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java?view=diff&rev=565946&r1=565945&r2=565946 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java Tue Aug 14 15:14:06 2007 @@ -28,7 +28,12 @@ public static void main(String[] args) throws IOException { boolean mayExit = true; + int returnStatus = 0; StreamJob job = new StreamJob(args, mayExit); - job.go(); + returnStatus = job.go(); + if (returnStatus != 0) { + System.err.println("Streaming Job Failed!"); + System.exit(returnStatus); + } } } Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?view=diff&rev=565946&r1=565945&r2=565946 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Tue Aug 14 15:14:06 2007 @@ -98,7 +98,7 @@ * to the jobtracker * @throws IOException */ - public void go() throws IOException { + public int go() throws IOException { init(); preProcessArgs(); @@ -106,7 +106,7 @@ postProcessArgs(); setJobConf(); - submitAndMonitorJob(); + return submitAndMonitorJob(); } protected void init() { @@ -868,7 +868,7 @@ } // Based on JobClient - public void submitAndMonitorJob() throws IOException { + public int submitAndMonitorJob() throws IOException { if (jar_ != null && isLocalHadoop()) { // getAbs became required when shell and subvm have different working dirs... @@ -906,28 +906,33 @@ } if (!running_.isSuccessful()) { jobInfo(); - throw new IOException("Job not Successful!"); + LOG.error("Job not Successful!"); + return 1; } LOG.info("Job complete: " + jobId_); LOG.info("Output: " + output_); error = false; - } catch(FileNotFoundException fe){ + } catch(FileNotFoundException fe) { LOG.error("Error launching job , bad input path : " + fe.getMessage()); - }catch(InvalidJobConfException je){ + return 2; + } catch(InvalidJobConfException je) { LOG.error("Error launching job , Invalid job conf : " + je.getMessage()); - }catch(FileAlreadyExistsException fae){ + return 3; + } catch(FileAlreadyExistsException fae) { LOG.error("Error launching job , Output path already exists : " + fae.getMessage()); - }catch(IOException ioe){ + return 4; + } catch(IOException ioe) { LOG.error("Error Launching job : " + ioe.getMessage()); - } - finally { + return 5; + } finally { if (error && (running_ != null)) { LOG.info("killJob..."); running_.killJob(); } jc_.close(); } + return 0; } /** Support -jobconf x=y x1=y1 type options **/ class MultiPropertyOption extends PropertyOption{ Added: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java?view=auto&rev=565946 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java (added) +++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingFailure.java Tue Aug 14 15:14:06 2007 @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.streaming; + +import junit.framework.TestCase; +import java.io.*; +import java.util.*; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +/** + * This class tests if hadoopStreaming returns Exception + * on failure when submitted an invalid/failed job + * The test case provides an invalid input file for map/reduce job as + * a unit test case + */ +public class TestStreamingFailure extends TestStreaming +{ + + protected File INVALID_INPUT_FILE;// = new File("invalid_input.txt"); + private StreamJob job; + + public TestStreamingFailure() throws IOException + { + INVALID_INPUT_FILE = new File("invalid_input.txt"); + } + + protected String[] genArgs() { + return new String[] { + "-input", INVALID_INPUT_FILE.getAbsolutePath(), + "-output", OUTPUT_DIR.getAbsolutePath(), + "-mapper", map, + "-combiner", combine, + "-reducer", reduce, + //"-verbose", + //"-jobconf", "stream.debug=set" + "-jobconf", "keep.failed.task.files=true", + "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp") + }; + } + + public void testCommandLine() + { + try { + try { + OUTPUT_DIR.getAbsoluteFile().delete(); + } catch (Exception e) { + } + + boolean mayExit = false; + int returnStatus = 0; + + // During tests, the default Configuration will use a local mapred + // So don't specify -config or -cluster + job = new StreamJob(genArgs(), mayExit); + returnStatus = job.go(); + assertEquals("Streaming Job Failure code expected", 5, returnStatus); + } catch(Exception e) { + // Expecting an exception + } finally { + OUTPUT_DIR.getAbsoluteFile().delete(); + } + } + + public static void main(String[]args) throws Exception + { + new TestStreamingFailure().testCommandLine(); + } +}