Author: cutting Date: Thu May 3 12:33:27 2007 New Revision: 534970 URL: http://svn.apache.org/viewvc?view=rev&rev=534970 Log: HADOOP-1315. Clean up contrib/streaming, switching it to use more core classes and removing unused classes. Contributed by Runping.
Removed: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/CompoundDirSpec.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MuxOutputFormat.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeCombiner.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/TupleInputFormat.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=534970&r1=534969&r2=534970 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Thu May 3 12:33:27 2007 @@ -316,6 +316,9 @@ that killed the heartbeat monitoring thread. (Dhruba Borthakur via cutting) +94. HADOOP-1315. Clean up contrib/streaming, switching it to use core + classes more and removing unused code. (Runping Qi via cutting) + Release 0.12.3 - 2007-04-06 Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?view=diff&rev=534970&r1=534969&r2=534970 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Thu May 3 12:33:27 2007 @@ -19,8 +19,6 @@ package org.apache.hadoop.streaming; import java.io.*; -import java.net.Socket; -import java.net.URI; import java.nio.charset.CharacterCodingException; import java.io.IOException; import java.util.Date; @@ -29,14 +27,11 @@ import java.util.Arrays; import java.util.ArrayList; import java.util.Properties; -import java.util.regex.*; import org.apache.commons.logging.*; import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.PhasedFileSystem; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.util.StringUtils; @@ -45,7 +40,6 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem; /** Shared functionality for PipeMapper, PipeReducer. @@ -60,60 +54,12 @@ */ abstract String getPipeCommand(JobConf job); - /* - */ - abstract String getKeyColPropName(); - abstract char getFieldSeparator(); abstract int getNumOfKeyFields(); - - /** Write output as side-effect files rather than as map outputs. - This is useful to do "Map" tasks rather than "MapReduce" tasks. */ - boolean getUseSideEffect() { - return false; - } abstract boolean getDoPipe(); - /** - * @returns how many TABS before the end of the key part - * usually: 1 or "ALL" - * used for tool output of both Map and Reduce - * configured via tool's argv: splitKeyVal=ALL or 1.. - * although it is interpreted here, not by tool - */ - int getKeyColsFromPipeCommand(String cmd) { - String key = getKeyColPropName(); - Pattern kcPat = Pattern.compile(".*" + key + "=([^\\s]*).*"); - Matcher match = kcPat.matcher(cmd); - String kc; - if (!match.matches()) { - kc = null; - } else { - kc = match.group(1); - } - - int cols; - if (kc == null) { - // default value is 1 and the Stream applications could instead - // add/remove the \t separator on lines to get the same effect as value 0, 1, ALL - cols = 1; - } else if (kc.equals("ALL")) { - cols = ALL_COLS; - } else { - try { - cols = Integer.parseInt(kc); - } catch (NumberFormatException nf) { - cols = Integer.MAX_VALUE; - } - } - - System.out.println("getKeyColsFromPipeCommand:" + key + " parse:" + cols + " from cmd=" + cmd); - - return cols; - } - final static int OUTSIDE = 1; final static int SINGLEQ = 2; final static int DOUBLEQ = 3; @@ -164,54 +110,15 @@ return (String[]) argList.toArray(new String[0]); } - OutputStream getURIOutputStream(URI uri, boolean allowSocket) throws IOException { - final String SOCKET = "socket"; - if (uri.getScheme().equals(SOCKET)) { - if (!allowSocket) { - throw new IOException(SOCKET + " not allowed on outputstream " + uri); - } - final Socket sock = new Socket(uri.getHost(), uri.getPort()); - OutputStream out = new FilterOutputStream(sock.getOutputStream()) { - public void close() throws IOException { - sock.close(); - super.close(); - } - }; - return out; - } else { - // a FSDataOutputStreamm, localFS or HDFS. - // localFS file may be set up as a FIFO. - return sideFs_.create(new Path(uri.getSchemeSpecificPart())); - } - } - - String getSideEffectFileName() { - FileSplit split = StreamUtil.getCurrentSplit(job_); - return new String(split.getPath().getName() + "-" + split.getStart() + - "-" + split.getLength()); - } - public void configure(JobConf job) { try { String argv = getPipeCommand(job); - keyCols_ = getKeyColsFromPipeCommand(argv); - - debug_ = (job.get("stream.debug") != null); - if (debug_) { - System.out.println("PipeMapRed: stream.debug=true"); - } - joinDelay_ = job.getLong("stream.joindelay.milli", 0); job_ = job; fs_ = FileSystem.get(job_); - if (job_.getBoolean("stream.sideoutput.localfs", false)) { - //sideFs_ = new LocalFileSystem(job_); - sideFs_ = FileSystem.getLocal(job_); - } else { - sideFs_ = fs_; - } + String mapOutputFieldSeparator = job_.get("stream.map.output.field.separator", "\t"); String reduceOutputFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t"); this.mapOutputFieldSeparator = mapOutputFieldSeparator.charAt(0); @@ -219,22 +126,12 @@ this.numOfMapOutputKeyFields = job_.getInt("stream.num.map.output.key.fields", 1); this.numOfReduceOutputKeyFields = job_.getInt("stream.num.reduce.output.key.fields", 1); - if (debug_) { - System.out.println("kind :" + this.getClass()); - System.out.println("split :" + StreamUtil.getCurrentSplit(job_)); - System.out.println("fs :" + fs_.toString()); - System.out.println("sideFs :" + sideFs_.toString()); - } doPipe_ = getDoPipe(); if (!doPipe_) return; setStreamJobDetails(job); - setStreamProperties(); - - if (debugFailEarly_) { - throw new RuntimeException("debugFailEarly_"); - } + String[] argvSplit = splitArgs(argv); String prog = argvSplit[0]; File currentDir = new File(".").getAbsoluteFile(); @@ -245,39 +142,6 @@ FileUtil.chmod(new File(jobCacheDir, prog).toString(), "a+x"); } - if (job_.getInputValueClass().equals(BytesWritable.class)) { - // TODO expose as separate config: - // job or semistandard inputformat property - optUseKey_ = false; - } - - optSideEffect_ = getUseSideEffect(); - - if (optSideEffect_) { - // during work: use a completely unique filename to avoid HDFS namespace conflicts - // after work: rename to a filename that depends only on the workload (the FileSplit) - // it's a friendly name and in case of reexecution it will clobber. - // reexecution can be due to: other job, failed task and speculative task - // See StreamJob.setOutputSpec(): if reducerNone_ aka optSideEffect then: - // client has renamed outputPath and saved the argv's original output path as: - if (useSingleSideOutputURI_) { - finalOutputURI = new URI(sideOutputURI_); - sideEffectPathFinal_ = null; // in-place, no renaming to final - } else { - sideFs_ = new PhasedFileSystem(sideFs_, job); - String sideOutputPath = job_.get("stream.sideoutput.dir"); // was: job_.getOutputPath() - String fileName = getSideEffectFileName(); // see HADOOP-444 for rationale - sideEffectPathFinal_ = new Path(sideOutputPath, fileName); - finalOutputURI = new URI(sideEffectPathFinal_.toString()); // implicit dfs: - } - // apply default scheme - if (finalOutputURI.getScheme() == null) { - finalOutputURI = new URI("file", finalOutputURI.getSchemeSpecificPart(), null); - } - boolean allowSocket = useSingleSideOutputURI_; - sideEffectOut_ = getURIOutputStream(finalOutputURI, allowSocket); - } - // // argvSplit[0]: // An absolute path should be a preexisting valid path on all TaskTrackers @@ -295,8 +159,6 @@ f = null; } logprintln("PipeMapRed exec " + Arrays.asList(argvSplit)); - logprintln("sideEffectURI_=" + finalOutputURI); - Environment childEnv = (Environment) StreamUtil.env().clone(); addJobConfToEnvironment(job_, childEnv); addEnvironment(childEnv, job_.get("stream.addenvironment")); @@ -327,34 +189,6 @@ logprintln("JobConf set minRecWrittenToEnableSkip_ =" + minRecWrittenToEnableSkip_); } taskId_ = StreamUtil.getTaskInfo(job_); - debugFailEarly_ = isDebugFail("early"); - debugFailDuring_ = isDebugFail("during"); - debugFailLate_ = isDebugFail("late"); - - sideOutputURI_ = job_.get("stream.sideoutput.uri"); - useSingleSideOutputURI_ = (sideOutputURI_ != null); - } - - boolean isDebugFail(String kind) { - String execidlist = job_.get("stream.debugfail.reexec." + kind); - if (execidlist == null) { - return false; - } - String[] e = execidlist.split(","); - for (int i = 0; i < e.length; i++) { - int ei = Integer.parseInt(e[i]); - if (taskId_.execid == ei) { - return true; - } - } - return false; - } - - void setStreamProperties() { - String s = System.getProperty("stream.port"); - if (s != null) { - reportPortPlusOne_ = Integer.parseInt(s); - } } void logStackTrace(Exception e) { @@ -442,7 +276,6 @@ if (log_ != null) { StreamUtil.exec("/bin/rm " + LOGNAME, log_); } - // TODO socket-based aggregator (in JobTrackerInfoServer) } void startOutputThreads(OutputCollector output, Reporter reporter) { @@ -474,14 +307,7 @@ * @throws IOException */ void splitKeyVal(byte[] line, Text key, Text val) throws IOException { - int pos = -1; - if (keyCols_ != ALL_COLS) { - pos = UTF8ByteArrayUtils.findNthByte(line, (byte)this.getFieldSeparator(), this.getNumOfKeyFields()); - } - LOG.info("FieldSeparator: " + this.getFieldSeparator()); - LOG.info("NumOfKeyFields: " + this.getNumOfKeyFields()); - LOG.info("Line: " + new String (line)); - LOG.info("Pos: " + pos); + int pos = UTF8ByteArrayUtils.findNthByte(line, (byte)this.getFieldSeparator(), this.getNumOfKeyFields()); try { if (pos == -1) { key.set(line); @@ -508,15 +334,10 @@ Text val = new Text(); // 3/4 Tool to Hadoop while ((answer = UTF8ByteArrayUtils.readLine((InputStream) clientIn_)) != null) { - // 4/4 Hadoop out - if (optSideEffect_) { - sideEffectOut_.write(answer); - sideEffectOut_.write('\n'); - sideEffectOut_.flush(); - } else { - splitKeyVal(answer, key, val); - output.collect(key, val); - } + + splitKeyVal(answer, key, val); + output.collect(key, val); + numRecWritten_++; long now = System.currentTimeMillis(); if (now-lastStdoutReport > reporterOutDelay_) { @@ -584,18 +405,6 @@ } catch (IOException io) { } waitOutputThreads(); - try { - if (optSideEffect_) { - logprintln("closing " + finalOutputURI); - if (sideEffectOut_ != null) sideEffectOut_.close(); - logprintln("closed " + finalOutputURI); - if (!useSingleSideOutputURI_) { - ((PhasedFileSystem)sideFs_).commit(); - } - } - } catch (IOException io) { - io.printStackTrace(); - } if (sim != null) sim.destroy(); } catch (RuntimeException e) { logStackTrace(e); @@ -692,18 +501,11 @@ long minRecWrittenToEnableSkip_ = Long.MAX_VALUE; - int keyCols_; - final static int ALL_COLS = Integer.MAX_VALUE; - long reporterOutDelay_ = 10*1000L; long reporterErrDelay_ = 10*1000L; long joinDelay_; JobConf job_; FileSystem fs_; - FileSystem sideFs_; - - // generic MapRed parameters passed on by hadoopStreaming - int reportPortPlusOne_; boolean doPipe_; boolean debug_; @@ -723,17 +525,6 @@ String mapredKey_; int numExceptions_; StreamUtil.TaskId taskId_; - - boolean optUseKey_ = true; - - private boolean optSideEffect_; - private URI finalOutputURI; - private Path sideEffectPathFinal_; - - private boolean useSingleSideOutputURI_; - private String sideOutputURI_; - - private OutputStream sideEffectOut_; protected volatile Throwable outerrThreadsThrowable; Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java?view=diff&rev=534970&r1=534969&r2=534970 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java Thu May 3 12:33:27 2007 @@ -25,8 +25,10 @@ import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.Writable; @@ -36,6 +38,8 @@ */ public class PipeMapper extends PipeMapRed implements Mapper { + private boolean ignoreKey = false; + String getPipeCommand(JobConf job) { String str = job.get("stream.map.streamprocessor"); if (str == null) { @@ -50,17 +54,15 @@ } } - String getKeyColPropName() { - return "mapKeyCols"; - } - - boolean getUseSideEffect() { - return StreamUtil.getUseMapSideEffect(job_); - } - boolean getDoPipe() { return true; } + + public void configure(JobConf job) { + super.configure(job); + String inputFormatClassName = job.getClass("mapred.input.format.class", TextInputFormat.class).getCanonicalName(); + this.ignoreKey = inputFormatClassName.equals(TextInputFormat.class.getCanonicalName()); + } // Do NOT declare default constructor // (MapRed creates it reflectively) @@ -86,7 +88,7 @@ // 2/4 Hadoop to Tool if (numExceptions_ == 0) { - if (optUseKey_) { + if (!this.ignoreKey) { write(key); clientOut_.write('\t'); } Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java?view=diff&rev=534970&r1=534969&r2=534970 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java Thu May 3 12:33:27 2007 @@ -57,10 +57,6 @@ return (argv != null) && !StreamJob.REDUCE_NONE.equals(argv); } - String getKeyColPropName() { - return "reduceKeyCols"; - } - public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?view=diff&rev=534970&r1=534969&r2=534970 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Thu May 3 12:33:27 2007 @@ -22,7 +22,6 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; -import java.net.URISyntaxException; import java.net.URLEncoder; import java.util.ArrayList; import java.util.Arrays; @@ -41,7 +40,6 @@ import org.apache.commons.cli2.option.PropertyOption; import org.apache.commons.cli2.resource.ResourceConstants; import org.apache.commons.cli2.util.HelpFormatter; -import org.apache.commons.cli2.validation.FileValidator; import org.apache.commons.cli2.validation.InvalidArgumentException; import org.apache.commons.cli2.validation.Validator; import org.apache.commons.logging.*; @@ -61,6 +59,7 @@ import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.SequenceFileAsTextInputFormat; +import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.filecache.*; import org.apache.hadoop.util.*; @@ -73,8 +72,7 @@ protected static final Log LOG = LogFactory.getLog(StreamJob.class.getName()); final static String REDUCE_NONE = "NONE"; - private boolean reducerNone_; - + /** -----------Streaming CLI Implementation **/ private DefaultOptionBuilder builder = new DefaultOptionBuilder("-","-", false); @@ -214,7 +212,6 @@ inputSpecs_.addAll(cmdLine.getValues("-input")); output_ = (String) cmdLine.getValue("-output"); - mapsideoutURI_ = (String) cmdLine.getValue("-mapsideoutput"); mapCmd_ = (String)cmdLine.getValue("-mapper"); comCmd_ = (String)cmdLine.getValue("-combiner"); @@ -450,20 +447,17 @@ System.out.println("Options:"); System.out.println(" -input <path> DFS input file(s) for the Map step"); System.out.println(" -output <path> DFS output directory for the Reduce step"); - System.out.println(" -mapper <cmd> The streaming command to run"); - System.out.println(" -combiner <cmd> The streaming command to run"); - System.out.println(" -reducer <cmd> The streaming command to run"); + System.out.println(" -mapper <cmd|JavaClassName> The streaming command to run"); + System.out.println(" -combiner <JavaClassName> Combiner has to be a Java class"); + System.out.println(" -reducer <cmd|JavaClassName> The streaming command to run"); System.out.println(" -file <file> File/dir to be shipped in the Job jar file"); - //Only advertise the standard way: [--config dir] in our launcher - //System.out.println(" -cluster <name> Default uses hadoop-default.xml and hadoop-site.xml"); - //System.out.println(" -config <file> Optional. One or more paths to xml config files"); System.out.println(" -dfs <h:p>|local Optional. Override DFS configuration"); System.out.println(" -jt <h:p>|local Optional. Override JobTracker configuration"); System.out.println(" -additionalconfspec specfile Optional."); - System.out.println(" -inputformat KeyValueTextInputFormat(default)|SequenceFileInputFormat|XmlTextInputFormat Optional."); - System.out.println(" -outputformat specfile Optional."); - System.out.println(" -partitioner specfile Optional."); - System.out.println(" -numReduceTasks specfile Optional."); + System.out.println(" -inputformat TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName Optional."); + System.out.println(" -outputformat TextOutputFormat(default)|JavaClassName Optional."); + System.out.println(" -partitioner JavaClassName Optional."); + System.out.println(" -numReduceTasks <num> Optional."); System.out.println(" -inputreader <spec> Optional."); System.out.println(" -jobconf <n>=<v> Optional. Add or override a JobConf property"); System.out.println(" -cmdenv <n>=<v> Optional. Pass env.var to streaming commands"); @@ -478,10 +472,7 @@ System.out.println("In -input: globbing on <path> is supported and can have multiple -input"); System.out.println("Default Map input format: a line is a record in UTF-8"); System.out.println(" the key part ends at first TAB, the rest of the line is the value"); - System.out.println("Custom Map input format: -inputreader package.MyRecordReader,n=v,n=v "); - System.out - .println(" comma-separated name-values can be specified to configure the InputFormat"); - System.out.println(" Ex: -inputreader 'StreamXmlRecordReader,begin=<doc>,end=</doc>'"); + System.out.println("Custom input format: -inputformat package.MyInputFormat "); System.out.println("Map output format, reduce input/output format:"); System.out.println(" Format defined by what the mapper command outputs. Line-oriented"); System.out.println(); @@ -489,34 +480,21 @@ System.out.println(" working directory when the mapper and reducer are run."); System.out.println(" The location of this working directory is unspecified."); System.out.println(); - //System.out.println("Use -cluster <name> to switch between \"local\" Hadoop and one or more remote "); - //System.out.println(" Hadoop clusters. "); - //System.out.println(" The default is to use the normal hadoop-default.xml and hadoop-site.xml"); - //System.out.println(" Else configuration will use $HADOOP_HOME/conf/hadoop-<name>.xml"); - //System.out.println(); + System.out.println("To set the number of reduce tasks (num. of output files):"); + System.out.println(" -jobconf mapred.reduce.tasks=10"); System.out.println("To skip the sort/combine/shuffle/sort/reduce step:"); - System.out.println(" Use -reducer " + REDUCE_NONE); + System.out.println(" Use -numReduceTasks 0"); System.out .println(" A Task's Map output then becomes a 'side-effect output' rather than a reduce input"); System.out .println(" This speeds up processing, This also feels more like \"in-place\" processing"); System.out.println(" because the input filename and the map input order are preserved"); - System.out.println("To specify a single side-effect output file"); - System.out.println(" -mapsideoutput [file:/C:/win|file:/unix/|socket://host:port]");//-output for side-effects will be soon deprecated - System.out.println(" If the jobtracker is local this is a local file"); - System.out.println(" This currently requires -reducer NONE"); + System.out.println(" This equivalent -reducer NONE"); System.out.println(); - System.out.println("To set the number of reduce tasks (num. of output files):"); - System.out.println(" -jobconf mapred.reduce.tasks=10"); System.out.println("To speed up the last reduces:"); System.out.println(" -jobconf mapred.speculative.execution=true"); - System.out.println(" Do not use this along -reducer " + REDUCE_NONE); System.out.println("To name the job (appears in the JobTracker Web UI):"); System.out.println(" -jobconf mapred.job.name='My Job' "); - System.out.println("To specify that line-oriented input is in gzip format:"); - System.out - .println("(at this time ALL input files must be gzipped and this is not recognized based on file extension)"); - System.out.println(" -jobconf stream.recordreader.compression=gzip "); System.out.println("To change the local temp directory:"); System.out.println(" -jobconf dfs.data.dir=/tmp/dfs"); System.out.println(" -jobconf stream.tmpdir=/tmp/streaming"); @@ -681,8 +659,6 @@ config_.addFinalResource(new Path(pathName)); } - testMerge_ = (-1 != userJobConfProps_.toString().indexOf("stream.testmerge")); - // general MapRed job properties jobConf_ = new JobConf(config_); @@ -695,25 +671,32 @@ // (to resolve local vs. dfs drive letter differences) // (mapred.working.dir will be lazily initialized ONCE and depends on FS) for (int i = 0; i < inputSpecs_.size(); i++) { - addInputSpec((String) inputSpecs_.get(i), i); + jobConf_.addInputPath(new Path(((String) inputSpecs_.get(i)))); } - jobConf_.setBoolean("stream.inputtagged", inputTagged_); jobConf_.set("stream.numinputspecs", "" + inputSpecs_.size()); String defaultPackage = this.getClass().getPackage().getName(); Class c; Class fmt = null; if (inReaderSpec_ == null && inputFormatSpec_ == null) { - fmt = KeyValueTextInputFormat.class; + fmt = TextInputFormat.class; } else if (inputFormatSpec_ != null) { - if (inputFormatSpec_.equals(KeyValueTextInputFormat.class.getName()) - || inputFormatSpec_.equals(KeyValueTextInputFormat.class.getCanonicalName())) { - fmt = KeyValueTextInputFormat.class; - } else if (inputFormatSpec_.equals(SequenceFileInputFormat.class.getName()) - || inputFormatSpec_.equals(SequenceFileInputFormat.class.getCanonicalName())) { - fmt = SequenceFileInputFormat.class; - } else if (inputFormatSpec_.equals(SequenceFileAsTextInputFormat.class.getName()) - || inputFormatSpec_.equals(SequenceFileAsTextInputFormat.class.getCanonicalName())) { + if (inputFormatSpec_.equals(TextInputFormat.class.getName()) + || inputFormatSpec_.equals(TextInputFormat.class.getCanonicalName())) { + fmt = TextInputFormat.class; + } else if (inputFormatSpec_.equals(KeyValueTextInputFormat.class + .getName()) + || inputFormatSpec_.equals(KeyValueTextInputFormat.class + .getCanonicalName())) { + } else if (inputFormatSpec_.equals(SequenceFileInputFormat.class + .getName()) + || inputFormatSpec_ + .equals(org.apache.hadoop.mapred.SequenceFileInputFormat.class + .getCanonicalName())) { + } else if (inputFormatSpec_.equals(SequenceFileAsTextInputFormat.class + .getName()) + || inputFormatSpec_.equals(SequenceFileAsTextInputFormat.class + .getCanonicalName())) { fmt = SequenceFileAsTextInputFormat.class; } else { c = StreamUtil.goodClassOrNull(inputFormatSpec_, defaultPackage); @@ -725,14 +708,7 @@ } } if (fmt == null) { - if (testMerge_ && false == hasSimpleInputSpecs_) { - // this ignores -inputreader - fmt = MergerInputFormat.class; - } else { - // need to keep this case to support custom -inputreader - // and their parameters ,n=v,n=v - fmt = StreamInputFormat.class; - } + fmt = StreamInputFormat.class; } jobConf_.setInputFormat(fmt); @@ -757,13 +733,10 @@ c = StreamUtil.goodClassOrNull(comCmd_, defaultPackage); if (c != null) { jobConf_.setCombinerClass(c); - } else { - jobConf_.setCombinerClass(PipeCombiner.class); - jobConf_.set("stream.combine.streamprocessor", URLEncoder.encode(comCmd_, "UTF-8")); - } + } } - reducerNone_ = false; + boolean reducerNone_ = false; if (redCmd_ != null) { reducerNone_ = redCmd_.equals(REDUCE_NONE); if (redCmd_.compareToIgnoreCase("aggregate") == 0) { @@ -801,9 +774,7 @@ } setUserJobConfProps(false); - // output setup is done late so we can customize for reducerNone_ - //jobConf_.setOutputDir(new File(output_)); - setOutputSpec(); + jobConf_.setOutputPath(new Path(output_)); fmt = null; if (outputFormatSpec_!= null) { c = StreamUtil.goodClassOrNull(outputFormatSpec_, defaultPackage); @@ -812,11 +783,7 @@ } } if (fmt == null) { - if (testMerge_) { - fmt = MuxOutputFormat.class; - } else { - fmt = TextOutputFormat.class; - } + fmt = TextOutputFormat.class; } jobConf_.setOutputFormat(fmt); @@ -831,6 +798,9 @@ int numReduceTasks = Integer.parseInt(numReduceTasksSpec_); jobConf_.setNumReduceTasks(numReduceTasks); } + if (reducerNone_) { + jobConf_.setNumReduceTasks(0); + } // last, allow user to override anything // (although typically used with properties we didn't touch) @@ -880,78 +850,6 @@ } msg("===="); } - - /** InputSpec-s encode: a glob pattern x additional column files x additional joins */ - protected void addInputSpec(String inSpec, int index) { - if (!testMerge_) { - jobConf_.addInputPath(new Path(inSpec)); - } else { - CompoundDirSpec spec = new CompoundDirSpec(inSpec, true); - msg("Parsed -input:\n" + spec.toTableString()); - if (index == 0) { - hasSimpleInputSpecs_ = (spec.paths_.length == 0); - msg("hasSimpleInputSpecs_=" + hasSimpleInputSpecs_); - } - String primary = spec.primarySpec(); - if (!seenPrimary_.add(primary)) { - // this won't detect glob overlaps and noncanonical path variations - fail("Primary used in multiple -input spec: " + primary); - } - jobConf_.addInputPath(new Path(primary)); - // during Job execution, will reparse into a CompoundDirSpec - jobConf_.set("stream.inputspecs." + index, inSpec); - } - } - - /** uses output_ and mapsideoutURI_ */ - protected void setOutputSpec() throws IOException { - CompoundDirSpec spec = new CompoundDirSpec(output_, false); - msg("Parsed -output:\n" + spec.toTableString()); - String primary = spec.primarySpec(); - String channel0; - // TODO simplify cases, encapsulate in a StreamJobConf - if (!reducerNone_) { - channel0 = primary; - } else { - if (mapsideoutURI_ != null) { - // user can override in case this is in a difft filesystem.. - try { - URI uri = new URI(mapsideoutURI_); - if (uri.getScheme() == null || uri.getScheme().equals("file")) { // || uri.getScheme().equals("hdfs") - if (!new Path(uri.getSchemeSpecificPart()).isAbsolute()) { - fail("Must be absolute: " + mapsideoutURI_); - } - } else if (uri.getScheme().equals("socket")) { - // ok - } else { - fail("Invalid scheme: " + uri.getScheme() + " for -mapsideoutput " + mapsideoutURI_); - } - } catch (URISyntaxException e) { - throw (IOException) new IOException().initCause(e); - } - } - // an empty reduce output named "part-00002" will go here and not collide. - channel0 = primary + ".NONE"; - // the side-effect of the first split of an input named "part-00002" - // will go in this directory - jobConf_.set("stream.sideoutput.dir", primary); - // oops if user overrides low-level this isn't set yet :-( - boolean localjt = StreamUtil.isLocalJobTracker(jobConf_); - // just a guess user may prefer remote.. - jobConf_.setBoolean("stream.sideoutput.localfs", localjt); - } - // a path in fs.name.default filesystem - System.out.println(channel0); - System.out.println(new Path(channel0)); - jobConf_.setOutputPath(new Path(channel0)); - // will reparse remotely - jobConf_.set("stream.outputspec", output_); - if (null != mapsideoutURI_) { - // a path in "jobtracker's filesystem" - // overrides sideoutput.dir - jobConf_.set("stream.sideoutput.uri", mapsideoutURI_); - } - } protected String getJobTrackerHostPort() { return jobConf_.get("mapred.job.tracker"); @@ -1099,15 +997,12 @@ // command-line arguments protected ArrayList inputSpecs_ = new ArrayList(); // <String> - protected boolean inputTagged_ = false; protected TreeSet seenPrimary_ = new TreeSet(); // <String> protected boolean hasSimpleInputSpecs_; protected ArrayList packageFiles_ = new ArrayList(); // <String> protected ArrayList shippedCanonFiles_ = new ArrayList(); // <String> - //protected ArrayList userJobConfProps_ = new ArrayList(); // <String> name=value protected TreeMap<String, String> userJobConfProps_ = new TreeMap<String, String>(); protected String output_; - protected String mapsideoutURI_; protected String mapCmd_; protected String comCmd_; protected String redCmd_; @@ -1124,8 +1019,6 @@ protected String partitionerSpec_; protected String numReduceTasksSpec_; protected String additionalConfSpec_; - - protected boolean testMerge_; // Use to communicate config to the external processes (ex env.var.HADOOP_USER) // encoding "a=b c=d" Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java?view=diff&rev=534970&r1=534969&r2=534970 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java Thu May 3 12:33:27 2007 @@ -453,20 +453,6 @@ return res; } - static boolean getUseMapSideEffect(JobConf job) { - String reduce = job.get("stream.reduce.streamprocessor"); - if (reduce == null) { - return false; - } - try { - reduce = URLDecoder.decode(reduce, "UTF-8"); - } catch (UnsupportedEncodingException e) { - System.err.println("stream.reduce.streamprocessor in jobconf not found"); - return false; - } - return StreamJob.REDUCE_NONE.equals(reduce); - } - public static void touch(File file) throws IOException { file = file.getAbsoluteFile(); FileOutputStream out = new FileOutputStream(file); Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java?view=diff&rev=534970&r1=534969&r2=534970 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java Thu May 3 12:33:27 2007 @@ -19,10 +19,8 @@ package org.apache.hadoop.streaming; import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; -import java.io.PushbackInputStream; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.LineRecordReader; Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java?view=diff&rev=534970&r1=534969&r2=534970 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java Thu May 3 12:33:27 2007 @@ -38,10 +38,11 @@ protected String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n"; // map behaves like "/usr/bin/tr . \\n"; (split words into lines) protected String map = StreamUtil.makeJavaCommand(TrApp.class, new String[]{".", "\\n"}); - // combine, reduce behave like /usr/bin/uniq. But also prepend lines with C, R. + // reduce behave like /usr/bin/uniq. But also prepend lines with R. + // command-line combiner does not have any effect any more. protected String combine = StreamUtil.makeJavaCommand(UniqApp.class, new String[]{"C"}); protected String reduce = StreamUtil.makeJavaCommand(UniqApp.class, new String[]{"R"}); - protected String outputExpect = "RCare\t\nRCblue\t\nRCbunnies\t\nRCpink\t\nRCred\t\nRCroses\t\nRCviolets\t\n"; + protected String outputExpect = "Rare\t\nRblue\t\nRbunnies\t\nRpink\t\nRred\t\nRroses\t\nRviolets\t\n"; private StreamJob job; Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java?view=diff&rev=534970&r1=534969&r2=534970 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java Thu May 3 12:33:27 2007 @@ -41,7 +41,7 @@ // test that some JobConf properties are exposed as expected // Note the dots translated to underscore: // property names have been escaped in PipeMapRed.safeEnvVarName() - expect("mapred_input_format_class", "org.apache.hadoop.mapred.KeyValueTextInputFormat"); + expect("mapred_input_format_class", "org.apache.hadoop.mapred.TextInputFormat"); expect("mapred_job_tracker", "local"); //expect("mapred_local_dir", "build/test/mapred/local"); expectDefined("mapred_local_dir");