Author: cutting Date: Tue Jan 16 13:28:50 2007 New Revision: 496864 URL: http://svn.apache.org/viewvc?view=rev&rev=496864 Log: HADOOP-801. Add to jobtracker a log of task completion events. Contributed by Sanjay.
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java lucene/hadoop/trunk/src/webapps/task/tasklog.jsp Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=496864&r1=496863&r2=496864 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Tue Jan 16 13:28:50 2007 @@ -25,6 +25,9 @@ for better compatibility with some monitoring systems. (Nigel Daley via cutting) + 7. HADOOP-801. Add to jobtracker a log of task completion events. + (Sanjay Dahiya via cutting) + Release 0.10.1 - 2007-01-10 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java?view=diff&rev=496864&r1=496863&r2=496864 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java Tue Jan 16 13:28:50 2007 @@ -91,6 +91,17 @@ public void reportTaskTrackerError(String taskTracker, String errorClass, String errorMessage) throws IOException; + /** + * Get task completion events for the jobid, starting from fromEventId. + * Returns empty aray if no events are available. + * @param jobid job id + * @param fromEventId event id to start from. + * @return array of task completion events. + * @throws IOException + */ + TaskCompletionEvent[] getTaskCompletionEvents( + String jobid, int fromEventId) throws IOException; + } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?view=diff&rev=496864&r1=496863&r2=496864 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Tue Jan 16 13:28:50 2007 @@ -17,7 +17,6 @@ */ package org.apache.hadoop.mapred; -import org.apache.commons.cli2.validation.InvalidArgumentException; import org.apache.commons.logging.*; import org.apache.hadoop.fs.*; @@ -39,6 +38,8 @@ *******************************************************/ public class JobClient extends ToolBase implements MRConstants { private static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobClient"); + public static enum TaskStatusFilter { NONE, FAILED, SUCCEEDED, ALL }; + private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; static long MAX_JOBPROFILE_AGE = 1000 * 2; @@ -150,6 +151,14 @@ public synchronized void killJob() throws IOException { jobSubmitClient.killJob(getJobID()); } + /** + * Fetch task completion events from jobtracker for this job. + */ + public synchronized TaskCompletionEvent[] getTaskCompletionEvents( + int startFrom) throws IOException{ + return jobSubmitClient.getTaskCompletionEvents( + getJobID(), startFrom); + } /** * Dump stats to screen @@ -367,10 +376,22 @@ String lastReport = null; final int MAX_RETRIES = 5; int retries = MAX_RETRIES; + String outputFilterName = job.get("jobclient.output.filter"); + + if (null != outputFilterName) { + try { + jc.setTaskOutputFilter(TaskStatusFilter.valueOf(outputFilterName)); + } catch(IllegalArgumentException e) { + LOG.warn("Invalid Output filter : " + outputFilterName + + " Valid values are : NONE, FAILED, SUCCEEDED, ALL"); + } + } try { running = jc.submitJob(job); String jobId = running.getJobID(); LOG.info("Running job: " + jobId); + int eventCounter = 0 ; + while (true) { try { Thread.sleep(1000); @@ -388,6 +409,34 @@ LOG.info(report); lastReport = report; } + + if( jc.getTaskOutputFilter() != TaskStatusFilter.NONE){ + TaskCompletionEvent[] events = + running.getTaskCompletionEvents(eventCounter); + eventCounter += events.length ; + for(TaskCompletionEvent event : events ){ + switch( jc.getTaskOutputFilter() ){ + case SUCCEEDED: + if( event.getTaskStatus() == + TaskCompletionEvent.Status.SUCCEEDED){ + LOG.info(event.toString()); + printHttpFile(event.getTaskTrackerHttp()); + } + break; + case FAILED: + if( event.getTaskStatus() == + TaskCompletionEvent.Status.FAILED){ + LOG.info(event.toString()); + printHttpFile(event.getTaskTrackerHttp()); + } + break ; + case ALL: + LOG.info(event.toString()); + printHttpFile(event.getTaskTrackerHttp()); + break; + } + } + } retries = MAX_RETRIES; } catch (IOException ie) { if (--retries == 0) { @@ -410,6 +459,35 @@ jc.close(); } } + + static void printHttpFile(String httpURL ) throws IOException { + boolean good = false; + long totalBytes = 0; + URL path = new URL(httpURL); + try { + URLConnection connection = path.openConnection(); + InputStream input = connection.getInputStream(); + try { + byte[] buffer = new byte[64 * 1024]; + int len = input.read(buffer); + while (len > 0) { + totalBytes += len; + LOG.info(new String(buffer).trim()); + len = input.read(buffer); + } + good = ((int) totalBytes) == connection.getContentLength(); + if (!good) { + LOG.warn("Incomplete task output received for " + path + + " (" + totalBytes + " instead of " + + connection.getContentLength() + ")"); + } + }finally { + input.close(); + } + }catch(IOException ioe){ + LOG.warn("Error reading task output" + ioe.getMessage()); + } + } static Configuration getConfiguration(String jobTrackerSpec) { @@ -428,8 +506,23 @@ } return conf; } - + /** + * Sets the output filter for tasks. only those tasks are printed whose + * output matches the filter. + * @param newValue task filter. + */ + public void setTaskOutputFilter(TaskStatusFilter newValue){ + this.taskOutputFilter = newValue ; + } + /** + * Returns task output filter. + * @return task filter. + */ + public TaskStatusFilter getTaskOutputFilter(){ + return this.taskOutputFilter; + } + public int run(String[] argv) throws Exception { if (argv.length < 2) { System.out.println("JobClient -submit <job> | -status <id> | -kill <id> [-jt <jobtracker:port>|<config>]"); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=496864&r1=496863&r2=496864 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Tue Jan 16 13:28:50 2007 @@ -53,6 +53,8 @@ int failedReduceTasks = 0 ; JobTracker jobtracker = null; HashMap hostToMaps = new HashMap(); + private int taskCompletionEventTracker = 0 ; + List<TaskCompletionEvent> taskCompletionEvents ; long startTime; long finishTime; @@ -95,6 +97,8 @@ this.numMapTasks = conf.getNumMapTasks(); this.numReduceTasks = conf.getNumReduceTasks(); + this.taskCompletionEvents = new ArrayList( + numMapTasks + numReduceTasks + 10); JobHistory.JobInfo.logSubmitted(jobid, conf.getJobName(), conf.getUser(), System.currentTimeMillis(), jobFile); @@ -276,10 +280,29 @@ boolean change = tip.updateStatus(status); if (change) { TaskStatus.State state = status.getRunState(); + TaskTrackerStatus ttStatus = + this.jobtracker.getTaskTracker(status.getTaskTracker()); + String httpTaskLogLocation = null; + if( null != ttStatus ){ + httpTaskLogLocation = "http://" + ttStatus.getHost() + ":" + + ttStatus.getHttpPort() + "/tasklog.jsp?plaintext=true&taskid=" + + status.getTaskId() + "&all=true"; + } + if (state == TaskStatus.State.SUCCEEDED) { + this.taskCompletionEvents.add( new TaskCompletionEvent( + taskCompletionEventTracker++, + status.getTaskId(), + TaskCompletionEvent.Status.SUCCEEDED, + httpTaskLogLocation )); completedTask(tip, status, metrics); } else if (state == TaskStatus.State.FAILED || state == TaskStatus.State.KILLED) { + this.taskCompletionEvents.add( new TaskCompletionEvent( + taskCompletionEventTracker++, + status.getTaskId(), + TaskCompletionEvent.Status.FAILED, + httpTaskLogLocation )); // Tell the job to fail the relevant task failedTask(tip, status.getTaskId(), status, status.getTaskTracker(), wasRunning, wasComplete); @@ -753,5 +776,15 @@ } } return null; + } + + public TaskCompletionEvent[] getTaskCompletionEvents(int fromEventId) { + TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY; + if( taskCompletionEvents.size() > fromEventId) { + events = (TaskCompletionEvent[])taskCompletionEvents.subList( + fromEventId, taskCompletionEvents.size()). + toArray(events); + } + return events; } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java?view=diff&rev=496864&r1=496863&r2=496864 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java Tue Jan 16 13:28:50 2007 @@ -80,4 +80,16 @@ * jobs. */ public JobStatus[] jobsToComplete() throws IOException; + + /** + * Get task completion events for the jobid, starting from fromEventId. + * Returns empty aray if no events are available. + * @param jobid job id + * @param fromEventId event id to start from. + * @return array of task completion events. + * @throws IOException + */ + public TaskCompletionEvent[] getTaskCompletionEvents( + String jobid, int fromEventId) throws IOException; + } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=496864&r1=496863&r2=496864 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Tue Jan 16 13:28:50 2007 @@ -1431,6 +1431,21 @@ return (TaskReport[]) reports.toArray(new TaskReport[reports.size()]); } } + + /* + * Returns a list of TaskCompletionEvent for the given job, + * starting from fromEventId. + * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getTaskCompletionEvents(java.lang.String, int) + */ + public synchronized TaskCompletionEvent[] getTaskCompletionEvents( + String jobid, int fromEventId) throws IOException{ + TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY; + JobInProgress job = (JobInProgress)this.jobs.get(jobid); + if (null != job) { + events = job.getTaskCompletionEvents(fromEventId); + } + return events; + } /** * Get the diagnostics for a given task Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?view=diff&rev=496864&r1=496863&r2=496864 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Tue Jan 16 13:28:50 2007 @@ -243,4 +243,9 @@ } public JobStatus[] jobsToComplete() {return null;} + public TaskCompletionEvent[] getTaskCompletionEvents( + String jobid, int fromEventId) throws IOException{ + return TaskCompletionEvent.EMPTY_ARRAY; + } + } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java?view=diff&rev=496864&r1=496863&r2=496864 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java Tue Jan 16 13:28:50 2007 @@ -76,4 +76,8 @@ * killed as well. If the job is no longer running, it simply returns. */ public void killJob() throws IOException; + + public TaskCompletionEvent[] getTaskCompletionEvents( + int startFrom) throws IOException; + } Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java?view=auto&rev=496864 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java Tue Jan 16 13:28:50 2007 @@ -0,0 +1,131 @@ +package org.apache.hadoop.mapred; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + +/** + * This is used to track task completion events on + * job tracker. + * + */ +public class TaskCompletionEvent implements Writable{ + static public enum Status {FAILED, SUCCEEDED}; + + private int eventId ; + private String taskTrackerHttp ; + private String taskId ; + Status status ; + public static final TaskCompletionEvent[] EMPTY_ARRAY = + new TaskCompletionEvent[0]; + /** + * Default constructor for Writable. + * + */ + public TaskCompletionEvent(){} + /** + * Constructor. eventId should be created externally and incremented + * per event for each job. + * @param eventId event id, event id should be unique and assigned in + * incrementally, starting from 0. + * @param taskId task id + * @param status task's status + * @param taskTrackerHttp task tracker's host:port for http. + */ + public TaskCompletionEvent(int eventId, + String taskId, + Status status, + String taskTrackerHttp){ + + this.taskId = taskId ; + this.eventId = eventId ; + this.status =status ; + this.taskTrackerHttp = taskTrackerHttp ; + } + /** + * Returns event Id. + * @return event id + */ + public int getEventId() { + return eventId; + } + /** + * Returns task id. + * @return task id + */ + public String getTaskId() { + return taskId; + } + /** + * Returns enum Status.SUCESS or Status.FAILURE. + * @return task tracker status + */ + public Status getTaskStatus() { + return status; + } + /** + * http location of the tasktracker where this task ran. + * @return http location of tasktracker user logs + */ + public String getTaskTrackerHttp() { + return taskTrackerHttp; + } + /** + * set event Id. should be assigned incrementally starting from 0. + * @param eventId + */ + public void setEventId( + int eventId) { + this.eventId = eventId; + } + /** + * Sets task id. + * @param taskId + */ + public void setTaskId( + String taskId) { + this.taskId = taskId; + } + /** + * Set task status. + * @param status + */ + public void setTaskStatus( + Status status) { + this.status = status; + } + /** + * Set task tracker http location. + * @param taskTrackerHttp + */ + public void setTaskTrackerHttp( + String taskTrackerHttp) { + this.taskTrackerHttp = taskTrackerHttp; + } + + public String toString(){ + StringBuffer buf = new StringBuffer(); + buf.append("Task Id : "); + buf.append( taskId ) ; + buf.append(", Status : "); + buf.append( status.name() ) ; + return buf.toString(); + } + + ////////////////////////////////////////////// + // Writable + ////////////////////////////////////////////// + public void write(DataOutput out) throws IOException { + WritableUtils.writeString(out, taskId); + WritableUtils.writeEnum(out, status); + WritableUtils.writeString(out, taskTrackerHttp); + } + + public void readFields(DataInput in) throws IOException { + this.taskId = WritableUtils.readString(in) ; + this.status = WritableUtils.readEnum(in, Status.class); + this.taskTrackerHttp = WritableUtils.readString(in); + } +} Modified: lucene/hadoop/trunk/src/webapps/task/tasklog.jsp URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/task/tasklog.jsp?view=diff&rev=496864&r1=496863&r2=496864 ============================================================================== --- lucene/hadoop/trunk/src/webapps/task/tasklog.jsp (original) +++ lucene/hadoop/trunk/src/webapps/task/tasklog.jsp Tue Jan 16 13:28:50 2007 @@ -10,6 +10,7 @@ long tailSize = 1024; int tailWindow = 1; boolean entireLog = false; + boolean plainText = false; String taskId = request.getParameter("taskid"); if (taskId == null) { @@ -46,6 +47,11 @@ if (sTailWindow != null) { tailWindow = Integer.valueOf(sTailWindow).intValue(); } + + String sPlainText = request.getParameter("plaintext"); + if (sPlainText != null) { + plainText = Boolean.valueOf(sPlainText); + } if (logOffset == -1 || logLength == -1) { tailLog = true; @@ -55,17 +61,18 @@ if (entireLog) { tailLog = false; } -%> - -<html> - -<title><%= taskId %> Task Logs</title> + + if( !plainText ) { + out.println("<html>"); + out.println("<title>" + taskId + "Task Logs </title>"); + out.println("<body>"); + out.println("<h1>" + taskId + "Task Logs</h1><br>"); + out.println("<h2>Task Logs</h2>"); + out.println("<pre>"); -<body> -<h1><%= taskId %> Task Logs</h1><br> + } +%> -<h2>Task Logs</h2> -<pre> <% boolean gotRequiredData = true; try { @@ -91,20 +98,30 @@ if (bytesRead != targetLength && targetLength <= taskLogReader.getTotalLogSize()) { - out.println("<b>Warning: Could not fetch " + targetLength + - " bytes from the task-logs; probably purged!</b><br/>"); + if( !plainText) { + out.println("<b>Warning: Could not fetch " + targetLength + + " bytes from the task-logs; probably purged!</b><br/>"); + }else{ + out.println("Warning: Could not fetch " + targetLength + + " bytes from the task-logs; probably purged!"); + } gotRequiredData = false; } + if( plainText ) { + response.setContentLength(bytesRead); + } String logData = new String(b, 0, bytesRead); out.println(logData); } catch (IOException ioe) { out.println("Failed to retrieve logs for task: " + taskId); } + + if( !plainText ) { + out.println("</pre>"); + } %> -</pre> - <% - if (!entireLog) { + if (!entireLog && !plainText) { if (tailLog) { if (gotRequiredData) { out.println("<a href='/tasklog.jsp?taskid=" + taskId + @@ -127,9 +144,10 @@ "&len=" + logLength + "'>Later</a>"); } } + if( !plainText ) { + out.println("<hr>"); + out.println("<a href='http://lucene.apache.org/hadoop'>Hadoop</a>, 2006.<br>"); + out.println("</body>"); + out.println("</html>"); + } %> - -<hr> -<a href="http://lucene.apache.org/hadoop">Hadoop</a>, 2006.<br> -</body> -</html>