Author: cdouglas
Date: Mon May 4 21:27:58 2009
New Revision: 771449
URL: http://svn.apache.org/viewvc?rev=771449&view=rev
Log:
HADOOP-5738. Split "waiting_tasks" JobTracker metric into waiting maps and
waiting reduces. Contributed by Sreekanth Ramakrishnan.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=771449&r1=771448&r2=771449&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon May 4 21:27:58 2009
@@ -47,6 +47,9 @@
HADOOP-5668. Change TotalOrderPartitioner to use new API. (Amareshwari
Sriramadasu via cdouglas)
+ HADOOP-5738. Split "waiting_tasks" JobTracker metric into waiting maps and
+ waiting reduces. (Sreekanth Ramakrishnan via cdouglas)
+
NEW FEATURES
HADOOP-4268. Change fsck to use ClientProtocol methods so that the
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=771449&r1=771448&r2=771449&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Mon May 4 21:27:58 2009
@@ -415,9 +415,9 @@
(numMapTasks + numReduceTasks) +
" exceeds the configured limit " + maxTasks);
}
- jobtracker.getInstrumentation().addWaiting(
- getJobID(), numMapTasks + numReduceTasks);
-
+ jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);
+ jobtracker.getInstrumentation().addWaitingReduces(getJobID(),
numReduceTasks);
+
maps = new TaskInProgress[numMapTasks];
for(int i=0; i < numMapTasks; ++i) {
inputLength += splits[i].getDataLength();
@@ -2466,8 +2466,8 @@
*/
synchronized void garbageCollect() {
// Let the JobTracker know that a job is complete
- jobtracker.getInstrumentation(
- ).decWaiting(getJobID(), pendingMaps() + pendingReduces());
+ jobtracker.getInstrumentation().decWaitingMaps(getJobID(), pendingMaps());
+ jobtracker.getInstrumentation().decWaitingReduces(getJobID(),
pendingReduces());
jobtracker.storeCompletedJob(this);
jobtracker.finalizeJob(this);
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java?rev=771449&r1=771448&r2=771449&view=diff
==============================================================================
---
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
(original)
+++
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java
Mon May 4 21:27:58 2009
@@ -55,9 +55,15 @@
public void finalizeJob(JobConf conf, JobID id)
{ }
- public void addWaiting(JobID id, int tasks)
+ public void addWaitingMaps(JobID id, int task)
{ }
-
- public void decWaiting(JobID id, int tasks)
+
+ public void decWaitingMaps(JobID id, int task)
+ { }
+
+ public void addWaitingReduces(JobID id, int task)
+ { }
+
+ public void decWaitingReduces(JobID id, int task)
{ }
}
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java?rev=771449&r1=771448&r2=771449&view=diff
==============================================================================
---
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
(original)
+++
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
Mon May 4 21:27:58 2009
@@ -36,8 +36,9 @@
private int numReduceTasksFailed = 0;
private int numJobsSubmitted = 0;
private int numJobsCompleted = 0;
- private int numWaitingTasks = 0;
-
+ private int numWaitingMaps = 0;
+ private int numWaitingReduces = 0;
+
public JobTrackerMetricsInst(JobTracker tracker, JobConf conf) {
super(tracker, conf);
String sessionId = conf.getSessionId();
@@ -64,7 +65,8 @@
metricsRecord.incrMetric("reduces_failed", numReduceTasksFailed);
metricsRecord.incrMetric("jobs_submitted", numJobsSubmitted);
metricsRecord.incrMetric("jobs_completed", numJobsCompleted);
- metricsRecord.incrMetric("waiting_tasks", numWaitingTasks);
+ metricsRecord.incrMetric("waiting_maps", numWaitingMaps);
+ metricsRecord.incrMetric("waiting_reduces", numWaitingReduces);
numMapTasksLaunched = 0;
numMapTasksCompleted = 0;
@@ -72,9 +74,10 @@
numReduceTasksLaunched = 0;
numReduceTasksCompleted = 0;
numReduceTasksFailed = 0;
- numWaitingTasks = 0;
numJobsSubmitted = 0;
numJobsCompleted = 0;
+ numWaitingMaps = 0;
+ numWaitingReduces = 0;
}
metricsRecord.update();
@@ -88,7 +91,7 @@
@Override
public synchronized void launchMap(TaskAttemptID taskAttemptID) {
++numMapTasksLaunched;
- decWaiting(taskAttemptID.getJobID(), 1);
+ decWaitingMaps(taskAttemptID.getJobID(), 1);
}
@Override
@@ -99,13 +102,13 @@
@Override
public synchronized void failedMap(TaskAttemptID taskAttemptID) {
++numMapTasksFailed;
- addWaiting(taskAttemptID.getJobID(), 1);
+ addWaitingMaps(taskAttemptID.getJobID(), 1);
}
@Override
public synchronized void launchReduce(TaskAttemptID taskAttemptID) {
++numReduceTasksLaunched;
- decWaiting(taskAttemptID.getJobID(), 1);
+ decWaitingReduces(taskAttemptID.getJobID(), 1);
}
@Override
@@ -116,7 +119,7 @@
@Override
public synchronized void failedReduce(TaskAttemptID taskAttemptID) {
++numReduceTasksFailed;
- addWaiting(taskAttemptID.getJobID(), 1);
+ addWaitingReduces(taskAttemptID.getJobID(), 1);
}
@Override
@@ -130,12 +133,22 @@
}
@Override
- public synchronized void addWaiting(JobID id, int tasks) {
- numWaitingTasks += tasks;
+ public synchronized void addWaitingMaps(JobID id, int task) {
+ numWaitingMaps += task;
}
-
+
+ @Override
+ public synchronized void decWaitingMaps(JobID id, int task) {
+ numWaitingMaps -= task;
+ }
+
+ @Override
+ public synchronized void addWaitingReduces(JobID id, int task) {
+ numWaitingReduces += task;
+ }
+
@Override
- public synchronized void decWaiting(JobID id, int tasks) {
- numWaitingTasks -= tasks;
+ public synchronized void decWaitingReduces(JobID id, int task){
+ numWaitingReduces -= task;
}
}