Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Submitter.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Submitter.java?rev=696551&r1=696550&r2=696551&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Submitter.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Submitter.java Wed Sep 17 20:02:51 2008 @@ -23,6 +23,8 @@ import java.net.URISyntaxException; import java.net.URL; import java.net.URLClassLoader; +import java.security.AccessController; +import java.security.PrivilegedAction; import java.util.StringTokenizer; import org.apache.commons.cli2.CommandLine; @@ -31,7 +33,10 @@ import org.apache.commons.cli2.builder.DefaultOptionBuilder; import org.apache.commons.cli2.builder.GroupBuilder; import org.apache.commons.cli2.commandline.Parser; - +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -48,13 +53,25 @@ import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.lib.HashPartitioner; import org.apache.hadoop.mapred.lib.NullOutputFormat; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.Tool; /** * The main entry point and job submitter. It may either be used as a command * line-based or API-based method to launch Pipes jobs. */ -public class Submitter { +public class Submitter extends Configured implements Tool { + protected static final Log LOG = LogFactory.getLog(Submitter.class); + + public Submitter() { + this(new Configuration()); + } + + public Submitter(Configuration conf) { + setConf(conf); + } + /** * Get the URI of the application's executable. * @param conf @@ -334,11 +351,6 @@ // The CLI package should do this for us, but I can't figure out how // to make it print something reasonable. System.out.println("bin/hadoop pipes"); - System.out.println(" [-conf <path>] // Configuration for job"); - System.out.println(" [-jobconf <key=value>, <key=value>, ...]" + - " // add/override configuration for job." + - " (Multiple comma delimited key=value pairs" + - " can be passed)"); System.out.println(" [-input <path>] // Input directory"); System.out.println(" [-output <path>] // Output directory"); System.out.println(" [-jar <jar file> // jar filename"); @@ -349,6 +361,8 @@ System.out.println(" [-writer <class>] // Java RecordWriter"); System.out.println(" [-program <executable>] // executable URI"); System.out.println(" [-reduces <num>] // number of reduces"); + System.out.println(); + GenericOptionsParser.printGenericCommandUsage(System.out); } } @@ -360,19 +374,16 @@ return conf.getClassByName((String) cl.getValue(key)).asSubclass(cls); } - /** - * Submit a pipes job based on the command line arguments. - * @param args - */ - public static void main(String[] args) throws Exception { + @Override + public int run(String[] args) throws Exception { CommandLineParser cli = new CommandLineParser(); if (args.length == 0) { cli.printUsage(); - return; + return 1; } cli.addOption("input", false, "input path to the maps", "path"); cli.addOption("output", false, "output path from the reduces", "path"); - cli.addOption("conf", false, "job xml configuration file", "path"); + cli.addOption("jar", false, "job jar file", "path"); cli.addOption("inputformat", false, "java classname of InputFormat", "class"); @@ -385,79 +396,102 @@ cli.addOption("program", false, "URI to application executable", "class"); cli.addOption("reduces", false, "number of reduces", "num"); cli.addOption("jobconf", false, - "\"n1=v1,n2=v2,..\" Optional. Add or override a JobConf property.", + "\"n1=v1,n2=v2,..\" (Deprecated) Optional. Add or override a JobConf property.", "key=val"); Parser parser = cli.createParser(); try { - CommandLine results = parser.parse(args); - JobConf conf = new JobConf(); - if (results.hasOption("-conf")) { - conf.addResource(new Path((String) results.getValue("-conf"))); - } + + GenericOptionsParser genericParser = new GenericOptionsParser(getConf(), args); + CommandLine results = parser.parse(genericParser.getRemainingArgs()); + + JobConf job = new JobConf(getConf()); + if (results.hasOption("-input")) { - FileInputFormat.setInputPaths(conf, + FileInputFormat.setInputPaths(job, (String) results.getValue("-input")); } if (results.hasOption("-output")) { - FileOutputFormat.setOutputPath(conf, + FileOutputFormat.setOutputPath(job, new Path((String) results.getValue("-output"))); } if (results.hasOption("-jar")) { - conf.setJar((String) results.getValue("-jar")); + job.setJar((String) results.getValue("-jar")); } if (results.hasOption("-inputformat")) { - setIsJavaRecordReader(conf, true); - conf.setInputFormat(getClass(results, "-inputformat", conf, + setIsJavaRecordReader(job, true); + job.setInputFormat(getClass(results, "-inputformat", job, InputFormat.class)); } if (results.hasOption("-javareader")) { - setIsJavaRecordReader(conf, true); + setIsJavaRecordReader(job, true); } if (results.hasOption("-map")) { - setIsJavaMapper(conf, true); - conf.setMapperClass(getClass(results, "-map", conf, Mapper.class)); + setIsJavaMapper(job, true); + job.setMapperClass(getClass(results, "-map", job, Mapper.class)); } if (results.hasOption("-partitioner")) { - conf.setPartitionerClass(getClass(results, "-partitioner", conf, + job.setPartitionerClass(getClass(results, "-partitioner", job, Partitioner.class)); } if (results.hasOption("-reduce")) { - setIsJavaReducer(conf, true); - conf.setReducerClass(getClass(results, "-reduce", conf, Reducer.class)); + setIsJavaReducer(job, true); + job.setReducerClass(getClass(results, "-reduce", job, Reducer.class)); } if (results.hasOption("-reduces")) { - conf.setNumReduceTasks(Integer.parseInt((String) + job.setNumReduceTasks(Integer.parseInt((String) results.getValue("-reduces"))); } if (results.hasOption("-writer")) { - setIsJavaRecordWriter(conf, true); - conf.setOutputFormat(getClass(results, "-writer", conf, + setIsJavaRecordWriter(job, true); + job.setOutputFormat(getClass(results, "-writer", job, OutputFormat.class)); } if (results.hasOption("-program")) { - setExecutable(conf, (String) results.getValue("-program")); + setExecutable(job, (String) results.getValue("-program")); } if (results.hasOption("-jobconf")) { + LOG.warn("-jobconf option is deprecated, please use -D instead."); String options = (String)results.getValue("-jobconf"); StringTokenizer tokenizer = new StringTokenizer(options, ","); while (tokenizer.hasMoreTokens()) { String keyVal = tokenizer.nextToken().trim(); String[] keyValSplit = keyVal.split("="); - conf.set(keyValSplit[0], keyValSplit[1]); + job.set(keyValSplit[0], keyValSplit[1]); } } // if they gave us a jar file, include it into the class path - String jarFile = conf.getJar(); + String jarFile = job.getJar(); if (jarFile != null) { + final URL[] urls = new URL[]{ FileSystem.getLocal(job). + pathToFile(new Path(jarFile)).toURL()}; + //FindBugs complains that creating a URLClassLoader should be + //in a doPrivileged() block. ClassLoader loader = - new URLClassLoader(new URL[]{ FileSystem.getLocal(conf). - pathToFile(new Path(jarFile)).toURL()}); - conf.setClassLoader(loader); - } - runJob(conf); + AccessController.doPrivileged( + new PrivilegedAction<ClassLoader>() { + public ClassLoader run() { + return new URLClassLoader(urls); + } + } + ); + job.setClassLoader(loader); + } + + runJob(job); + return 0; } catch (OptionException oe) { cli.printUsage(); + return 1; } + + } + + /** + * Submit a pipes job based on the command line arguments. + * @param args + */ + public static void main(String[] args) throws Exception { + new Submitter().run(args); } }
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/package.html URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/package.html?rev=696551&r1=696550&r2=696551&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/package.html (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/package.html Wed Sep 17 20:02:51 2008 @@ -35,7 +35,6 @@ <pre> bin/hadoop pipes \ - [-conf <i>path</i>] \ [-input <i>inputDir</i>] \ [-output <i>outputDir</i>] \ [-jar <i>applicationJarFile</i>] \ @@ -44,9 +43,17 @@ [-partitioner <i>class</i>] \ [-reduce <i>class</i>] \ [-writer <i>class</i>] \ - [-program <i>program url</i>] + [-program <i>program url</i>] \ + [-conf <i>configuration file</i>] \ + [-D <i>property=value</i>] \ + [-fs <i>local|namenode:port</i>] \ + [-jt <i>local|jobtracker:port</i>] \ + [-files <i>comma separated list of files</i>] \ + [-libjars <i>comma separated list of jars</i>] \ + [-archives <i>comma separated list of archives</i>] </pre> + <p> The application programs link against a thin C++ wrapper library that
