Author: cutting Date: Tue Feb 13 11:11:46 2007 New Revision: 507163 URL: http://svn.apache.org/viewvc?view=rev&rev=507163 Log: HADOOP-476. Rewrite contrib/streaming command-line processing, improving parameter validation. Contributed by Sanjay.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=507163&r1=507162&r2=507163 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Tue Feb 13 11:11:46 2007 @@ -46,6 +46,9 @@ separate thread, to improve heartbeat processing time. (Dhruba Borthakur via cutting) +14. HADOOP-476. Rewrite contrib/streaming command-line processing, + improving parameter validation. (Sanjay Dahiya via cutting) + Release 0.11.1 - 2007-02-09 Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?view=diff&rev=507163&r1=507162&r2=507163 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Tue Feb 13 11:11:46 2007 @@ -26,16 +26,33 @@ import java.net.URLEncoder; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; +import java.util.List; +import java.util.ListIterator; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; +import org.apache.commons.cli2.*; +import org.apache.commons.cli2.builder.ArgumentBuilder; +import org.apache.commons.cli2.builder.DefaultOptionBuilder; +import org.apache.commons.cli2.builder.GroupBuilder; +import org.apache.commons.cli2.commandline.Parser; +import org.apache.commons.cli2.option.PropertyOption; +import org.apache.commons.cli2.resource.ResourceConstants; +import org.apache.commons.cli2.util.HelpFormatter; +import org.apache.commons.cli2.validation.FileValidator; +import org.apache.commons.cli2.validation.InvalidArgumentException; +import org.apache.commons.cli2.validation.Validator; import org.apache.commons.logging.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileAlreadyExistsException; @@ -45,6 +62,7 @@ import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.filecache.*; import org.apache.hadoop.util.*; +import org.apache.log4j.helpers.OptionConverter; /** All the client-side work happens here. * (Jar packaging, MapRed job submission and monitoring) * @author Michel Tourn @@ -55,7 +73,22 @@ final static String REDUCE_NONE = "NONE"; private boolean reducerNone_; + /** -----------Streaming CLI Implementation **/ + private DefaultOptionBuilder builder = + new DefaultOptionBuilder("-","-", false); + private ArgumentBuilder argBuilder = new ArgumentBuilder(); + private Parser parser = new Parser(); + private Group allOptions ; + HelpFormatter helpFormatter = new HelpFormatter(" ", " ", " ", 900); + // need these two at class level to extract values later from + // commons-cli command line + private MultiPropertyOption jobconf = new MultiPropertyOption( + "-jobconf", "(n=v) Optional. Add or override a JobConf property.", 'D'); + private MultiPropertyOption cmdenv = new MultiPropertyOption( + "-cmdenv", "(n=v) Pass env.var to streaming commands.", 'E'); + public StreamJob(String[] argv, boolean mayExit) { + setupOptions(); argv_ = argv; mayExit_ = mayExit; } @@ -119,15 +152,6 @@ redCmd_ = unqualifyIfLocalPath(redCmd_); } - String[] parseNameEqValue(String neqv) { - String[] nv = neqv.split("=", 2); - if (nv.length < 2) { - fail("Invalid name=value spec: " + neqv); - } - msg("Recording name=value: name=" + nv[0] + " value=" + nv[1]); - return nv; - } - String unqualifyIfLocalPath(String cmd) throws IOException { if (cmd == null) { // @@ -168,109 +192,85 @@ return new File(getHadoopClientHome() + "/conf", hadoopAliasConf_).getAbsolutePath(); } - /** - * This method parses the command line args - * to a hadoop streaming job - */ - void parseArgv() { - if (argv_.length == 0) { - exitUsage(false); - } - int i = 0; - while (i < argv_.length) { - String s; - if (argv_[i].equals("-verbose")) { - verbose_ = true; - } else if (argv_[i].equals("-info")) { - detailedUsage_ = true; - } else if (argv_[i].equals("-debug")) { - debug_++; - } else if ((s = optionArg(argv_, i, "-input", false)) != null) { - i++; - inputSpecs_.add(s); - } else if (argv_[i].equals("-inputtagged")) { - inputTagged_ = true; - } else if ((s = optionArg(argv_, i, "-output", output_ != null)) != null) { - i++; - output_ = s; - } else if ((s = optionArg(argv_, i, "-mapsideoutput", mapsideoutURI_ != null)) != null) { - i++; - mapsideoutURI_ = s; - } else if ((s = optionArg(argv_, i, "-mapper", mapCmd_ != null)) != null) { - i++; - mapCmd_ = s; - } else if ((s = optionArg(argv_, i, "-combiner", comCmd_ != null)) != null) { - i++; - comCmd_ = s; - } else if ((s = optionArg(argv_, i, "-reducer", redCmd_ != null)) != null) { - i++; - redCmd_ = s; - } else if ((s = optionArg(argv_, i, "-file", false)) != null) { - i++; - packageFiles_.add(s); - } else if ((s = optionArg(argv_, i, "-cluster", cluster_ != null)) != null) { - i++; - cluster_ = s; - } else if ((s = optionArg(argv_, i, "-config", false)) != null) { - i++; - configPath_.add(s); - } else if ((s = optionArg(argv_, i, "-dfs", false)) != null) { - i++; - userJobConfProps_.put("fs.default.name", s); - } else if ((s = optionArg(argv_, i, "-jt", false)) != null) { - i++; - userJobConfProps_.put("mapred.job.tracker", s); - } else if ((s = optionArg(argv_, i, "-jobconf", false)) != null) { - i++; - String[] nv = parseNameEqValue(s); - userJobConfProps_.put(nv[0], nv[1]); - } else if ((s = optionArg(argv_, i, "-cmdenv", false)) != null) { - i++; - parseNameEqValue(s); - if (addTaskEnvironment_.length() > 0) { - addTaskEnvironment_ += " "; - } - addTaskEnvironment_ += s; - } else if ((s = optionArg(argv_, i, "-inputreader", inReaderSpec_ != null)) != null) { - i++; - inReaderSpec_ = s; - } else if((s = optionArg(argv_, i, "-cacheArchive", false)) != null) { - i++; - if (cacheArchives == null) - cacheArchives = s; - else - cacheArchives = cacheArchives + "," + s; - } else if((s = optionArg(argv_, i, "-cacheFile", false)) != null) { - i++; - System.out.println(" the val of s is " + s); - if (cacheFiles == null) - cacheFiles = s; - else - cacheFiles = cacheFiles + "," + s; - System.out.println(" the val of cachefiles is " + cacheFiles); - } - else { - System.err.println("Unexpected argument: " + argv_[i]); + void parseArgv(){ + CommandLine cmdLine = null ; + try{ + cmdLine = parser.parse(argv_); + }catch(Exception oe){ + LOG.error(oe.getMessage()); + if (detailedUsage_) { + exitUsage(true); + } else { exitUsage(false); } - i++; - } - if (detailedUsage_) { - exitUsage(true); } - } + + if( cmdLine != null ){ + verbose_ = cmdLine.hasOption("-verbose") ; + detailedUsage_ = cmdLine.hasOption("-info") ; + debug_ = cmdLine.hasOption("-debug")? debug_ + 1 : debug_ ; + inputTagged_ = cmdLine.hasOption("-inputtagged"); + + inputSpecs_.addAll(cmdLine.getValues("-input")); + output_ = (String) cmdLine.getValue("-output"); + mapsideoutURI_ = (String) cmdLine.getValue("-mapsideoutput"); + + mapCmd_ = (String)cmdLine.getValue("-mapper"); + comCmd_ = (String)cmdLine.getValue("-combiner"); + redCmd_ = (String)cmdLine.getValue("-reducer"); + + packageFiles_.addAll(cmdLine.getValues("-file")); + + cluster_ = (String)cmdLine.getValue("-cluster"); + + configPath_.addAll(cmdLine.getValues("-config")); + + String fsName = (String)cmdLine.getValue("-dfs"); + if( null != fsName ){ + userJobConfProps_.put("fs.default.name", fsName); + } + + String jt = (String)cmdLine.getValue("mapred.job.tracker"); + if( null != jt ){ + userJobConfProps_.put("fs.default.name", jt); + } + + inReaderSpec_ = (String)cmdLine.getValue("-inputreader"); + + List<String> car = cmdLine.getValues("-cacheArchive"); + if( null != car ){ + for( String s : car ){ + cacheArchives = (cacheArchives == null)?s :cacheArchives + "," + s; + } + } - String optionArg(String[] args, int index, String arg, boolean argSet) { - if (index >= args.length || !args[index].equals(arg)) { - return null; - } - if (argSet) { - throw new IllegalArgumentException("Can only have one " + arg + " option"); - } - if (index >= args.length - 1) { - throw new IllegalArgumentException("Expected argument after option " + args[index]); + List<String> caf = cmdLine.getValues("-cacheFile"); + if( null != caf ){ + for( String s : caf ){ + cacheFiles = (cacheFiles == null)?s :cacheFiles + "," + s; + } + } + + List<String> jobConfArgs = (List<String>)cmdLine.getValue(jobconf); + List<String> envArgs = (List<String>)cmdLine.getValue(cmdenv); + + if( null != jobConfArgs ){ + for( String s : jobConfArgs){ + String []parts = s.split("="); + userJobConfProps_.put(parts[0], parts[1]); + } + } + if( null != envArgs ){ + for( String s : envArgs ){ + if (addTaskEnvironment_.length() > 0) { + addTaskEnvironment_ += " "; + } + addTaskEnvironment_ += s; + } + } + }else if (detailedUsage_) { + exitUsage(true); } - return args[index + 1]; } protected void msg(String msg) { @@ -278,32 +278,173 @@ System.out.println("STREAM: " + msg); } } + + private Option createOption(String name, String desc, + String argName, int max, boolean required){ + Argument argument = argBuilder. + withName(argName). + withMinimum(1). + withMaximum(max). + create(); + return builder. + withLongName(name). + withArgument(argument). + withDescription(desc). + withRequired(required). + create(); + } + + private Option createOption(String name, String desc, + String argName, int max, boolean required, Validator validator){ + + Argument argument = argBuilder. + withName(argName). + withMinimum(1). + withMaximum(max). + withValidator(validator). + create() ; + + return builder. + withLongName(name). + withArgument(argument). + withDescription(desc). + withRequired(required). + create(); + } + + private Option createBoolOption(String name, String desc){ + return builder.withLongName(name).withDescription(desc).create(); + } + + private void setupOptions(){ + + final Validator fileValidator = new Validator(){ + public void validate(final List values) throws InvalidArgumentException { + // Note : This code doesnt belong here, it should be changed to + // an can exec check in java 6 + for (String file : (List<String>)values) { + File f = new File(file); + if ( ! f.exists() ) { + throw new InvalidArgumentException("Argument : " + + f.getAbsolutePath() + " doesn't exist."); + } + if ( ! f.isFile() ) { + throw new InvalidArgumentException("Argument : " + + f.getAbsolutePath() + " is not a file."); + } + if ( ! f.canRead() ) { + throw new InvalidArgumentException("Argument : " + + f.getAbsolutePath() + " is not accessible"); + } + } + } + }; + + // Note: not extending CLI2's FileValidator, that overwrites + // the String arg into File and causes ClassCastException + // in inheritance tree. + final Validator execValidator = new Validator(){ + public void validate(final List values) throws InvalidArgumentException { + // Note : This code doesnt belong here, it should be changed to + // an can exec check in java 6 + for (String file : (List<String>)values) { + try{ + Runtime.getRuntime().exec("chmod 0777 " + (new File(file)).getAbsolutePath()); + }catch(IOException ioe){ + // ignore + } + } + fileValidator.validate(values); + } + }; + + Option input = createOption("input", + "DFS input file(s) for the Map step", + "path", + Integer.MAX_VALUE, + true); + + Option output = createOption("output", + "DFS output directory for the Reduce step", + "path", 1, true); + Option mapper = createOption("mapper", + "The streaming command to run", "cmd", 1, true); + Option combiner = createOption("combiner", + "The streaming command to run", "cmd",1, false); + // reducer could be NONE + Option reducer = createOption("reducer", + "The streaming command to run", "cmd", 1, true); + Option file = createOption("file", + "File/dir to be shipped in the Job jar file", + "file", Integer.MAX_VALUE, false, execValidator); + Option dfs = createOption("dfs", + "Optional. Override DFS configuration", "<h:p>|local", 1, false); + Option jt = createOption("jt", + "Optional. Override JobTracker configuration", "<h:p>|local",1, false); + Option inputreader = createOption("inputreader", + "Optional.", "spec",1, false ); + Option cacheFile = createOption("cacheFile", + "File name URI", "fileNameURI", 1, false); + Option cacheArchive = createOption("cacheArchive", + "File name URI", "fileNameURI",1, false); + + // boolean properties + + Option verbose = createBoolOption("verbose", "print verbose output"); + Option info = createBoolOption("info", "print verbose output"); + Option help = createBoolOption("help", "print this help message"); + Option debug = createBoolOption("debug", "print debug output"); + Option inputtagged = createBoolOption("inputtagged", "inputtagged"); + + allOptions = new GroupBuilder(). + withOption(input). + withOption(output). + withOption(mapper). + withOption(combiner). + withOption(reducer). + withOption(file). + withOption(dfs). + withOption(jt). + withOption(inputreader). + withOption(jobconf). + withOption(cmdenv). + withOption(cacheFile). + withOption(cacheArchive). + withOption(verbose). + withOption(info). + withOption(debug). + withOption(inputtagged). + withOption(help). + create(); + parser.setGroup(allOptions); + + } public void exitUsage(boolean detailed) { // 1 2 3 4 5 6 7 //1234567890123456789012345678901234567890123456789012345678901234567890123456789 - System.out.println("Usage: $HADOOP_HOME/bin/hadoop [--config dir] jar \\"); - System.out.println(" $HADOOP_HOME/hadoop-streaming.jar [options]"); - System.out.println("Options:"); - System.out.println(" -input <path> DFS input file(s) for the Map step"); - System.out.println(" -output <path> DFS output directory for the Reduce step"); - System.out.println(" -mapper <cmd> The streaming command to run"); - System.out.println(" -combiner <cmd> The streaming command to run"); - System.out.println(" -reducer <cmd> The streaming command to run"); - System.out.println(" -file <file> File/dir to be shipped in the Job jar file"); - //Only advertise the standard way: [--config dir] in our launcher - //System.out.println(" -cluster <name> Default uses hadoop-default.xml and hadoop-site.xml"); - //System.out.println(" -config <file> Optional. One or more paths to xml config files"); - System.out.println(" -dfs <h:p>|local Optional. Override DFS configuration"); - System.out.println(" -jt <h:p>|local Optional. Override JobTracker configuration"); - System.out.println(" -inputreader <spec> Optional."); - System.out.println(" -jobconf <n>=<v> Optional. Add or override a JobConf property"); - System.out.println(" -cmdenv <n>=<v> Optional. Pass env.var to streaming commands"); - System.out.println(" -cacheFile fileNameURI"); - System.out.println(" -cacheArchive fileNameURI"); - System.out.println(" -verbose"); - System.out.println(); if (!detailed) { + System.out.println("Usage: $HADOOP_HOME/bin/hadoop [--config dir] jar \\"); + System.out.println(" $HADOOP_HOME/hadoop-streaming.jar [options]"); + System.out.println("Options:"); + System.out.println(" -input <path> DFS input file(s) for the Map step"); + System.out.println(" -output <path> DFS output directory for the Reduce step"); + System.out.println(" -mapper <cmd> The streaming command to run"); + System.out.println(" -combiner <cmd> The streaming command to run"); + System.out.println(" -reducer <cmd> The streaming command to run"); + System.out.println(" -file <file> File/dir to be shipped in the Job jar file"); + //Only advertise the standard way: [--config dir] in our launcher + //System.out.println(" -cluster <name> Default uses hadoop-default.xml and hadoop-site.xml"); + //System.out.println(" -config <file> Optional. One or more paths to xml config files"); + System.out.println(" -dfs <h:p>|local Optional. Override DFS configuration"); + System.out.println(" -jt <h:p>|local Optional. Override JobTracker configuration"); + System.out.println(" -inputreader <spec> Optional."); + System.out.println(" -jobconf <n>=<v> Optional. Add or override a JobConf property"); + System.out.println(" -cmdenv <n>=<v> Optional. Pass env.var to streaming commands"); + System.out.println(" -cacheFile fileNameURI"); + System.out.println(" -cacheArchive fileNameURI"); + System.out.println(" -verbose"); + System.out.println(); System.out.println("For more details about these options:"); System.out.println("Use $HADOOP_HOME/bin/hadoop jar build/hadoop-streaming.jar -info"); fail(""); @@ -808,6 +949,56 @@ running_.killJob(); } jc_.close(); + } + } + /** Support -jobconf x=y x1=y1 type options **/ + class MultiPropertyOption extends PropertyOption{ + private String optionString ; + MultiPropertyOption(){ + super(); + } + + MultiPropertyOption(final String optionString, + final String description, + final int id){ + super(optionString, description, id) ; + this.optionString = optionString; + } + + public boolean canProcess(final WriteableCommandLine commandLine, + final String argument) { + boolean ret = (argument != null) && argument.startsWith(optionString); + + return ret; + } + public void process(final WriteableCommandLine commandLine, + final ListIterator arguments) throws OptionException { + final String arg = (String) arguments.next(); + + if (!canProcess(commandLine, arg)) { + throw new OptionException(this, + ResourceConstants.UNEXPECTED_TOKEN, arg); + } + + ArrayList properties = new ArrayList(); + String next = "" ; + while( arguments.hasNext()){ + next = (String) arguments.next(); + if( ! next.startsWith("-") ){ + properties.add(next); + }else{ + arguments.previous(); + break; + } + } + + // add to any existing values ( support specifying args multiple times) + List<String> oldVal = (List<String>)commandLine.getValue(this) ; + if( oldVal == null ){ + commandLine.addValue(this, properties); + }else{ + oldVal.addAll(properties); + } } }