Author: acmurthy
Date: Wed Jul 16 16:55:47 2008
New Revision: 677470
URL: http://svn.apache.org/viewvc?rev=677470&view=rev
Log:
HADOOP-3617. Removed redundant checks of accounting space in MapTask and makes
the spill thread persistent so as to avoid creating a new one for each spill.
Contributed by Chris Douglas.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=677470&r1=677469&r2=677470&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Jul 16 16:55:47 2008
@@ -90,6 +90,10 @@
randomization is by the hosts (and not the map outputs themselves).
(Jothi Padmanabhan via ddas)
+ HADOOP-3617. Removed redundant checks of accounting space in MapTask and
+ makes the spill thread persistent so as to avoid creating a new one for
+ each spill. (Chris Douglas via acmurthy)
+
OPTIMIZATIONS
HADOOP-3556. Removed lock contention in MD5Hash by changing the
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=677470&r1=677469&r2=677470&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Wed Jul
16 16:55:47 2008
@@ -32,6 +32,8 @@
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -326,8 +328,12 @@
private final int softBufferLimit;
private final int minSpillsForCombine;
private final IndexedSorter sorter;
- private final Object spillLock = new Object();
+ private final ReentrantLock spillLock = new ReentrantLock();
+ private final Condition spillDone = spillLock.newCondition();
+ private final Condition spillReady = spillLock.newCondition();
private final BlockingBuffer bb = new BlockingBuffer();
+ private volatile boolean spillThreadRunning = false;
+ private final SpillThread spillThread = new SpillThread();
private final FileSystem localFs;
@@ -403,6 +409,24 @@
? new CombineOutputCollector(combineOutputCounter)
: null;
minSpillsForCombine = job.getInt("min.num.spills.for.combine", 3);
+ spillThread.setDaemon(true);
+ spillThread.setName("SpillThread");
+ spillLock.lock();
+ try {
+ spillThread.start();
+ while (!spillThreadRunning) {
+ spillDone.await();
+ }
+ } catch (InterruptedException e) {
+ throw (IOException)new IOException("Spill thread failed to initialize"
+ ).initCause(sortSpillException);
+ } finally {
+ spillLock.unlock();
+ }
+ if (sortSpillException != null) {
+ throw (IOException)new IOException("Spill thread failed to initialize"
+ ).initCause(sortSpillException);
+ }
}
@SuppressWarnings("unchecked")
@@ -419,12 +443,43 @@
+ valClass.getName() + ", recieved "
+ value.getClass().getName());
}
- if (sortSpillException != null) {
- throw (IOException)new IOException("Spill failed"
- ).initCause(sortSpillException);
+ final int kvnext = (kvindex + 1) % kvoffsets.length;
+ spillLock.lock();
+ try {
+ boolean kvfull;
+ do {
+ if (sortSpillException != null) {
+ throw (IOException)new IOException("Spill failed"
+ ).initCause(sortSpillException);
+ }
+ // sufficient acct space
+ kvfull = kvnext == kvstart;
+ final boolean kvsoftlimit = ((kvnext > kvend)
+ ? kvnext - kvend > softRecordLimit
+ : kvend - kvnext <= kvoffsets.length - softRecordLimit);
+ if (kvstart == kvend && kvsoftlimit) {
+ LOG.info("Spilling map output: record full = " + kvsoftlimit);
+ startSpill();
+ }
+ if (kvfull) {
+ try {
+ while (kvstart != kvend) {
+ reporter.progress();
+ spillDone.await();
+ }
+ } catch (InterruptedException e) {
+ throw (IOException)new IOException(
+ "Collector interrupted while waiting for the writer"
+ ).initCause(e);
+ }
+ }
+ } while (kvfull);
+ } finally {
+ spillLock.unlock();
}
+
try {
- // serialize key bytes into buffer
+ // serialize key bytes into buffer
int keystart = bufindex;
keySerializer.serialize(key);
if (bufindex < keystart) {
@@ -433,25 +488,20 @@
keystart = 0;
}
// serialize value bytes into buffer
- int valstart = bufindex;
+ final int valstart = bufindex;
valSerializer.serialize(value);
int valend = bb.markRecord();
- mapOutputByteCounter.increment(valend >= keystart
- ? valend - keystart
- : (bufvoid - keystart) + valend);
- if (keystart == bufindex) {
- // if emitted records make no writes, it's possible to wrap
- // accounting space without notice
- bb.write(new byte[0], 0, 0);
- }
-
- int partition = partitioner.getPartition(key, value, partitions);
+ final int partition = partitioner.getPartition(key, value, partitions);
if (partition < 0 || partition >= partitions) {
throw new IOException("Illegal partition for " + key + " (" +
partition + ")");
}
+
mapOutputRecordCounter.increment(1);
+ mapOutputByteCounter.increment(valend >= keystart
+ ? valend - keystart
+ : (bufvoid - keystart) + valend);
// update accounting info
int ind = kvindex * ACCTSIZE;
@@ -459,7 +509,7 @@
kvindices[ind + PARTITION] = partition;
kvindices[ind + KEYSTART] = keystart;
kvindices[ind + VALSTART] = valstart;
- kvindex = (kvindex + 1) % kvoffsets.length;
+ kvindex = kvnext;
} catch (MapBufferTooSmallException e) {
LOG.info("Record too large for in-memory buffer: " + e.getMessage());
spillSingleRecord(key, value);
@@ -578,19 +628,16 @@
@Override
public synchronized void write(byte b[], int off, int len)
throws IOException {
- boolean kvfull = false;
boolean buffull = false;
boolean wrap = false;
- synchronized(spillLock) {
+ spillLock.lock();
+ try {
do {
if (sortSpillException != null) {
throw (IOException)new IOException("Spill failed"
).initCause(sortSpillException);
}
- // sufficient accounting space?
- final int kvnext = (kvindex + 1) % kvoffsets.length;
- kvfull = kvnext == kvstart;
// sufficient buffer space?
if (bufstart <= bufend && bufend <= bufindex) {
buffull = bufindex + len > bufvoid;
@@ -606,26 +653,12 @@
// spill thread not running
if (kvend != kvindex) {
// we have records we can spill
- final boolean kvsoftlimit = (kvnext > kvend)
- ? kvnext - kvend > softRecordLimit
- : kvend - kvnext <= kvoffsets.length - softRecordLimit;
final boolean bufsoftlimit = (bufindex > bufend)
? bufindex - bufend > softBufferLimit
: bufend - bufindex < bufvoid - softBufferLimit;
- if (kvsoftlimit || bufsoftlimit || (buffull && !wrap)) {
- LOG.info("Spilling map output: buffer full = " +
bufsoftlimit+
- " and record full = " + kvsoftlimit);
- LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
- "; bufvoid = " + bufvoid);
- LOG.info("kvstart = " + kvstart + "; kvend = " + kvindex +
- "; length = " + kvoffsets.length);
- kvend = kvindex;
- bufend = bufmark;
- // TODO No need to recreate this thread every time
- SpillThread t = new SpillThread();
- t.setDaemon(true);
- t.setName("SpillThread");
- t.start();
+ if (bufsoftlimit || (buffull && !wrap)) {
+ LOG.info("Spilling map output: buffer full= " +
bufsoftlimit);
+ startSpill();
}
} else if (buffull && !wrap) {
// We have no buffered records, and this record is too large
@@ -641,19 +674,21 @@
}
}
- if (kvfull || (buffull && !wrap)) {
- while (kvstart != kvend) {
- reporter.progress();
- try {
- spillLock.wait();
- } catch (InterruptedException e) {
+ if (buffull && !wrap) {
+ try {
+ while (kvstart != kvend) {
+ reporter.progress();
+ spillDone.await();
+ }
+ } catch (InterruptedException e) {
throw (IOException)new IOException(
"Buffer interrupted while waiting for the writer"
).initCause(e);
- }
}
}
- } while (kvfull || (buffull && !wrap));
+ } while (buffull && !wrap);
+ } finally {
+ spillLock.unlock();
}
// here, we know that we have sufficient space to write
if (buffull) {
@@ -670,30 +705,41 @@
public synchronized void flush() throws IOException {
LOG.info("Starting flush of map output");
- synchronized (spillLock) {
+ spillLock.lock();
+ try {
while (kvstart != kvend) {
- try {
- reporter.progress();
- spillLock.wait();
- } catch (InterruptedException e) {
- throw (IOException)new IOException(
- "Buffer interrupted while waiting for the writer"
- ).initCause(e);
- }
+ reporter.progress();
+ spillDone.await();
+ }
+ if (sortSpillException != null) {
+ throw (IOException)new IOException("Spill failed"
+ ).initCause(sortSpillException);
+ }
+ if (kvend != kvindex) {
+ kvend = kvindex;
+ bufend = bufmark;
+ sortAndSpill();
}
+ } catch (InterruptedException e) {
+ throw (IOException)new IOException(
+ "Buffer interrupted while waiting for the writer"
+ ).initCause(e);
+ } finally {
+ spillLock.unlock();
}
- if (sortSpillException != null) {
+ assert !spillLock.isHeldByCurrentThread();
+ // shut down spill thread and wait for it to exit. Since the preceding
+ // ensures that it is finished with its work (and sortAndSpill did not
+ // throw), we elect to use an interrupt instead of setting a flag.
+ // Spilling simultaneously from this thread while the spill thread
+ // finishes its work might be both a useful way to extend this and also
+ // sufficient motivation for the latter approach.
+ try {
+ spillThread.interrupt();
+ spillThread.join();
+ } catch (InterruptedException e) {
throw (IOException)new IOException("Spill failed"
- ).initCause(sortSpillException);
- }
- if (kvend != kvindex) {
- LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
- "; bufvoid = " + bufvoid);
- LOG.info("kvstart = " + kvstart + "; kvend = " + kvindex +
- "; length = " + kvoffsets.length);
- kvend = kvindex;
- bufend = bufmark;
- sortAndSpill();
+ ).initCause(e);
}
// release sort buffer before the merge
kvbuffer = null;
@@ -706,23 +752,47 @@
@Override
public void run() {
+ spillLock.lock();
+ spillThreadRunning = true;
try {
- sortAndSpill();
- } catch (Throwable e) {
- sortSpillException = e;
- } finally {
- synchronized(spillLock) {
- if (bufend < bufindex && bufindex < bufstart) {
- bufvoid = kvbuffer.length;
+ while (true) {
+ spillDone.signal();
+ while (kvstart == kvend) {
+ spillReady.await();
+ }
+ try {
+ spillLock.unlock();
+ sortAndSpill();
+ } catch (Throwable e) {
+ sortSpillException = e;
+ } finally {
+ spillLock.lock();
+ if (bufend < bufindex && bufindex < bufstart) {
+ bufvoid = kvbuffer.length;
+ }
+ kvstart = kvend;
+ bufstart = bufend;
}
- kvstart = kvend;
- bufstart = bufend;
- spillLock.notify();
}
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ spillLock.unlock();
+ spillThreadRunning = false;
}
}
}
+ private synchronized void startSpill() {
+ LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
+ "; bufvoid = " + bufvoid);
+ LOG.info("kvstart = " + kvstart + "; kvend = " + kvindex +
+ "; length = " + kvoffsets.length);
+ kvend = kvindex;
+ bufend = bufmark;
+ spillReady.signal();
+ }
+
private void sortAndSpill() throws IOException {
//approximate the length of the output file to be the length of the
//buffer + header lengths for the partitions