Author: ddas
Date: Fri Jul 25 05:05:51 2008
New Revision: 679769
URL: http://svn.apache.org/viewvc?rev=679769&view=rev
Log:
HADOOP-3327. Treats connection and read timeouts differently in the shuffle and
the backoff logic is dependent on the type of timeout. Contributed by Jothi
Padmanabhan.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=679769&r1=679768&r2=679769&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Jul 25 05:05:51 2008
@@ -103,6 +103,10 @@
HADOOP-3756. Minor. Remove unused dfs.client.buffer.dir from
hadoop-default.xml. (rangadi)
+ HADOOP-3327. Treats connection and read timeouts differently in the
+ shuffle and the backoff logic is dependent on the type of timeout.
+ (Jothi Padmanabhan via ddas)
+
OPTIMIZATIONS
HADOOP-3556. Removed lock contention in MD5Hash by changing the
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=679769&r1=679768&r2=679769&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Fri
Jul 25 05:05:51 2008
@@ -71,6 +71,8 @@
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.net.ConnTimeoutException;
+import org.apache.hadoop.net.ReadTimeoutException;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
@@ -91,6 +93,12 @@
private int numMaps;
private ReduceCopier reduceCopier;
+ private static enum CopyOutputErrorType {
+ NO_ERROR,
+ CONNECTION_ERROR,
+ READ_ERROR
+ }
+
private CompressionCodec codec;
@@ -99,6 +107,7 @@
setPhase(TaskStatus.Phase.SHUFFLE); // phase to start with
}
+
private Progress copyPhase = getProgress().addPhase("copy");
private Progress sortPhase = getProgress().addPhase("sort");
private Progress reducePhase = getProgress().addPhase("reduce");
@@ -519,9 +528,15 @@
Set<TaskID> fetchFailedMaps = new TreeSet<TaskID>();
/**
- * A map of taskId -> no. of failed fetches
+ * A map of taskId -> no. of failed fetches in connect
+ */
+ Map<TaskAttemptID, Integer> mapTaskToConnectFailedFetchesMap =
+ new HashMap<TaskAttemptID, Integer>();
+
+ /**
+ * A map of taskId -> no. of failed fetches in read
*/
- Map<TaskAttemptID, Integer> mapTaskToFailedFetchesMap =
+ Map<TaskAttemptID, Integer> mapTaskToReadFailedFetchesMap =
new HashMap<TaskAttemptID, Integer>();
/**
@@ -541,6 +556,7 @@
Collections.synchronizedList(new LinkedList<MapOutput>());
+
/**
* This class contains the methods that should be used for
metrics-reporting
* the specific metrics for shuffle. This class actually reports the
@@ -603,7 +619,8 @@
/** Represents the result of an attempt to copy a map output */
private class CopyResult {
- // the map output location against which a copy attempt was made
+
+ // the map output location against which a copy attempt was made
private final MapOutputLocation loc;
// the size of the file copied, -1 if the transfer failed
@@ -611,10 +628,14 @@
//a flag signifying whether a copy result is obsolete
private static final int OBSOLETE = -2;
-
- CopyResult(MapOutputLocation loc, long size) {
+
+ CopyOutputErrorType errorType;
+
+ CopyResult(MapOutputLocation loc, long size,
+ CopyOutputErrorType errorType) {
this.loc = loc;
this.size = size;
+ this.errorType = errorType;
}
public boolean getSuccess() { return size >= 0; }
@@ -623,6 +644,9 @@
}
public long getSize() { return size; }
public String getHost() { return loc.getHost(); }
+ public CopyOutputErrorType getErrorType() {
+ return ((size < 0) ? errorType: CopyOutputErrorType.NO_ERROR);
+ }
public MapOutputLocation getLocation() { return loc; }
}
@@ -875,19 +899,6 @@
}
/**
- * Fail the current file that we are fetching
- * @return were we currently fetching?
- */
- public synchronized boolean fail() {
- if (currentLocation != null) {
- finish(-1);
- return true;
- } else {
- return false;
- }
- }
-
- /**
* Get the current map output location.
*/
public synchronized MapOutputLocation getLocation() {
@@ -898,11 +909,12 @@
currentLocation = loc;
}
- private synchronized void finish(long size) {
+ private synchronized void finish(long size,
+ CopyOutputErrorType errorType) {
if (currentLocation != null) {
LOG.debug(getName() + " finishing " + currentLocation + " =" + size);
synchronized (copyResults) {
- copyResults.add(new CopyResult(currentLocation, size));
+ copyResults.add(new CopyResult(currentLocation, size, errorType));
copyResults.notify();
}
currentLocation = null;
@@ -918,6 +930,7 @@
try {
MapOutputLocation loc = null;
long size = -1;
+ CopyOutputErrorType errorType = CopyOutputErrorType.NO_ERROR;
synchronized (scheduledCopies) {
while (scheduledCopies.isEmpty()) {
@@ -939,9 +952,18 @@
// Reset
size = -1;
+
+ // Identify the error type
+ if (e.getClass() == ConnTimeoutException.class) {
+ errorType = CopyOutputErrorType.CONNECTION_ERROR;
+ }
+ else if (e.getClass() == ReadTimeoutException.class) {
+ errorType = CopyOutputErrorType.READ_ERROR;
+ }
+
} finally {
shuffleClientMetrics.threadFree();
- finish(size);
+ finish(size, errorType);
}
} catch (InterruptedException e) {
return; // ALL DONE
@@ -1124,26 +1146,17 @@
connection.setReadTimeout(readTimeout);
// set the connect timeout to the unit-connect-timeout
connection.setConnectTimeout(unit);
- while (true) {
- try {
- return connection.getInputStream();
- } catch (IOException ioe) {
- // update the total remaining connect-timeout
- connectionTimeout -= unit;
- // throw an exception if we have waited for timeout amount of time
- // note that the updated value if timeout is used here
- if (connectionTimeout == 0) {
- throw ioe;
- }
+ try {
+ connection.connect();
+ } catch (IOException ioe) {
+ throw new ConnTimeoutException("Connection Timed out");
+ }
- // reset the connect timeout for the last try
- if (connectionTimeout < unit) {
- unit = connectionTimeout;
- // reset the connect time out for the final connect
- connection.setConnectTimeout(unit);
- }
- }
+ try {
+ return connection.getInputStream();
+ } catch (IOException ioe) {
+ throw new ReadTimeoutException("Read Timed out");
}
}
@@ -1699,33 +1712,61 @@
cr.getHost());
} else {
retryFetches.add(cr.getLocation());
+
+ CopyOutputErrorType errorType = cr.getErrorType();
// note the failed-fetch
TaskAttemptID mapTaskId = cr.getLocation().getTaskAttemptId();
TaskID mapId = cr.getLocation().getTaskId();
totalFailures++;
- Integer noFailedFetches =
- mapTaskToFailedFetchesMap.get(mapTaskId);
- noFailedFetches =
- (noFailedFetches == null) ? 1 : (noFailedFetches + 1);
- mapTaskToFailedFetchesMap.put(mapTaskId, noFailedFetches);
- LOG.info("Task " + getTaskID() + ": Failed fetch #" +
- noFailedFetches + " from " + mapTaskId);
-
- // did the fetch fail too many times?
- // using a hybrid technique for notifying the jobtracker.
- // a. the first notification is sent after max-retries
- // b. subsequent notifications are sent after 2 retries.
- if ((noFailedFetches >= maxFetchRetriesPerMap)
- && ((noFailedFetches - maxFetchRetriesPerMap) % 2) == 0) {
+
+ Integer noFailedFetches = 0;
+
+ Integer noReadFailedFetches =
+ mapTaskToReadFailedFetchesMap.get(mapTaskId);
+
+ if (noReadFailedFetches == null) noReadFailedFetches = 0;
+
+ Integer noConnectFailedFetches =
+ mapTaskToConnectFailedFetchesMap.get(mapTaskId);
+
+ if (noConnectFailedFetches == null) noConnectFailedFetches = 0;
+
+ if (errorType == CopyOutputErrorType.READ_ERROR) {
+ noReadFailedFetches ++;
+ mapTaskToReadFailedFetchesMap.put (mapTaskId,
+ noReadFailedFetches);
synchronized (ReduceTask.this) {
taskStatus.addFetchFailedMap(mapTaskId);
LOG.info("Failed to fetch map-output from " + mapTaskId +
- " even after MAX_FETCH_RETRIES_PER_MAP retries... "
- + " reporting to the JobTracker");
+ " Got a Read Time out," +
+ " reporting to the JobTracker");
+ }
+ } else if (errorType == CopyOutputErrorType.CONNECTION_ERROR) {
+ noConnectFailedFetches ++;
+ mapTaskToConnectFailedFetchesMap.put (
+ mapTaskId, noConnectFailedFetches);
+
+ LOG.info("Task " + getTaskID() + ": Failed fetch #"
+ + noConnectFailedFetches + " from " + mapTaskId);
+
+ if ((noConnectFailedFetches >= maxFetchRetriesPerMap) &&
+ ((noConnectFailedFetches - maxFetchRetriesPerMap) % 2)
+ == 0) {
+ synchronized (ReduceTask.this) {
+ taskStatus.addFetchFailedMap(mapTaskId);
+ LOG.info("Failed to fetch map-output from " + mapTaskId
+ + " even after MAX_FETCH_RETRIES_PER_MAP"
+ + " (connect) retries... "
+ + " reporting to the JobTracker");
+ }
}
}
+
+ noFailedFetches = noConnectFailedFetches +
+ noReadFailedFetches;
+
// note unique failed-fetch maps
if (noFailedFetches == maxFetchRetriesPerMap) {
fetchFailedMaps.add(mapId);
@@ -1774,22 +1815,32 @@
}
}
- // back off exponentially until num_retries <= max_retries
- // back off by max_backoff/2 on subsequent failed attempts
- currentTime = System.currentTimeMillis();
- int currentBackOff = noFailedFetches <= maxFetchRetriesPerMap
+ if (errorType == CopyOutputErrorType.CONNECTION_ERROR) {
+ // back off exponentially until num_retries <= max_retries
+ // back off by max_backoff/2 on subsequent failed attempts
+ currentTime = System.currentTimeMillis();
+ int currentBackOff = noFailedFetches <= maxFetchRetriesPerMap
? BACKOFF_INIT
* (1 << (noFailedFetches - 1))
: (this.maxBackoff * 1000 / 2);
- penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
- LOG.warn(reduceTask.getTaskID() + " adding host " +
+ penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
+ LOG.warn(reduceTask.getTaskID() + " adding host " +
cr.getHost() + " to penalty box, next contact in " +
(currentBackOff/1000) + " seconds");
- }
+ } else if (errorType == CopyOutputErrorType.READ_ERROR) {
+ int backOff = Math.max(maxMapRuntime/2,
+ (this.maxBackoff * 1000));
+ penaltyBox.put(cr.getHost(), currentTime + backOff);
+ LOG.warn(reduceTask.getTaskID() + " adding host " +
+ cr.getHost() + " to penalty box, next contact in " +
+ (backOff/1000) + " seconds");
+ }
+
+ } // Fetch Failure
uniqueHosts.remove(cr.getHost());
numInFlight--;
- }
- }
+ } // while (numInFlight > 0)
+ } // while (copiedMaps < numMaps)
// all done, inform the copiers to exit
synchronized (copiers) {