Author: cdouglas
Date: Wed Aug 13 15:38:26 2008
New Revision: 685714
URL: http://svn.apache.org/viewvc?rev=685714&view=rev
Log:
HADOOP-3940. Fix in-memory merge condition to wait when there are no map
outputs or when the final map outputs are being fetched without contention.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=685714&r1=685713&r2=685714&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Aug 13 15:38:26 2008
@@ -298,6 +298,10 @@
HADOOP-3773. Change Pipes to set the default map output key and value
types correctly. (Koji Noguchi via omalley)
+ HADOOP-3940. Fix in-memory merge condition to wait when there are no map
+ outputs or when the final map outputs are being fetched without contention.
+ (cdouglas)
+
Release 0.18.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=685714&r1=685713&r2=685714&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Wed
Aug 13 15:38:26 2008
@@ -856,17 +856,31 @@
public boolean waitForDataToMerge() throws InterruptedException {
boolean done = false;
synchronized (dataAvailable) {
- while (!closed &&
+ // Start in-memory merge if manager has been closed or...
+ while (!closed
+ &&
+ // In-memory threshold exceeded and at least two segments
+ // have been fetched
(getPercentUsed() < MAX_INMEM_FILESYS_USE ||
numClosed <
(int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)
)
&&
+ // More than "mapred.inmem.merge.threshold" map outputs
+ // have been fetched into memory
(mergeThreshold <= 0 || numClosed < mergeThreshold)
&&
+ // More than MAX... threads are blocked on the RamManager
+ // or the blocked threads are the last map outputs to be
+ // fetched. If numRequiredMapOutputs is zero, either
+ // setNumCopiedMapOutputs has not been called (no map ouputs
+ // have been fetched, so there is nothing to merge) or the
+ // last map outputs being transferred without
+ // contention, so a merge would be premature.
(numPendingRequests <
numCopiers*MAX_STALLED_SHUFFLE_THREADS_FRACTION &&
- numPendingRequests < numRequiredMapOutputs)) {
+ (0 == numRequiredMapOutputs ||
+ numPendingRequests < numRequiredMapOutputs))) {
dataAvailable.wait();
}
done = closed;