Author: cdouglas
Date: Wed Mar 18 07:07:29 2009
New Revision: 755495
URL: http://svn.apache.org/viewvc?rev=755495&view=rev
Log:
HADOOP-5514. Fix JobTracker metrics and add metrics for wating, failed tasks.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=755495&r1=755494&r2=755495&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Mar 18 07:07:29 2009
@@ -1022,6 +1022,9 @@
HADOOP-5463. Balancer throws "Not a host:port pair" unless port is
specified in fs.default.name. (Stuart White via hairong)
+ HADOOP-5514. Fix JobTracker metrics and add metrics for wating, failed
+ tasks. (cdouglas)
+
Release 0.19.2 - Unreleased
BUG FIXES
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=755495&r1=755494&r2=755495&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Wed Mar 18 07:07:29 2009
@@ -415,6 +415,8 @@
(numMapTasks + numReduceTasks) +
" exceeds the configured limit " + maxTasks);
}
+ jobtracker.getInstrumentation().addWaiting(
+ getJobID(), numMapTasks + numReduceTasks);
maps = new TaskInProgress[numMapTasks];
for(int i=0; i < numMapTasks; ++i) {
@@ -734,8 +736,7 @@
// Status update methods
////////////////////////////////////////////////////
public synchronized void updateTaskStatus(TaskInProgress tip,
- TaskStatus status,
- JobTrackerInstrumentation metrics)
{
+ TaskStatus status) {
double oldProgress = tip.getProgress(); // save old progress
boolean wasRunning = tip.isRunning();
@@ -833,7 +834,7 @@
// Tell the job to fail the relevant task
failedTask(tip, taskid, status, ttStatus,
- wasRunning, wasComplete, metrics);
+ wasRunning, wasComplete);
// Did the task failure lead to tip failure?
TaskCompletionEvent.Status taskCompletionStatus =
@@ -864,7 +865,7 @@
this.taskCompletionEvents.add(taskEvent);
taskCompletionEventTracker++;
if (state == TaskStatus.State.SUCCEEDED) {
- completedTask(tip, status, metrics);
+ completedTask(tip, status);
}
}
}
@@ -1267,6 +1268,7 @@
if (!isScheduled) {
tip.addRunningTask(id, tts.getTrackerName());
}
+ final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
// keeping the earlier ordering intact
String name;
@@ -1285,12 +1287,14 @@
splits = tip.getSplitNodes();
if (tip.getActiveTasks().size() > 1)
speculativeMapTasks++;
+ metrics.launchMap(id);
} else {
++runningReduceTasks;
name = Values.REDUCE.name();
counter = Counter.TOTAL_LAUNCHED_REDUCES;
if (tip.getActiveTasks().size() > 1)
speculativeReduceTasks++;
+ metrics.launchReduce(id);
}
// Note that the logs are for the scheduled tasks only. Tasks that join on
// restart has already their logs in place.
@@ -1959,11 +1963,11 @@
* A taskid assigned to this JobInProgress has reported in successfully.
*/
public synchronized boolean completedTask(TaskInProgress tip,
- TaskStatus status,
- JobTrackerInstrumentation metrics)
+ TaskStatus status)
{
TaskAttemptID taskid = status.getTaskID();
int oldNumAttempts = tip.getActiveTasks().size();
+ final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
// Sanity check: is the TIP already complete?
// It _is_ safe to not decrement running{Map|Reduce}Tasks and
@@ -2047,7 +2051,7 @@
terminateJob(JobStatus.KILLED);
}
else {
- jobComplete(metrics);
+ jobComplete();
}
// The job has been killed/failed/successful
// JobTracker should cleanup this task
@@ -2085,10 +2089,9 @@
/**
* The job is done since all it's component tasks are either
* successful or have failed.
- *
- * @param metrics job-tracker metrics
*/
- private void jobComplete(JobTrackerInstrumentation metrics) {
+ private void jobComplete() {
+ final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
//
// All tasks are complete, then the job is done!
//
@@ -2186,12 +2189,12 @@
while (!mapCleanupTasks.isEmpty()) {
taskid = mapCleanupTasks.remove(0);
tip = maps[taskid.getTaskID().getId()];
- updateTaskStatus(tip, tip.getTaskStatus(taskid), null);
+ updateTaskStatus(tip, tip.getTaskStatus(taskid));
}
while (!reduceCleanupTasks.isEmpty()) {
taskid = reduceCleanupTasks.remove(0);
tip = reduces[taskid.getTaskID().getId()];
- updateTaskStatus(tip, tip.getTaskStatus(taskid), null);
+ updateTaskStatus(tip, tip.getTaskStatus(taskid));
}
}
@@ -2239,8 +2242,8 @@
private void failedTask(TaskInProgress tip, TaskAttemptID taskid,
TaskStatus status,
TaskTrackerStatus taskTrackerStatus,
- boolean wasRunning, boolean wasComplete,
- JobTrackerInstrumentation metrics) {
+ boolean wasRunning, boolean wasComplete) {
+ final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
// check if the TIP is already failed
boolean wasFailed = tip.isFailed();
@@ -2258,6 +2261,7 @@
launchedSetup = false;
} else if (tip.isMapTask()) {
runningMapTasks -= 1;
+ metrics.failedMap(taskid);
// remove from the running queue and put it in the non-running cache
// if the tip is not complete i.e if the tip still needs to be run
if (!isComplete) {
@@ -2266,6 +2270,7 @@
}
} else {
runningReduceTasks -= 1;
+ metrics.failedReduce(taskid);
// remove from the running queue and put in the failed queue if the tip
// is not complete
if (!isComplete) {
@@ -2417,7 +2422,7 @@
*/
public void failedTask(TaskInProgress tip, TaskAttemptID taskid, String
reason,
TaskStatus.Phase phase, TaskStatus.State state,
- String trackerName, JobTrackerInstrumentation
metrics) {
+ String trackerName) {
TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(),
taskid,
0.0f,
@@ -2434,7 +2439,7 @@
status.setStartTime(startTime);
status.setFinishTime(System.currentTimeMillis());
boolean wasComplete = tip.isComplete();
- updateTaskStatus(tip, status, metrics);
+ updateTaskStatus(tip, status);
boolean isComplete = tip.isComplete();
if (wasComplete && !isComplete) { // mark a successful tip as failed
String taskType = getTaskType(tip);
@@ -2451,6 +2456,8 @@
*/
synchronized void garbageCollect() {
// Let the JobTracker know that a job is complete
+ jobtracker.getInstrumentation(
+ ).decWaiting(getJobID(), pendingMaps() + pendingReduces());
jobtracker.storeCompletedJob(this);
jobtracker.finalizeJob(this);
@@ -2556,8 +2563,7 @@
synchronized void fetchFailureNotification(TaskInProgress tip,
TaskAttemptID mapTaskId,
- String trackerName,
- JobTrackerInstrumentation
metrics) {
+ String trackerName) {
Integer fetchFailures = mapTaskIdToFetchFailuresMap.get(mapTaskId);
fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1);
mapTaskIdToFetchFailuresMap.put(mapTaskId, fetchFailures);
@@ -2577,7 +2583,7 @@
failedTask(tip, mapTaskId, "Too many fetch-failures",
(tip.isMapTask() ? TaskStatus.Phase.MAP :
TaskStatus.Phase.REDUCE),
- TaskStatus.State.FAILED, trackerName, metrics);
+ TaskStatus.State.FAILED, trackerName);
mapTaskIdToFetchFailuresMap.remove(mapTaskId);
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=755495&r1=755494&r2=755495&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Wed
Mar 18 07:07:29 2009
@@ -253,7 +253,7 @@
tip.isMapTask()? TaskStatus.Phase.MAP:
TaskStatus.Phase.STARTING,
TaskStatus.State.FAILED,
- trackerName, myInstrumentation);
+ trackerName);
}
itr.remove();
} else {
@@ -931,7 +931,7 @@
// This will add the tip failed event in the new log
tip.getJob().failedTask(tip, id, status.getDiagnosticInfo(),
status.getPhase(), status.getRunState(),
- status.getTaskTracker(), myInstrumentation);
+ status.getTaskTracker());
}
}
@@ -1045,7 +1045,7 @@
taskStatus.setCounters(counter);
// II. Replay the status
- job.updateTaskStatus(tip, taskStatus, myInstrumentation);
+ job.updateTaskStatus(tip, taskStatus);
// III. Prevent the task from expiry
expireLaunchingTasks.removeTask(attemptId);
@@ -1082,7 +1082,7 @@
taskStatus.setDiagnosticInfo(diagInfo); // diag info
// II. Update the task status
- job.updateTaskStatus(tip, taskStatus, myInstrumentation);
+ job.updateTaskStatus(tip, taskStatus);
// III. Prevent the task from expiry
expireLaunchingTasks.removeTask(attemptId);
@@ -1231,7 +1231,7 @@
}
}
- private JobTrackerInstrumentation myInstrumentation = null;
+ private final JobTrackerInstrumentation myInstrumentation;
/////////////////////////////////////////////////////////////////
// The real JobTracker
@@ -1457,18 +1457,21 @@
trackerIdentifier = getDateFormat().format(new Date());
- Class<? extends JobTrackerInstrumentation> metricsInst =
getInstrumentationClass(jobConf);
+ // Initialize instrumentation
+ JobTrackerInstrumentation tmp;
+ Class<? extends JobTrackerInstrumentation> metricsInst =
+ getInstrumentationClass(jobConf);
try {
java.lang.reflect.Constructor<? extends JobTrackerInstrumentation> c =
metricsInst.getConstructor(new Class[] {JobTracker.class,
JobConf.class} );
- this.myInstrumentation = c.newInstance(this, jobConf);
+ tmp = c.newInstance(this, jobConf);
} catch(Exception e) {
//Reflection can throw lots of exceptions -- handle them all by
//falling back on the default.
LOG.error("failed to initialize job tracker metrics", e);
- this.myInstrumentation = new JobTrackerMetricsInst(this, jobConf);
+ tmp = new JobTrackerMetricsInst(this, jobConf);
}
-
+ myInstrumentation = tmp;
// The rpc/web-server ports can be ephemeral ports...
// ... ensure we have the correct info
@@ -1611,6 +1614,10 @@
t, JobTrackerInstrumentation.class);
}
+ JobTrackerInstrumentation getInstrumentation() {
+ return myInstrumentation;
+ }
+
public static InetSocketAddress getAddress(Configuration conf) {
String jobTrackerStr =
conf.get("mapred.job.tracker", "localhost:8012");
@@ -1736,12 +1743,6 @@
// taskid --> TIP
taskidToTIPMap.put(taskid, tip);
- // Note this launch
- if (taskid.isMap()) {
- myInstrumentation.launchMap(taskid);
- } else {
- myInstrumentation.launchReduce(taskid);
- }
}
void removeTaskEntry(TaskAttemptID taskid) {
@@ -3249,7 +3250,7 @@
// Update the job and inform the listeners if necessary
JobStatus prevStatus = (JobStatus)job.getStatus().clone();
- job.updateTaskStatus(tip, report, myInstrumentation);
+ job.updateTaskStatus(tip, report);
JobStatus newStatus = (JobStatus)job.getStatus().clone();
// Update the listeners if an incomplete job completes
@@ -3278,8 +3279,7 @@
}
failedFetchMap.getJob().fetchFailureNotification(failedFetchMap,
mapTaskId,
-
failedFetchTrackerName,
-
myInstrumentation);
+
failedFetchTrackerName);
}
}
}
@@ -3329,7 +3329,7 @@
TaskStatus.Phase.MAP :
TaskStatus.Phase.REDUCE),
killState,
- trackerName, myInstrumentation);
+ trackerName);
jobsWithFailures.add(job);
}
} else {
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java?rev=755495&r1=755494&r2=755495&view=diff
==============================================================================
---
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
(original)
+++
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
Wed Mar 18 07:07:29 2009
@@ -31,16 +31,27 @@
public void completeMap(TaskAttemptID taskAttemptID)
{ }
+ public void failedMap(TaskAttemptID taskAttemptID)
+ { }
+
public void launchReduce(TaskAttemptID taskAttemptID)
{ }
public void completeReduce(TaskAttemptID taskAttemptID)
- { }
+ { }
+ public void failedReduce(TaskAttemptID taskAttemptID)
+ { }
+
public void submitJob(JobConf conf, JobID id)
{ }
public void completeJob(JobConf conf, JobID id)
{ }
+ public void addWaiting(JobID id, int tasks)
+ { }
+
+ public void decWaiting(JobID id, int tasks)
+ { }
}
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java?rev=755495&r1=755494&r2=755495&view=diff
==============================================================================
---
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
(original)
+++
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
Wed Mar 18 07:07:29 2009
@@ -22,15 +22,21 @@
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.Updater;
import org.apache.hadoop.metrics.jvm.JvmMetrics;
+import org.apache.hadoop.metrics.util.MetricsRegistry;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
class JobTrackerMetricsInst extends JobTrackerInstrumentation implements
Updater {
- private MetricsRecord metricsRecord = null;
- int numMapTasksLaunched = 0;
- int numMapTasksCompleted = 0;
- int numReduceTasksLaunched = 0;
- int numReduceTasksCompleted = 0;
+ private final MetricsRecord metricsRecord;
+
+ private int numMapTasksLaunched = 0;
+ private int numMapTasksCompleted = 0;
+ private int numMapTasksFailed = 0;
+ private int numReduceTasksLaunched = 0;
+ private int numReduceTasksCompleted = 0;
+ private int numReduceTasksFailed = 0;
private int numJobsSubmitted = 0;
private int numJobsCompleted = 0;
+ private int numWaitingTasks = 0;
public JobTrackerMetricsInst(JobTracker tracker, JobConf conf) {
super(tracker, conf);
@@ -52,15 +58,21 @@
synchronized (this) {
metricsRecord.incrMetric("maps_launched", numMapTasksLaunched);
metricsRecord.incrMetric("maps_completed", numMapTasksCompleted);
+ metricsRecord.incrMetric("maps_failed", numMapTasksFailed);
metricsRecord.incrMetric("reduces_launched", numReduceTasksLaunched);
metricsRecord.incrMetric("reduces_completed", numReduceTasksCompleted);
+ metricsRecord.incrMetric("reduces_failed", numReduceTasksFailed);
metricsRecord.incrMetric("jobs_submitted", numJobsSubmitted);
metricsRecord.incrMetric("jobs_completed", numJobsCompleted);
-
+ metricsRecord.incrMetric("waiting_tasks", numWaitingTasks);
+
numMapTasksLaunched = 0;
numMapTasksCompleted = 0;
+ numMapTasksFailed = 0;
numReduceTasksLaunched = 0;
numReduceTasksCompleted = 0;
+ numReduceTasksFailed = 0;
+ numWaitingTasks = 0;
numJobsSubmitted = 0;
numJobsCompleted = 0;
}
@@ -76,6 +88,7 @@
@Override
public synchronized void launchMap(TaskAttemptID taskAttemptID) {
++numMapTasksLaunched;
+ decWaiting(taskAttemptID.getJobID(), 1);
}
@Override
@@ -84,8 +97,15 @@
}
@Override
+ public synchronized void failedMap(TaskAttemptID taskAttemptID) {
+ ++numMapTasksFailed;
+ addWaiting(taskAttemptID.getJobID(), 1);
+ }
+
+ @Override
public synchronized void launchReduce(TaskAttemptID taskAttemptID) {
++numReduceTasksLaunched;
+ decWaiting(taskAttemptID.getJobID(), 1);
}
@Override
@@ -94,6 +114,12 @@
}
@Override
+ public synchronized void failedReduce(TaskAttemptID taskAttemptID) {
+ ++numReduceTasksFailed;
+ addWaiting(taskAttemptID.getJobID(), 1);
+ }
+
+ @Override
public synchronized void submitJob(JobConf conf, JobID id) {
++numJobsSubmitted;
}
@@ -102,4 +128,14 @@
public synchronized void completeJob(JobConf conf, JobID id) {
++numJobsCompleted;
}
+
+ @Override
+ public synchronized void addWaiting(JobID id, int tasks) {
+ numWaitingTasks += tasks;
+ }
+
+ @Override
+ public synchronized void decWaiting(JobID id, int tasks) {
+ numWaitingTasks -= tasks;
+ }
}