Author: cutting Date: Mon Jun 5 16:41:05 2006 New Revision: 411950 URL: http://svn.apache.org/viewvc?rev=411950&view=rev Log: HADOOP-275. Update the streaming contrib module to use log4j for logging. Contributed by Michel Tourn.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=411950&r1=411949&r2=411950&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Mon Jun 5 16:41:05 2006 @@ -3,7 +3,8 @@ Trunk (unreleased changes) - 1. + 1. HADOOP-275. Update the streaming contrib module to use log4j for + its logging. (Michel Tourn via cutting) Release 0.3.1 - 2006-06-05 Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java?rev=411950&r1=411949&r2=411950&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java Mon Jun 5 16:41:05 2006 @@ -17,8 +17,12 @@ package org.apache.hadoop.streaming; import java.io.*; +import java.net.InetAddress; import java.util.*; +/* + * If we move to Java 1.5, we can get rid of this class and just use System.getenv + */ public class Environment extends Properties { public Environment() @@ -26,13 +30,15 @@ { // Extend this code to fit all operating // environments that you expect to run in - String command = null; String OS = System.getProperty("os.name"); + String lowerOs = OS.toLowerCase(); if (OS.equals("Windows NT")) { command = "cmd /C set"; } else if (OS.indexOf("ix") > -1 || OS.indexOf("inux") > -1) { command = "env"; + } else if(lowerOs.startsWith("mac os x")) { + command = "env"; } else { // Add others here } @@ -83,4 +89,19 @@ } return arr; } -} \ No newline at end of file + + public String getHost() + { + String host = getProperty("HOST"); + if(host == null) { + // HOST isn't always in the environment + try { + host = InetAddress.getLocalHost().getHostName(); + } catch(IOException io) { + io.printStackTrace(); + } + } + return host; + } + +} Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=411950&r1=411949&r2=411950&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Mon Jun 5 16:41:05 2006 @@ -22,9 +22,12 @@ import java.util.Date; import java.util.Map; import java.util.Arrays; +import java.util.ArrayList; import java.util.Properties; import java.util.regex.*; +import org.apache.commons.logging.*; + import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.Reducer; @@ -43,6 +46,8 @@ */ public abstract class PipeMapRed { + protected static final Log LOG = LogFactory.getLog(PipeMapRed.class.getName()); + /** The command to be spawned as a subprocess. * Mapper/Reducer operations will delegate to it */ @@ -53,9 +58,9 @@ /** - * @returns ow many TABS before the end of the key part + * @returns how many TABS before the end of the key part * usually: 1 or "ALL" - * used both for tool output of both Map and Reduce + * used for tool output of both Map and Reduce * configured via tool's argv: splitKeyVal=ALL or 1.. * although it is interpreted here, not by tool */ @@ -91,20 +96,57 @@ return cols; } - String[] splitArgs(String args) - { - String regex = "\\s(?=(?:[^\"]*\"[^\"]*\")*[^\"]*\\z)"; - String[] split = args.split(regex); - // remove outer quotes - for(int i=0; i<split.length; i++) { - String si = split[i].trim(); - if(si.charAt(0)=='"' && si.charAt(si.length()-1)=='"') { - si = si.substring(1, si.length()-1); - split[i] = si; + final static int OUTSIDE = 1; + final static int SINGLEQ = 2; + final static int DOUBLEQ = 3; + + static String[] splitArgs(String args) + { + ArrayList argList = new ArrayList(); + char[] ch = args.toCharArray(); + int clen = ch.length; + int state = OUTSIDE; + int argstart = 0; + for(int c=0; c<=clen; c++) { + boolean last = (c==clen); + int lastState = state; + boolean endToken = false; + if(!last) { + if(ch[c]=='\'') { + if(state == OUTSIDE) { + state = SINGLEQ; + } else if(state == SINGLEQ) { + state = OUTSIDE; + } + endToken = (state != lastState); + } else if(ch[c]=='"') { + if(state == OUTSIDE) { + state = DOUBLEQ; + } else if(state == DOUBLEQ) { + state = OUTSIDE; + } + endToken = (state != lastState); + } else if(ch[c]==' ') { + if(state == OUTSIDE) { + endToken = true; + } + } + } + if(last || endToken) { + if(c == argstart) { + // unquoted space + } else { + String a; + a = args.substring(argstart, c); + argList.add(a); + } + argstart = c+1; + lastState = state; } } - return split; + return (String[])argList.toArray(new String[0]); } + public void configure(JobConf job) { @@ -132,7 +174,7 @@ // A relative path should match in the unjarred Job data // In this case, force an absolute path to make sure exec finds it. argvSplit[0] = new File(argvSplit[0]).getAbsolutePath(); - log_.println("PipeMapRed exec " + Arrays.toString(argvSplit)); + log_.println("PipeMapRed exec " + Arrays.asList(argvSplit)); Environment childEnv = (Environment)StreamUtil.env().clone(); @@ -440,4 +482,5 @@ } } } + } Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java?rev=411950&r1=411949&r2=411950&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java Mon Jun 5 16:41:05 2006 @@ -25,7 +25,6 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.io.UTF8; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.WritableComparable; Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java?rev=411950&r1=411949&r2=411950&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java Mon Jun 5 16:41:05 2006 @@ -25,7 +25,6 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.io.UTF8; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.WritableComparable; Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java?rev=411950&r1=411949&r2=411950&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java Mon Jun 5 16:41:05 2006 @@ -20,14 +20,14 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.UTF8; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.util.LogFormatter; +import org.apache.commons.logging.*; + /** * Shared functionality for hadoopStreaming formats. @@ -40,7 +40,10 @@ public abstract class StreamBaseRecordReader implements RecordReader { - protected static final Logger LOG = LogFormatter.getLogger(StreamBaseRecordReader.class.getName()); + protected static final Log LOG = LogFactory.getLog(StreamBaseRecordReader.class.getName()); + + // custom JobConf properties for this class are prefixed with this namespace + final String CONF_NS = "stream.recordreader."; public StreamBaseRecordReader( FSDataInputStream in, long start, long end, @@ -49,15 +52,45 @@ { in_ = in; start_ = start; - splitName_ = splitName; end_ = end; + length_ = end_ - start_; + splitName_ = splitName; reporter_ = reporter; job_ = job; + + statusMaxRecordChars_ = job_.getInt(CONF_NS + "statuschars", 200); + } + + /// RecordReader API + + /** Read a record. Implementation should call numRecStats at the end + */ + public abstract boolean next(Writable key, Writable value) throws IOException; + + /** Returns the current position in the input. */ + public synchronized long getPos() throws IOException + { + return in_.getPos(); + } + + /** Close this to future operations.*/ + public synchronized void close() throws IOException + { + in_.close(); } + + /// StreamBaseRecordReader API - /** Called once before the first call to next */ public void init() throws IOException { + LOG.info("StreamBaseRecordReader.init: " + + " start_=" + start_ + " end_=" + end_ + " length_=" + length_ + + " start_ > in_.getPos() =" + + (start_ > in_.getPos()) + " " + start_ + + " > " + in_.getPos() ); + if (start_ > in_.getPos()) { + in_.seek(start_); + } seekNextRecordBoundary(); } @@ -66,17 +99,12 @@ */ public abstract void seekNextRecordBoundary() throws IOException; - - /** Read a record. Implementation should call numRecStats at the end - */ - public abstract boolean next(Writable key, Writable value) throws IOException; - - + void numRecStats(CharSequence record) throws IOException { numRec_++; if(numRec_ == nextStatusRec_) { - nextStatusRec_ +=100000;//*= 10; + nextStatusRec_ +=100;//*= 10; String status = getStatus(record); LOG.info(status); reporter_.setStatus(status); @@ -91,10 +119,9 @@ pos = getPos(); } catch(IOException io) { } - final int M = 2000; String recStr; - if(record.length() > M) { - recStr = record.subSequence(0, M) + "..."; + if(record.length() > statusMaxRecordChars_) { + recStr = record.subSequence(0, statusMaxRecordChars_) + "..."; } else { recStr = record.toString(); } @@ -103,25 +130,15 @@ return status; } - /** Returns the current position in the input. */ - public synchronized long getPos() throws IOException - { - return in_.getPos(); - } - - /** Close this to future operations.*/ - public synchronized void close() throws IOException - { - in_.close(); - } - FSDataInputStream in_; long start_; long end_; + long length_; String splitName_; Reporter reporter_; JobConf job_; int numRec_ = 0; int nextStatusRec_ = 1; + int statusMaxRecordChars_; } Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java?rev=411950&r1=411949&r2=411950&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java Mon Jun 5 16:41:05 2006 @@ -23,6 +23,8 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.commons.logging.*; + import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.PathFilter; @@ -30,11 +32,8 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.UTF8; import org.apache.hadoop.mapred.*; -import org.apache.hadoop.util.LogFormatter; - /** An input format that performs globbing on DFS paths and * selects a RecordReader based on a JobConf property. @@ -46,7 +45,8 @@ // an InputFormat should be public with the synthetic public default constructor // JobTracker's JobInProgress will instantiate with clazz.newInstance() (and a custom ClassLoader) - protected static final Logger LOG = LogFormatter.getLogger(StreamInputFormat.class.getName()); + protected static final Log LOG = LogFactory.getLog(StreamInputFormat.class.getName()); + static { //LOG.setLevel(Level.FINE); } @@ -59,7 +59,7 @@ int dsup = globs.length; for(int d=0; d<dsup; d++) { String leafName = globs[d].getName(); - LOG.fine("StreamInputFormat: globs[" + d + "] leafName = " + leafName); + LOG.info("StreamInputFormat: globs[" + d + "] leafName = " + leafName); Path[] paths; Path dir; PathFilter filter = new GlobFilter(fs, leafName); dir = new Path(globs[d].getParent().toString()); @@ -79,7 +79,13 @@ } String globToRegexp(String glob) { - return glob.replaceAll("\\*", ".*"); + String re = glob; + re = re.replaceAll("\\.", "\\\\."); + re = re.replaceAll("\\+", "\\\\+"); + re = re.replaceAll("\\*", ".*"); + re = re.replaceAll("\\?", "."); + LOG.info("globToRegexp: |" + glob + "| -> |" + re + "|"); + return re; } public boolean accept(Path pathname) @@ -88,7 +94,7 @@ if(acc) { acc = pat_.matcher(pathname.getName()).matches(); } - LOG.finer("matches " + pat_ + ", " + pathname + " = " + acc); + LOG.info("matches " + pat_ + ", " + pathname + " = " + acc); return acc; } @@ -99,7 +105,7 @@ public RecordReader getRecordReader(FileSystem fs, final FileSplit split, JobConf job, Reporter reporter) throws IOException { - LOG.finer("getRecordReader start....."); + LOG.info("getRecordReader start....."); reporter.setStatus(split.toString()); final long start = split.getStart(); @@ -143,5 +149,5 @@ return reader; } - + } Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?rev=411950&r1=411949&r2=411950&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Mon Jun 5 16:41:05 2006 @@ -23,6 +23,8 @@ import java.util.Arrays; import java.util.Iterator; +import org.apache.commons.logging.*; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.UTF8; @@ -32,16 +34,14 @@ import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.RunningJob; -import org.apache.hadoop.util.LogFormatter; - /** All the client-side work happens here. * (Jar packaging, MapRed job submission and monitoring) * @author Michel Tourn */ public class StreamJob { - protected static final Logger LOG = LogFormatter.getLogger(StreamJob.class.getName()); - + protected static final Log LOG = LogFactory.getLog(StreamJob.class.getName()); + public StreamJob(String[] argv, boolean mayExit) { argv_ = argv; @@ -72,9 +72,10 @@ void preProcessArgs() { verbose_ = false; + addTaskEnvironment_ = ""; } - void postProcessArgs() + void postProcessArgs() throws IOException { if(cluster_ == null) { // hadoop-default.xml is standard, hadoop-local.xml is not. @@ -87,22 +88,35 @@ if(output_ == null) { fail("Required argument: -output "); } - // careful with class names.. - mapCmd_ = packageOrTrimNoShip(mapCmd_); - redCmd_ = packageOrTrimNoShip(redCmd_); + msg("addTaskEnvironment=" + addTaskEnvironment_); + + Iterator it = packageFiles_.iterator(); + while(it.hasNext()) { + File f = new File((String)it.next()); + if(f.isFile()) { + shippedCanonFiles_.add(f.getCanonicalPath()); + } + } + msg("shippedCanonFiles_=" + shippedCanonFiles_); - // TBD -D format or sthg on cmdline. - // Plus maybe a standard list originating on client or server - addTaskEnvironment_ = ""; + // careful with class names.. + mapCmd_ = unqualifyIfLocalPath(mapCmd_); + redCmd_ = unqualifyIfLocalPath(redCmd_); + } + + void validateNameEqValue(String neqv) + { + String[] nv = neqv.split("=", 2); + if(nv.length < 2) { + fail("Invalid name=value spec: " + neqv); + } + msg("Recording name=value: name=" + nv[0] + " value=" + nv[1]); } - String packageOrTrimNoShip(String cmd) + String unqualifyIfLocalPath(String cmd) throws IOException { if(cmd == null) { // - } else if(cmd.startsWith(NOSHIP)) { - // don't package the file, but keep the abolute path - cmd = cmd.substring(NOSHIP.length()); } else { String prog = cmd; String args = ""; @@ -111,18 +125,23 @@ prog = cmd.substring(0, s); args = cmd.substring(s+1); } - packageFiles_.add(new File(prog).getAbsolutePath()); - // Change path to simple filename. - // That way when PipeMapRed calls Runtime.exec(), - // it will look for the excutable in Task's working dir. - // And this is where TaskRunner unjars our job jar. - prog = new File(prog).getName(); - if(args.length() > 0) { - cmd = prog + " " + args; - } else { - cmd = prog; + String progCanon = new File(prog).getCanonicalPath(); + boolean shipped = shippedCanonFiles_.contains(progCanon); + msg("shipped: " + shipped + " " + progCanon); + if(shipped) { + // Change path to simple filename. + // That way when PipeMapRed calls Runtime.exec(), + // it will look for the excutable in Task's working dir. + // And this is where TaskRunner unjars our job jar. + prog = new File(prog).getName(); + if(args.length() > 0) { + cmd = prog + " " + args; + } else { + cmd = prog; + } } } + msg("cmd=" + cmd); return cmd; } @@ -130,17 +149,20 @@ { return new File(getHadoopClientHome() + "/conf", hadoopAliasConf_).getAbsolutePath(); } + void parseArgv() { if(argv_.length==0) { - exitUsage(); + exitUsage(false); } int i=0; while(i < argv_.length) { String s; if(argv_[i].equals("-verbose")) { verbose_ = true; + } else if(argv_[i].equals("-info")) { + detailedUsage_ = true; } else if(argv_[i].equals("-debug")) { debug_++; } else if((s = optionArg(argv_, i, "-input", false)) != null) { @@ -155,7 +177,7 @@ } else if((s = optionArg(argv_, i, "-reducer", redCmd_ != null)) != null) { i++; redCmd_ = s; - } else if((s = optionArg(argv_, i, "-files", false)) != null) { + } else if((s = optionArg(argv_, i, "-file", false)) != null) { i++; packageFiles_.add(s); } else if((s = optionArg(argv_, i, "-cluster", cluster_ != null)) != null) { @@ -164,15 +186,35 @@ } else if((s = optionArg(argv_, i, "-config", false)) != null) { i++; configPath_.add(s); + } else if((s = optionArg(argv_, i, "-dfs", false)) != null) { + i++; + userJobConfProps_.add("fs.default.name="+s); + } else if((s = optionArg(argv_, i, "-jt", false)) != null) { + i++; + userJobConfProps_.add("mapred.job.tracker="+s); + } else if((s = optionArg(argv_, i, "-jobconf", false)) != null) { + i++; + validateNameEqValue(s); + userJobConfProps_.add(s); + } else if((s = optionArg(argv_, i, "-cmdenv", false)) != null) { + i++; + validateNameEqValue(s); + if(addTaskEnvironment_.length() > 0) { + addTaskEnvironment_ += " "; + } + addTaskEnvironment_ += s; } else if((s = optionArg(argv_, i, "-inputreader", inReaderSpec_ != null)) != null) { i++; inReaderSpec_ = s; } else { System.err.println("Unexpected argument: " + argv_[i]); - exitUsage(); + exitUsage(false); } i++; } + if(detailedUsage_) { + exitUsage(true); + } } String optionArg(String[] args, int index, String arg, boolean argSet) @@ -196,22 +238,32 @@ } } - public void exitUsage() + public void exitUsage(boolean detailed) { // 1 2 3 4 5 6 7 //1234567890123456789012345678901234567890123456789012345678901234567890123456789 - System.out.println("Usage: bin/hadoop jar build/hadoop-streaming.jar [options]"); + System.out.println("Usage: $HADOOP_HOME/bin/hadoop jar build/hadoop-streaming.jar [options]"); System.out.println("Options:"); - System.out.println(" -input <path> DFS input file(s) for the Map step"); - System.out.println(" -output <path> DFS output directory for the Reduce step"); - System.out.println(" -mapper <cmd> The streaming command to run"); - System.out.println(" -reducer <cmd> The streaming command to run"); - System.out.println(" -files <file> Additional files to be shipped in the Job jar file"); - System.out.println(" -cluster <name> Default uses hadoop-default.xml and hadoop-site.xml"); - System.out.println(" -config <file> Optional. One or more paths to xml config files"); - System.out.println(" -inputreader <spec> Optional. See below"); + System.out.println(" -input <path> DFS input file(s) for the Map step"); + System.out.println(" -output <path> DFS output directory for the Reduce step"); + System.out.println(" -mapper <cmd> The streaming command to run"); + System.out.println(" -combiner <cmd> Not implemented. But you can pipe the mapper output"); + System.out.println(" -reducer <cmd> The streaming command to run"); + System.out.println(" -file <file> File/dir to be shipped in the Job jar file"); + System.out.println(" -cluster <name> Default uses hadoop-default.xml and hadoop-site.xml"); + System.out.println(" -config <file> Optional. One or more paths to xml config files"); + System.out.println(" -dfs <h:p> Optional. Override DFS configuration"); + System.out.println(" -jt <h:p> Optional. Override JobTracker configuration"); + System.out.println(" -inputreader <spec> Optional."); + System.out.println(" -jobconf <n>=<v> Optional."); + System.out.println(" -cmdenv <n>=<v> Optional. Pass env.var to streaming commands"); System.out.println(" -verbose"); System.out.println(); + if(!detailed) { + System.out.println("For more details about these options:"); + System.out.println("Use $HADOOP_HOME/bin/hadoop jar build/hadoop-streaming.jar -info"); + fail(""); + } System.out.println("In -input: globbing on <path> is supported and can have multiple -input"); System.out.println("Default Map input format: a line is a record in UTF-8"); System.out.println(" the key part ends at first TAB, the rest of the line is the value"); @@ -220,21 +272,34 @@ System.out.println(" Ex: -inputreader 'StreamXmlRecordReader,begin=<doc>,end=</doc>'"); System.out.println("Map output format, reduce input/output format:"); System.out.println(" Format defined by what mapper command outputs. Line-oriented"); - System.out.println("Mapper and Reducer <cmd> syntax: "); - System.out.println(" If the mapper or reducer programs are prefixed with " + NOSHIP + " then "); - System.out.println(" the paths are assumed to be valid absolute paths on the task tracker machines"); - System.out.println(" and are NOT packaged with the Job jar file."); + System.out.println(); System.out.println("Use -cluster <name> to switch between \"local\" Hadoop and one or more remote "); System.out.println(" Hadoop clusters. "); System.out.println(" The default is to use the normal hadoop-default.xml and hadoop-site.xml"); System.out.println(" Else configuration will use $HADOOP_HOME/conf/hadoop-<name>.xml"); System.out.println(); - System.out.println("Example: hadoopStreaming -mapper \"noship:/usr/local/bin/perl5 filter.pl\""); - System.out.println(" -files /local/filter.pl -input \"/logs/0604*/*\" [...]"); + System.out.println("To set the number of reduce tasks (num. of output files):"); + System.out.println(" -jobconf mapred.reduce.tasks=10"); + System.out.println("To change the local temp directory:"); + System.out.println(" -jobconf dfs.data.dir=/tmp"); + System.out.println("Additional local temp directories with -cluster local:"); + System.out.println(" -jobconf mapred.local.dir=/tmp/local"); + System.out.println(" -jobconf mapred.system.dir=/tmp/system"); + System.out.println(" -jobconf mapred.temp.dir=/tmp/temp"); + System.out.println("For more details about jobconf parameters see:"); + System.out.println(" http://wiki.apache.org/lucene-hadoop/JobConfFile"); + System.out.println("To set an environement variable in a streaming command:"); + System.out.println(" -cmdenv EXAMPLE_DIR=/home/example/dictionaries/"); + System.out.println(); + System.out.println("Shortcut to run from any directory:"); + System.out.println(" setenv HSTREAMING \"$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/build/hadoop-streaming.jar\""); + System.out.println(); + System.out.println("Example: $HSTREAMING -mapper \"/usr/local/bin/perl5 filter.pl\""); + System.out.println(" -file /local/filter.pl -input \"/logs/0604*/*\" [...]"); System.out.println(" Ships a script, invokes the non-shipped perl interpreter"); System.out.println(" Shipped files go to the working directory so filter.pl is found by perl"); System.out.println(" Input files are all the daily logs for days in month 2006-04"); - fail(""); + fail(""); } public void fail(String message) @@ -291,7 +356,7 @@ msg("Found runtime classes in: " + runtimeClasses); } if(isLocalHadoop()) { - // don't package class files (they might get unpackaged in . and then + // don't package class files (they might get unpackaged in "." and then // hide the intended CLASSPATH entry) // we still package everything else (so that scripts and executable are found in // Task workdir like distributed Hadoop) @@ -393,7 +458,17 @@ if(jar_ != null) { jobConf_.setJar(jar_); } - //jobConf_.mtdump();System.exit(1); + + // last, allow user to override anything + // (although typically used with properties we didn't touch) + it = userJobConfProps_.iterator(); + while(it.hasNext()) { + String prop = (String)it.next(); + String[] nv = prop.split("=", 2); + msg("JobConf: set(" + nv[0] + ", " + nv[1]+")"); + jobConf_.set(nv[0], nv[1]); + } + } protected String getJobTrackerHostPort() @@ -432,7 +507,7 @@ running_ = jc_.submitJob(jobConf_); jobId_ = running_.getJobID(); - LOG.info("getLocalDirs(): " + Arrays.toString(jobConf_.getLocalDirs())); + LOG.info("getLocalDirs(): " + Arrays.asList(jobConf_.getLocalDirs())); LOG.info("Running job: " + jobId_); jobInfo(); @@ -467,11 +542,10 @@ } - public final static String NOSHIP = "noship:"; - protected boolean mayExit_; protected String[] argv_; protected boolean verbose_; + protected boolean detailedUsage_; protected int debug_; protected Environment env_; @@ -483,8 +557,10 @@ protected JobClient jc_; // command-line arguments - protected ArrayList inputGlobs_ = new ArrayList(); // <String> - protected ArrayList packageFiles_ = new ArrayList(); // <String> + protected ArrayList inputGlobs_ = new ArrayList(); // <String> + protected ArrayList packageFiles_ = new ArrayList(); // <String> + protected ArrayList shippedCanonFiles_= new ArrayList(); // <String> + protected ArrayList userJobConfProps_ = new ArrayList(); // <String> protected String output_; protected String mapCmd_; protected String redCmd_; Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java?rev=411950&r1=411949&r2=411950&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java Mon Jun 5 16:41:05 2006 @@ -69,7 +69,7 @@ return false; //((LongWritable)key).set(pos); // key is position - //((UTF8)value).set(readLine(in)); // value is line + //((UTF8)value).set(readLine(in)); // value is line String line = readLine(in_); // key is line up to TAB, value is rest Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java?rev=411950&r1=411949&r2=411950&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java Mon Jun 5 16:41:05 2006 @@ -198,7 +198,7 @@ static { try { env = new Environment(); - HOST = env.get("HOST").toString(); + HOST = env.getHost(); } catch(IOException io) { io.printStackTrace(); } @@ -275,6 +275,22 @@ } } + static final String regexpSpecials = "[]()?*+|.!^-\\~@"; + + public static String regexpEscape(String plain) + { + StringBuffer buf = new StringBuffer(); + char[] ch = plain.toCharArray(); + int csup = ch.length; + for(int c=0; c<csup; c++) { + if(regexpSpecials.indexOf(ch[c]) != -1) { + buf.append("\\"); + } + buf.append(ch[c]); + } + return buf.toString(); + } + static String slurp(File f) throws IOException { FileInputStream in = new FileInputStream(f); @@ -298,5 +314,5 @@ } return env_; } - + } Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java?rev=411950&r1=411949&r2=411950&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java Mon Jun 5 16:41:05 2006 @@ -17,10 +17,12 @@ package org.apache.hadoop.streaming; import java.io.*; +import java.util.regex.*; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.UTF8; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.mapred.Reporter; @@ -32,6 +34,14 @@ * Values are XML subtrees delimited by configurable tags. * Keys could be the value of a certain attribute in the XML subtree, * but this is left to the stream processor application. + * + * The name-value properties that StreamXmlRecordReader understands are: + * String begin (chars marking beginning of record) + * String end (chars marking end of record) + * int maxrec (maximum record size) + * int lookahead(maximum lookahead to sync CDATA) + * boolean slowmatch + * * @author Michel Tourn */ public class StreamXmlRecordReader extends StreamBaseRecordReader @@ -42,67 +52,278 @@ throws IOException { super(in, start, end, splitName, reporter, job); - beginMark_ = checkJobGet("stream.recordreader.begin"); - endMark_ = checkJobGet("stream.recordreader.end"); - } + + beginMark_ = checkJobGet(CONF_NS + "begin"); + endMark_ = checkJobGet(CONF_NS + "end"); - String checkJobGet(String prop) throws IOException - { - String val = job_.get(prop); - if(val == null) { - throw new IOException("JobConf: missing required property: " + prop); - } - return val; + maxRecSize_= job_.getInt(CONF_NS + "maxrec", 50*1000); + lookAhead_ = job_.getInt(CONF_NS + "lookahead", 2*maxRecSize_); + synched_ = false; + + slowMatch_ = job_.getBoolean(CONF_NS + "slowmatch", false); + if(slowMatch_) { + beginPat_ = makePatternCDataOrMark(beginMark_); + endPat_ = makePatternCDataOrMark(endMark_); + } } - public void seekNextRecordBoundary() throws IOException - { - System.out.println("@@@start seekNext " + in_.getPos()); - readUntilMatch(beginMark_, null); - System.out.println("@@@end seekNext " + in_.getPos()); - } - + int numNext = 0; public synchronized boolean next(Writable key, Writable value) throws IOException { long pos = in_.getPos(); - if (pos >= end_) + numNext++; + if (pos >= end_) { return false; + } StringBuffer buf = new StringBuffer(); - readUntilMatch(endMark_, buf); + if(!readUntilMatchBegin()) { + return false; + } + if(!readUntilMatchEnd(buf)) { + return false; + } numRecStats(buf); + + // There is only one elem..key/value splitting is not done here. + ((UTF8)key).set(buf.toString()); + ((UTF8)value).set(""); + + /*if(numNext < 5) { + System.out.println("@@@ " + numNext + ". true next k=|" + key.toString().replaceAll("[\\r\\n]", " ") + + "|, len=" + buf.length() + " v=|" + value.toString().replaceAll("[\\r\\n]", " ") + "|"); + }*/ + return true; } + + public void seekNextRecordBoundary() throws IOException + { + readUntilMatchBegin(); + } + + boolean readUntilMatchBegin() throws IOException + { + if(slowMatch_) { + return slowReadUntilMatch(beginPat_, false, null); + } else { + return fastReadUntilMatch(beginMark_, false, null); + } + } + + boolean readUntilMatchEnd(StringBuffer buf) throws IOException + { + if(slowMatch_) { + return slowReadUntilMatch(endPat_, true, buf); + } else { + return fastReadUntilMatch(endMark_, true, buf); + } + } + + + boolean slowReadUntilMatch(Pattern markPattern, boolean includePat, StringBuffer outBufOrNull) + throws IOException + { + try { + long inStart = in_.getPos(); + byte[] buf = new byte[Math.max(lookAhead_, maxRecSize_)]; + int read = 0; + boolean success = true; + in_.mark(lookAhead_ + 2); + read = in_.read(buf); + String sbuf = new String(buf); + Matcher match = markPattern.matcher(sbuf); - void readUntilMatch(String pat, StringBuffer outBuf) throws IOException + firstMatchStart_ = NA; + firstMatchEnd_ = NA; + int bufPos = 0; + int state = synched_ ? CDATA_OUT : CDATA_UNK; + int s=0; + int matchLen = 0; + while(match.find(bufPos)) { + int input; + matchLen = match.group(0).length(); + if(match.group(1) != null) { + input = CDATA_BEGIN; + } else if(match.group(2) != null) { + input = CDATA_END; + firstMatchStart_ = NA; // |<DOC CDATA[ </DOC> ]]> should keep it + } else { + input = RECORD_MAYBE; + } + if(input == RECORD_MAYBE) { + if(firstMatchStart_ == NA) { + firstMatchStart_ = match.start(); + firstMatchEnd_ = match.end(); + } + } + state = nextState(state, input, match.start()); + /*System.out.println("@@@" + + s + ". Match " + match.start() + " " + match.groupCount() + + " state=" + state + " input=" + input + + " firstMatchStart_=" + firstMatchStart_ + " startinstream=" + (inStart+firstMatchStart_) + + " match=" + match.group(0) + " in=" + in_.getPos());*/ + if(state == RECORD_ACCEPT) { + break; + } + bufPos = match.end(); + s++; + } + if(state != CDATA_UNK) { + synched_ = true; + } + boolean matched = (firstMatchStart_ != NA) && (state == RECORD_ACCEPT || state == CDATA_UNK); + if(matched) { + int endPos = includePat ? firstMatchEnd_ : firstMatchStart_; + //System.out.println("firstMatchStart_=" + firstMatchStart_ + " firstMatchEnd_=" + firstMatchEnd_); + String snip = sbuf.substring(firstMatchStart_, firstMatchEnd_); + //System.out.println(" match snip=|" + snip + "| markPattern=" + markPattern); + if(outBufOrNull != null) { + buf = new byte[endPos]; + in_.reset(); + read = in_.read(buf); + if(read != endPos) { + //System.out.println("@@@ BAD re-read less: " + read + " < " + endPos); + } + outBufOrNull.append(new String(buf)); + } else { + //System.out.println("Skip to " + (inStart + endPos)); + in_.seek(inStart + endPos); + } + } + return matched; + } catch(Exception e) { + e.printStackTrace(); + } finally { + // in_ ? + } + return false; + } + + // states + final static int CDATA_IN = 10; + final static int CDATA_OUT = 11; + final static int CDATA_UNK = 12; + final static int RECORD_ACCEPT = 13; + // inputs + final static int CDATA_BEGIN = 20; + final static int CDATA_END = 21; + final static int RECORD_MAYBE= 22; + + /* also updates firstMatchStart_;*/ + int nextState(int state, int input, int bufPos) { + switch(state) { + case CDATA_UNK: + case CDATA_OUT: + switch(input) { + case CDATA_BEGIN: + return CDATA_IN; + case CDATA_END: + if(state==CDATA_OUT) { + //System.out.println("buggy XML " + bufPos); + } + return CDATA_OUT; + case RECORD_MAYBE: + return (state==CDATA_UNK) ? CDATA_UNK : RECORD_ACCEPT; + } + break; + case CDATA_IN: + return (input==CDATA_END) ? CDATA_OUT : CDATA_IN; + } + throw new IllegalStateException(state + " " + input + " " + bufPos + " " + splitName_); + } + - char[] cpat = pat.toCharArray(); + Pattern makePatternCDataOrMark(String escapedMark) + { + StringBuffer pat = new StringBuffer(); + addGroup(pat, StreamUtil.regexpEscape("CDATA[")); // CDATA_BEGIN + addGroup(pat, StreamUtil.regexpEscape("]]>")); // CDATA_END + addGroup(pat, escapedMark); // RECORD_MAYBE + return Pattern.compile(pat.toString()); + } + void addGroup(StringBuffer pat, String escapedGroup) + { + if(pat.length() > 0) { + pat.append("|"); + } + pat.append("("); + pat.append(escapedGroup); + pat.append(")"); + } + + + + boolean fastReadUntilMatch(String textPat, boolean includePat, StringBuffer outBufOrNull) throws IOException + { + //System.out.println("@@@BEGIN readUntilMatch inPos=" + in_.getPos()); + char[] cpat = textPat.toCharArray(); int m = 0; + boolean match = false; + long markPos = -1; int msup = cpat.length; + if(!includePat) { + int LL = 120000 * 10; + markPos = in_.getPos(); + in_.mark(LL); // lookAhead_ + } while (true) { int b = in_.read(); if (b == -1) break; char c = (char)b; // this assumes eight-bit matching. OK with UTF-8 + if(outBufOrNull != null) { + outBufOrNull.append(c); + } if (c == cpat[m]) { m++; - if(m==msup-1) { + if(m==msup) { + match = true; break; } } else { m = 0; } - if(outBuf != null) { - outBuf.append(c); + } + if(!includePat && match) { + if(outBufOrNull != null) { + outBufOrNull.setLength(outBufOrNull.length() - textPat.length()); } + long pos = in_.getPos() - textPat.length(); + in_.reset(); + in_.seek(pos); + } + //System.out.println("@@@DONE readUntilMatch inPos=" + in_.getPos() + " includePat=" + includePat + " pat=" + textPat + ", buf=|" + outBufOrNull + "|"); + return match; + } + + String checkJobGet(String prop) throws IOException + { + String val = job_.get(prop); + if(val == null) { + throw new IOException("JobConf: missing required property: " + prop); } -System.out.println("@@@START readUntilMatch(" + pat + ", " + outBuf + "\n@@@END readUntilMatch"); + return val; } String beginMark_; String endMark_; + + Pattern beginPat_; + Pattern endPat_; + + boolean slowMatch_; + int lookAhead_; // bytes to read to try to synch CDATA/non-CDATA. Should be more than max record size + int maxRecSize_; + + final static int NA = -1; + int firstMatchStart_ = 0; // candidate record boundary. Might just be CDATA. + int firstMatchEnd_ = 0; + + boolean isRecordMatch_; + boolean synched_; }