Author: cdouglas
Date: Fri Jun 3 22:51:26 2011
New Revision: 1131280
URL: http://svn.apache.org/viewvc?rev=1131280&view=rev
Log:
MAPREDUCE-2558. Add JobTracker metrics for scheduling queues.
Contributed by Jeffrey Naisbitt
Added:
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestQueueMetrics.java
Modified:
hadoop/common/branches/branch-0.20-security/CHANGES.txt
hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Queue.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/QueueManager.java
Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1131280&r1=1131279&r2=1131280&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Fri Jun 3 22:51:26
2011
@@ -44,6 +44,9 @@ Release 0.20.205.0 - unreleased
MAPREDUCE-2555. Avoid sprious logging from completedtasks. (Thomas Graves
via cdouglas)
+ MAPREDUCE-2558. Add JobTracker metrics for scheduling queues. (Jeffrey
+ Naisbitt via cdouglas)
+
Release 0.20.204.0 - unreleased
NEW FEATURES
Modified:
hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1131280&r1=1131279&r2=1131280&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
(original)
+++
hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Fri Jun 3 22:51:26 2011
@@ -432,7 +432,8 @@ public class TestCapacityScheduler exten
queues.clear();
for (String qName : newQueues) {
try {
- queues.put(qName, new Queue(qName, acls, Queue.QueueState.RUNNING));
+ queues.put(qName, new Queue(qName, acls, Queue.QueueState.RUNNING,
+ QueueMetrics.create(qName, new Configuration())));
} catch (Throwable t) {
throw new RuntimeException("Unable to initialize queue " + qName, t);
}
Modified:
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1131280&r1=1131279&r2=1131280&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Fri Jun 3 22:51:26 2011
@@ -91,6 +91,7 @@ public class JobInProgress {
JobStatus status;
String jobFile = null;
Path localJobFile = null;
+ final QueueMetrics queueMetrics;
TaskInProgress maps[] = new TaskInProgress[0];
TaskInProgress reduces[] = new TaskInProgress[0];
@@ -339,14 +340,17 @@ public class JobInProgress {
this.resourceEstimator = new ResourceEstimator(this);
this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP);
this.status.setUsername(conf.getUser());
+ String queueName = conf.getQueueName();
this.profile = new JobProfile(conf.getUser(), jobid, "", "",
- conf.getJobName(), conf.getQueueName());
+ conf.getJobName(), queueName);
this.memoryPerMap = conf.getMemoryForMapTask();
this.memoryPerReduce = conf.getMemoryForReduceTask();
this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();
this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
+ this.queueMetrics =
this.jobtracker.getQueueManager().getQueue(queueName).getMetrics();
+
// Check task limits
checkTaskLimits();
@@ -371,6 +375,7 @@ public class JobInProgress {
this.status = new JobStatus(jobId, 0.0f, 0.0f, JobStatus.PREP);
this.status.setUsername(jobInfo.getUser().toString());
this.jobtracker.getInstrumentation().addPrepJob(conf, jobId);
+ // Add the queue-level metric below (after the profile has been
initialized)
this.startTime = jobtracker.getClock().getTime();
status.setStartTime(startTime);
this.localFs = jobtracker.getLocalFileSystem();
@@ -418,9 +423,12 @@ public class JobInProgress {
this.priority = conf.getJobPriority();
this.status.setJobPriority(this.priority);
+ String queueName = conf.getQueueName();
this.profile = new JobProfile(user, jobId,
- jobFile, url, conf.getJobName(),
- conf.getQueueName());
+ jobFile, url, conf.getJobName(), queueName);
+
+ this.queueMetrics =
this.jobtracker.getQueueManager().getQueue(queueName).getMetrics();
+ this.queueMetrics.addPrepJob(conf, jobId);
this.submitHostName = conf.getJobSubmitHostName();
this.submitHostAddress = conf.getJobSubmitHostAddress();
@@ -474,7 +482,15 @@ public class JobInProgress {
FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());
}
}
-
+
+ /**
+ * Get the QueueMetrics object associated with this job
+ * @return QueueMetrics
+ */
+ public QueueMetrics getQueueMetrics() {
+ return this.queueMetrics;
+ }
+
private void checkTaskLimits() throws IOException {
// if the number of tasks is larger than a configured value
// then fail the job.
@@ -682,6 +698,8 @@ public class JobInProgress {
jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);
jobtracker.getInstrumentation().addWaitingReduces(getJobID(),
numReduceTasks);
+ this.queueMetrics.addWaitingMaps(getJobID(), numMapTasks);
+ this.queueMetrics.addWaitingReduces(getJobID(), numReduceTasks);
maps = new TaskInProgress[numMapTasks];
for(int i=0; i < numMapTasks; ++i) {
@@ -1682,6 +1700,7 @@ public class JobInProgress {
if (tip.getActiveTasks().size() > 1)
speculativeMapTasks++;
metrics.launchMap(id);
+ this.queueMetrics.launchMap(id);
} else {
++runningReduceTasks;
name = Values.REDUCE.name();
@@ -1689,6 +1708,7 @@ public class JobInProgress {
if (tip.getActiveTasks().size() > 1)
speculativeReduceTasks++;
metrics.launchReduce(id);
+ this.queueMetrics.launchReduce(id);
}
// Note that the logs are for the scheduled tasks only. Tasks that join on
// restart has already their logs in place.
@@ -1839,9 +1859,11 @@ public class JobInProgress {
map.put(taskTracker, info);
if (type == TaskType.MAP) {
jobtracker.getInstrumentation().addReservedMapSlots(reservedSlots);
+ this.queueMetrics.addReservedMapSlots(reservedSlots);
}
else {
jobtracker.getInstrumentation().addReservedReduceSlots(reservedSlots);
+ this.queueMetrics.addReservedReduceSlots(reservedSlots);
}
jobtracker.incrementReservations(type, reservedSlots);
}
@@ -1871,10 +1893,12 @@ public class JobInProgress {
map.remove(taskTracker);
if (type == TaskType.MAP) {
jobtracker.getInstrumentation().decReservedMapSlots(info.getNumSlots());
+ this.queueMetrics.decReservedMapSlots(info.getNumSlots());
}
else {
jobtracker.getInstrumentation().decReservedReduceSlots(
info.getNumSlots());
+ this.queueMetrics.decReservedReduceSlots(info.getNumSlots());
}
jobtracker.decrementReservations(type, info.getNumSlots());
}
@@ -2583,6 +2607,7 @@ public class JobInProgress {
}
finishedMapTasks += 1;
metrics.completeMap(taskid);
+ this.queueMetrics.completeMap(taskid);
// remove the completed map from the resp running caches
retireMap(tip);
if ((finishedMapTasks + failedMapTIPs) == (numMapTasks)) {
@@ -2598,6 +2623,7 @@ public class JobInProgress {
}
finishedReduceTasks += 1;
metrics.completeReduce(taskid);
+ this.queueMetrics.completeReduce(taskid);
// remove the completed reduces from the running reducers set
retireReduce(tip);
if ((finishedReduceTasks + failedReduceTIPs) == (numReduceTasks)) {
@@ -2642,14 +2668,18 @@ public class JobInProgress {
//update the metrics
if (oldState == JobStatus.PREP) {
this.jobtracker.getInstrumentation().decPrepJob(conf, jobId);
+ this.queueMetrics.decPrepJob(conf, jobId);
} else if (oldState == JobStatus.RUNNING) {
this.jobtracker.getInstrumentation().decRunningJob(conf, jobId);
+ this.queueMetrics.decRunningJob(conf, jobId);
}
if (newState == JobStatus.PREP) {
this.jobtracker.getInstrumentation().addPrepJob(conf, jobId);
+ this.queueMetrics.addPrepJob(conf, jobId);
} else if (newState == JobStatus.RUNNING) {
this.jobtracker.getInstrumentation().addRunningJob(conf, jobId);
+ this.queueMetrics.addRunningJob(conf, jobId);
}
}
@@ -2704,6 +2734,7 @@ public class JobInProgress {
garbageCollect();
metrics.completeJob(this.conf, this.status.getJobID());
+ this.queueMetrics.completeJob(this.conf, this.status.getJobID());
}
}
@@ -2744,9 +2775,11 @@ public class JobInProgress {
if (jobTerminationState == JobStatus.FAILED) {
jobtracker.getInstrumentation().failedJob(
this.conf, this.status.getJobID());
+ this.queueMetrics.failedJob(this.conf, this.status.getJobID());
} else {
jobtracker.getInstrumentation().killedJob(
this.conf, this.status.getJobID());
+ this.queueMetrics.killedJob(this.conf, this.status.getJobID());
}
}
}
@@ -2897,9 +2930,11 @@ public class JobInProgress {
if (tip.isMapTask() && !metricsDone) {
runningMapTasks -= 1;
metrics.failedMap(taskid);
+ this.queueMetrics.failedMap(taskid);
} else if (!metricsDone) {
runningReduceTasks -= 1;
metrics.failedReduce(taskid);
+ this.queueMetrics.failedReduce(taskid);
}
}
@@ -3142,6 +3177,8 @@ public class JobInProgress {
// Let the JobTracker know that a job is complete
jobtracker.getInstrumentation().decWaitingMaps(getJobID(),
pendingMaps());
jobtracker.getInstrumentation().decWaitingReduces(getJobID(),
pendingReduces());
+ this.queueMetrics.decWaitingMaps(getJobID(), pendingMaps());
+ this.queueMetrics.decWaitingReduces(getJobID(), pendingReduces());
jobtracker.storeCompletedJob(this);
jobtracker.finalizeJob(this);
Modified:
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1131280&r1=1131279&r2=1131280&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobTracker.java
(original)
+++
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobTracker.java
Fri Jun 3 22:51:26 2011
@@ -4033,6 +4033,8 @@ public class JobTracker implements MRCon
}
}
myInstrumentation.submitJob(job.getJobConf(), jobId);
+ job.getQueueMetrics().submitJob(job.getJobConf(), jobId);
+
LOG.info("Job " + jobId + " added successfully for user '"
+ job.getJobConf().getUser() + "' to queue '"
+ job.getJobConf().getQueueName() + "'");
Modified:
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1131280&r1=1131279&r2=1131280&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
(original)
+++
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
Fri Jun 3 22:51:26 2011
@@ -61,6 +61,7 @@ class LocalJobRunner implements JobSubmi
private final TaskController taskController = new DefaultTaskController();
private JobTrackerInstrumentation myMetrics = null;
+ private QueueMetrics queueMetrics = null;
private static final String jobDir = "localRunner/";
@@ -207,8 +208,10 @@ class LocalJobRunner implements JobSubmi
map.setConf(localConf);
map_tasks += 1;
myMetrics.launchMap(mapId);
+ queueMetrics.launchMap(mapId);
map.run(localConf, this);
myMetrics.completeMap(mapId);
+ queueMetrics.completeMap(mapId);
map_tasks -= 1;
updateCounters(map);
} else {
@@ -253,8 +256,10 @@ class LocalJobRunner implements JobSubmi
reduce.setConf(localConf);
reduce_tasks += 1;
myMetrics.launchReduce(reduce.getTaskID());
+ queueMetrics.launchReduce(reduce.getTaskID());
reduce.run(localConf, this);
myMetrics.completeReduce(reduce.getTaskID());
+ queueMetrics.completeReduce(reduce.getTaskID());
reduce_tasks -= 1;
updateCounters(reduce);
} else {
@@ -413,6 +418,7 @@ class LocalJobRunner implements JobSubmi
this.fs = FileSystem.getLocal(conf);
this.conf = conf;
myMetrics = JobTrackerInstrumentation.create(null, new JobConf(conf));
+ queueMetrics = QueueMetrics.create(conf.getQueueName(), new JobConf(conf));
taskController.setConf(conf);
}
Modified:
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Queue.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Queue.java?rev=1131280&r1=1131279&r2=1131280&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Queue.java
(original)
+++
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Queue.java
Fri Jun 3 22:51:26 2011
@@ -37,6 +37,7 @@ class Queue {
private String name;
private Map<String,AccessControlList> acls;
private QueueState state = QueueState.RUNNING;
+ private QueueMetrics queueMetrics;
/**
* An Object that can be used by schedulers to fill in
@@ -69,10 +70,12 @@ class Queue {
* @param acls ACLs for the queue
* @param state state of the queue
*/
- Queue(String name, Map<String, AccessControlList> acls, QueueState state) {
+ Queue(String name, Map<String, AccessControlList> acls, QueueState state,
+ QueueMetrics metrics) {
this.name = name;
this.acls = acls;
this.state = state;
+ this.queueMetrics = metrics;
}
/**
@@ -149,4 +152,12 @@ class Queue {
void setSchedulingInfo(Object schedulingInfo) {
this.schedulingInfo = schedulingInfo;
}
+
+ /**
+ * Return the QueueMetrics object for this queue
+ * @return QueueMetrics
+ */
+ public QueueMetrics getMetrics() {
+ return this.queueMetrics;
+ }
}
Modified:
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/QueueManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/QueueManager.java?rev=1131280&r1=1131279&r2=1131280&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/QueueManager.java
(original)
+++
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/QueueManager.java
Fri Jun 3 22:51:26 2011
@@ -118,7 +118,7 @@ class QueueManager {
LOG.error("The queue, " + name + " does not have a configured ACL
list");
}
queues.put(name, new Queue(name, getQueueAcls(name, conf),
- getQueueState(name, conf)));
+ getQueueState(name, conf), QueueMetrics.create(name, conf)));
}
return queues;
@@ -136,7 +136,17 @@ class QueueManager {
public synchronized Set<String> getQueues() {
return queues.keySet();
}
-
+
+ /**
+ * Return a specific queue configured in the system.
+ *
+ * @param queueName Name of the queue requested
+ * @return Queue object corresponding to queueName
+ */
+ public synchronized Queue getQueue(String queueName) {
+ return queues.get(queueName);
+ }
+
/**
* Return true if the given user is part of the ACL for the given
* {@link QueueACL} name for the given queue.
Added:
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java?rev=1131280&view=auto
==============================================================================
---
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java
(added)
+++
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/QueueMetrics.java
Fri Jun 3 22:51:26 2011
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.metrics2.MetricsBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.lib.MetricMutableCounterInt;
+import org.apache.hadoop.metrics2.lib.MetricMutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MetricMutableGaugeInt;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.source.JvmMetricsSource;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+
+/**
+ *
+ */
+@SuppressWarnings("deprecation")
+class QueueMetrics implements MetricsSource {
+ private static final Log LOG =
+ LogFactory.getLog(QueueMetrics.class);
+
+ final MetricsRegistry registry = new MetricsRegistry("Queue");
+ final MetricMutableCounterInt mapsLaunched =
+ registry.newCounter("maps_launched", "", 0);
+ final MetricMutableCounterInt mapsCompleted =
+ registry.newCounter("maps_completed", "", 0);
+ final MetricMutableCounterInt mapsFailed =
+ registry.newCounter("maps_failed", "", 0);
+ final MetricMutableCounterInt redsLaunched =
+ registry.newCounter("reduces_launched", "", 0);
+ final MetricMutableCounterInt redsCompleted =
+ registry.newCounter("reduces_completed", "", 0);
+ final MetricMutableCounterInt redsFailed =
+ registry.newCounter("reduces_failed", "", 0);
+ final MetricMutableCounterInt jobsSubmitted =
+ registry.newCounter("jobs_submitted", "", 0);
+ final MetricMutableCounterInt jobsCompleted =
+ registry.newCounter("jobs_completed", "", 0);
+ final MetricMutableGaugeInt waitingMaps =
+ registry.newGauge("waiting_maps", "", 0);
+ final MetricMutableGaugeInt waitingReds =
+ registry.newGauge("waiting_reduces", "", 0);
+ final MetricMutableGaugeInt reservedMapSlots =
+ registry.newGauge("reserved_map_slots", "", 0);
+ final MetricMutableGaugeInt reservedRedSlots =
+ registry.newGauge("reserved_reduce_slots", "", 0);
+ final MetricMutableCounterInt jobsFailed =
+ registry.newCounter("jobs_failed", "", 0);
+ final MetricMutableCounterInt jobsKilled =
+ registry.newCounter("jobs_killed", "", 0);
+ final MetricMutableGaugeInt jobsPreparing =
+ registry.newGauge("jobs_preparing", "", 0);
+ final MetricMutableGaugeInt jobsRunning =
+ registry.newGauge("jobs_running", "", 0);
+ final MetricMutableCounterInt mapsKilled =
+ registry.newCounter("maps_killed", "", 0);
+ final MetricMutableCounterInt redsKilled =
+ registry.newCounter("reduces_killed", "", 0);
+
+ final String sessionId;
+ private String queueName;
+
+ public QueueMetrics(String queueName, Configuration conf) {
+ this.queueName = queueName;
+ sessionId = conf.get("session.id", "");
+ registry.setContext("mapred").tag("sessionId", "", sessionId);
+ registry.tag("Queue", "Metrics by queue", queueName);
+ }
+
+ public String getQueueName() {
+ return this.queueName;
+ }
+
+ public void getMetrics(MetricsBuilder builder, boolean all) {
+ registry.snapshot(builder.addRecord(registry.name()), all);
+ }
+
+ public void launchMap(TaskAttemptID taskAttemptID) {
+ mapsLaunched.incr();
+ decWaitingMaps(taskAttemptID.getJobID(), 1);
+ }
+
+ public void completeMap(TaskAttemptID taskAttemptID) {
+ mapsCompleted.incr();
+ }
+
+ public void failedMap(TaskAttemptID taskAttemptID) {
+ mapsFailed.incr();
+ addWaitingMaps(taskAttemptID.getJobID(), 1);
+ }
+
+ public void launchReduce(TaskAttemptID taskAttemptID) {
+ redsLaunched.incr();
+ decWaitingReduces(taskAttemptID.getJobID(), 1);
+ }
+
+ public void completeReduce(TaskAttemptID taskAttemptID) {
+ redsCompleted.incr();
+ }
+
+ public void failedReduce(TaskAttemptID taskAttemptID) {
+ redsFailed.incr();
+ addWaitingReduces(taskAttemptID.getJobID(), 1);
+ }
+
+ public void submitJob(JobConf conf, JobID id) {
+ jobsSubmitted.incr();
+ }
+
+ public void completeJob(JobConf conf, JobID id) {
+ jobsCompleted.incr();
+ }
+
+ public void addWaitingMaps(JobID id, int task) {
+ waitingMaps.incr(task);
+ }
+
+ public void decWaitingMaps(JobID id, int task) {
+ waitingMaps.decr(task);
+ }
+
+ public void addWaitingReduces(JobID id, int task) {
+ waitingReds.incr(task);
+ }
+
+ public void decWaitingReduces(JobID id, int task){
+ waitingReds.decr(task);
+ }
+
+ public void addReservedMapSlots(int slots) {
+ reservedMapSlots.incr(slots);;
+ }
+
+ public void decReservedMapSlots(int slots) {
+ reservedMapSlots.decr(slots);
+ }
+
+ public void addReservedReduceSlots(int slots) {
+ reservedRedSlots.incr(slots);
+ }
+
+ public void decReservedReduceSlots(int slots) {
+ reservedRedSlots.decr(slots);
+ }
+
+ public void failedJob(JobConf conf, JobID id) {
+ jobsFailed.incr();
+ }
+
+ public void killedJob(JobConf conf, JobID id) {
+ jobsKilled.incr();
+ }
+
+ public void addPrepJob(JobConf conf, JobID id) {
+ jobsPreparing.incr();
+ }
+
+ public void decPrepJob(JobConf conf, JobID id) {
+ jobsPreparing.decr();
+ }
+
+ public void addRunningJob(JobConf conf, JobID id) {
+ jobsRunning.incr();
+ }
+
+ public void decRunningJob(JobConf conf, JobID id) {
+ jobsRunning.decr();
+ }
+
+ public void killedMap(TaskAttemptID taskAttemptID) {
+ mapsKilled.incr();
+ }
+
+ public void killedReduce(TaskAttemptID taskAttemptID) {
+ redsKilled.incr();
+ }
+
+ static QueueMetrics create(String queueName, Configuration conf) {
+ return create(queueName, conf, DefaultMetricsSystem.INSTANCE);
+ }
+
+ static QueueMetrics create(String queueName, Configuration conf,
+ MetricsSystem ms) {
+ return ms.register("QueueMetrics,q=" + queueName, "Queue metrics",
+ new QueueMetrics(queueName, conf));
+ }
+
+}
Added:
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestQueueMetrics.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestQueueMetrics.java?rev=1131280&view=auto
==============================================================================
---
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestQueueMetrics.java
(added)
+++
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestQueueMetrics.java
Fri Jun 3 22:51:26 2011
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.metrics2.MetricsBuilder;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.test.MetricsAsserts;
+import org.apache.jasper.tagplugins.jstl.core.When;
+import org.mockito.Mockito;
+
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@SuppressWarnings("deprecation")
+public class TestQueueMetrics extends TestCase {
+
+ QueueMetrics metrics = Mockito.mock(QueueMetrics.class);
+ static int jobIdCounter = 0;
+ static final String jtIdentifier = "queue_jt";
+
+ private static JobID getJobId() {
+ return new JobID(TestQueueMetrics.jtIdentifier, jobIdCounter++);
+ }
+
+ public void testDefaultSingleQueueMetrics() {
+ String queueName = "single";
+ TaskAttemptID taskAttemptID = Mockito.mock(TaskAttemptID.class);
+ when(taskAttemptID.getJobID()).thenReturn(TestQueueMetrics.getJobId());
+
+ QueueMetrics metrics = QueueMetrics.create(queueName, new Configuration());
+
+ assertEquals(metrics.getQueueName(), "single");
+ metrics.launchMap(taskAttemptID);
+ checkMaps(metrics, 1, 0, 0, 0, -1, 0);
+ metrics.addWaitingMaps(taskAttemptID.getJobID(), 5);
+ metrics.launchMap(taskAttemptID);
+ checkMaps(metrics, 2, 0, 0, 0, 3, 0);
+ checkReduces(metrics, 0, 0, 0, 0, 0, 0);
+
+ metrics.completeMap(taskAttemptID);
+ metrics.failedMap(taskAttemptID);
+ checkMaps(metrics, 2, 1, 1, 0, 4, 0);
+ checkReduces(metrics, 0, 0, 0, 0, 0, 0);
+
+ metrics.launchReduce(taskAttemptID);
+ metrics.completeReduce(taskAttemptID);
+ metrics.failedReduce(taskAttemptID);
+ checkMaps(metrics, 2, 1, 1, 0, 4, 0);
+ checkReduces(metrics, 1, 1, 1, 0, 0, 0);
+
+ metrics.addWaitingMaps(null, 20);
+ metrics.decWaitingMaps(null, 10);
+ metrics.addWaitingReduces(null, 20);
+ metrics.decWaitingReduces(null, 10);
+ checkMaps(metrics, 2, 1, 1, 0, 14, 0);
+ checkReduces(metrics, 1, 1, 1, 0, 10, 0);
+
+ metrics.addReservedMapSlots(10);
+ metrics.addReservedReduceSlots(10);
+ checkMaps(metrics, 2, 1, 1, 0, 14, 10);
+ checkReduces(metrics, 1, 1, 1, 0, 10, 10);
+ metrics.decReservedReduceSlots(5);
+ metrics.decReservedMapSlots(5);
+ checkMaps(metrics, 2, 1, 1, 0, 14, 5);
+ checkReduces(metrics, 1, 1, 1, 0, 10, 5);
+
+ metrics.killedMap(taskAttemptID);
+ metrics.killedReduce(taskAttemptID);
+ checkMaps(metrics, 2, 1, 1, 1, 14, 5);
+ checkReduces(metrics, 1, 1, 1, 1, 10, 5);
+ checkJobs(metrics, 0, 0, 0, 0, 0, 0);
+
+ metrics.submitJob(null, null);
+ metrics.completeJob(null, null);
+ metrics.failedJob(null, null);
+ metrics.killedJob(null, null);
+ checkJobs(metrics, 1, 1, 1, 1, 0, 0);
+
+ metrics.addPrepJob(null, null);
+ metrics.addRunningJob(null, null);
+ metrics.addPrepJob(null, null);
+ metrics.addRunningJob(null, null);
+ checkJobs(metrics, 1, 1, 1, 1, 2, 2);
+ metrics.decPrepJob(null, null);
+ metrics.decRunningJob(null, null);
+ checkJobs(metrics, 1, 1, 1, 1, 1, 1);
+ checkMaps(metrics, 2, 1, 1, 1, 14, 5);
+ checkReduces(metrics, 1, 1, 1, 1, 10, 5);
+ }
+
+ public static void checkMaps(QueueMetrics metrics,
+ int maps_launched, int maps_completed, int maps_failed, int maps_killed,
+ int waiting_maps, int reserved_map_slots) {
+ assertCounter("maps_launched", maps_launched, metrics);
+ assertCounter("maps_completed", maps_completed, metrics);
+ assertCounter("maps_failed", maps_failed, metrics);
+ assertCounter("maps_killed", maps_killed, metrics);
+ assertGauge("waiting_maps", waiting_maps, metrics);
+ assertGauge("reserved_map_slots", reserved_map_slots, metrics);
+ }
+
+ public static void checkReduces(QueueMetrics metrics,
+ int reduces_launched, int reduces_completed, int reduces_failed,
+ int reduces_killed, int waiting_reduces, int reserved_reduce_slots) {
+ assertCounter("reduces_launched", reduces_launched, metrics);
+ assertCounter("reduces_completed", reduces_completed, metrics);
+ assertCounter("reduces_failed", reduces_failed, metrics);
+ assertCounter("reduces_killed", reduces_killed, metrics);
+ assertGauge("waiting_reduces", waiting_reduces, metrics);
+ assertGauge("reserved_reduce_slots", reserved_reduce_slots, metrics);
+ }
+
+ public static void checkJobs(QueueMetrics metrics, int jobs_submitted, int
jobs_completed,
+ int jobs_failed, int jobs_killed, int jobs_preparing, int jobs_running) {
+ assertCounter("jobs_submitted", jobs_submitted, metrics);
+ assertCounter("jobs_completed", jobs_completed, metrics);
+ assertCounter("jobs_failed", jobs_failed, metrics);
+ assertCounter("jobs_killed", jobs_killed, metrics);
+ assertGauge("jobs_preparing", jobs_preparing, metrics);
+ assertGauge("jobs_running", jobs_running, metrics);
+ }
+}