Author: cutting Date: Thu Feb 22 11:23:35 2007 New Revision: 510630 URL: http://svn.apache.org/viewvc?view=rev&rev=510630 Log: HADOOP-654. Stop assigning tasks to a tasktracker if it has failed more than a specified number in the job. Contributed by Arun.
Added: lucene/hadoop/trunk/src/webapps/job/jobblacklistedtrackers.jsp Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/conf/hadoop-default.xml lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=510630&r1=510629&r2=510630 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Thu Feb 22 11:23:35 2007 @@ -109,6 +109,10 @@ Also increase the value used from one to two seconds, in hopes of making tests complete more reliably. (cutting) +33. HADOOP-654. Stop assigning tasks to a tasktracker if it has + failed more than a specified number in the job. + (Arun C Murthy via cutting) + Release 0.11.2 - 2007-02-16 Modified: lucene/hadoop/trunk/conf/hadoop-default.xml URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?view=diff&rev=510630&r1=510629&r2=510630 ============================================================================== --- lucene/hadoop/trunk/conf/hadoop-default.xml (original) +++ lucene/hadoop/trunk/conf/hadoop-default.xml Thu Feb 22 11:23:35 2007 @@ -748,6 +748,14 @@ </property> <property> + <name>mapred.max.tracker.failures</name> + <value>4</value> + <description>The number of task-failures on a tasktracker of a given job + after which new tasks of that job aren't assigned to it. + </description> +</property> + +<property> <name>jobclient.output.filter</name> <value>FAILED</value> <description>The filter for controlling the output of the task's userlogs sent Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?view=diff&rev=510630&r1=510629&r2=510630 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Thu Feb 22 11:23:35 2007 @@ -532,6 +532,24 @@ set("mapred.job.name", name); } + /** + * Set the maximum no. of failures of a given job per tasktracker. + * + * @param noFailures maximum no. of failures of a given job per tasktracker. + */ + public void setMaxTaskFailuresPerTracker(int noFailures) { + setInt("mapred.max.tracker.failures", noFailures); + } + + /** + * Get the maximum no. of failures of a given job per tasktracker. + * + * @return the maximum no. of failures of a given job per tasktracker. + */ + public int getMaxTaskFailuresPerTracker() { + return getInt("mapred.max.tracker.failures", 4); + } + /** Find a jar that contains a class of the same name, if any. * It will return a jar file, even if that is not the first thing * on the class path that has a class with the same name. Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=510630&r1=510629&r2=510630 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Thu Feb 22 11:23:35 2007 @@ -38,7 +38,7 @@ /////////////////////////////////////////////////////// class JobInProgress { private static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobInProgress"); - + JobProfile profile; JobStatus status; Path localJobFile = null; @@ -57,8 +57,15 @@ JobTracker jobtracker = null; Map<String,List<TaskInProgress>> hostToMaps = new HashMap(); private int taskCompletionEventTracker = 0 ; - List<TaskCompletionEvent> taskCompletionEvents ; - + List<TaskCompletionEvent> taskCompletionEvents ; + + // The no. of tasktrackers where >= conf.getMaxTaskFailuresPerTracker() + // tasks have failed + private volatile int flakyTaskTrackers = 0; + // Map of trackerHostName -> no. of task failures + private Map<String, Integer> trackerToFailuresMap = + new TreeMap<String, Integer>(); + long startTime; long finishTime; @@ -102,6 +109,7 @@ this.numReduceTasks = conf.getNumReduceTasks(); this.taskCompletionEvents = new ArrayList( numMapTasks + numReduceTasks + 10); + JobHistory.JobInfo.logSubmitted(jobid, conf.getJobName(), conf.getUser(), System.currentTimeMillis(), jobFile); @@ -373,6 +381,61 @@ return result; } + private String convertTrackerNameToHostName(String trackerName) { + // Ugly! + // Convert the trackerName to it's host name + int indexOfColon = trackerName.indexOf(":"); + String trackerHostName = (indexOfColon == -1) ? + trackerName : + trackerName.substring(0, indexOfColon); + return trackerHostName; + } + + private void addTrackerTaskFailure(String trackerName) { + String trackerHostName = convertTrackerNameToHostName(trackerName); + + Integer trackerFailures = trackerToFailuresMap.get(trackerHostName); + if (trackerFailures == null) { + trackerFailures = new Integer(0); + } + trackerToFailuresMap.put(trackerHostName, ++trackerFailures); + + // Check if this tasktracker has turned 'flaky' + if (trackerFailures.intValue() == conf.getMaxTaskFailuresPerTracker()) { + ++flakyTaskTrackers; + LOG.info("TaskTracker at '" + trackerHostName + "' turned 'flaky'"); + } + } + + private int getTrackerTaskFailures(String trackerName) { + String trackerHostName = convertTrackerNameToHostName(trackerName); + Integer failedTasks = trackerToFailuresMap.get(trackerHostName); + return (failedTasks != null) ? failedTasks.intValue() : 0; + } + + /** + * Get the no. of 'flaky' tasktrackers for a given job. + * + * @return the no. of 'flaky' tasktrackers for a given job. + */ + int getNoOfBlackListedTrackers() { + return flakyTaskTrackers; + } + + /** + * Get the information on tasktrackers and no. of errors which occurred + * on them for a given job. + * + * @return the map of tasktrackers and no. of errors which occurred + * on them for a given job. + */ + synchronized Map<String, Integer> getTaskTrackerErrors() { + // Clone the 'trackerToFailuresMap' and return the copy + Map<String, Integer> trackerErrors = + new TreeMap<String, Integer>(trackerToFailuresMap); + return trackerErrors; + } + /** * Find a new task to run. * @param tts The task tracker that is asking for a task @@ -389,6 +452,25 @@ TaskInProgress[] tasks, List cachedTasks) { String taskTracker = tts.getTrackerName(); + + // + // Check if too many tasks of this job have failed on this + // tasktracker prior to assigning it a new one. + // + int taskTrackerFailedTasks = getTrackerTaskFailures(taskTracker); + if (taskTrackerFailedTasks >= conf.getMaxTaskFailuresPerTracker()) { + String flakyTracker = convertTrackerNameToHostName(taskTracker); + if (flakyTaskTrackers < clusterSize) { + LOG.debug("Ignoring the black-listed tasktracker: '" + flakyTracker + + "' for assigning a new task"); + return -1; + } else { + LOG.warn("Trying to assign a new task for black-listed tracker " + + flakyTracker + " since all task-trackers in the cluster are " + + "'flaky' !"); + } + } + // // See if there is a split over a block that is stored on // the TaskTracker checking in. That means the block @@ -647,6 +729,11 @@ failedReduceTasks++; } + // + // Note down that a task has failed on this tasktracker + // + addTrackerTaskFailure(trackerName); + // // Let the JobTracker know that this task has failed // Added: lucene/hadoop/trunk/src/webapps/job/jobblacklistedtrackers.jsp URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobblacklistedtrackers.jsp?view=auto&rev=510630 ============================================================================== --- lucene/hadoop/trunk/src/webapps/job/jobblacklistedtrackers.jsp (added) +++ lucene/hadoop/trunk/src/webapps/job/jobblacklistedtrackers.jsp Thu Feb 22 11:23:35 2007 @@ -0,0 +1,60 @@ +<%@ page + contentType="text/html; charset=UTF-8" + import="javax.servlet.*" + import="javax.servlet.http.*" + import="java.io.*" + import="java.util.*" + import="org.apache.hadoop.mapred.*" + import="org.apache.hadoop.util.*" +%> + +<%! + JobTracker tracker = JobTracker.getTracker(); + String trackerName = + StringUtils.simpleHostname(tracker.getJobTrackerMachine()); + + private void printBlackListedTrackers(JspWriter out, + JobInProgress job) throws IOException { + Map<String, Integer> trackerErrors = job.getTaskTrackerErrors(); + out.print("<table border=2 cellpadding=\"5\" cellspacing=\"2\">"); + out.print("<tr><th>TaskTracker</th><th>No. of Failures</th></tr>\n"); + int maxErrorsPerTracker = job.getJobConf().getMaxTaskFailuresPerTracker(); + for (Map.Entry<String,Integer> e : trackerErrors.entrySet()) { + if (e.getValue().intValue() >= maxErrorsPerTracker) { + out.print("<tr><td>" + e.getKey() + "</td><td>" + e.getValue() + + "</td></tr>\n"); + } + } + out.print("</table>\n"); + } +%> + +<% + String jobId = request.getParameter("jobid"); + if (jobId == null) { + out.println("<h2>Missing 'jobid' for fetching black-listed tasktrackers!</h2>"); + return; + } + + JobInProgress job = (JobInProgress) tracker.getJob(jobId); + if (job == null) { + out.print("<b>Job " + jobId + " not found.</b><br>\n"); + return; + } +%> + +<html> +<title>Hadoop <%=jobId%>'s black-listed tasktrackers</title> +<body> +<h1>Hadoop <a href="/jobdetails.jsp?jobid=<%=jobId%>"><%=jobId%></a> - +Black-listed task-trackers</h1> + +<% + printBlackListedTrackers(out, job); +%> + +<hr> +<a href="/jobdetails.jsp?jobid=><%=jobId%>">Go back to <%=jobId%></a><br> +<a href="http://lucene.apache.org/hadoop">Hadoop</a>, 2006.<br> +</body> +</html> Modified: lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp?view=diff&rev=510630&r1=510629&r2=510630 ============================================================================== --- lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp (original) +++ lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp Thu Feb 22 11:23:35 2007 @@ -60,6 +60,7 @@ JobProfile profile = job.getProfile(); JobStatus status = job.getStatus(); int runState = status.getRunState(); + int flakyTaskTrackers = job.getNoOfBlackListedTrackers(); out.print("<b>User:</b> " + profile.getUser() + "<br>\n"); out.print("<b>Job Name:</b> " + profile.getJobName() + "<br>\n"); if (runState == JobStatus.RUNNING) { @@ -79,6 +80,11 @@ } out.print("<b>Finished at:</b> " + new Date(job.getFinishTime()) + "<br>\n"); + } + if (flakyTaskTrackers > 0) { + out.print("<b>Black-listed TaskTrackers:</b> " + + "<a href=\"/jobblacklistedtrackers.jsp?jobid=" + jobId + "\">" + + flakyTaskTrackers + "</a><br>\n"); } out.print("<hr>\n"); out.print("<table border=2 cellpadding=\"5\" cellspacing=\"2\">");