Author: cdouglas Date: Tue Jan 8 15:29:55 2008 New Revision: 610227 URL: http://svn.apache.org/viewvc?rev=610227&view=rev Log: HADOOP-2285. Speeds up TextInputFormat. Also includes updates to the Text API. Contributed by Owen O'Malley
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/io/Text.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestText.java lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=610227&r1=610226&r2=610227&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Tue Jan 8 15:29:55 2008 @@ -66,6 +66,9 @@ IMPROVEMENTS + HADOOP-2285. Speeds up TextInputFormat. Also includes updates to the + Text API. (Owen O'Malley via cdouglas) + HADOOP-2045. Change committer list on website to a table, so that folks can list their organization, timezone, etc. (cutting) Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/Text.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/Text.java?rev=610227&r1=610226&r2=610227&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/Text.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/Text.java Tue Jan 8 15:29:55 2008 @@ -188,23 +188,48 @@ * @param len the number of bytes of the new string */ public void set(byte[] utf8, int start, int len) { - setCapacity(len); + setCapacity(len, false); System.arraycopy(utf8, start, bytes, 0, len); this.length = len; } + /** + * Append a range of bytes to the end of the given text + * @param utf8 the data to copy from + * @param start the first position to append from utf8 + * @param len the number of bytes to append + */ + public void append(byte[] utf8, int start, int len) { + setCapacity(length + len, true); + System.arraycopy(utf8, start, bytes, length, len); + length += len; + } + + /** + * Clear the string to empty. + */ + public void clear() { + length = 0; + } + /* * Sets the capacity of this Text object to <em>at least</em> * <code>len</code> bytes. If the current buffer is longer, * then the capacity and existing content of the buffer are * unchanged. If <code>len</code> is larger * than the current capacity, the Text object's capacity is - * increased to match. The existing contents of the buffer - * (if any) are deleted. + * increased to match. + * @param len the number of bytes we need + * @param keepData should the old data be kept */ - private void setCapacity(int len) { - if (bytes == null || bytes.length < len) - bytes = new byte[len]; + private void setCapacity(int len, boolean keepData) { + if (bytes == null || bytes.length < len) { + byte[] newBytes = new byte[len]; + if (bytes != null && keepData) { + System.arraycopy(bytes, 0, newBytes, 0, length); + } + bytes = newBytes; + } } /** @@ -222,9 +247,10 @@ /** deserialize */ public void readFields(DataInput in) throws IOException { - length = WritableUtils.readVInt(in); - setCapacity(length); - in.readFully(bytes, 0, length); + int newLength = WritableUtils.readVInt(in); + setCapacity(newLength, false); + in.readFully(bytes, 0, newLength); + length = newLength; } /** Skips over one Text in the input. */ Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java?rev=610227&r1=610226&r2=610227&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java Tue Jan 8 15:29:55 2008 @@ -18,8 +18,6 @@ package org.apache.hadoop.mapred; -import java.io.BufferedInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -38,30 +36,126 @@ */ public class LineRecordReader implements RecordReader<LongWritable, Text> { private CompressionCodecFactory compressionCodecs = null; - private long start; + private long start; private long pos; private long end; - private BufferedInputStream in; - private ByteArrayOutputStream buffer = new ByteArrayOutputStream(256); + private LineReader in; + /** - * Provide a bridge to get the bytes from the ByteArrayOutputStream - * without creating a new byte array. + * A class that provides a line reader from an input stream. */ - private static class TextStuffer extends OutputStream { - public Text target; - public void write(int b) { - throw new UnsupportedOperationException("write(byte) not supported"); - } - public void write(byte[] data, int offset, int len) throws IOException { - target.set(data, offset, len); - } + public static class LineReader { + private static final int DEFAULT_BUFFER_SIZE = 64 * 1024; + private int bufferSize = DEFAULT_BUFFER_SIZE; + private InputStream in; + private byte[] buffer; + // the number of bytes of real data in the buffer + private int bufferLength = 0; + // the current position in the buffer + private int bufferPosn = 0; + + /** + * Create a line reader that reads from the given stream using the + * given buffer-size. + * @param in + * @throws IOException + */ + LineReader(InputStream in, int bufferSize) { + this.in = in; + this.bufferSize = bufferSize; + this.buffer = new byte[this.bufferSize]; + } + + /** + * Create a line reader that reads from the given stream using the + * <code>io.file.buffer.size</code> specified in the given + * <code>Configuration</code>. + * @param in input stream + * @param conf configuration + * @throws IOException + */ + LineReader(InputStream in, Configuration conf) throws IOException { + this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE)); + } + + /** + * Fill the buffer with more data. + * @return was there more data? + * @throws IOException + */ + boolean backfill() throws IOException { + bufferPosn = 0; + bufferLength = in.read(buffer); + return bufferLength > 0; + } + + /** + * Close the underlying stream. + * @throws IOException + */ + public void close() throws IOException { + in.close(); + } + + /** + * Read from the InputStream into the given Text. + * @param str the object to store the given line + * @return the number of bytes read including the newline + * @throws IOException if the underlying stream throws + */ + public int readLine(Text str) throws IOException { + str.clear(); + boolean hadFinalNewline = false; + boolean hadFinalReturn = false; + boolean hitEndOfFile = false; + int startPosn = bufferPosn; + outerLoop: while (true) { + if (bufferPosn >= bufferLength) { + if (!backfill()) { + hitEndOfFile = true; + break; + } + } + startPosn = bufferPosn; + for(; bufferPosn < bufferLength; ++bufferPosn) { + switch (buffer[bufferPosn]) { + case '\n': + hadFinalNewline = true; + bufferPosn += 1; + break outerLoop; + case '\r': + if (hadFinalReturn) { + // leave this \n in the stream, so we'll get it next time + break outerLoop; + } + hadFinalReturn = true; + break; + default: + if (hadFinalReturn) { + break outerLoop; + } + } + } + int length = bufferPosn - startPosn - (hadFinalReturn ? 1 : 0); + if (length >= 0) { + str.append(buffer, startPosn, length); + } + } + int newlineLength = (hadFinalNewline ? 1 : 0) + (hadFinalReturn ? 1 : 0); + if (!hitEndOfFile) { + int length = bufferPosn - startPosn - newlineLength; + if (length > 0) { + str.append(buffer, startPosn, length); + } + } + return str.getLength() + newlineLength; + } } - private TextStuffer bridge = new TextStuffer(); - public LineRecordReader(Configuration job, FileSplit split) - throws IOException { - long start = split.getStart(); - long end = start + split.getLength(); + public LineRecordReader(Configuration job, + FileSplit split) throws IOException { + start = split.getStart(); + end = start + split.getLength(); final Path file = split.getPath(); compressionCodecs = new CompressionCodecFactory(job); final CompressionCodec codec = compressionCodecs.getCodec(file); @@ -69,33 +163,38 @@ // open the file and seek to the start of the split FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(split.getPath()); - InputStream in = fileIn; boolean skipFirstLine = false; if (codec != null) { - in = codec.createInputStream(fileIn); + in = new LineReader(codec.createInputStream(fileIn), job); end = Long.MAX_VALUE; - } else if (start != 0) { - skipFirstLine = true; // wait till BufferedInputStream to skip - --start; - fileIn.seek(start); + } else { + if (start != 0) { + skipFirstLine = true; + --start; + fileIn.seek(start); + } + in = new LineReader(fileIn, job); } - - this.in = new BufferedInputStream(in); if (skipFirstLine) { // skip first line and re-establish "start". - start += LineRecordReader.readLine(this.in, null); + start += in.readLine(new Text()); } - this.start = start; this.pos = start; - this.end = end; } - public LineRecordReader(InputStream in, long offset, long endOffset) + public LineRecordReader(InputStream in, long offset, long endOffset) { + this.in = new LineReader(in, LineReader.DEFAULT_BUFFER_SIZE); + this.start = offset; + this.pos = offset; + this.end = endOffset; + } + + public LineRecordReader(InputStream in, long offset, long endOffset, + Configuration job) throws IOException{ - this.in = new BufferedInputStream(in); + this.in = new LineReader(in, job); this.start = offset; this.pos = offset; this.end = endOffset; - // readLine(in, null); } public LongWritable createKey() { @@ -113,21 +212,17 @@ return false; key.set(pos); // key is position - buffer.reset(); - long bytesRead = readLine(); - if (bytesRead == 0) { - return false; + int newSize = in.readLine(value); + if (newSize > 0) { + pos += newSize; + return true; } - pos += bytesRead; - bridge.target = value; - buffer.writeTo(bridge); - return true; - } - - protected long readLine() throws IOException { - return LineRecordReader.readLine(in, buffer); + return false; } + /** + * @deprecated + */ public static long readLine(InputStream in, OutputStream out) throws IOException { long bytes = 0; @@ -177,7 +272,9 @@ return pos; } - public synchronized void close() throws IOException { - in.close(); + public synchronized void close() throws IOException { + if (in != null) { + in.close(); + } } } Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestText.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestText.java?rev=610227&r1=610226&r2=610227&view=diff ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestText.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestText.java Tue Jan 8 15:29:55 2008 @@ -24,12 +24,8 @@ import java.nio.charset.CharacterCodingException; import java.util.Random; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - /** Unit tests for LargeUTF8. */ public class TestText extends TestCase { - private static final Log LOG= LogFactory.getLog("org.apache.hadoop.io.TestText"); private static final int NUM_ITERATIONS = 100; public TestText(String name) { super(name); } @@ -208,6 +204,9 @@ Text b=new Text("a"); b.set(a); assertEquals("abc", b.toString()); + a.append("xdefgxxx".getBytes(), 1, 4); + assertEquals("modified aliased string", "abc", b.toString()); + assertEquals("appended string incorrectly", "abcdefg", a.toString()); } public static void main(String[] args) throws Exception Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java?rev=610227&r1=610226&r2=610227&view=diff ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java Tue Jan 8 15:29:55 2008 @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.*; import org.apache.hadoop.io.*; import org.apache.hadoop.io.compress.*; +import org.apache.hadoop.mapred.LineRecordReader.LineReader; import org.apache.hadoop.util.ReflectionUtils; public class TestTextInputFormat extends TestCase { @@ -126,47 +127,39 @@ } } - private InputStream makeStream(String str) throws IOException { - Text text = new Text(str); - return new ByteArrayInputStream(text.getBytes(), 0, text.getLength()); + private static LineReader makeStream(String str) throws IOException { + return new LineRecordReader.LineReader(new ByteArrayInputStream + (str.getBytes("UTF-8")), + defaultConf); } public void testUTF8() throws Exception { - InputStream in = makeStream("abcd\u20acbdcd\u20ac"); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - LineRecordReader.readLine(in, out); + LineReader in = makeStream("abcd\u20acbdcd\u20ac"); Text line = new Text(); - line.set(out.toByteArray()); + in.readLine(line); assertEquals("readLine changed utf8 characters", "abcd\u20acbdcd\u20ac", line.toString()); in = makeStream("abc\u200axyz"); - out.reset(); - LineRecordReader.readLine(in, out); - line.set(out.toByteArray()); + in.readLine(line); assertEquals("split on fake newline", "abc\u200axyz", line.toString()); } public void testNewLines() throws Exception { - InputStream in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee"); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - LineRecordReader.readLine(in, out); - assertEquals("line1 length", 1, out.size()); - out.reset(); - LineRecordReader.readLine(in, out); - assertEquals("line2 length", 2, out.size()); - out.reset(); - LineRecordReader.readLine(in, out); - assertEquals("line3 length", 0, out.size()); - out.reset(); - LineRecordReader.readLine(in, out); - assertEquals("line4 length", 3, out.size()); - out.reset(); - LineRecordReader.readLine(in, out); - assertEquals("line5 length", 4, out.size()); - out.reset(); - LineRecordReader.readLine(in, out); - assertEquals("line5 length", 5, out.size()); - assertEquals("end of file", 0, LineRecordReader.readLine(in, out)); + LineReader in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee"); + Text out = new Text(); + in.readLine(out); + assertEquals("line1 length", 1, out.getLength()); + in.readLine(out); + assertEquals("line2 length", 2, out.getLength()); + in.readLine(out); + assertEquals("line3 length", 0, out.getLength()); + in.readLine(out); + assertEquals("line4 length", 3, out.getLength()); + in.readLine(out); + assertEquals("line5 length", 4, out.getLength()); + in.readLine(out); + assertEquals("line5 length", 5, out.getLength()); + assertEquals("end of file", 0, in.readLine(out)); } private static void writeFile(FileSystem fs, Path name, @@ -252,7 +245,46 @@ assertEquals("Compressed empty file length == 0", 0, results.size()); } + private static String unquote(String in) { + StringBuffer result = new StringBuffer(); + for(int i=0; i < in.length(); ++i) { + char ch = in.charAt(i); + if (ch == '\\') { + ch = in.charAt(++i); + switch (ch) { + case 'n': + result.append('\n'); + break; + case 'r': + result.append('\r'); + break; + default: + result.append(ch); + break; + } + } else { + result.append(ch); + } + } + return result.toString(); + } + + /** + * Parse the command line arguments into lines and display the result. + * @param args + * @throws Exception + */ public static void main(String[] args) throws Exception { - new TestTextInputFormat().testFormat(); + for(String arg: args) { + System.out.println("Working on " + arg); + LineReader reader = makeStream(unquote(arg)); + Text line = new Text(); + int size = reader.readLine(line); + while (size > 0) { + System.out.println("Got: " + line.toString()); + size = reader.readLine(line); + } + reader.close(); + } } }