Author: acmurthy Date: Fri Sep 7 14:08:36 2007 New Revision: 573708 URL: http://svn.apache.org/viewvc?rev=573708&view=rev Log: HADOOP-1351. Add "bin/hadoop job [-fail-task|-kill-task]" sub-commands to terminate a particular task-attempt. Contributed by Enis Soztutar.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp lucene/hadoop/trunk/src/webapps/job/jobtracker.jsp lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=573708&r1=573707&r2=573708&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Fri Sep 7 14:08:36 2007 @@ -39,6 +39,9 @@ HADOOP-1767. Add "bin/hadoop job -list" sub-command. (taton via cutting) + HADOOP-1351. Add "bin/hadoop job [-fail-task|-kill-task]" sub-commands + to terminate a particular task-attempt. (Enis Soztutar via acmurthy) + OPTIMIZATIONS HADOOP-1565. Reduce memory usage of NameNode by replacing Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?rev=573708&r1=573707&r2=573708&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Fri Sep 7 14:08:36 2007 @@ -187,6 +187,17 @@ public synchronized void killJob() throws IOException { jobSubmitClient.killJob(getJobID()); } + + /** + * Kill indicated task attempt. + * @param taskId the id of the task to kill. + * @param shouldFail if true the task is failed and added to failed tasks list, otherwise + * it is just killed, w/o affecting job failure status. + */ + public synchronized void killTask(String taskId, boolean shouldFail) throws IOException { + jobSubmitClient.killTask(taskId, shouldFail); + } + /** * Fetch task completion events from jobtracker for this job. */ @@ -767,7 +778,9 @@ System.out.printf("\t-status\t<job-id>\n"); System.out.printf("\t-kill\t<job-id>\n"); System.out.printf("\t-events\t<job-id> <from-event-#> <#-of-events>\n"); - System.out.printf("\t-list\n\n"); + System.out.printf("\t-list\n"); + System.out.printf("\t-kill-task <task-id>\n"); + System.out.printf("\t-fail-task <task-id>\n\n"); ToolRunner.printGenericCommandUsage(System.out); throw new RuntimeException("JobClient: bad command-line arguments"); } @@ -776,13 +789,16 @@ // process arguments String submitJobFile = null; String jobid = null; + String taskid = null; int fromEvent = 0; int nEvents = 0; boolean getStatus = false; boolean killJob = false; boolean listEvents = false; boolean listJobs = false; - + boolean killTask = false; + boolean failTask = false; + if (argv.length < 1) displayUsage(); @@ -809,6 +825,16 @@ listEvents = true; } else if ("-list".equals(argv[0])) { listJobs = true; + } else if("-kill-task".equals(argv[0])) { + if(argv.length != 2) + displayUsage(); + killTask = true; + taskid = argv[1]; + } else if("-fail-task".equals(argv[0])) { + if(argv.length != 2) + displayUsage(); + failTask = true; + taskid = argv[1]; } else { displayUsage(); } @@ -853,6 +879,22 @@ } else if (listJobs) { listJobs(); exitCode = 0; + } else if(killTask) { + if(jobSubmitClient.killTask(taskid, false)) { + System.out.println("Killed task " + taskid); + exitCode = 0; + } else { + System.out.println("Could not kill task " + taskid); + exitCode = -1; + } + } else if(failTask) { + if(jobSubmitClient.killTask(taskid, true)) { + System.out.println("Killed task " + taskid + " by failing it"); + exitCode = 0; + } else { + System.out.println("Could not fail task " + taskid); + exitCode = -1; + } } } finally { close(); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java?rev=573708&r1=573707&r2=573708&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java Fri Sep 7 14:08:36 2007 @@ -584,7 +584,7 @@ } } /** - * Log task attempt failed event. + * Log task attempt killed event. * @param jobId jobid * @param taskId taskid * @param taskAttemptId task attempt id @@ -679,7 +679,7 @@ } } /** - * Log failed reduce task attempt. + * Log killed reduce task attempt. * @param jobId job id * @param taskId task id * @param taskAttemptId task attempt id Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=573708&r1=573707&r2=573708&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Fri Sep 7 14:08:36 2007 @@ -39,12 +39,13 @@ import org.apache.hadoop.metrics.MetricsUtil; import org.apache.hadoop.util.StringUtils; -/////////////////////////////////////////////////////// -// JobInProgress maintains all the info for keeping -// a Job on the straight and narrow. It keeps its JobProfile -// and its latest JobStatus, plus a set of tables for -// doing bookkeeping of its Tasks. -/////////////////////////////////////////////////////// +/************************************************************* + * JobInProgress maintains all the info for keeping + * a Job on the straight and narrow. It keeps its JobProfile + * and its latest JobStatus, plus a set of tables for + * doing bookkeeping of its Tasks. + * *********************************************************** + */ class JobInProgress { private static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobInProgress"); @@ -188,8 +189,7 @@ } } } - - + /** * Called when the job is complete */ @@ -455,8 +455,10 @@ // // Update JobInProgress status // - LOG.debug("Taking progress for " + tip.getTIPId() + " from " + - oldProgress + " to " + tip.getProgress()); + if(LOG.isDebugEnabled()) { + LOG.debug("Taking progress for " + tip.getTIPId() + " from " + + oldProgress + " to " + tip.getProgress()); + } double progressDelta = tip.getProgress() - oldProgress; if (tip.isMapTask()) { if (maps.length == 0) { @@ -916,9 +918,14 @@ TaskStatus status, String trackerName, boolean wasRunning, boolean wasComplete, JobTrackerMetrics metrics) { - // Mark the taskid as a 'failure' - tip.incompleteSubTask(taskid, trackerName, this.status); - + if(status.getRunState() == TaskStatus.State.KILLED ) { + tip.taskKilled(taskid, trackerName, this.status); + } + else { + // Mark the taskid as a 'failure' + tip.incompleteSubTask(taskid, trackerName, this.status); + } + boolean isRunning = tip.isRunning(); boolean isComplete = tip.isComplete(); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java?rev=573708&r1=573707&r2=573708&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java Fri Sep 7 14:08:36 2007 @@ -30,9 +30,10 @@ public interface JobSubmissionProtocol extends VersionedProtocol { /* *Changing the versionID to 2L since the getTaskCompletionEvents method has - *changed + *changed. + *Changed to 4 since killTask(String,boolean) is added */ - public static final long versionID = 3L; + public static final long versionID = 4L; /** * Allocate a name for the job. @@ -59,6 +60,14 @@ */ public void killJob(String jobid) throws IOException; + /** + * Kill indicated task attempt. + * @param taskId the id of the task to kill. + * @param shouldFail if true the task is failed and added to failed tasks list, otherwise + * it is just killed, w/o affecting job failure status. + */ + public boolean killTask(String taskId, boolean shouldFail) throws IOException; + /** * Grab a handle to a job that is already known to the JobTracker */ Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=573708&r1=573707&r2=573708&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Fri Sep 7 14:08:36 2007 @@ -1404,7 +1404,7 @@ Set<String> killJobIds = new TreeSet<String>(); for (String killTaskId : taskIds) { TaskInProgress tip = taskidToTIPMap.get(killTaskId); - if (tip.shouldCloseForClosedJob(killTaskId)) { + if (tip.shouldClose(killTaskId)) { // // This is how the JobTracker ends a task at the TaskTracker. // It may be successfully completed, or may be killed in @@ -1668,6 +1668,18 @@ : (TaskInProgress) job.getTaskInProgress(tipid)); } + /** Mark a Task to be killed */ + public synchronized boolean killTask(String taskid, boolean shouldFail) throws IOException{ + TaskInProgress tip = taskidToTIPMap.get(taskid); + if(tip != null) { + return tip.killTask(taskid, shouldFail); + } + else { + LOG.info("Kill task attempt failed since task " + taskid + " was not found"); + return false; + } + } + /** * Get tracker name for a given task id. * @param taskId the name of the task Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=573708&r1=573707&r2=573708&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Fri Sep 7 14:08:36 2007 @@ -283,6 +283,12 @@ jobs.get(id).stop(); } + /** Throws [EMAIL PROTECTED] UnsupportedOperationException} */ + public boolean killTask(String taskId, boolean shouldFail) throws IOException { + throw new UnsupportedOperationException("Killing tasks in " + + "LocalJobRunner is not supported"); + } + public JobProfile getJobProfile(String id) { Job job = jobs.get(id); return job.getProfile(); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java?rev=573708&r1=573707&r2=573708&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java Fri Sep 7 14:08:36 2007 @@ -82,6 +82,14 @@ public TaskCompletionEvent[] getTaskCompletionEvents( int startFrom) throws IOException; + + /** + * Kill indicated task attempt. + * @param taskId the id of the task to kill. + * @param shouldFail if true the task is failed and added to failed tasks list, otherwise + * it is just killed, w/o affecting job failure status. + */ + public void killTask(String taskId, boolean shouldFail) throws IOException; /** * Gets the counters for this job. Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=573708&r1=573707&r2=573708&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Fri Sep 7 14:08:36 2007 @@ -33,20 +33,20 @@ import org.apache.hadoop.util.StringUtils; -//////////////////////////////////////////////////////// -// TaskInProgress maintains all the info needed for a -// Task in the lifetime of its owning Job. A given Task -// might be speculatively executed or reexecuted, so we -// need a level of indirection above the running-id itself. -// -// A given TaskInProgress contains multiple taskids, -// 0 or more of which might be executing at any one time. -// (That's what allows speculative execution.) A taskid -// is now *never* recycled. A TIP allocates enough taskids -// to account for all the speculation and failures it will -// ever have to handle. Once those are up, the TIP is dead. -// -//////////////////////////////////////////////////////// +/************************************************************* + * TaskInProgress maintains all the info needed for a + * Task in the lifetime of its owning Job. A given Task + * might be speculatively executed or reexecuted, so we + * need a level of indirection above the running-id itself. + * <br> + * A given TaskInProgress contains multiple taskids, + * 0 or more of which might be executing at any one time. + * (That's what allows speculative execution.) A taskid + * is now *never* recycled. A TIP allocates enough taskids + * to account for all the speculation and failures it will + * ever have to handle. Once those are up, the TIP is dead. + * ************************************************************** + */ class TaskInProgress { static final int MAX_TASK_EXECS = 1; int maxTaskAttempts = 4; @@ -108,7 +108,10 @@ private TreeSet<String> machinesWhereFailed = new TreeSet<String>(); private TreeSet<String> tasksReportedClosed = new TreeSet<String>(); - + + //list of tasks to kill, <taskid> -> <shouldFail> + private TreeMap<String, Boolean> tasksToKill = new TreeMap<String, Boolean>(); + private Counters counters = new Counters(); /** @@ -162,7 +165,7 @@ * @return The unique string for this tip */ private String makeUniqueString(String uniqueBase) { - StringBuffer result = new StringBuffer(); + StringBuilder result = new StringBuilder(); result.append(uniqueBase); if (isMapTask()) { result.append("_m_"); @@ -174,8 +177,8 @@ } /** - * Return the index of the tip within the job, so "tip_0002_m_012345" - * would return 12345; + * Return the index of the tip within the job, so + * "tip_200707121733_1313_0002_m_012345" would return 12345; * @return int the tip index */ public int idWithinJob() { @@ -239,7 +242,7 @@ * @return <code>true</code> if taskid is complete, else <code>false</code> */ public boolean isComplete(String taskid) { - TaskStatus status = (TaskStatus) taskStatuses.get(taskid); + TaskStatus status = taskStatuses.get(taskid); if (status == null) { return false; } @@ -285,14 +288,15 @@ } /** * Returns whether a component task-thread should be - * closed because the containing JobInProgress has completed. + * closed because the containing JobInProgress has completed + * or the task is killed by the user */ - public boolean shouldCloseForClosedJob(String taskid) { + public boolean shouldClose(String taskid) { // If the thing has never been closed, // and it belongs to this TIP, // and this TIP is somehow FINISHED, // then true - TaskStatus ts = (TaskStatus) taskStatuses.get(taskid); + TaskStatus ts = taskStatuses.get(taskid); if ((ts != null) && (!tasksReportedClosed.contains(taskid)) && (job.getStatus().getRunState() != JobStatus.RUNNING)) { @@ -303,7 +307,7 @@ tasksReportedClosed.add(taskid); return true; } else { - return false; + return tasksToKill.keySet().contains(taskid); } } @@ -364,7 +368,7 @@ synchronized boolean updateStatus(TaskStatus status) { String taskid = status.getTaskId(); String diagInfo = status.getDiagnosticInfo(); - TaskStatus oldStatus = (TaskStatus) taskStatuses.get(taskid); + TaskStatus oldStatus = taskStatuses.get(taskid); boolean changed = true; if (diagInfo != null && diagInfo.length() > 0) { LOG.info("Error from "+taskid+": "+diagInfo); @@ -578,7 +582,49 @@ public boolean wasKilled() { return killed; } - + + /** + * Kill the given task + */ + boolean killTask(String taskId, boolean shouldFail) { + TaskStatus st = taskStatuses.get(taskId); + if(st != null && st.getRunState() == TaskStatus.State.RUNNING + && tasksToKill.put(taskId, shouldFail) == null ) { + String logStr = "Request received to " + (shouldFail ? "fail" : "kill") + + " task '" + taskId + "' by user"; + addDiagnosticInfo(taskId, logStr); + LOG.info(logStr); + return true; + } + return false; + } + + /** Notification that a task with the given id has been killed */ + void taskKilled(String taskId, String trackerName, JobStatus jobStatus) { + Boolean shouldFail = tasksToKill.remove(taskId); + if(shouldFail != null && !shouldFail) { + LOG.info("Task '" + taskId + "' has been killed"); + this.activeTasks.remove(taskId); + taskStatuses.get(taskId).setRunState(TaskStatus.State.KILLED ); + addDiagnosticInfo(taskId, "Task has been killed" ); + // Discard task output + Task t = tasks.get(taskId); + try { + t.discardTaskOutput(); + } catch (IOException ioe) { + LOG.info("Failed to discard output of task '" + taskId + "' with " + + StringUtils.stringifyException(ioe)); + } + numKilledTasks++; + + } + else { + //set the task status as failed. + taskStatuses.get(taskId).setRunState(TaskStatus.State.FAILED); + incompleteSubTask(taskId, trackerName, jobStatus); + } + } + /** * This method is called whenever there's a status change * for one of the TIP's sub-tasks. It recomputes the overall @@ -596,8 +642,8 @@ double bestProgress = 0; String bestState = ""; Counters bestCounters = new Counters(); - for (Iterator it = taskStatuses.keySet().iterator(); it.hasNext();) { - String taskid = (String) it.next(); + for (Iterator<String> it = taskStatuses.keySet().iterator(); it.hasNext();) { + String taskid = it.next(); TaskStatus status = taskStatuses.get(taskid); if (status.getRunState() == TaskStatus.State.SUCCEEDED) { bestProgress = 1; Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=573708&r1=573707&r2=573708&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri Sep 7 14:08:36 2007 @@ -825,10 +825,11 @@ // Send the heartbeat and process the jobtracker's directives HeartbeatResponse heartbeatResponse = transmitHeartBeat(); TaskTrackerAction[] actions = heartbeatResponse.getActions(); - LOG.debug("Got heartbeatResponse from JobTracker with responseId: " + - heartbeatResponse.getResponseId() + " and " + - ((actions != null) ? actions.length : 0) + " actions"); - + if(LOG.isDebugEnabled()) { + LOG.debug("Got heartbeatResponse from JobTracker with responseId: " + + heartbeatResponse.getResponseId() + " and " + + ((actions != null) ? actions.length : 0) + " actions"); + } if (reinitTaskTracker(actions)) { return State.STALE; } Modified: lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp?rev=573708&r1=573707&r2=573708&view=diff ============================================================================== --- lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp (original) +++ lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp Fri Sep 7 14:08:36 2007 @@ -83,7 +83,7 @@ + " ?<h3><br><table border=\"0\"><tr><td width=\"100\">" + "<a href=\"" + url + "&action=kill" + "\">Kill</a></td><td width=\"100\"><a href=\"" + url - + "\">Don't Kill</a></td></tr></table></body></html>"); + + "\">Cancel </a></td></tr></table></body></html>"); } %> @@ -245,7 +245,7 @@ JobPriority jobPrio = job.getPriority(); for (JobPriority prio : JobPriority.values()) { if(jobPrio != prio) { - %><a href="jobdetails.jsp?action=changeprio&jobid=<%=jobId%>&prio=<%=prio%>"> <%=prio%> </a><% + %> <a href="jobdetails.jsp?action=changeprio&jobid=<%=jobId%>&prio=<%=prio%>"> <%=prio%> </a> <% } } %> Modified: lucene/hadoop/trunk/src/webapps/job/jobtracker.jsp URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobtracker.jsp?rev=573708&r1=573707&r2=573708&view=diff ============================================================================== --- lucene/hadoop/trunk/src/webapps/job/jobtracker.jsp (original) +++ lucene/hadoop/trunk/src/webapps/job/jobtracker.jsp Fri Sep 7 14:08:36 2007 @@ -86,6 +86,7 @@ r<%= VersionInfo.getRevision()%><br> <b>Compiled:</b> <%= VersionInfo.getDate()%> by <%= VersionInfo.getUser()%><br> +<b>Identifier:</b> <%= tracker.getTrackerIdentifier()%><br> <hr> <h2>Cluster Summary</h2> Modified: lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp?rev=573708&r1=573707&r2=573708&view=diff ============================================================================== --- lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp (original) +++ lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp Fri Sep 7 14:08:36 2007 @@ -9,42 +9,85 @@ import="org.apache.hadoop.util.*" import="java.text.SimpleDateFormat" import="org.apache.hadoop.util.*" + import="org.apache.hadoop.dfs.JspHelper" %> -<%! static SimpleDateFormat dateFormat = new SimpleDateFormat("d-MMM-yyyy HH:mm:ss") ; %> +<%!static SimpleDateFormat dateFormat = new SimpleDateFormat( + "d-MMM-yyyy HH:mm:ss"); + + private static final String PRIVATE_ACTIONS_KEY = "webinterface.private.actions";%> +<%!private void printConfirm(JspWriter out, String jobid, String tipid, + String taskid, String action) throws IOException { + String url = "taskdetails.jsp?jobid=" + jobid + "&tipid=" + tipid + + "&taskid=" + taskid; + out.print("<html><head><META http-equiv=\"refresh\" content=\"15;URL=" + + url + "\"></head>" + "<body><h3> Are you sure you want to kill/fail " + + taskid + " ?<h3><br><table border=\"0\"><tr><td width=\"100\">" + + "<a href=\"" + url + "&action=" + action + + "\">Kill / Fail</a></td><td width=\"100\"><a href=\"" + url + + "\">Cancel</a></td></tr></table></body></html>"); + }%> <% - String jobid = request.getParameter("jobid"); - JobTracker tracker = JobTracker.getTracker(); - JobInProgress job = (JobInProgress) tracker.getJob(jobid); - String tipid = request.getParameter("tipid"); - TaskStatus[] ts = (job != null) ? tracker.getTaskStatuses(jobid, tipid): null; + JobTracker tracker = JobTracker.getTracker(); + String jobid = request.getParameter("jobid"); + String tipid = request.getParameter("tipid"); + String taskid = request.getParameter("taskid"); + + boolean privateActions = JspHelper.conf.getBoolean(PRIVATE_ACTIONS_KEY, + false); + if (privateActions) { + String action = request.getParameter("action"); + if (action != null) { + if (action.equalsIgnoreCase("confirm")) { + String subAction = request.getParameter("subaction"); + if (subAction == null) + subAction = "fail-task"; + printConfirm(out, jobid, tipid, taskid, subAction); + return; + } + else if (action.equalsIgnoreCase("kill-task")) { + tracker.killTask(taskid, false); + //redirect again so that refreshing the page will not attempt to rekill the task + response.sendRedirect("/taskdetails.jsp?" + "&subaction=kill-task" + + "&jobid=" + jobid + "&tipid=" + tipid); + } + else if (action.equalsIgnoreCase("fail-task")) { + tracker.killTask(taskid, true); + response.sendRedirect("/taskdetails.jsp?" + "&subaction=fail-task" + + "&jobid=" + jobid + "&tipid=" + tipid); + } + } + } + JobInProgress job = (JobInProgress) tracker.getJob(jobid); + TaskStatus[] ts = (job != null) ? tracker.getTaskStatuses(jobid, tipid) + : null; %> <html> <title>Hadoop Task Details</title> <body> -<h1>Job <%=jobid%></h1> +<h1>Job <a href="/jobdetails.jsp?jobid=<%=jobid%>"><%=jobid%></a></h1> <hr> <h2>All Task Attempts</h2> <center> <% - if( ts == null || ts.length == 0) { + if (ts == null || ts.length == 0) { %> <h3>No Task Attempts found</h3> <% - }else{ + } else { %> <table border=2 cellpadding="5" cellspacing="2"> <tr><td align="center">Task Attempts</td><td>Machine</td><td>Status</td><td>Progress</td><td>Start Time</td> <% - if( ! ts[0].getIsMap() ) { - %> + if (!ts[0].getIsMap()) { + %> <td>Shuffle Finished</td><td>Sort Finished</td> <% - } + } %> -<td>Finish Time</td><td>Errors</td><td>Task Logs</td><td>Counters</td></tr> +<td>Finish Time</td><td>Errors</td><td>Task Logs</td><td>Counters</td><td>Actions</td></tr> <% for (int i = 0; i < ts.length; i++) { TaskStatus status = ts[i]; @@ -55,59 +98,75 @@ if (taskTracker == null) { out.print("<td>" + taskTrackerName + "</td>"); } else { - taskAttemptTracker = "http://" + taskTracker.getHost() + ":" + - taskTracker.getHttpPort(); - out.print("<td><a href=\"" + taskAttemptTracker + "\">" + - taskTracker.getHost() + "</a></td>"); - } - out.print("<td>" + status.getRunState() + "</td>"); - out.print("<td>"+ StringUtils.formatPercent(status.getProgress(),2) + - "</td>"); - out.print("<td>" + StringUtils.getFormattedTimeWithDiff(dateFormat, - status.getStartTime(), 0) + "</td>"); - if( ! ts[i].getIsMap() ) { - out.print("<td>" + StringUtils.getFormattedTimeWithDiff(dateFormat, - status.getShuffleFinishTime(), status.getStartTime()) + "</td>"); - out.println("<td>" + StringUtils.getFormattedTimeWithDiff(dateFormat, - status.getSortFinishTime(), status.getShuffleFinishTime()) + "</td>"); - } - out.println("<td>"+ StringUtils.getFormattedTimeWithDiff(dateFormat, - status.getFinishTime(), status.getStartTime()) + "</td>"); - - out.print("<td><pre>"); - List<String> failures = tracker.getTaskDiagnostics(jobid, tipid, - status.getTaskId()); - if (failures == null) { - out.print(" "); - } else { - for(Iterator<String> itr = failures.iterator(); itr.hasNext(); ) { - out.print(itr.next()); - if (itr.hasNext()) { - out.print("\n-------\n"); + taskAttemptTracker = "http://" + taskTracker.getHost() + ":" + + taskTracker.getHttpPort(); + out.print("<td><a href=\"" + taskAttemptTracker + "\">" + + taskTracker.getHost() + "</a></td>"); + } + out.print("<td>" + status.getRunState() + "</td>"); + out.print("<td>" + StringUtils.formatPercent(status.getProgress(), 2) + + "</td>"); + out.print("<td>" + + StringUtils.getFormattedTimeWithDiff(dateFormat, status + .getStartTime(), 0) + "</td>"); + if (!ts[i].getIsMap()) { + out.print("<td>" + + StringUtils.getFormattedTimeWithDiff(dateFormat, status + .getShuffleFinishTime(), status.getStartTime()) + "</td>"); + out.println("<td>" + + StringUtils.getFormattedTimeWithDiff(dateFormat, status + .getSortFinishTime(), status.getShuffleFinishTime()) + + "</td>"); + } + out.println("<td>" + + StringUtils.getFormattedTimeWithDiff(dateFormat, status + .getFinishTime(), status.getStartTime()) + "</td>"); + + out.print("<td><pre>"); + List<String> failures = tracker.getTaskDiagnostics(jobid, tipid, + status.getTaskId()); + if (failures == null) { + out.print(" "); + } else { + for (Iterator<String> itr = failures.iterator(); itr.hasNext();) { + out.print(itr.next()); + if (itr.hasNext()) { + out.print("\n-------\n"); + } } } + out.print("</pre></td>"); + out.print("<td>"); + if (taskAttemptTracker == null) { + out.print("n/a"); + } else { + String taskLogUrl = taskAttemptTracker + "/tasklog?taskid=" + + status.getTaskId(); + String tailFourKBUrl = taskLogUrl + "&start=-4097"; + String tailEightKBUrl = taskLogUrl + "&start=-8193"; + String entireLogUrl = taskLogUrl + "&all=true"; + out.print("<a href=\"" + tailFourKBUrl + "\">Last 4KB</a><br/>"); + out.print("<a href=\"" + tailEightKBUrl + "\">Last 8KB</a><br/>"); + out.print("<a href=\"" + entireLogUrl + "\">All</a><br/>"); + } + out.print("</td><td>" + "<a href=\"/taskstats.jsp?jobid=" + jobid + + "&tipid=" + tipid + "&taskid=" + status.getTaskId() + "\">" + + status.getCounters().size() + "</a></td>"); + out.print("<td>"); + if (privateActions + && status.getRunState() == TaskStatus.State.RUNNING) { + out.print("<a href=\"/taskdetails.jsp?action=confirm" + + "&subaction=kill-task" + "&jobid=" + jobid + "&tipid=" + + tipid + "&taskid=" + status.getTaskId() + "\" > Kill </a>"); + out.print("<br><a href=\"/taskdetails.jsp?action=confirm" + + "&subaction=fail-task" + "&jobid=" + jobid + "&tipid=" + + tipid + "&taskid=" + status.getTaskId() + "\" > Fail </a>"); + } + else + out.print("<pre> </pre>"); + out.println("</td></tr>"); } - out.print("</pre></td>"); - out.print("<td>"); - if (taskAttemptTracker == null) { - out.print("n/a"); - } else { - String taskLogUrl = taskAttemptTracker + "/tasklog?taskid=" + - status.getTaskId(); - String tailFourKBUrl = taskLogUrl + "&start=-4097"; - String tailEightKBUrl = taskLogUrl + "&start=-8193"; - String entireLogUrl = taskLogUrl + "&all=true"; - out.print("<a href=\"" + tailFourKBUrl + "\">Last 4KB</a><br/>"); - out.print("<a href=\"" + tailEightKBUrl + "\">Last 8KB</a><br/>"); - out.print("<a href=\"" + entireLogUrl + "\">All</a><br/>"); - } - out.println("</td><td>" + - "<a href=\"/taskstats.jsp?jobid=" + jobid + - "&tipid=" + tipid + "&taskid=" + status.getTaskId() + - "\">" + status.getCounters().size() + - "</a></td></tr>"); } - } %> </table> </center>