Author: cdouglas
Date: Tue Jan 20 15:15:31 2009
New Revision: 736162

URL: http://svn.apache.org/viewvc?rev=736162&view=rev
Log:
HADOOP-4010. Change semantics for LineRecordReader to read an additional
line per split- rather than moving back one character in the stream- to
work with splittable compression codecs. Contributed by Abdul Qadeer.

Modified:
    hadoop/core/trunk/CHANGES.txt
    
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java
    
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/NLineInputFormat.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=736162&r1=736161&r2=736162&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Jan 20 15:15:31 2009
@@ -16,6 +16,10 @@
     HADOOP-4940. Remove a deprecated method FileSystem.delete(Path f).  (Enis
     Soztutar via szetszwo)
 
+    HADOOP-4010. Change semantics for LineRecordReader to read an additional
+    line per split- rather than moving back one character in the stream- to
+    work with splittable compression codecs. (Abdul Qadeer via cdouglas)
+
   NEW FEATURES
 
     HADOOP-4268. Change fsck to use ClientProtocol methods so that the

Modified: 
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java?rev=736162&r1=736161&r2=736162&view=diff
==============================================================================
--- 
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java
 (original)
+++ 
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java
 Tue Jan 20 15:15:31 2009
@@ -97,16 +97,16 @@
         file.writeBytes(mapString2 + "\n");
         file.close();
         file = fileSys.create(new Path(CACHE_FILE));
-        file.writeBytes(cacheString);
+        file.writeBytes(cacheString + "\n");
         file.close();
         file = fileSys.create(new Path(CACHE_FILE_2));
-        file.writeBytes(cacheString2);
+        file.writeBytes(cacheString2 + "\n");
         file.close();
           
         job = new StreamJob(argv, mayExit);     
         job.go();
 
-       fileSys = dfs.getFileSystem();
+        fileSys = dfs.getFileSystem();
         String line = null;
         String line2 = null;
         Path[] fileList = FileUtil.stat2Paths(fileSys.listStatus(

Modified: 
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java?rev=736162&r1=736161&r2=736162&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java 
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java 
Tue Jan 20 15:15:31 2009
@@ -79,21 +79,19 @@
     // open the file and seek to the start of the split
     FileSystem fs = file.getFileSystem(job);
     FSDataInputStream fileIn = fs.open(split.getPath());
-    boolean skipFirstLine = false;
     if (codec != null) {
       in = new LineReader(codec.createInputStream(fileIn), job);
       end = Long.MAX_VALUE;
     } else {
-      if (start != 0) {
-        skipFirstLine = true;
-        --start;
-        fileIn.seek(start);
-      }
+      fileIn.seek(start);
       in = new LineReader(fileIn, job);
     }
-    if (skipFirstLine) {  // skip first line and re-establish "start".
-      start += in.readLine(new Text(), 0,
-                           (int)Math.min((long)Integer.MAX_VALUE, end - 
start));
+    // If this is not the first split, we always throw away first record
+    // because we always (except the last split) read one extra line in
+    // next() method.
+    if (start != 0) {
+      start += in.readLine(new Text(), 0, (int) Math.min(
+          (long) Integer.MAX_VALUE, end - start));
     }
     this.pos = start;
   }
@@ -130,7 +128,9 @@
   public synchronized boolean next(LongWritable key, Text value)
     throws IOException {
 
-    while (pos < end) {
+    // We always read one extra line, which lies outside the upper
+    // split limit i.e. (end - 1)
+    while (pos <= end) {
       key.set(pos);
 
       int newSize = in.readLine(value, maxLineLength,

Modified: 
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/NLineInputFormat.java
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/NLineInputFormat.java?rev=736162&r1=736161&r2=736162&view=diff
==============================================================================
--- 
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/NLineInputFormat.java 
(original)
+++ 
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/NLineInputFormat.java 
Tue Jan 20 15:15:31 2009
@@ -97,7 +97,15 @@
           numLines++;
           length += num;
           if (numLines == N) {
-            splits.add(new FileSplit(fileName, begin, length, new String[]{}));
+            // NLineInputFormat uses LineRecordReader, which always reads (and 
consumes) 
+            //at least one character out of its upper split boundary. So to 
make sure that
+            //each mapper gets N lines, we move back the upper split limits of 
each split 
+            //by one character here.
+            if (begin == 0) {
+              splits.add(new FileSplit(fileName, begin, length - 1, new 
String[] {}));
+            } else {
+              splits.add(new FileSplit(fileName, begin - 1, length, new 
String[] {}));
+            }
             begin += length;
             length = 0;
             numLines = 0;


Reply via email to