Author: ddas
Date: Thu Feb 5 05:35:09 2009
New Revision: 741009
URL: http://svn.apache.org/viewvc?rev=741009&view=rev
Log:
HADOOP-3327. Improves handling of READ_TIMEOUT during map output copying.
Contributed by Amareshwari Sriramadasu.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=741009&r1=741008&r2=741009&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Feb 5 05:35:09 2009
@@ -68,6 +68,9 @@
HADOOP-5097. Remove static variable JspHelper.fsn, a static reference to
a non-singleton FSNamesystem object. (szetszwo)
+ HADOOP-3327. Improves handling of READ_TIMEOUT during map output copying.
+ (Amareshwari Sriramadasu via ddas)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=741009&r1=741008&r2=741009&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Thu
Feb 5 05:35:09 2009
@@ -565,6 +565,12 @@
}
}
+ private static enum CopyOutputErrorType {
+ NO_ERROR,
+ READ_ERROR,
+ OTHER_ERROR
+ };
+
class ReduceCopier<K, V> implements MRConstants {
/** Reference to the umbilical object */
@@ -746,6 +752,11 @@
private static final int MIN_FETCH_RETRIES_PER_MAP = 2;
/**
+ * The minimum percentage of maps yet to be copied,
+ * which indicates end of shuffle
+ */
+ private static final float MIN_PENDING_MAPS_PERCENT = 0.25f;
+ /**
* Maximum no. of unique maps from which we failed to fetch map-outputs
* even after {...@link #maxFetchRetriesPerMap} retries; after this the
* reduce task is failed.
@@ -858,11 +869,18 @@
//a flag signifying whether a copy result is obsolete
private static final int OBSOLETE = -2;
+ private CopyOutputErrorType error = CopyOutputErrorType.NO_ERROR;
CopyResult(MapOutputLocation loc, long size) {
this.loc = loc;
this.size = size;
}
-
+
+ CopyResult(MapOutputLocation loc, long size, CopyOutputErrorType error) {
+ this.loc = loc;
+ this.size = size;
+ this.error = error;
+ }
+
public boolean getSuccess() { return size >= 0; }
public boolean isObsolete() {
return size == OBSOLETE;
@@ -870,6 +888,7 @@
public long getSize() { return size; }
public String getHost() { return loc.getHost(); }
public MapOutputLocation getLocation() { return loc; }
+ public CopyOutputErrorType getError() { return error; }
}
private int nextMapOutputCopierId = 0;
@@ -1120,6 +1139,7 @@
private MapOutputLocation currentLocation = null;
private int id = nextMapOutputCopierId++;
private Reporter reporter;
+ private boolean readError = false;
// Decompression of map-outputs
private CompressionCodec codec = null;
@@ -1144,7 +1164,7 @@
*/
public synchronized boolean fail() {
if (currentLocation != null) {
- finish(-1);
+ finish(-1, CopyOutputErrorType.OTHER_ERROR);
return true;
} else {
return false;
@@ -1162,11 +1182,11 @@
currentLocation = loc;
}
- private synchronized void finish(long size) {
+ private synchronized void finish(long size, CopyOutputErrorType error) {
if (currentLocation != null) {
LOG.debug(getName() + " finishing " + currentLocation + " =" + size);
synchronized (copyResults) {
- copyResults.add(new CopyResult(currentLocation, size));
+ copyResults.add(new CopyResult(currentLocation, size, error));
copyResults.notify();
}
currentLocation = null;
@@ -1189,23 +1209,27 @@
}
loc = scheduledCopies.remove(0);
}
-
+ CopyOutputErrorType error = CopyOutputErrorType.OTHER_ERROR;
+ readError = false;
try {
shuffleClientMetrics.threadBusy();
start(loc);
size = copyOutput(loc);
shuffleClientMetrics.successFetch();
+ error = CopyOutputErrorType.NO_ERROR;
} catch (IOException e) {
LOG.warn(reduceTask.getTaskID() + " copy failed: " +
loc.getTaskAttemptId() + " from " + loc.getHost());
LOG.warn(StringUtils.stringifyException(e));
shuffleClientMetrics.failedFetch();
-
+ if (readError) {
+ error = CopyOutputErrorType.READ_ERROR;
+ }
// Reset
size = -1;
} finally {
shuffleClientMetrics.threadFree();
- finish(size);
+ finish(size, error);
}
} catch (InterruptedException e) {
return; // ALL DONE
@@ -1402,7 +1426,8 @@
connection.setConnectTimeout(unit);
while (true) {
try {
- return connection.getInputStream();
+ connection.connect();
+ break;
} catch (IOException ioe) {
// update the total remaining connect-timeout
connectionTimeout -= unit;
@@ -1421,6 +1446,12 @@
}
}
}
+ try {
+ return connection.getInputStream();
+ } catch (IOException ioe) {
+ readError = true;
+ throw ioe;
+ }
}
private MapOutput shuffleInMemory(MapOutputLocation mapOutputLoc,
@@ -1506,6 +1537,7 @@
IOUtils.cleanup(LOG, input);
// Re-throw
+ readError = true;
throw ioe;
}
@@ -1571,7 +1603,13 @@
output = rfs.create(localFilename);
byte[] buf = new byte[64 * 1024];
- int n = input.read(buf, 0, buf.length);
+ int n = -1;
+ try {
+ n = input.read(buf, 0, buf.length);
+ } catch (IOException ioe) {
+ readError = true;
+ throw ioe;
+ }
while (n > 0) {
bytesRead += n;
shuffleClientMetrics.inputBytes(n);
@@ -1579,7 +1617,12 @@
// indicate we're making progress
reporter.progress();
- n = input.read(buf, 0, buf.length);
+ try {
+ n = input.read(buf, 0, buf.length);
+ } catch (IOException ioe) {
+ readError = true;
+ throw ioe;
+ }
}
LOG.info("Read " + bytesRead + " bytes from map-output for " +
@@ -1985,17 +2028,38 @@
mapTaskToFailedFetchesMap.put(mapTaskId, noFailedFetches);
LOG.info("Task " + getTaskID() + ": Failed fetch #" +
noFailedFetches + " from " + mapTaskId);
+
+ // half the number of max fetch retries per map during
+ // the end of shuffle
+ int fetchRetriesPerMap = maxFetchRetriesPerMap;
+ int pendingCopies = numMaps - numCopied;
+
+ // The check noFailedFetches != maxFetchRetriesPerMap is
+ // required to make sure of the notification in case of a
+ // corner case :
+ // when noFailedFetches reached maxFetchRetriesPerMap and
+ // reducer reached the end of shuffle, then we may miss sending
+ // a notification if the difference between
+ // noFailedFetches and fetchRetriesPerMap is not divisible by 2
+ if (pendingCopies <= numMaps * MIN_PENDING_MAPS_PERCENT &&
+ noFailedFetches != maxFetchRetriesPerMap) {
+ fetchRetriesPerMap = fetchRetriesPerMap >> 1;
+ }
// did the fetch fail too many times?
// using a hybrid technique for notifying the jobtracker.
// a. the first notification is sent after max-retries
// b. subsequent notifications are sent after 2 retries.
- if ((noFailedFetches >= maxFetchRetriesPerMap)
- && ((noFailedFetches - maxFetchRetriesPerMap) % 2) == 0) {
+ // c. send notification immediately if it is a read error.
+ if (cr.getError().equals(CopyOutputErrorType.READ_ERROR) ||
+ ((noFailedFetches >= fetchRetriesPerMap)
+ && ((noFailedFetches - fetchRetriesPerMap) % 2) == 0)) {
synchronized (ReduceTask.this) {
taskStatus.addFetchFailedMap(mapTaskId);
+ reporter.progress();
LOG.info("Failed to fetch map-output from " + mapTaskId +
" even after MAX_FETCH_RETRIES_PER_MAP retries... "
+ + " or it is a read error, "
+ " reporting to the JobTracker");
}
}
@@ -2051,10 +2115,22 @@
// back off exponentially until num_retries <= max_retries
// back off by max_backoff/2 on subsequent failed attempts
currentTime = System.currentTimeMillis();
- int currentBackOff = noFailedFetches <= maxFetchRetriesPerMap
+ int currentBackOff = noFailedFetches <= fetchRetriesPerMap
? BACKOFF_INIT
* (1 << (noFailedFetches - 1))
: (this.maxBackoff * 1000 / 2);
+ // If it is read error,
+ // back off for maxMapRuntime/2
+ // during end of shuffle,
+ // backoff for min(maxMapRuntime/2, currentBackOff)
+ if (cr.getError().equals(CopyOutputErrorType.READ_ERROR)) {
+ int backOff = maxMapRuntime >> 1;
+ if (pendingCopies <= numMaps * MIN_PENDING_MAPS_PERCENT) {
+ backOff = Math.min(backOff, currentBackOff);
+ }
+ currentBackOff = backOff;
+ }
+
penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
LOG.warn(reduceTask.getTaskID() + " adding host " +
cr.getHost() + " to penalty box, next contact in " +