Author: cutting Date: Wed Jan 31 14:26:41 2007 New Revision: 502021 URL: http://svn.apache.org/viewvc?view=rev&rev=502021 Log: HADOOP-788. Change contrib/streaming to subclass TextInputFormat. Contributed by Sanjay.
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=502021&r1=502020&r2=502021 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Wed Jan 31 14:26:41 2007 @@ -114,6 +114,10 @@ 35. HADOOP-881. Fix JobTracker web interface to display the correct number of task failures. (Sanjay Dahiya via cutting) +36. HADOOP-788. Change contrib/streaming to subclass TextInputFormat, + permitting it to take advantage of native compression facilities. + (Sanjay Dahiya via cutting) + Release 0.10.1 - 2007-01-10 Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java?view=diff&rev=502021&r1=502020&r2=502021 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/MergerInputFormat.java Wed Jan 31 14:26:41 2007 @@ -85,7 +85,7 @@ full file at a time... ) */ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - return ((StreamInputFormat) primary_).getFullFileSplits(job); + return ((StreamInputFormat) primary_).getSplits(job, numSplits); } /** Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java?view=diff&rev=502021&r1=502020&r2=502021 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java Wed Jan 31 14:26:41 2007 @@ -100,16 +100,6 @@ /// StreamBaseRecordReader API - public void init() throws IOException { - LOG.info("StreamBaseRecordReader.init: " + " start_=" + start_ + " end_=" + end_ + " length_=" - + length_ + " start_ > in_.getPos() =" + (start_ > in_.getPos()) + " " + start_ + " > " - + in_.getPos()); - if (start_ > in_.getPos()) { - in_.seek(start_); - } - seekNextRecordBoundary(); - } - /** Implementation should seek forward in_ to the first byte of the next record. * The initial byte offset in the stream is arbitrary. */ Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java?view=diff&rev=502021&r1=502020&r2=502021 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamInputFormat.java Wed Jan 31 14:26:41 2007 @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.*; @@ -34,7 +35,7 @@ * selects a RecordReader based on a JobConf property. * @author Michel Tourn */ -public class StreamInputFormat extends InputFormatBase { +public class StreamInputFormat extends TextInputFormat { // an InputFormat should be public with the synthetic public default constructor // JobTracker's JobInProgress will instantiate with clazz.newInstance() (and a custom ClassLoader) @@ -54,7 +55,6 @@ return super.getSplits(job, numSplits); } } - /** For the compressed-files case: override InputFormatBase to produce one split. */ FileSplit[] getFullFileSplits(JobConf job) throws IOException { Path[] files = listPaths(job); @@ -79,9 +79,8 @@ final long start = split.getStart(); final long end = start + split.getLength(); - String splitName = split.getPath() + ":" + start + "-" + end; - final FSDataInputStream in = fs.open(split.getPath()); - + FSDataInputStream in = fs.open(split.getPath()); + // will open the file and seek to the start of the split // Factory dispatch based on available params.. Class readerClass; @@ -103,15 +102,13 @@ throw new RuntimeException(nsm); } - StreamBaseRecordReader reader; + RecordReader reader; try { - reader = (StreamBaseRecordReader) ctor.newInstance(new Object[] { in, split, reporter, job, + reader = (RecordReader) ctor.newInstance(new Object[] { in, split, reporter, job, fs }); } catch (Exception nsm) { throw new RuntimeException(nsm); } - - reader.init(); if (reader instanceof StreamSequenceRecordReader) { // override k/v class types with types stored in SequenceFile Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java?view=diff&rev=502021&r1=502020&r2=502021 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java Wed Jan 31 14:26:41 2007 @@ -19,59 +19,65 @@ package org.apache.hadoop.streaming; import java.io.*; -import java.nio.charset.MalformedInputException; -import java.util.Arrays; -import java.util.zip.GZIPInputStream; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.mapred.LineRecordReader; import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.util.StringUtils; /** * Similar to org.apache.hadoop.mapred.TextRecordReader, * but delimits key and value with a TAB. * @author Michel Tourn */ -public class StreamLineRecordReader extends StreamBaseRecordReader { +public class StreamLineRecordReader extends LineRecordReader { + + private String splitName; + private Reporter reporter; + private FileSplit split; + private int numRec = 0; + private int nextStatusRec = 1; + private int statusMaxRecordChars; + protected static final Log LOG = LogFactory.getLog(StreamLineRecordReader.class); + // base class uses LongWritable as key, use this. + private WritableComparable dummyKey = super.createKey(); + private Text innerValue = (Text)super.createValue(); - public StreamLineRecordReader(FSDataInputStream in, FileSplit split, Reporter reporter, + public StreamLineRecordReader(FSDataInputStream in, FileSplit split, + Reporter reporter, JobConf job, FileSystem fs) throws IOException { - super(in, split, reporter, job, fs); - gzipped_ = StreamInputFormat.isGzippedInput(job); - if (gzipped_) { - din_ = new BufferedInputStream( (new GZIPInputStream(in_) ) ); - } else { - din_ = in_; - } + super(createStream(in, job), split.getStart(), + split.getStart() + split.getLength()); + this.split = split ; + this.reporter = reporter ; } - - public void seekNextRecordBoundary() throws IOException { - if (gzipped_) { - // no skipping: use din_ as-is - // assumes splitter created only one split per file - return; - } else { - int bytesSkipped = 0; - if (start_ != 0) { - in_.seek(start_ - 1); - // scan to the next newline in the file - while (in_.getPos() < end_) { - char c = (char) in_.read(); - bytesSkipped++; - if (c == '\r' || c == '\n') { - break; - } - } - } - - //System.out.println("getRecordReader start="+start_ + " end=" + end_ + " bytesSkipped"+bytesSkipped); - } + + private static InputStream createStream(FSDataInputStream in, JobConf job) + throws IOException{ + InputStream finalStream = in ; + boolean gzipped = StreamInputFormat.isGzippedInput(job); + if ( gzipped ) { + GzipCodec codec = new GzipCodec(); + codec.setConf(job); + finalStream = codec.createInputStream(in); + } + return finalStream; + } + + public WritableComparable createKey() { + return new Text(); + } + + public Writable createValue() { + return new Text(); } public synchronized boolean next(Writable key, Writable value) throws IOException { @@ -86,14 +92,12 @@ Text tKey = (Text) key; Text tValue = (Text) value; - byte[] line; - - if ( !gzipped_ ) { - long pos = in_.getPos(); - if (pos >= end_) return false; + byte[] line = null ; + if( super.next(dummyKey, innerValue) ){ + line = innerValue.getBytes(); + }else{ + return false; } - - line = UTF8ByteArrayUtils.readLine((InputStream) din_); if (line == null) return false; int tab = UTF8ByteArrayUtils.findTab(line); if (tab == -1) { @@ -105,7 +109,35 @@ numRecStats(line, 0, line.length); return true; } + + private void numRecStats(byte[] record, int start, int len) throws IOException { + numRec++; + if (numRec == nextStatusRec) { + String recordStr = new String(record, start, Math.min(len, statusMaxRecordChars), "UTF-8"); + nextStatusRec += 100;//*= 10; + String status = getStatus(recordStr); + LOG.info(status); + reporter.setStatus(status); + } + } - boolean gzipped_; - InputStream din_; // GZIP or plain + private String getStatus(CharSequence record) { + long pos = -1; + try { + pos = getPos(); + } catch (IOException io) { + } + String recStr; + if (record.length() > statusMaxRecordChars) { + recStr = record.subSequence(0, statusMaxRecordChars) + "..."; + } else { + recStr = record.toString(); + } + String unqualSplit = split.getFile().getName() + ":" + split.getStart() + "+" + + split.getLength(); + String status = "HSTR " + StreamUtil.HOST + " " + numRec + ". pos=" + pos + " " + unqualSplit + + " Processing record=" + recStr; + status += " " + splitName; + return status; + } } Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java?view=diff&rev=502021&r1=502020&r2=502021 ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java Wed Jan 31 14:26:41 2007 @@ -62,8 +62,19 @@ beginPat_ = makePatternCDataOrMark(beginMark_); endPat_ = makePatternCDataOrMark(endMark_); } + init(); } + public void init() throws IOException { + LOG.info("StreamBaseRecordReader.init: " + " start_=" + start_ + " end_=" + end_ + " length_=" + + length_ + " start_ > in_.getPos() =" + (start_ > in_.getPos()) + " " + start_ + " > " + + in_.getPos()); + if (start_ > in_.getPos()) { + in_.seek(start_); + } + seekNextRecordBoundary(); + } + int numNext = 0; public synchronized boolean next(Writable key, Writable value) throws IOException { Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java?view=auto&rev=502021 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java Wed Jan 31 14:26:41 2007 @@ -0,0 +1,127 @@ +package org.apache.hadoop.mapred; + +import java.io.BufferedInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Treats keys as offset in file and value as line. + * @author sanjaydahiya + * + */ +public class LineRecordReader implements RecordReader { + private long start; + private long pos; + private long end; + private BufferedInputStream in; + private ByteArrayOutputStream buffer = new ByteArrayOutputStream(256); + /** + * Provide a bridge to get the bytes from the ByteArrayOutputStream + * without creating a new byte array. + */ + private static class TextStuffer extends OutputStream { + public Text target; + public void write(int b) { + throw new UnsupportedOperationException("write(byte) not supported"); + } + public void write(byte[] data, int offset, int len) throws IOException { + target.set(data, offset, len); + } + } + private TextStuffer bridge = new TextStuffer(); + + public LineRecordReader(InputStream in, long offset, long endOffset) + throws IOException{ + this.in = new BufferedInputStream(in); + this.start = offset; + this.pos = offset; + this.end = endOffset; +// readLine(in, null); + } + + public WritableComparable createKey() { + return new LongWritable(); + } + + public Writable createValue() { + return new Text(); + } + + /** Read a line. */ + public synchronized boolean next(Writable key, Writable value) + throws IOException { + if (pos >= end) + return false; + + ((LongWritable)key).set(pos); // key is position + buffer.reset(); + long bytesRead = readLine(in, buffer); + if (bytesRead == 0) { + return false; + } + pos += bytesRead; + bridge.target = (Text) value; + buffer.writeTo(bridge); + return true; + } + + public static long readLine(InputStream in, + OutputStream out) throws IOException { + long bytes = 0; + while (true) { + + int b = in.read(); + if (b == -1) { + break; + } + bytes += 1; + + byte c = (byte)b; + if (c == '\n') { + break; + } + + if (c == '\r') { + in.mark(1); + byte nextC = (byte)in.read(); + if (nextC != '\n') { + in.reset(); + } else { + bytes += 1; + } + break; + } + + if (out != null) { + out.write(c); + } + } + return bytes; + } + + /** + * Get the progress within the split + */ + public float getProgress() { + if (start == end) { + return 0.0f; + } else { + return (pos - start) / (end - start); + } + } + + public synchronized long getPos() throws IOException { + return pos; + } + + public synchronized void close() throws IOException { + in.close(); + } +} Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java?view=diff&rev=502021&r1=502020&r2=502021 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextInputFormat.java Wed Jan 31 14:26:41 2007 @@ -21,7 +21,6 @@ import java.io.*; import org.apache.hadoop.fs.*; -import org.apache.hadoop.io.*; import org.apache.hadoop.io.compress.*; /** An [EMAIL PROTECTED] InputFormat} for plain text files. Files are broken into lines. @@ -39,81 +38,6 @@ return compressionCodecs.getCodec(file) == null; } - protected static class LineRecordReader implements RecordReader { - private long start; - private long pos; - private long end; - private BufferedInputStream in; - private ByteArrayOutputStream buffer = new ByteArrayOutputStream(256); - /** - * Provide a bridge to get the bytes from the ByteArrayOutputStream - * without creating a new byte array. - */ - private static class TextStuffer extends OutputStream { - public Text target; - public void write(int b) { - throw new UnsupportedOperationException("write(byte) not supported"); - } - public void write(byte[] data, int offset, int len) throws IOException { - target.set(data, offset, len); - } - } - private TextStuffer bridge = new TextStuffer(); - - public LineRecordReader(InputStream in, long offset, long endOffset) { - this.in = new BufferedInputStream(in); - this.start = offset; - this.pos = offset; - this.end = endOffset; - } - - public WritableComparable createKey() { - return new LongWritable(); - } - - public Writable createValue() { - return new Text(); - } - - /** - * Get the progress within the split - */ - public float getProgress() { - if (start == end) { - return 0.0f; - } else { - return (pos - start) / (end - start); - } - } - - /** Read a line. */ - public synchronized boolean next(Writable key, Writable value) - throws IOException { - if (pos >= end) - return false; - - ((LongWritable)key).set(pos); // key is position - buffer.reset(); - long bytesRead = readLine(in, buffer); - if (bytesRead == 0) { - return false; - } - pos += bytesRead; - bridge.target = (Text) value; - buffer.writeTo(bridge); - return true; - } - - public synchronized long getPos() throws IOException { - return pos; - } - - public synchronized void close() throws IOException { - in.close(); - } - - } - public RecordReader getRecordReader(InputSplit genericSplit, JobConf job, Reporter reporter) throws IOException { @@ -129,52 +53,16 @@ FileSystem fs = FileSystem.get(job); FSDataInputStream fileIn = fs.open(split.getPath()); InputStream in = fileIn; - if (codec != null) { in = codec.createInputStream(fileIn); end = Long.MAX_VALUE; } else if (start != 0) { fileIn.seek(start-1); - readLine(fileIn, null); + LineRecordReader.readLine(fileIn, null); start = fileIn.getPos(); } return new LineRecordReader(in, start, end); } - - public static long readLine(InputStream in, - OutputStream out) throws IOException { - long bytes = 0; - while (true) { - - int b = in.read(); - if (b == -1) { - break; - } - bytes += 1; - - byte c = (byte)b; - if (c == '\n') { - break; - } - - if (c == '\r') { - in.mark(1); - byte nextC = (byte)in.read(); - if (nextC != '\n') { - in.reset(); - } else { - bytes += 1; - } - break; - } - - if (out != null) { - out.write(c); - } - } - return bytes; - } - } Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java?view=diff&rev=502021&r1=502020&r2=502021 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java Wed Jan 31 14:26:41 2007 @@ -130,14 +130,14 @@ public void testUTF8() throws Exception { InputStream in = makeStream("abcd\u20acbdcd\u20ac"); ByteArrayOutputStream out = new ByteArrayOutputStream(); - TextInputFormat.readLine(in, out); + LineRecordReader.readLine(in, out); Text line = new Text(); line.set(out.toByteArray()); assertEquals("readLine changed utf8 characters", "abcd\u20acbdcd\u20ac", line.toString()); in = makeStream("abc\u200axyz"); out.reset(); - TextInputFormat.readLine(in, out); + LineRecordReader.readLine(in, out); line.set(out.toByteArray()); assertEquals("split on fake newline", "abc\u200axyz", line.toString()); } @@ -145,24 +145,24 @@ public void testNewLines() throws Exception { InputStream in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee"); ByteArrayOutputStream out = new ByteArrayOutputStream(); - TextInputFormat.readLine(in, out); + LineRecordReader.readLine(in, out); assertEquals("line1 length", 1, out.size()); out.reset(); - TextInputFormat.readLine(in, out); + LineRecordReader.readLine(in, out); assertEquals("line2 length", 2, out.size()); out.reset(); - TextInputFormat.readLine(in, out); + LineRecordReader.readLine(in, out); assertEquals("line3 length", 0, out.size()); out.reset(); - TextInputFormat.readLine(in, out); + LineRecordReader.readLine(in, out); assertEquals("line4 length", 3, out.size()); out.reset(); - TextInputFormat.readLine(in, out); + LineRecordReader.readLine(in, out); assertEquals("line5 length", 4, out.size()); out.reset(); - TextInputFormat.readLine(in, out); + LineRecordReader.readLine(in, out); assertEquals("line5 length", 5, out.size()); - assertEquals("end of file", 0, TextInputFormat.readLine(in, out)); + assertEquals("end of file", 0, LineRecordReader.readLine(in, out)); } private static void writeFile(FileSystem fs, Path name,