Author: cdouglas
Date: Tue May 6 13:21:56 2008
New Revision: 653906
URL: http://svn.apache.org/viewvc?rev=653906&view=rev
Log:
HADOOP-3144. Improve robustness of LineRecordReader by defining a maximum
line length (mapred.linerecordreader.maxlength), thereby avoiding reading
too far into the following split. Contributed by Zheng Shao.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=653906&r1=653905&r2=653906&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue May 6 13:21:56 2008
@@ -85,6 +85,10 @@
HADOOP-3345. Enhance the hudson-test-patch target to cleanup messages,
fix minor defects, and add eclipse plugin and python unit tests. (nigel)
+ HADOOP-3144. Improve robustness of LineRecordReader by defining a maximum
+ line length (mapred.linerecordreader.maxlength), thereby avoiding reading
+ too far into the following split. (Zheng Shao via cdouglas)
+
OPTIMIZATIONS
HADOOP-3274. The default constructor of BytesWritable creates empty
Modified:
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java?rev=653906&r1=653905&r2=653906&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java
Tue May 6 13:21:56 2008
@@ -30,16 +30,22 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
/**
* Treats keys as offset in file and value as line.
*/
public class LineRecordReader implements RecordReader<LongWritable, Text> {
+ private static final Log LOG
+ = LogFactory.getLog(LineRecordReader.class.getName());
+
private CompressionCodecFactory compressionCodecs = null;
private long start;
private long pos;
private long end;
private LineReader in;
+ int maxLineLength;
/**
* A class that provides a line reader from an input stream.
@@ -100,15 +106,19 @@
/**
* Read from the InputStream into the given Text.
* @param str the object to store the given line
+ * @param maxLineLength the maximum number of bytes to store into str.
+ * @param maxBytesToConsume the maximum number of bytes to consume in this
call.
* @return the number of bytes read including the newline
* @throws IOException if the underlying stream throws
*/
- public int readLine(Text str) throws IOException {
+ public int readLine(Text str, int maxLineLength,
+ int maxBytesToConsume) throws IOException {
str.clear();
boolean hadFinalNewline = false;
boolean hadFinalReturn = false;
boolean hitEndOfFile = false;
int startPosn = bufferPosn;
+ long bytesConsumed = 0;
outerLoop: while (true) {
if (bufferPosn >= bufferLength) {
if (!backfill()) {
@@ -125,7 +135,7 @@
break outerLoop;
case '\r':
if (hadFinalReturn) {
- // leave this \n in the stream, so we'll get it next time
+ // leave this \r in the stream, so we'll get it next time
break outerLoop;
}
hadFinalReturn = true;
@@ -136,24 +146,55 @@
}
}
}
+ bytesConsumed += bufferPosn - startPosn;
int length = bufferPosn - startPosn - (hadFinalReturn ? 1 : 0);
+ length = (int)Math.min(length, maxLineLength - str.getLength());
if (length >= 0) {
str.append(buffer, startPosn, length);
}
+ if (bytesConsumed >= maxBytesToConsume) {
+ return (int)Math.min(bytesConsumed, (long)Integer.MAX_VALUE);
+ }
}
int newlineLength = (hadFinalNewline ? 1 : 0) + (hadFinalReturn ? 1 : 0);
if (!hitEndOfFile) {
+ bytesConsumed += bufferPosn - startPosn;
int length = bufferPosn - startPosn - newlineLength;
+ length = (int)Math.min(length, maxLineLength - str.getLength());
if (length > 0) {
str.append(buffer, startPosn, length);
}
}
- return str.getLength() + newlineLength;
+ return (int)Math.min(bytesConsumed, (long)Integer.MAX_VALUE);
+ }
+
+ /**
+ * Read from the InputStream into the given Text.
+ * @param str the object to store the given line
+ * @param maxLineLength the maximum number of bytes to store into str.
+ * @return the number of bytes read including the newline
+ * @throws IOException if the underlying stream throws
+ */
+ public int readLine(Text str, int maxLineLength) throws IOException {
+ return readLine(str, maxLineLength, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Read from the InputStream into the given Text.
+ * @param str the object to store the given line
+ * @return the number of bytes read including the newline
+ * @throws IOException if the underlying stream throws
+ */
+ public int readLine(Text str) throws IOException {
+ return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
}
+
}
public LineRecordReader(Configuration job,
FileSplit split) throws IOException {
+ this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
+ Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
@@ -176,12 +217,15 @@
in = new LineReader(fileIn, job);
}
if (skipFirstLine) { // skip first line and re-establish "start".
- start += in.readLine(new Text());
+ start += in.readLine(new Text(), 0,
+ (int)Math.min((long)Integer.MAX_VALUE, end -
start));
}
this.pos = start;
}
- public LineRecordReader(InputStream in, long offset, long endOffset) {
+ public LineRecordReader(InputStream in, long offset, long endOffset,
+ int maxLineLength) {
+ this.maxLineLength = maxLineLength;
this.in = new LineReader(in, LineReader.DEFAULT_BUFFER_SIZE);
this.start = offset;
this.pos = offset;
@@ -191,6 +235,8 @@
public LineRecordReader(InputStream in, long offset, long endOffset,
Configuration job)
throws IOException{
+ this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
+ Integer.MAX_VALUE);
this.in = new LineReader(in, job);
this.start = offset;
this.pos = offset;
@@ -208,15 +254,25 @@
/** Read a line. */
public synchronized boolean next(LongWritable key, Text value)
throws IOException {
- if (pos >= end)
- return false;
- key.set(pos); // key is position
- int newSize = in.readLine(value);
- if (newSize > 0) {
+ while (pos < end) {
+ key.set(pos);
+
+ int newSize = in.readLine(value, maxLineLength,
+ Math.max((int)Math.min(Integer.MAX_VALUE,
end-pos),
+ maxLineLength));
+ if (newSize == 0) {
+ return false;
+ }
pos += newSize;
- return true;
+ if (newSize < maxLineLength) {
+ return true;
+ }
+
+ // line too long. try again
+ LOG.info("Skipped line of size " + newSize + " at pos " + (pos -
newSize));
}
+
return false;
}
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java?rev=653906&r1=653905&r2=653906&view=diff
==============================================================================
---
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java
(original)
+++
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextInputFormat.java
Tue May 6 13:21:56 2008
@@ -162,6 +162,24 @@
assertEquals("end of file", 0, in.readLine(out));
}
+ public void testMaxLineLength() throws Exception {
+ LineReader in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee");
+ Text out = new Text();
+ in.readLine(out, 1);
+ assertEquals("line1 length", 1, out.getLength());
+ in.readLine(out, 1);
+ assertEquals("line2 length", 1, out.getLength());
+ in.readLine(out, 1);
+ assertEquals("line3 length", 0, out.getLength());
+ in.readLine(out, 3);
+ assertEquals("line4 length", 3, out.getLength());
+ in.readLine(out, 10);
+ assertEquals("line5 length", 4, out.getLength());
+ in.readLine(out, 8);
+ assertEquals("line5 length", 5, out.getLength());
+ assertEquals("end of file", 0, in.readLine(out));
+ }
+
private static void writeFile(FileSystem fs, Path name,
CompressionCodec codec,
String contents) throws IOException {