Author: cdouglas
Date: Mon Jun 2 21:19:25 2008
New Revision: 662634
URL: http://svn.apache.org/viewvc?rev=662634&view=rev
Log:
HADOOP-3443. Avoid copying map output across partitions when renaming a
single spill. Contributed by Owen O'Malley.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=662634&r1=662633&r2=662634&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Jun 2 21:19:25 2008
@@ -391,6 +391,9 @@
HADOOP-3475. Fix MapTask to correctly size the accounting allocation of
io.sort.mb. (cdouglas)
+ HADOOP-3443. Avoid copying map output across partitions when renaming a
+ single spill. (omalley via cdouglas)
+
Release 0.17.0 - 2008-05-18
INCOMPATIBLE CHANGES
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=662634&r1=662633&r2=662634&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Mon Jun 2
21:19:25 2008
@@ -35,7 +35,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -58,7 +57,6 @@
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.mapred.ReduceTask.ValuesIterator;
import org.apache.hadoop.util.IndexedSortable;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.QuickSort;
@@ -174,7 +172,7 @@
public float getProgress() throws IOException {
return rawIn.getProgress();
}
- };
+ }
@Override
@SuppressWarnings("unchecked")
@@ -366,6 +364,7 @@
if ((sortmb & 0x7FF) != sortmb) {
throw new IOException("Invalid \"io.sort.mb\": " + sortmb);
}
+ LOG.info("io.sort.mb = " + sortmb);
// buffers and accounting
int maxMemUsage = sortmb << 20;
int recordCapacity = (int)(maxMemUsage * recper);
@@ -377,6 +376,8 @@
kvindices = new int[recordCapacity * ACCTSIZE];
softBufferLimit = (int)(kvbuffer.length * spillper);
softRecordLimit = (int)(kvoffsets.length * spillper);
+ LOG.info("data buffer = " + softBufferLimit + "/" + kvbuffer.length);
+ LOG.info("record buffer = " + softRecordLimit + "/" + kvoffsets.length);
// k/v serialization
comparator = job.getOutputKeyComparator();
keyClass = job.getMapOutputKeyClass();
@@ -643,6 +644,12 @@
? bufindex - bufend > softBufferLimit
: bufend - bufindex < bufvoid - softBufferLimit;
if (kvsoftlimit || bufsoftlimit || (buffull && !wrap)) {
+ LOG.info("Spilling map output: buffer full = " +
bufsoftlimit+
+ " and record full = " + kvsoftlimit);
+ LOG.info("bufindex = " + bufindex + "; bufend = " + bufend +
+ "; bufvoid = " + bufvoid);
+ LOG.info("kvindex = " + kvindex + "; kvend = " + kvend +
+ "; length = " + kvoffsets.length);
kvend = kvindex;
bufend = bufmark;
// TODO No need to recreate this thread every time
@@ -693,6 +700,7 @@
}
public synchronized void flush() throws IOException {
+ LOG.info("Starting flush of map output");
synchronized (spillLock) {
while (kvstart != kvend) {
try {
@@ -815,6 +823,7 @@
}
}
++numSpills;
+ LOG.info("Finished spill " + numSpills);
} finally {
if (out != null) out.close();
if (indexOut != null) indexOut.close();
@@ -976,7 +985,15 @@
for(int i = 0; i < numSpills; i++) {
filename[i] = mapOutputFile.getSpillFile(getTaskID(), i);
indexFileName[i] = mapOutputFile.getSpillIndexFile(getTaskID(), i);
- finalOutFileSize += localFs.getLength(filename[i]);
+ finalOutFileSize += localFs.getFileStatus(filename[i]).getLen();
+ }
+
+ if (numSpills == 1) { //the spill is the final output
+ localFs.rename(filename[0],
+ new Path(filename[0].getParent(), "file.out"));
+ localFs.rename(indexFileName[0],
+ new
Path(indexFileName[0].getParent(),"file.out.index"));
+ return;
}
//make correction in the length to include the sequence file header
//lengths for each partition
@@ -989,12 +1006,6 @@
Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(
getTaskID(), finalIndexFileSize);
- if (numSpills == 1) { //the spill is the final output
- localFs.rename(filename[0], finalOutputFile);
- localFs.rename(indexFileName[0], finalIndexFile);
- return;
- }
-
//The output stream for the final single output file
FSDataOutputStream finalOut = localFs.create(finalOutputFile, true,
4096);