Author: omalley Date: Tue Jun 5 23:14:54 2007 New Revision: 544740 URL: http://svn.apache.org/viewvc?view=rev&rev=544740 Log: HADOOP-1446. Update the TaskTracker metrics while the task is running.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=544740&r1=544739&r2=544740 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Tue Jun 5 23:14:54 2007 @@ -78,6 +78,9 @@ 25. HADOOP-1461. Fix the synchronization of the task tracker to avoid lockups in job cleanup. (Arun C Murthy via omalley) + 26. HADOOP-1446. Update the TaskTracker metrics while the task is + running. (Devaraj via omalley) + Release 0.13.0 - 2007-06-08 1. HADOOP-1047. Fix TestReplication to succeed more reliably. Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?view=diff&rev=544740&r1=544739&r2=544740 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Tue Jun 5 23:14:54 2007 @@ -282,7 +282,8 @@ // Add main class and its arguments vargs.add(TaskTracker.Child.class.getName()); // main of Child - vargs.add(tracker.taskReportPort + ""); // pass umbilical port + // pass umbilical port + vargs.add(tracker.getTaskTrackerReportPort() + ""); vargs.add(t.getTaskId()); // pass task identifier // Run java Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=544740&r1=544739&r2=544740 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Jun 5 23:14:54 2007 @@ -59,6 +59,7 @@ import org.apache.hadoop.metrics.MetricsException; import org.apache.hadoop.metrics.MetricsRecord; import org.apache.hadoop.metrics.MetricsUtil; +import org.apache.hadoop.metrics.Updater; import org.apache.hadoop.net.DNS; import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.ReflectionUtils; @@ -92,7 +93,7 @@ InetSocketAddress jobTrackAddr; String taskReportBindAddress; - int taskReportPort; + private int taskReportPort; Server taskReportServer = null; InterTrackerProtocol jobClient; @@ -120,8 +121,8 @@ */ Map<String, TaskInProgress> runningTasks = null; Map<String, RunningJob> runningJobs = null; - int mapTotal = 0; - int reduceTotal = 0; + volatile int mapTotal = 0; + volatile int reduceTotal = 0; boolean justStarted = true; //dir -> DF @@ -156,25 +157,33 @@ */ private int probe_sample_size = 50; - private class TaskTrackerMetrics { + private class TaskTrackerMetrics implements Updater { private MetricsRecord metricsRecord = null; + private int numCompletedTasks = 0; TaskTrackerMetrics() { MetricsContext context = MetricsUtil.getContext("mapred"); metricsRecord = MetricsUtil.createRecord(context, "tasktracker"); + context.registerUpdater(this); } synchronized void completeTask() { - if (metricsRecord != null) { - metricsRecord.incrMetric("tasks_completed", 1); - } + ++numCompletedTasks; } - - synchronized void update() { - if (metricsRecord != null) { - metricsRecord.setMetric("maps_running", mapTotal); - metricsRecord.setMetric("reduces_running", reduceTotal); - metricsRecord.update(); + /** + * Since this object is a registered updater, this method will be called + * periodically, e.g. every 5 seconds. + */ + public void doUpdates(MetricsContext unused) { + synchronized (this) { + if (metricsRecord != null) { + metricsRecord.setMetric("maps_running", mapTotal); + metricsRecord.setMetric("reduces_running", reduceTotal); + metricsRecord.setMetric("taskSlots", (short)maxCurrentTasks); + metricsRecord.incrMetric("tasks_completed", numCompletedTasks); + metricsRecord.update(); + } + numCompletedTasks = 0; } } } @@ -681,6 +690,11 @@ public FileSystem getFileSystem(){ return fs; } + + /** Return the port at which the tasktracker bound to */ + public synchronized int getTaskTrackerReportPort() { + return taskReportPort; + } /** Queries the job tracker for a set of outputs ready to be copied * @param fromEventId the first event ID we want to start from, this is @@ -769,8 +783,10 @@ String msg = "Exiting task tracker for disk error:\n" + StringUtils.stringifyException(de); LOG.error(msg); - jobClient.reportTaskTrackerError(taskTrackerName, - "DiskErrorException", msg); + synchronized (this) { + jobClient.reportTaskTrackerError(taskTrackerName, + "DiskErrorException", msg); + } return State.STALE; } catch (RemoteException re) { String reClass = re.getClassName(); @@ -852,7 +868,6 @@ } try { myMetrics.completeTask(); - myMetrics.update(); } catch (MetricsException me) { LOG.warn("Caught: " + StringUtils.stringifyException(me)); } @@ -1081,7 +1096,6 @@ } else { reduceTotal++; } - myMetrics.update(); } try { localizeJob(tip); @@ -1461,7 +1475,6 @@ failure); runningTasks.put(task.getTaskId(), this); mapTotal++; - myMetrics.update(); } else { LOG.warn("Output already reported lost:"+task.getTaskId()); }