Author: cdouglas
Date: Tue Jan 20 15:15:31 2009
New Revision: 736162
URL: http://svn.apache.org/viewvc?rev=736162&view=rev
Log:
HADOOP-4010. Change semantics for LineRecordReader to read an additional
line per split- rather than moving back one character in the stream- to
work with splittable compression codecs. Contributed by Abdul Qadeer.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/NLineInputFormat.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=736162&r1=736161&r2=736162&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Jan 20 15:15:31 2009
@@ -16,6 +16,10 @@
HADOOP-4940. Remove a deprecated method FileSystem.delete(Path f). (Enis
Soztutar via szetszwo)
+ HADOOP-4010. Change semantics for LineRecordReader to read an additional
+ line per split- rather than moving back one character in the stream- to
+ work with splittable compression codecs. (Abdul Qadeer via cdouglas)
+
NEW FEATURES
HADOOP-4268. Change fsck to use ClientProtocol methods so that the
Modified:
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java?rev=736162&r1=736161&r2=736162&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java
(original)
+++
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java
Tue Jan 20 15:15:31 2009
@@ -97,16 +97,16 @@
file.writeBytes(mapString2 + "\n");
file.close();
file = fileSys.create(new Path(CACHE_FILE));
- file.writeBytes(cacheString);
+ file.writeBytes(cacheString + "\n");
file.close();
file = fileSys.create(new Path(CACHE_FILE_2));
- file.writeBytes(cacheString2);
+ file.writeBytes(cacheString2 + "\n");
file.close();
job = new StreamJob(argv, mayExit);
job.go();
- fileSys = dfs.getFileSystem();
+ fileSys = dfs.getFileSystem();
String line = null;
String line2 = null;
Path[] fileList = FileUtil.stat2Paths(fileSys.listStatus(
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java?rev=736162&r1=736161&r2=736162&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java
Tue Jan 20 15:15:31 2009
@@ -79,21 +79,19 @@
// open the file and seek to the start of the split
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
- boolean skipFirstLine = false;
if (codec != null) {
in = new LineReader(codec.createInputStream(fileIn), job);
end = Long.MAX_VALUE;
} else {
- if (start != 0) {
- skipFirstLine = true;
- --start;
- fileIn.seek(start);
- }
+ fileIn.seek(start);
in = new LineReader(fileIn, job);
}
- if (skipFirstLine) { // skip first line and re-establish "start".
- start += in.readLine(new Text(), 0,
- (int)Math.min((long)Integer.MAX_VALUE, end -
start));
+ // If this is not the first split, we always throw away first record
+ // because we always (except the last split) read one extra line in
+ // next() method.
+ if (start != 0) {
+ start += in.readLine(new Text(), 0, (int) Math.min(
+ (long) Integer.MAX_VALUE, end - start));
}
this.pos = start;
}
@@ -130,7 +128,9 @@
public synchronized boolean next(LongWritable key, Text value)
throws IOException {
- while (pos < end) {
+ // We always read one extra line, which lies outside the upper
+ // split limit i.e. (end - 1)
+ while (pos <= end) {
key.set(pos);
int newSize = in.readLine(value, maxLineLength,
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/NLineInputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/NLineInputFormat.java?rev=736162&r1=736161&r2=736162&view=diff
==============================================================================
---
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/NLineInputFormat.java
(original)
+++
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/NLineInputFormat.java
Tue Jan 20 15:15:31 2009
@@ -97,7 +97,15 @@
numLines++;
length += num;
if (numLines == N) {
- splits.add(new FileSplit(fileName, begin, length, new String[]{}));
+ // NLineInputFormat uses LineRecordReader, which always reads (and
consumes)
+ //at least one character out of its upper split boundary. So to
make sure that
+ //each mapper gets N lines, we move back the upper split limits of
each split
+ //by one character here.
+ if (begin == 0) {
+ splits.add(new FileSplit(fileName, begin, length - 1, new
String[] {}));
+ } else {
+ splits.add(new FileSplit(fileName, begin - 1, length, new
String[] {}));
+ }
begin += length;
length = 0;
numLines = 0;