Author: yhemanth
Date: Fri Dec 5 08:52:23 2008
New Revision: 723788
URL: http://svn.apache.org/viewvc?rev=723788&view=rev
Log:
HADOOP-4445. Replace running task counts with running task percentage in
capacity scheduler UI. Contributed by Sreekanth Ramakrishnan.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=723788&r1=723787&r2=723788&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Dec 5 08:52:23 2008
@@ -50,6 +50,10 @@
HADOOP-3497. Fix bug in overly restrictive file globbing with a
PathFilter. (tomwhite)
+ HADOOP-4445. Replace running task counts with running task
+ percentage in capacity scheduler UI. (Sreekanth Ramakrishnan via
+ yhemanth)
+
NEW FEATURES
HADOOP-4575. Add a proxy service for relaying HsftpFileSystem requests.
Modified:
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=723788&r1=723787&r2=723788&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
(original)
+++
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
Fri Dec 5 08:52:23 2008
@@ -212,26 +212,42 @@
@Override
public String toString(){
+ float runningMaps = 0;
+ float runningReduces = 0;
+
+ Collection<JobInProgress> runningJobs =
+ mgr.getRunningJobQueue(mqsi.queueName);
+
+ for(JobInProgress job : runningJobs) {
+ runningMaps += job.runningMaps();
+ runningReduces += job.runningReduces();
+ }
+ float usedMaps = mqsi.guaranteedCapacity!= 0 ?
+ (runningMaps * 100/mqsi.guaranteedCapacity):0;
+ float usedReduces = rqsi.guaranteedCapacity != 0 ?
+ (runningReduces * 100/rqsi.guaranteedCapacity) :0;
StringBuffer sb = new StringBuffer();
- sb.append("Guaranteed Capacity (%) : ");
+ sb.append("Guaranteed Capacity : ");
sb.append(mqsi.guaranteedCapacityPercent);
- sb.append(" \n");
+ sb.append(" %\n");
sb.append(String.format("Guaranteed Capacity Maps : %d \n",
mqsi.guaranteedCapacity));
sb.append(String.format("Guaranteed Capacity Reduces : %d \n",
rqsi.guaranteedCapacity));
- sb.append(String.format("User Limit : %d \n",mqsi.ulMin));
- sb.append(String.format("Reclaim Time limit : %d \n",mqsi.reclaimTime));
- sb.append(String.format("Number of Running Maps : %d \n",
- mqsi.numRunningTasks));
- sb.append(String.format("Number of Running Reduces : %d \n",
- rqsi.numRunningTasks));
+ sb.append(String.format("User Limit : %d %s\n",mqsi.ulMin, "%"));
+ sb.append(String.format("Reclaim Time limit : %s \n",
+ StringUtils.formatTime(mqsi.reclaimTime)));
+ sb.append(String.format("Priority Supported : %s \n",
+ supportsPriority?"YES":"NO"));
+ sb.append("-------------\n");
+ sb.append(String.format("Running Maps : %s %s\n",
+ Float.valueOf(usedMaps).toString(),
+ "% of Guaranteed Capacity"));
+ sb.append(String.format("Running Reduces : %s %s\n",
+ Float.valueOf(usedReduces).toString(),
+ "% of Guaranteed Capacity" ));
sb.append(String.format("Number of Waiting Jobs : %d \n", mgr
.getWaitingJobCount(mqsi.queueName)));
- sb.append(String.format("Priority Supported : %s \n",
- supportsPriority?"YES":"NO"));
- sb.append(String.format("* Scheduling information can be off by "
- + "maximum of %s\n", StringUtils.formatTime(pollingInterval)));
return sb.toString();
}
}
Modified:
hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=723788&r1=723787&r2=723788&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
(original)
+++
hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Fri Dec 5 08:52:23 2008
@@ -243,10 +243,6 @@
return (Set<TaskInProgress>)reduceTips;
}
- @Override
- synchronized void fail() {
- getStatus().setRunState(JobStatus.FAILED);
- }
}
static class FakeTaskInProgress extends TaskInProgress {
@@ -1343,6 +1339,39 @@
assertEquals(j1.runningMapTasks, 2);
}
+ /*
+ * Following is the testing strategy for testing scheduling information.
+ * - start capacity scheduler with two queues.
+ * - check the scheduling information with respect to the configuration
+ * which was used to configure the queues.
+ * - Submit 5 jobs to a queue.
+ * - Check the waiting jobs count, it should be 5.
+ * - Then run initializationPoller()
+ * - Check once again the waiting queue, it should be 5 jobs again.
+ * - Then raise status change events.
+ * - Assign one task to a task tracker. (Map)
+ * - Check waiting job count, it should be 4 now and used map (%) = 100
+ * - Assign another one task (Reduce)
+ * - Check waiting job count, it should be 4 now and used map (%) = 100
+ * and used reduce (%) = 100
+ * - finish the job and then check the used percentage it should go
+ * back to zero
+ * - Then pick an initialized job but not scheduled job and fail it.
+ * - Run the poller
+ * - Check the waiting job count should now be 3.
+ * - Now fail a job which has not been initialized at all.
+ * - Run the poller, so that it can clean up the job queue.
+ * - Check the count, the waiting job count should be 2.
+ * - Now raise status change events to move the initialized jobs which
+ * should be two in count to running queue.
+ * - Then schedule a map of the job in running queue.
+ * - Run the poller because the poller is responsible for waiting
+ * jobs count. Check the count, it should be using 100% map and one
+ * waiting job
+ * - fail the running job.
+ * - Check the count, it should be now one waiting job and zero running
+ * tasks
+ */
public void testSchedulingInformation() throws Exception {
String[] qs = {"default", "q2"};
@@ -1371,35 +1400,20 @@
String[] infoStrings = schedulingInfo.split("\n");
assertEquals(infoStrings.length, 10);
- assertEquals(infoStrings[0] , "Guaranteed Capacity (%) : 50.0 ");
+ assertEquals(infoStrings[0] , "Guaranteed Capacity : 50.0 %");
assertEquals(infoStrings[1] , "Guaranteed Capacity Maps : " + totalMaps *
50/100 + " ");
assertEquals(infoStrings[2] , "Guaranteed Capacity Reduces : " +
totalReduces * 50/100 + " ");
- assertEquals(infoStrings[3] , "User Limit : 25 ");
- assertEquals(infoStrings[4] , "Reclaim Time limit : 1000000 " );
- assertEquals(infoStrings[5] , "Number of Running Maps : 0 ");
- assertEquals(infoStrings[6] , "Number of Running Reduces : 0 ");
- assertEquals(infoStrings[7] , "Number of Waiting Jobs : 0 ");
- assertEquals(infoStrings[8] , "Priority Supported : YES ");
- assertEquals(infoStrings[9] , "* Scheduling information can be off by " +
- "maximum of "+ StringUtils.formatTime(resConf.getSleepInterval()));
+ assertEquals(infoStrings[3] , "User Limit : 25 %");
+ assertEquals(infoStrings[4] , "Reclaim Time limit : " +
+ StringUtils.formatTime(1000000) + " ");
+ assertEquals(infoStrings[5] , "Priority Supported : YES ");
+ assertEquals(infoStrings[7] ,
+ "Running Maps : 0.0 % of Guaranteed Capacity");
+ assertEquals(infoStrings[8] ,
+ "Running Reduces : 0.0 % of Guaranteed Capacity");
+ assertEquals(infoStrings[9] , "Number of Waiting Jobs : 0 ");
assertEquals(schedulingInfo, schedulingInfo2);
- /*
- * Following is the testing strategy for testing scheduling information.
- * - Submit 5 jobs to a queue.
- * - Check the waiting jobs count, it should be 5.
- * - Then run initializationPoller()
- * - Check once again the waiting queue, it should be 5 jobs again.
- * - Then raise status change events.
- * - Assign one task to a task tracker.
- * - Check waiting job count, it should be 4 now.
- * - Then pick an initialized job but not scheduled job and fail it.
- * - Run the poller
- * - Check the waiting job count should now be 3.
- * - Now fail a job which has not been initialized at all.
- * - Run the poller, so that it can clean up the job queue.
- * - Check the count, the waiting job count should be 2.
- */
//Testing with actual job submission.
ArrayList<FakeJobInProgress> userJobs =
submitJobs(1, 5, "default").get("u1");
@@ -1409,9 +1423,11 @@
//waiting job should be equal to number of jobs submitted.
assertEquals(infoStrings.length, 10);
- assertEquals(infoStrings[5] , "Number of Running Maps : 0 ");
- assertEquals(infoStrings[6] , "Number of Running Reduces : 0 ");
- assertEquals(infoStrings[7] , "Number of Waiting Jobs : 5 ");
+ assertEquals(infoStrings[7] ,
+ "Running Maps : 0.0 % of Guaranteed Capacity");
+ assertEquals(infoStrings[8] ,
+ "Running Reduces : 0.0 % of Guaranteed Capacity");
+ assertEquals(infoStrings[9] , "Number of Waiting Jobs : 5 ");
//Initalize the jobs but don't raise events
p.selectJobsToInitialize();
@@ -1422,14 +1438,16 @@
assertEquals(infoStrings.length, 10);
//should be previous value as nothing is scheduled because no events
//has been raised after initialization.
- assertEquals(infoStrings[5] , "Number of Running Maps : 0 ");
- assertEquals(infoStrings[6] , "Number of Running Reduces : 0 ");
- assertEquals(infoStrings[7] , "Number of Waiting Jobs : 5 ");
+ assertEquals(infoStrings[7] ,
+ "Running Maps : 0.0 % of Guaranteed Capacity");
+ assertEquals(infoStrings[8] ,
+ "Running Reduces : 0.0 % of Guaranteed Capacity");
+ assertEquals(infoStrings[9] , "Number of Waiting Jobs : 5 ");
//Raise status change event so that jobs can move to running queue.
raiseStatusChangeEvents(scheduler.jobQueuesManager);
//assign one job
- scheduler.assignTasks(tracker("tt1")); // heartbeat
+ Task t1 = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
//Initalize extra job.
p.selectJobsToInitialize();
@@ -1439,35 +1457,116 @@
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
assertEquals(infoStrings.length, 10);
- //TODO check running task count also fix in HADOOP-4445
- assertEquals(infoStrings[7] , "Number of Waiting Jobs : 4 ");
+ assertEquals(infoStrings[7] ,
+ "Running Maps : 100.0 % of Guaranteed Capacity");
+ assertEquals(infoStrings[8] ,
+ "Running Reduces : 0.0 % of Guaranteed Capacity");
+ assertEquals(infoStrings[9] , "Number of Waiting Jobs : 4 ");
+
+ //assign a reduce task
+
+ Task t2 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+ schedulingInfo =
+ queueManager.getJobQueueInfo("default").getSchedulingInfo();
+ infoStrings = schedulingInfo.split("\n");
+ assertEquals(infoStrings.length, 10);
+ assertEquals(infoStrings[7] ,
+ "Running Maps : 100.0 % of Guaranteed Capacity");
+ assertEquals(infoStrings[8] ,
+ "Running Reduces : 100.0 % of Guaranteed Capacity");
+ assertEquals(infoStrings[9] , "Number of Waiting Jobs : 4 ");
+
+ //Complete the job and check the running tasks count
+ FakeJobInProgress u1j1 = userJobs.get(0);
+ taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), u1j1);
+ taskTrackerManager.finishTask("tt1", t2.getTaskID().toString(), u1j1);
+ taskTrackerManager.finalizeJob(u1j1);
+
+ schedulingInfo =
+ queueManager.getJobQueueInfo("default").getSchedulingInfo();
+ infoStrings = schedulingInfo.split("\n");
+ assertEquals(infoStrings.length, 10);
+ assertEquals(infoStrings[7] ,
+ "Running Maps : 0.0 % of Guaranteed Capacity");
+ assertEquals(infoStrings[8] ,
+ "Running Reduces : 0.0 % of Guaranteed Capacity");
+ assertEquals(infoStrings[9] , "Number of Waiting Jobs : 4 ");
//Fail a job which is initialized but not scheduled and check the count.
FakeJobInProgress u1j2 = userJobs.get(1);
assertTrue("User1 job 2 not initalized ",
u1j2.getStatus().getRunState() == JobStatus.RUNNING);
- u1j2.fail();
+ taskTrackerManager.finalizeJob(u1j2, JobStatus.FAILED);
//Run initializer to clean up failed jobs
p.selectJobsToInitialize();
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
assertEquals(infoStrings.length, 10);
- assertEquals(infoStrings[7] , "Number of Waiting Jobs : 3 ");
+ assertEquals(infoStrings[7] ,
+ "Running Maps : 0.0 % of Guaranteed Capacity");
+ assertEquals(infoStrings[8] ,
+ "Running Reduces : 0.0 % of Guaranteed Capacity");
+ assertEquals(infoStrings[9] , "Number of Waiting Jobs : 3 ");
//Fail a job which is not initialized but is in the waiting queue.
FakeJobInProgress u1j5 = userJobs.get(4);
assertFalse("User1 job 5 initalized ",
u1j5.getStatus().getRunState() == JobStatus.RUNNING);
- u1j5.fail();
+
+ taskTrackerManager.finalizeJob(u1j5, JobStatus.FAILED);
//run initializer to clean up failed job
p.selectJobsToInitialize();
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
assertEquals(infoStrings.length, 10);
- assertEquals(infoStrings[7] , "Number of Waiting Jobs : 2 ");
+ assertEquals(infoStrings[7] ,
+ "Running Maps : 0.0 % of Guaranteed Capacity");
+ assertEquals(infoStrings[8] ,
+ "Running Reduces : 0.0 % of Guaranteed Capacity");
+ assertEquals(infoStrings[9] , "Number of Waiting Jobs : 2 ");
+
+ //Raise status change events as none of the intialized jobs would be
+ //in running queue as we just failed the second job which was initialized
+ //and completed the first one.
+
+ raiseStatusChangeEvents(scheduler.jobQueuesManager);
+
+ //Now schedule a map should be job3 of the user as job1 succeeded job2
+ //failed and now job3 is running
+
+ t1 = checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
+ FakeJobInProgress u1j3 = userJobs.get(2);
+ assertTrue("User Job 3 not running ",
+ u1j3.getStatus().getRunState() == JobStatus.RUNNING);
+
+ //now the running count of map should be one and waiting jobs should be
+ //one. run the poller as it is responsible for waiting count
+ p.selectJobsToInitialize();
+ schedulingInfo =
+ queueManager.getJobQueueInfo("default").getSchedulingInfo();
+ infoStrings = schedulingInfo.split("\n");
+ assertEquals(infoStrings.length, 10);
+ assertEquals(infoStrings[7] ,
+ "Running Maps : 100.0 % of Guaranteed Capacity");
+ assertEquals(infoStrings[8] ,
+ "Running Reduces : 0.0 % of Guaranteed Capacity");
+ assertEquals(infoStrings[9] , "Number of Waiting Jobs : 1 ");
+
+ //Fail the executing job
+ taskTrackerManager.finalizeJob(u1j3, JobStatus.FAILED);
+ //Now running counts should become zero
+ schedulingInfo =
+ queueManager.getJobQueueInfo("default").getSchedulingInfo();
+ infoStrings = schedulingInfo.split("\n");
+ assertEquals(infoStrings.length, 10);
+ assertEquals(infoStrings[7] ,
+ "Running Maps : 0.0 % of Guaranteed Capacity");
+ assertEquals(infoStrings[8] ,
+ "Running Reduces : 0.0 % of Guaranteed Capacity");
+ assertEquals(infoStrings[9] , "Number of Waiting Jobs : 1 ");
}