Author: cdouglas
Date: Tue Jan  8 15:29:55 2008
New Revision: 610227

URL: http://svn.apache.org/viewvc?rev=610227&view=rev
Log:
HADOOP-2285. Speeds up TextInputFormat. Also includes updates to the
Text API. Contributed by Owen O'Malley



Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/Text.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestText.java
    
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=610227&r1=610226&r2=610227&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Jan  8 15:29:55 2008
@@ -66,6 +66,9 @@
 
   IMPROVEMENTS
 
+    HADOOP-2285. Speeds up TextInputFormat. Also includes updates to the
+    Text API. (Owen O'Malley via cdouglas)
+
     HADOOP-2045.  Change committer list on website to a table, so that
     folks can list their organization, timezone, etc.  (cutting)
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/Text.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/Text.java?rev=610227&r1=610226&r2=610227&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/Text.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/Text.java Tue Jan  8 
15:29:55 2008
@@ -188,23 +188,48 @@
    * @param len the number of bytes of the new string
    */
   public void set(byte[] utf8, int start, int len) {
-    setCapacity(len);
+    setCapacity(len, false);
     System.arraycopy(utf8, start, bytes, 0, len);
     this.length = len;
   }
 
+  /**
+   * Append a range of bytes to the end of the given text
+   * @param utf8 the data to copy from
+   * @param start the first position to append from utf8
+   * @param len the number of bytes to append
+   */
+  public void append(byte[] utf8, int start, int len) {
+    setCapacity(length + len, true);
+    System.arraycopy(utf8, start, bytes, length, len);
+    length += len;
+  }
+
+  /**
+   * Clear the string to empty.
+   */
+  public void clear() {
+    length = 0;
+  }
+
   /*
    * Sets the capacity of this Text object to <em>at least</em>
    * <code>len</code> bytes. If the current buffer is longer,
    * then the capacity and existing content of the buffer are
    * unchanged. If <code>len</code> is larger
    * than the current capacity, the Text object's capacity is
-   * increased to match. The existing contents of the buffer
-   * (if any) are deleted.
+   * increased to match.
+   * @param len the number of bytes we need
+   * @param keepData should the old data be kept
    */
-  private void setCapacity(int len) {
-    if (bytes == null || bytes.length < len)
-      bytes = new byte[len];      
+  private void setCapacity(int len, boolean keepData) {
+    if (bytes == null || bytes.length < len) {
+      byte[] newBytes = new byte[len];
+      if (bytes != null && keepData) {
+        System.arraycopy(bytes, 0, newBytes, 0, length);
+      }
+      bytes = newBytes;
+    }
   }
    
   /** 
@@ -222,9 +247,10 @@
   /** deserialize 
    */
   public void readFields(DataInput in) throws IOException {
-    length = WritableUtils.readVInt(in);
-    setCapacity(length);
-    in.readFully(bytes, 0, length);
+    int newLength = WritableUtils.readVInt(in);
+    setCapacity(newLength, false);
+    in.readFully(bytes, 0, newLength);
+    length = newLength;
   }
 
   /** Skips over one Text in the input. */

Modified: 
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java?rev=610227&r1=610226&r2=610227&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java 
(original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java 
Tue Jan  8 15:29:55 2008
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.BufferedInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -38,30 +36,126 @@
  */
 public class LineRecordReader implements RecordReader<LongWritable, Text> {
   private CompressionCodecFactory compressionCodecs = null;
-  private long start; 
+  private long start;
   private long pos;
   private long end;
-  private BufferedInputStream in;
-  private ByteArrayOutputStream buffer = new ByteArrayOutputStream(256);
+  private LineReader in;
+
   /**
-   * Provide a bridge to get the bytes from the ByteArrayOutputStream
-   * without creating a new byte array.
+   * A class that provides a line reader from an input stream.
    */
-  private static class TextStuffer extends OutputStream {
-    public Text target;
-    public void write(int b) {
-      throw new UnsupportedOperationException("write(byte) not supported");
-    }
-    public void write(byte[] data, int offset, int len) throws IOException {
-      target.set(data, offset, len);
-    }      
+  public static class LineReader {
+    private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+    private int bufferSize = DEFAULT_BUFFER_SIZE;
+    private InputStream in;
+    private byte[] buffer;
+    // the number of bytes of real data in the buffer
+    private int bufferLength = 0;
+    // the current position in the buffer
+    private int bufferPosn = 0;
+
+    /**
+     * Create a line reader that reads from the given stream using the 
+     * given buffer-size.
+     * @param in
+     * @throws IOException
+     */
+    LineReader(InputStream in, int bufferSize) {
+      this.in = in;
+      this.bufferSize = bufferSize;
+      this.buffer = new byte[this.bufferSize];
+    }
+
+    /**
+     * Create a line reader that reads from the given stream using the
+     * <code>io.file.buffer.size</code> specified in the given
+     * <code>Configuration</code>.
+     * @param in input stream
+     * @param conf configuration
+     * @throws IOException
+     */
+    LineReader(InputStream in, Configuration conf) throws IOException {
+      this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
+    }
+
+    /**
+     * Fill the buffer with more data.
+     * @return was there more data?
+     * @throws IOException
+     */
+    boolean backfill() throws IOException {
+      bufferPosn = 0;
+      bufferLength = in.read(buffer);
+      return bufferLength > 0;
+    }
+    
+    /**
+     * Close the underlying stream.
+     * @throws IOException
+     */
+    public void close() throws IOException {
+      in.close();
+    }
+    
+    /**
+     * Read from the InputStream into the given Text.
+     * @param str the object to store the given line
+     * @return the number of bytes read including the newline
+     * @throws IOException if the underlying stream throws
+     */
+    public int readLine(Text str) throws IOException {
+      str.clear();
+      boolean hadFinalNewline = false;
+      boolean hadFinalReturn = false;
+      boolean hitEndOfFile = false;
+      int startPosn = bufferPosn;
+      outerLoop: while (true) {
+        if (bufferPosn >= bufferLength) {
+          if (!backfill()) {
+            hitEndOfFile = true;
+            break;
+          }
+        }
+        startPosn = bufferPosn;
+        for(; bufferPosn < bufferLength; ++bufferPosn) {
+          switch (buffer[bufferPosn]) {
+          case '\n':
+            hadFinalNewline = true;
+            bufferPosn += 1;
+            break outerLoop;
+          case '\r':
+            if (hadFinalReturn) {
+              // leave this \n in the stream, so we'll get it next time
+              break outerLoop;
+            }
+            hadFinalReturn = true;
+            break;
+          default:
+            if (hadFinalReturn) {
+              break outerLoop;
+            }
+          }        
+        }
+        int length = bufferPosn - startPosn - (hadFinalReturn ? 1 : 0);
+        if (length >= 0) {
+          str.append(buffer, startPosn, length);
+        }
+      }
+      int newlineLength = (hadFinalNewline ? 1 : 0) + (hadFinalReturn ? 1 : 0);
+      if (!hitEndOfFile) {
+        int length = bufferPosn - startPosn - newlineLength;
+        if (length > 0) {
+          str.append(buffer, startPosn, length);
+        }
+      }
+      return str.getLength() + newlineLength;
+    }
   }
-  private TextStuffer bridge = new TextStuffer();
 
-  public LineRecordReader(Configuration job, FileSplit split)
-    throws IOException {
-    long start = split.getStart();
-    long end = start + split.getLength();
+  public LineRecordReader(Configuration job, 
+                          FileSplit split) throws IOException {
+    start = split.getStart();
+    end = start + split.getLength();
     final Path file = split.getPath();
     compressionCodecs = new CompressionCodecFactory(job);
     final CompressionCodec codec = compressionCodecs.getCodec(file);
@@ -69,33 +163,38 @@
     // open the file and seek to the start of the split
     FileSystem fs = file.getFileSystem(job);
     FSDataInputStream fileIn = fs.open(split.getPath());
-    InputStream in = fileIn;
     boolean skipFirstLine = false;
     if (codec != null) {
-      in = codec.createInputStream(fileIn);
+      in = new LineReader(codec.createInputStream(fileIn), job);
       end = Long.MAX_VALUE;
-    } else if (start != 0) {
-      skipFirstLine = true;  // wait till BufferedInputStream to skip
-      --start;
-      fileIn.seek(start);
+    } else {
+      if (start != 0) {
+        skipFirstLine = true;
+        --start;
+        fileIn.seek(start);
+      }
+      in = new LineReader(fileIn, job);
     }
-
-    this.in = new BufferedInputStream(in);
     if (skipFirstLine) {  // skip first line and re-establish "start".
-      start += LineRecordReader.readLine(this.in, null);
+      start += in.readLine(new Text());
     }
-    this.start = start;
     this.pos = start;
-    this.end = end;
   }
   
-  public LineRecordReader(InputStream in, long offset, long endOffset) 
+  public LineRecordReader(InputStream in, long offset, long endOffset) {
+    this.in = new LineReader(in, LineReader.DEFAULT_BUFFER_SIZE);
+    this.start = offset;
+    this.pos = offset;
+    this.end = endOffset;    
+  }
+
+  public LineRecordReader(InputStream in, long offset, long endOffset, 
+                          Configuration job) 
     throws IOException{
-    this.in = new BufferedInputStream(in);
+    this.in = new LineReader(in, job);
     this.start = offset;
     this.pos = offset;
     this.end = endOffset;    
-    //    readLine(in, null); 
   }
   
   public LongWritable createKey() {
@@ -113,21 +212,17 @@
       return false;
 
     key.set(pos);           // key is position
-    buffer.reset();
-    long bytesRead = readLine();
-    if (bytesRead == 0) {
-      return false;
+    int newSize = in.readLine(value);
+    if (newSize > 0) {
+      pos += newSize;
+      return true;
     }
-    pos += bytesRead;
-    bridge.target = value;
-    buffer.writeTo(bridge);
-    return true;
-  }
-  
-  protected long readLine() throws IOException {
-    return LineRecordReader.readLine(in, buffer);
+    return false;
   }
 
+  /**
+   * @deprecated
+   */
   public static long readLine(InputStream in, 
                               OutputStream out) throws IOException {
     long bytes = 0;
@@ -177,7 +272,9 @@
     return pos;
   }
 
-  public synchronized void close() throws IOException { 
-    in.close(); 
+  public synchronized void close() throws IOException {
+    if (in != null) {
+      in.close(); 
+    }
   }
 }

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestText.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestText.java?rev=610227&r1=610226&r2=610227&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestText.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/io/TestText.java Tue Jan  8 
15:29:55 2008
@@ -24,12 +24,8 @@
 import java.nio.charset.CharacterCodingException;
 import java.util.Random;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 /** Unit tests for LargeUTF8. */
 public class TestText extends TestCase {
-  private static final Log LOG= 
LogFactory.getLog("org.apache.hadoop.io.TestText");
   private static final int NUM_ITERATIONS = 100;
   public TestText(String name) { super(name); }
 
@@ -208,6 +204,9 @@
     Text b=new Text("a");
     b.set(a);
     assertEquals("abc", b.toString());
+    a.append("xdefgxxx".getBytes(), 1, 4);
+    assertEquals("modified aliased string", "abc", b.toString());
+    assertEquals("appended string incorrectly", "abcdefg", a.toString());
   }
 
   public static void main(String[] args)  throws Exception

Modified: 
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java?rev=610227&r1=610226&r2=610227&view=diff
==============================================================================
--- 
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java 
(original)
+++ 
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java 
Tue Jan  8 15:29:55 2008
@@ -26,6 +26,7 @@
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.io.compress.*;
+import org.apache.hadoop.mapred.LineRecordReader.LineReader;
 import org.apache.hadoop.util.ReflectionUtils;
 
 public class TestTextInputFormat extends TestCase {
@@ -126,47 +127,39 @@
     }
   }
 
-  private InputStream makeStream(String str) throws IOException {
-    Text text = new Text(str);
-    return new ByteArrayInputStream(text.getBytes(), 0, text.getLength());
+  private static LineReader makeStream(String str) throws IOException {
+    return new LineRecordReader.LineReader(new ByteArrayInputStream
+                                             (str.getBytes("UTF-8")), 
+                                           defaultConf);
   }
   
   public void testUTF8() throws Exception {
-    InputStream in = makeStream("abcd\u20acbdcd\u20ac");
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    LineRecordReader.readLine(in, out);
+    LineReader in = makeStream("abcd\u20acbdcd\u20ac");
     Text line = new Text();
-    line.set(out.toByteArray());
+    in.readLine(line);
     assertEquals("readLine changed utf8 characters", 
                  "abcd\u20acbdcd\u20ac", line.toString());
     in = makeStream("abc\u200axyz");
-    out.reset();
-    LineRecordReader.readLine(in, out);
-    line.set(out.toByteArray());
+    in.readLine(line);
     assertEquals("split on fake newline", "abc\u200axyz", line.toString());
   }
 
   public void testNewLines() throws Exception {
-    InputStream in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee");
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    LineRecordReader.readLine(in, out);
-    assertEquals("line1 length", 1, out.size());
-    out.reset();
-    LineRecordReader.readLine(in, out);
-    assertEquals("line2 length", 2, out.size());
-    out.reset();
-    LineRecordReader.readLine(in, out);
-    assertEquals("line3 length", 0, out.size());
-    out.reset();
-    LineRecordReader.readLine(in, out);
-    assertEquals("line4 length", 3, out.size());
-    out.reset();
-    LineRecordReader.readLine(in, out);
-    assertEquals("line5 length", 4, out.size());
-    out.reset();
-    LineRecordReader.readLine(in, out);
-    assertEquals("line5 length", 5, out.size());
-    assertEquals("end of file", 0, LineRecordReader.readLine(in, out));
+    LineReader in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee");
+    Text out = new Text();
+    in.readLine(out);
+    assertEquals("line1 length", 1, out.getLength());
+    in.readLine(out);
+    assertEquals("line2 length", 2, out.getLength());
+    in.readLine(out);
+    assertEquals("line3 length", 0, out.getLength());
+    in.readLine(out);
+    assertEquals("line4 length", 3, out.getLength());
+    in.readLine(out);
+    assertEquals("line5 length", 4, out.getLength());
+    in.readLine(out);
+    assertEquals("line5 length", 5, out.getLength());
+    assertEquals("end of file", 0, in.readLine(out));
   }
   
   private static void writeFile(FileSystem fs, Path name, 
@@ -252,7 +245,46 @@
     assertEquals("Compressed empty file length == 0", 0, results.size());
   }
   
+  private static String unquote(String in) {
+    StringBuffer result = new StringBuffer();
+    for(int i=0; i < in.length(); ++i) {
+      char ch = in.charAt(i);
+      if (ch == '\\') {
+        ch = in.charAt(++i);
+        switch (ch) {
+        case 'n':
+          result.append('\n');
+          break;
+        case 'r':
+          result.append('\r');
+          break;
+        default:
+          result.append(ch);
+          break;
+        }
+      } else {
+        result.append(ch);
+      }
+    }
+    return result.toString();
+  }
+
+  /**
+   * Parse the command line arguments into lines and display the result.
+   * @param args
+   * @throws Exception
+   */
   public static void main(String[] args) throws Exception {
-    new TestTextInputFormat().testFormat();
+    for(String arg: args) {
+      System.out.println("Working on " + arg);
+      LineReader reader = makeStream(unquote(arg));
+      Text line = new Text();
+      int size = reader.readLine(line);
+      while (size > 0) {
+        System.out.println("Got: " + line.toString());
+        size = reader.readLine(line);
+      }
+      reader.close();
+    }
   }
 }


Reply via email to