Author: cdouglas
Date: Wed Apr 15 06:13:59 2009
New Revision: 765062
URL: http://svn.apache.org/viewvc?rev=765062&view=rev
Log:
HADOOP-5494. Modify sorted map output merger to lazily read values,
rather than buffering at least one record for each segment. Contributed by
Devaraj Das.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=765062&r1=765061&r2=765062&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Apr 15 06:13:59 2009
@@ -229,6 +229,10 @@
HADOOP-5509. PendingReplicationBlocks does not start monitor in the
constructor. (shv)
+ HADOOP-5494. Modify sorted map output merger to lazily read values,
+ rather than buffering at least one record for each segment. (Devaraj Das
+ via cdouglas)
+
OPTIMIZATIONS
HADOOP-5595. NameNode does not need to run a replicator to choose a
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java?rev=765062&r1=765061&r2=765062&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java Wed Apr 15
06:13:59 2009
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.mapred;
+import java.io.DataInput;
+import java.io.DataInputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileOutputStream;
@@ -250,12 +252,16 @@
final long fileLength;
boolean eof = false;
final IFileInputStream checksumIn;
+ DataInputStream dataIn;
byte[] buffer = null;
int bufferSize = DEFAULT_BUFFER_SIZE;
- DataInputBuffer dataIn = new DataInputBuffer();
int recNo = 1;
+ int currentKeyLength;
+ int currentValueLength;
+ byte keyBytes[] = new byte[0];
+
/**
* Construct an IFile Reader.
@@ -298,6 +304,7 @@
} else {
this.in = checksumIn;
}
+ this.dataIn = new DataInputStream(this.in);
this.fileLength = length;
if (conf != null) {
@@ -334,104 +341,70 @@
return len;
}
- void readNextBlock(int minSize) throws IOException {
- if (buffer == null) {
- buffer = new byte[bufferSize];
- dataIn.reset(buffer, 0, 0);
- }
- buffer =
- rejigData(buffer,
- (bufferSize < minSize) ? new byte[minSize << 1] : buffer);
- bufferSize = buffer.length;
- }
-
- private byte[] rejigData(byte[] source, byte[] destination)
- throws IOException{
- // Copy remaining data into the destination array
- int bytesRemaining = dataIn.getLength()-dataIn.getPosition();
- if (bytesRemaining > 0) {
- System.arraycopy(source, dataIn.getPosition(),
- destination, 0, bytesRemaining);
- }
-
- // Read as much data as will fit from the underlying stream
- int n = readData(destination, bytesRemaining,
- (destination.length - bytesRemaining));
- dataIn.reset(destination, 0, (bytesRemaining + n));
-
- return destination;
- }
-
- public boolean next(DataInputBuffer key, DataInputBuffer value)
- throws IOException {
+ protected boolean positionToNextRecord(DataInput dIn) throws IOException {
// Sanity check
if (eof) {
throw new EOFException("Completed reading " + bytesRead);
}
- // Check if we have enough data to read lengths
- if ((dataIn.getLength() - dataIn.getPosition()) < 2*MAX_VINT_SIZE) {
- readNextBlock(2*MAX_VINT_SIZE);
- }
-
// Read key and value lengths
- int oldPos = dataIn.getPosition();
- int keyLength = WritableUtils.readVInt(dataIn);
- int valueLength = WritableUtils.readVInt(dataIn);
- int pos = dataIn.getPosition();
- bytesRead += pos - oldPos;
+ currentKeyLength = WritableUtils.readVInt(dIn);
+ currentValueLength = WritableUtils.readVInt(dIn);
+ bytesRead += WritableUtils.getVIntSize(currentKeyLength) +
+ WritableUtils.getVIntSize(currentValueLength);
// Check for EOF
- if (keyLength == EOF_MARKER && valueLength == EOF_MARKER) {
+ if (currentKeyLength == EOF_MARKER && currentValueLength == EOF_MARKER) {
eof = true;
return false;
}
// Sanity check
- if (keyLength < 0) {
+ if (currentKeyLength < 0) {
throw new IOException("Rec# " + recNo + ": Negative key-length: " +
- keyLength);
+ currentKeyLength);
}
- if (valueLength < 0) {
+ if (currentValueLength < 0) {
throw new IOException("Rec# " + recNo + ": Negative value-length: " +
- valueLength);
+ currentValueLength);
}
-
- final int recordLength = keyLength + valueLength;
-
- // Check if we have the raw key/value in the buffer
- if ((dataIn.getLength()-pos) < recordLength) {
- readNextBlock(recordLength);
-
- // Sanity check
- if ((dataIn.getLength() - dataIn.getPosition()) < recordLength) {
- throw new EOFException("Rec# " + recNo + ": Could read the next " +
- " record");
- }
+
+ return true;
+ }
+
+ public boolean nextRawKey(DataInputBuffer key) throws IOException {
+ if (!positionToNextRecord(dataIn)) {
+ return false;
}
-
- // Setup the key and value
- pos = dataIn.getPosition();
- byte[] data = dataIn.getData();
- key.reset(data, pos, keyLength);
- value.reset(data, (pos + keyLength), valueLength);
-
- // Position for the next record
- long skipped = dataIn.skip(recordLength);
- if (skipped != recordLength) {
- throw new IOException("Rec# " + recNo + ": Failed to skip past record
" +
- "of length: " + recordLength);
+ if (keyBytes.length < currentKeyLength) {
+ keyBytes = new byte[currentKeyLength << 1];
}
+ int i = readData(keyBytes, 0, currentKeyLength);
+ if (i != currentKeyLength) {
+ throw new IOException ("Asked for " + currentKeyLength + " Got: " + i);
+ }
+ key.reset(keyBytes, currentKeyLength);
+ bytesRead += currentKeyLength;
+ return true;
+ }
+
+ public void nextRawValue(DataInputBuffer value) throws IOException {
+ final byte[] valBytes = (value.getData().length < currentValueLength)
+ ? new byte[currentValueLength << 1]
+ : value.getData();
+ int i = readData(valBytes, 0, currentValueLength);
+ if (i != currentValueLength) {
+ throw new IOException ("Asked for " + currentValueLength + " Got: " +
i);
+ }
+ value.reset(valBytes, currentValueLength);
// Record the bytes read
- bytesRead += recordLength;
+ bytesRead += currentValueLength;
++recNo;
++numRecordsRead;
-
- return true;
}
-
+
public void close() throws IOException {
// Return the decompressor
if (decompressor != null) {
@@ -458,7 +431,7 @@
public static class InMemoryReader<K, V> extends Reader<K, V> {
RamManager ramManager;
TaskAttemptID taskAttemptId;
-
+ DataInputBuffer memDataIn = new DataInputBuffer();
public InMemoryReader(RamManager ramManager, TaskAttemptID taskAttemptId,
byte[] data, int start, int length)
throws IOException {
@@ -468,7 +441,7 @@
buffer = data;
bufferSize = (int)fileLength;
- dataIn.reset(buffer, start, length);
+ memDataIn.reset(buffer, start, length);
}
@Override
@@ -497,58 +470,49 @@
}
}
- public boolean next(DataInputBuffer key, DataInputBuffer value)
- throws IOException {
+ public boolean nextRawKey(DataInputBuffer key) throws IOException {
try {
- // Sanity check
- if (eof) {
- throw new EOFException("Completed reading " + bytesRead);
- }
-
- // Read key and value lengths
- int oldPos = dataIn.getPosition();
- int keyLength = WritableUtils.readVInt(dataIn);
- int valueLength = WritableUtils.readVInt(dataIn);
- int pos = dataIn.getPosition();
- bytesRead += pos - oldPos;
-
- // Check for EOF
- if (keyLength == EOF_MARKER && valueLength == EOF_MARKER) {
- eof = true;
- return false;
- }
-
- // Sanity check
- if (keyLength < 0) {
- throw new IOException("Rec# " + recNo + ": Negative key-length: " +
- keyLength);
- }
- if (valueLength < 0) {
- throw new IOException("Rec# " + recNo + ": Negative value-length: " +
- valueLength);
- }
+ if (!positionToNextRecord(memDataIn)) {
+ return false;
+ }
+ // Setup the key
+ int pos = memDataIn.getPosition();
+ byte[] data = memDataIn.getData();
+ key.reset(data, pos, currentKeyLength);
+ // Position for the next value
+ long skipped = memDataIn.skip(currentKeyLength);
+ if (skipped != currentKeyLength) {
+ throw new IOException("Rec# " + recNo +
+ ": Failed to skip past key of length: " +
+ currentKeyLength);
+ }
- final int recordLength = keyLength + valueLength;
-
- // Setup the key and value
- pos = dataIn.getPosition();
- byte[] data = dataIn.getData();
- key.reset(data, pos, keyLength);
- value.reset(data, (pos + keyLength), valueLength);
-
- // Position for the next record
- long skipped = dataIn.skip(recordLength);
- if (skipped != recordLength) {
- throw new IOException("Rec# " + recNo + ": Failed to skip past record
of length: " +
- recordLength);
+ // Record the byte
+ bytesRead += currentKeyLength;
+ return true;
+ } catch (IOException ioe) {
+ dumpOnError();
+ throw ioe;
}
-
- // Record the byte
- bytesRead += recordLength;
+ }
+
+ public void nextRawValue(DataInputBuffer value) throws IOException {
+ try {
+ int pos = memDataIn.getPosition();
+ byte[] data = memDataIn.getData();
+ value.reset(data, pos, currentValueLength);
+
+ // Position for the next record
+ long skipped = memDataIn.skip(currentValueLength);
+ if (skipped != currentValueLength) {
+ throw new IOException("Rec# " + recNo +
+ ": Failed to skip past value of length: " +
+ currentValueLength);
+ }
+ // Record the byte
+ bytesRead += currentValueLength;
- ++recNo;
-
- return true;
+ ++recNo;
} catch (IOException ioe) {
dumpOnError();
throw ioe;
@@ -557,7 +521,7 @@
public void close() {
// Release
- dataIn = null;
+ memDataIn = null;
buffer = null;
// Inform the RamManager
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java?rev=765062&r1=765061&r2=765062&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Merger.java Wed Apr
15 06:13:59 2009
@@ -128,8 +128,7 @@
public static class Segment<K extends Object, V extends Object> {
Reader<K, V> reader = null;
- DataInputBuffer key = new DataInputBuffer();
- DataInputBuffer value = new DataInputBuffer();
+ final DataInputBuffer key = new DataInputBuffer();
Configuration conf = null;
FileSystem fs = null;
@@ -172,18 +171,30 @@
}
}
+ boolean inMemory() {
+ return fs == null;
+ }
+
DataInputBuffer getKey() { return key; }
- DataInputBuffer getValue() { return value; }
+
+ DataInputBuffer getValue(DataInputBuffer value) throws IOException {
+ nextRawValue(value);
+ return value;
+ }
long getLength() {
return (reader == null) ?
segmentLength : reader.getLength();
}
- boolean next() throws IOException {
- return reader.next(key, value);
+ boolean nextRawKey() throws IOException {
+ return reader.nextRawKey(key);
}
-
+
+ void nextRawValue(DataInputBuffer value) throws IOException {
+ reader.nextRawValue(value);
+ }
+
void close() throws IOException {
reader.close();
@@ -214,7 +225,8 @@
Progressable reporter;
DataInputBuffer key;
- DataInputBuffer value;
+ final DataInputBuffer value = new DataInputBuffer();
+ final DataInputBuffer diskIFileValue = new DataInputBuffer();
Segment<K, V> minSegment;
Comparator<Segment<K, V>> segmentComparator =
@@ -284,7 +296,7 @@
private void adjustPriorityQueue(Segment<K, V> reader) throws IOException{
long startPos = reader.getPosition();
- boolean hasNext = reader.next();
+ boolean hasNext = reader.nextRawKey();
long endPos = reader.getPosition();
totalBytesProcessed += endPos - startPos;
mergeProgress.set(totalBytesProcessed * progPerByte);
@@ -311,10 +323,24 @@
}
}
minSegment = top();
-
+ if (!minSegment.inMemory()) {
+ //When we load the value from an inmemory segment, we reset
+ //the "value" DIB in this class to the inmem segment's byte[].
+ //When we load the value bytes from disk, we shouldn't use
+ //the same byte[] since it would corrupt the data in the inmem
+ //segment. So we maintain an explicit DIB for value bytes
+ //obtained from disk, and if the current segment is a disk
+ //segment, we reset the "value" DIB to the byte[] in that (so
+ //we reuse the disk segment DIB whenever we consider
+ //a disk segment).
+ value.reset(diskIFileValue.getData(), diskIFileValue.getLength());
+ }
+ long startPos = minSegment.getPosition();
key = minSegment.getKey();
- value = minSegment.getValue();
-
+ minSegment.getValue(value);
+ long endPos = minSegment.getPosition();
+ totalBytesProcessed += endPos - startPos;
+ mergeProgress.set(totalBytesProcessed * progPerByte);
return true;
}
@@ -374,7 +400,7 @@
// this helps in ensuring we don't use buffers until we need them
segment.init(readsCounter);
long startPos = segment.getPosition();
- boolean hasNext = segment.next();
+ boolean hasNext = segment.nextRawKey();
long endPos = segment.getPosition();
startBytes += endPos - startPos;
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=765062&r1=765061&r2=765062&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Wed
Apr 15 06:13:59 2009
@@ -2334,24 +2334,24 @@
super(null, null, size, null, spilledRecordsCounter);
this.kvIter = kvIter;
}
-
- public boolean next(DataInputBuffer key, DataInputBuffer value)
- throws IOException {
+ public boolean nextRawKey(DataInputBuffer key) throws IOException {
if (kvIter.next()) {
final DataInputBuffer kb = kvIter.getKey();
- final DataInputBuffer vb = kvIter.getValue();
final int kp = kb.getPosition();
final int klen = kb.getLength() - kp;
key.reset(kb.getData(), kp, klen);
- final int vp = vb.getPosition();
- final int vlen = vb.getLength() - vp;
- value.reset(vb.getData(), vp, vlen);
- bytesRead += klen + vlen;
+ bytesRead += klen;
return true;
}
return false;
}
-
+ public void nextRawValue(DataInputBuffer value) throws IOException {
+ final DataInputBuffer vb = kvIter.getValue();
+ final int vp = vb.getPosition();
+ final int vlen = vb.getLength() - vp;
+ value.reset(vb.getData(), vp, vlen);
+ bytesRead += vlen;
+ }
public long getPosition() throws IOException {
return bytesRead;
}