Author: yhemanth
Date: Tue Jan 20 11:18:58 2009
New Revision: 736070
URL: http://svn.apache.org/viewvc?rev=736070&view=rev
Log:
HADOOP-5048. Fix capacity scheduler to correctly cleanup jobs that are killed
after initialization, but before running. Contributed by Sreekanth Ramakrishnan.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=736070&r1=736069&r2=736070&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Jan 20 11:18:58 2009
@@ -617,6 +617,10 @@
HADOOP-4993. Fix Chukwa agent configuration and startup to make it both
more modular and testable. (Ari Rabkin via cdouglas)
+ HADOOP-5048. Fix capacity scheduler to correctly cleanup jobs that are
+ killed after initialization, but before running.
+ (Sreekanth Ramakrishnan via yhemanth)
+
Release 0.19.1 - Unreleased
IMPROVEMENTS
Modified:
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=736070&r1=736069&r2=736070&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
(original)
+++
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
Tue Jan 20 11:18:58 2009
@@ -1276,7 +1276,7 @@
}
//update stats on waiting jobs
- for(JobInProgress j: jobQueuesManager.getJobs(qsi.queueName)) {
+ for(JobInProgress j: jobQueuesManager.getWaitingJobs(qsi.queueName)) {
// pending tasks
if ((qsi.mapTSI.numPendingTasks > mapClusterCapacity) &&
(qsi.reduceTSI.numPendingTasks > reduceClusterCapacity)) {
@@ -1498,7 +1498,7 @@
jobCollection.addAll(runningJobs);
}
Collection<JobInProgress> waitingJobs =
- jobQueuesManager.getJobs(queueName);
+ jobQueuesManager.getWaitingJobs(queueName);
Collection<JobInProgress> tempCollection = new ArrayList<JobInProgress>();
if(waitingJobs != null) {
tempCollection.addAll(waitingJobs);
Modified:
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java?rev=736070&r1=736069&r2=736070&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
(original)
+++
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
Tue Jan 20 11:18:58 2009
@@ -243,7 +243,7 @@
* Set of jobs which have been passed to Initialization threads.
* This is maintained so that we dont call initTasks() for same job twice.
*/
- private HashSet<JobID> initializedJobs;
+ private HashMap<JobID,JobInProgress> initializedJobs;
private volatile boolean running;
@@ -255,7 +255,7 @@
public JobInitializationPoller(JobQueuesManager mgr,
CapacitySchedulerConf rmConf, Set<String> queue) {
- initializedJobs = new HashSet<JobID>();
+ initializedJobs = new HashMap<JobID,JobInProgress>();
jobQueues = new HashMap<String, QueueInfo>();
this.jobQueueManager = mgr;
threadsToQueueMap = new HashMap<String, JobInitializationThread>();
@@ -293,9 +293,20 @@
}
}
+ /**
+ * This is main thread of initialization poller, We essentially do
+ * following in the main threads:
+ *
+ * <ol>
+ * <li> Clean up the list of initialized jobs list which poller maintains
+ * </li>
+ * <li> Select jobs to initialize in the polling interval.</li>
+ * </ol>
+ */
public void run() {
while (running) {
try {
+ cleanUpInitializedJobsList();
selectJobsToInitialize();
if (!this.isInterrupted()) {
Thread.sleep(sleepInterval);
@@ -307,17 +318,18 @@
}
}
- // The key method that picks up jobs to initialize for each queue.
- // The jobs picked up are added to the worker thread that is handling
- // initialization for that queue.
- // The method is package private to allow tests to call it synchronously
- // in a controlled manner.
+ /**
+ * The key method which does selecting jobs to be initalized across
+ * queues and assign those jobs to their appropriate init-worker threads.
+ * <br/>
+ * This method is overriden in test case which is used to test job
+ * initialization poller.
+ *
+ */
void selectJobsToInitialize() {
for (String queue : jobQueues.keySet()) {
ArrayList<JobInProgress> jobsToInitialize = getJobsToInitialize(queue);
- //if (LOG.isDebugEnabled()) {
- printJobs(jobsToInitialize);
- //}
+ printJobs(jobsToInitialize);
JobInitializationThread t = threadsToQueueMap.get(queue);
for (JobInProgress job : jobsToInitialize) {
t.addJobsToQueue(queue, job);
@@ -325,6 +337,13 @@
}
}
+ /**
+ * Method used to print log statements about which jobs are being
+ * passed to init-threads.
+ *
+ * @param jobsToInitialize list of jobs which are passed to be
+ * init-threads.
+ */
private void printJobs(ArrayList<JobInProgress> jobsToInitialize) {
for (JobInProgress job : jobsToInitialize) {
LOG.info("Passing to Initializer Job Id :" + job.getJobID()
@@ -333,13 +352,25 @@
}
}
- // This method exists to be overridden by test cases that wish to
- // create a test-friendly worker thread which can be controlled
- // synchronously.
+ /**
+ * This method exists to be overridden by test cases that wish to
+ * create a test-friendly worker thread which can be controlled
+ * synchronously.
+ *
+ * @return Instance of worker init-threads.
+ */
JobInitializationThread createJobInitializationThread() {
return new JobInitializationThread();
}
+ /**
+ * Method which is used by the poller to assign appropriate worker thread
+ * to a queue. The number of threads would be always less than or equal
+ * to number of queues in a system. If number of threads is configured to
+ * be more than number of queues then poller does not create threads more
+ * than number of queues.
+ *
+ */
private void assignThreadsToQueues() {
int countOfQueues = jobQueues.size();
String[] queues = (String[]) jobQueues.keySet().toArray(
@@ -369,14 +400,38 @@
}
}
- /*
- * Select jobs to be initialized for a given queue.
+ /**
+ *
+ * Method used to select jobs to be initialized for a given queue. <br/>
+ *
+ * We want to ensure that enough jobs have been initialized, so that when the
+ * Scheduler wants to consider a new job to run, it's ready. We clearly don't
+ * want to initialize too many jobs as each initialized job has a memory
+ * footprint, sometimes significant.
*
- * The jobs are selected such that they are within the limits
- * for number of users and number of jobs per user in the queue.
- * The only exception is if high priority jobs are waiting to be
- * initialized. In that case, we could exceed the configured limits.
- * However, we try to restrict the excess to a minimum.
+ * Number of jobs to be initialized is restricted by two values: - Maximum
+ * number of users whose jobs we want to initialize, which is equal to
+ * the number of concurrent users the queue can support. - Maximum number
+ * of initialized jobs per user. The product of these two gives us the
+ * total number of initialized jobs.
+ *
+ * Note that this is a rough number, meant for decreasing extra memory
+ * footprint. It's OK if we go over it once in a while, if we have to.
+ *
+ * This can happen as follows. Suppose we have initialized 3 jobs for a
+ * user. Now, suppose the user submits a job who's priority is higher than
+ * that of the 3 jobs initialized. This job needs to be initialized, since it
+ * will run earlier than the 3 jobs. We'll now have 4 initialized jobs for
the
+ * user. If memory becomes a problem, we should ideally un-initialize one of
+ * the 3 jobs, to keep the count of initialized jobs at 3, but that's
+ * something we don't do for now. This situation can also arise when a new
+ * user submits a high priority job, thus superceeding a user whose jobs have
+ * already been initialized. The latter user's initialized jobs are
redundant,
+ * but we'll leave them initialized.
+ *
+ * @param queue name of the queue to pick the jobs to initialize.
+ * @return list of jobs to be initalized in a queue. An empty queue is
+ * returned if no jobs are found.
*/
ArrayList<JobInProgress> getJobsToInitialize(String queue) {
QueueInfo qi = jobQueues.get(queue);
@@ -385,116 +440,131 @@
// queue.
int maximumUsersAllowedToInitialize = qi.maxUsersAllowedToInitialize;
int maxJobsPerUserAllowedToInitialize = qi.maxJobsPerUserToInitialize;
- // calculate maximum number of jobs which can be allowed to initialize
- // for this queue.
- // This value is used when a user submits a high priority job after we
- // have initialized jobs for that queue and none of them is scheduled.
- // This would prevent us from initializing extra jobs for that particular
- // user. Explanation given at end of method.
int maxJobsPerQueueToInitialize = maximumUsersAllowedToInitialize
* maxJobsPerUserAllowedToInitialize;
- Collection<JobInProgress> jobs = jobQueueManager.getJobs(queue);
int countOfJobsInitialized = 0;
HashMap<String, Integer> userJobsInitialized = new HashMap<String,
Integer>();
+ Collection<JobInProgress> jobs = jobQueueManager.getWaitingJobs(queue);
+ /*
+ * Walk through the collection of waiting jobs.
+ * We maintain a map of jobs that have already been initialized. If a
+ * job exists in that map, increment the count for that job's user
+ * and move on to the next job.
+ *
+ * If the job doesn't exist, see whether we want to initialize it.
+ * We initialize it if: - at least one job of the user has already
+ * been initialized, but the user's total initialized jobs are below
+ * the limit, OR - this is a new user, and we haven't reached the limit
+ * for the number of users whose jobs we want to initialize. We break
+ * when we've reached the limit of maximum jobs to initialize.
+ */
for (JobInProgress job : jobs) {
- /*
- * First check if job has been scheduled or completed or killed. If so
- * then remove from uninitialised jobs. Remove from Job queue
- */
- if ((job.getStatus().getRunState() == JobStatus.RUNNING)
- && (job.runningMaps() > 0 || job.runningReduces() > 0
- || job.finishedMaps() > 0 || job.finishedReduces() > 0)) {
- LOG.debug("Removing from the queue " + job.getJobID());
- initializedJobs.remove(job.getJobID());
- jobQueueManager.removeJobFromQueue(job);
- continue;
- } else if (job.isComplete()) {
- LOG.debug("Removing from completed job from " + "the queue "
- + job.getJobID());
- initializedJobs.remove(job.getJobID());
- jobQueueManager.removeJobFromQueue(job);
- continue;
- }
String user = job.getProfile().getUser();
int numberOfJobs = userJobsInitialized.get(user) == null ? 0
: userJobsInitialized.get(user);
// If the job is already initialized then add the count against user
// then continue.
- if (initializedJobs.contains(job.getJobID())) {
+ if (initializedJobs.containsKey(job.getJobID())) {
userJobsInitialized.put(user, Integer.valueOf(numberOfJobs + 1));
countOfJobsInitialized++;
continue;
}
boolean isUserPresent = userJobsInitialized.containsKey(user);
- /*
- * If the user is present in user list and size of user list is less
- * maximum allowed users initialize then initialize this job and add this
- * user to the global list.
- *
- * Else if he is present we check if his number of jobs has not crossed
- * his quota and global quota.
- *
- * The logic behind using a global per queue job can be understood by
example
- * below: Consider 3 users submitting normal priority job in a job queue
with
- * user limit as 100. (Max jobs per user = 2)
- *
- * U1J1,U1J2,U1J3....,U3J3.
- *
- * Jobs initialized would be
- *
- * U1J1,U1J2,U2J1,U2J2,U3J1,U3J2
- *
- * Now consider a case where U4 comes in and submits a high priority job.
- *
- * U4J1 --- High Priority JOb, U4J2---- Normal priority job.
- *
- * So, if we dont use global per queue value we would end up
initializing both
- * U4 jobs which is not correct.
- *
- * By using a global value we ensure that we dont initialize any extra
jobs
- * for a user.
- */
if (!isUserPresent
&& userJobsInitialized.size() < maximumUsersAllowedToInitialize) {
// this is a new user being considered and the number of users
// is within limits.
userJobsInitialized.put(user, Integer.valueOf(numberOfJobs + 1));
jobsToInitialize.add(job);
- initializedJobs.add(job.getJobID());
+ initializedJobs.put(job.getJobID(),job);
countOfJobsInitialized++;
} else if (isUserPresent
- && numberOfJobs < maxJobsPerUserAllowedToInitialize
- && countOfJobsInitialized < maxJobsPerQueueToInitialize) {
- /*
- * this is an existing user and the number of jobs per user
- * is within limits, as also the number of jobs per queue.
- * We need the check on number of jobs per queue to restrict
- * the number of jobs we initialize over the limit due to high
- * priority jobs.
- *
- * For e.g Consider 3 users submitting normal priority job in
- * a job queue with user limit as 100 and max jobs per user as 2
- * Say the jobs are U1J1,U1J2,U1J3....,U3J3.
- *
- * Jobs initialized would be U1J1,U1J2,U2J1,U2J2,U3J1,U3J2
- *
- * Now consider a case where U4 comes in and submits a high priority
job
- * and a normal priority job. Say U4J1 and U4J2
- *
- * If we dont consider the number of jobs per queue we would end up
- * initializing both jobs from U4. Initializing the second job is
- * unnecessary.
- */
+ && numberOfJobs < maxJobsPerUserAllowedToInitialize) {
userJobsInitialized.put(user, Integer.valueOf(numberOfJobs + 1));
jobsToInitialize.add(job);
- initializedJobs.add(job.getJobID());
+ initializedJobs.put(job.getJobID(),job);
countOfJobsInitialized++;
}
+ /*
+ * if the maximum number of jobs to initalize for a queue is reached
+ * then we stop looking at further jobs. The jobs beyond this number
+ * can be initialized.
+ */
+ if(countOfJobsInitialized > maxJobsPerQueueToInitialize) {
+ break;
+ }
}
return jobsToInitialize;
}
+ /**
+ * Method which is used internally to clean up the initialized jobs
+ * data structure which the job initialization poller uses to check
+ * if a job is initalized or not.
+ *
+ * Algorithm for cleaning up task is as follows:
+ *
+ * <ul>
+ * <li> For jobs in <b>initalizedJobs</b> list </li>
+ * <ul>
+ * <li> If job is running</li>
+ * <ul>
+ * <li> If job is scheduled then remove the job from the waiting queue
+ * of the scheduler and <b>initalizedJobs</b>.<br/>
+ * The check for a job is scheduled or not is done by following
+ * formulae:<br/>
+ * if pending <i>task</i> < desired <i>task</i> then scheduled else
+ * not scheduled.<br/>
+ * The formulae would return <i>scheduled</i> if one task has run or failed,
+ * any cases in which there has been a failure but not enough to mark task
+ * as failed, we return <i>not scheduled</i> in formulae.
+ * </li>
+ * </ul>
+ *
+ * <li> If job is complete, then remove the job from <b>initalizedJobs</b>.
+ * </li>
+ *
+ * </ul>
+ * </ul>
+ *
+ */
+ void cleanUpInitializedJobsList() {
+ Iterator<Entry<JobID, JobInProgress>> jobsIterator =
+ initializedJobs.entrySet().iterator();
+ while(jobsIterator.hasNext()) {
+ Entry<JobID,JobInProgress> entry = jobsIterator.next();
+ JobInProgress job = entry.getValue();
+ if (job.getStatus().getRunState() == JobStatus.RUNNING) {
+ if (isScheduled(job)) {
+ LOG.info("Removing scheduled jobs from waiting queue"
+ + job.getJobID());
+ jobsIterator.remove();
+ jobQueueManager.removeJobFromWaitingQueue(job);
+ continue;
+ }
+ }
+ if(job.isComplete()) {
+ LOG.info("Removing killed/completed job from initalized jobs " +
+ "list : "+ job.getJobID());
+ jobsIterator.remove();
+ }
+ }
+ }
+
+ /**
+ * Convenience method to check if job has been scheduled or not.
+ *
+ * The method may return false in case of job which has failure but
+ * has not failed the tip.
+ * @param job
+ * @return
+ */
+ private boolean isScheduled(JobInProgress job) {
+ return ((job.pendingMaps() < job.desiredMaps())
+ || (job.pendingReduces() < job.desiredReduces()));
+ }
+
void terminate() {
running = false;
for (Entry<String, JobInitializationThread> entry : threadsToQueueMap
@@ -519,7 +589,7 @@
}
}
- HashSet<JobID> getInitializedJobList() {
- return initializedJobs;
+ Set<JobID> getInitializedJobList() {
+ return initializedJobs.keySet();
}
}
Modified:
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java?rev=736070&r1=736069&r2=736070&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
(original)
+++
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
Tue Jan 20 11:18:58 2009
@@ -65,7 +65,7 @@
// whether the queue supports priorities
boolean supportsPriorities;
- Map<JobSchedulingInfo, JobInProgress> jobList; // for waiting jobs
+ Map<JobSchedulingInfo, JobInProgress> waitingJobs; // for waiting jobs
Map<JobSchedulingInfo, JobInProgress> runningJobs; // for running jobs
public Comparator<JobSchedulingInfo> comparator;
@@ -79,14 +79,14 @@
else {
comparator = STARTTIME_JOB_COMPARATOR;
}
- jobList = new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);
+ waitingJobs = new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);
runningJobs = new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);
}
- Collection<JobInProgress> getJobs() {
- synchronized (jobList) {
+ Collection<JobInProgress> getWaitingJobs() {
+ synchronized (waitingJobs) {
return Collections.unmodifiableCollection(
- new LinkedList<JobInProgress>(jobList.values()));
+ new LinkedList<JobInProgress>(waitingJobs.values()));
}
}
@@ -109,21 +109,21 @@
}
}
- JobInProgress removeJob(JobSchedulingInfo schedInfo) {
- synchronized (jobList) {
- return jobList.remove(schedInfo);
+ JobInProgress removeWaitingJob(JobSchedulingInfo schedInfo) {
+ synchronized (waitingJobs) {
+ return waitingJobs.remove(schedInfo);
}
}
- void addJob(JobInProgress job) {
- synchronized (jobList) {
- jobList.put(new JobSchedulingInfo(job), job);
+ void addWaitingJob(JobInProgress job) {
+ synchronized (waitingJobs) {
+ waitingJobs.put(new JobSchedulingInfo(job), job);
}
}
int getWaitingJobCount() {
- synchronized (jobList) {
- return jobList.size();
+ synchronized (waitingJobs) {
+ return waitingJobs.size();
}
}
@@ -157,11 +157,11 @@
}
/**
- * Returns the queue of Uninitialised jobs associated with queue name.
+ * Returns the queue of waiting jobs associated with queue name.
*
*/
- public Collection<JobInProgress> getJobs(String queueName) {
- return jobQueues.get(queueName).getJobs();
+ Collection<JobInProgress> getWaitingJobs(String queueName) {
+ return jobQueues.get(queueName).getWaitingJobs();
}
@Override
@@ -178,21 +178,23 @@
}
// add job to waiting queue. It will end up in the right place,
// based on priority.
- qi.addJob(job);
+ qi.addWaitingJob(job);
// let scheduler know.
scheduler.jobAdded(job);
}
/*
- * The removal of the running jobs alone is done by the JobQueueManager.
- * The removal of the jobs in the job queue is taken care by the
- * JobInitializationPoller.
+ * Method removes the jobs from both running and waiting job queue in
+ * job queue manager.
*/
private void jobCompleted(JobInProgress job, JobSchedulingInfo oldInfo,
QueueInfo qi) {
LOG.info("Job " + job.getJobID().toString() + " submitted to queue "
+ job.getProfile().getQueueName() + " has completed");
+ //remove jobs from both queue's a job can be in
+ //running and waiting queue at the same time.
qi.removeRunningJob(oldInfo);
+ qi.removeWaitingJob(oldInfo);
// let scheduler know
scheduler.jobCompleted(job);
}
@@ -206,8 +208,8 @@
private void reorderJobs(JobInProgress job, JobSchedulingInfo oldInfo,
QueueInfo qi) {
- if(qi.removeJob(oldInfo) != null) {
- qi.addJob(job);
+ if(qi.removeWaitingJob(oldInfo) != null) {
+ qi.addWaitingJob(job);
}
if(qi.removeRunningJob(oldInfo) != null) {
qi.addRunningJob(job);
@@ -264,10 +266,10 @@
}
}
- void removeJobFromQueue(JobInProgress job) {
+ void removeJobFromWaitingQueue(JobInProgress job) {
String queue = job.getProfile().getQueueName();
QueueInfo qi = jobQueues.get(queue);
- qi.removeJob(new JobSchedulingInfo(job));
+ qi.removeWaitingJob(new JobSchedulingInfo(job));
}
Comparator<JobSchedulingInfo> getComparator(String queue) {
Modified:
hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=736070&r1=736069&r2=736070&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
(original)
+++
hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Tue Jan 20 11:18:58 2009
@@ -128,6 +128,7 @@
@Override
void selectJobsToInitialize() {
+ super.cleanUpInitializedJobsList();
super.selectJobsToInitialize();
for (ControlledJobInitializer t : workers) {
t.initializeJobs();
@@ -704,7 +705,7 @@
// check if the jobs are missing from the waiting queue
// The jobs are not removed from waiting queue until they are scheduled
assertEquals("Waiting queue is garbled on job init", 2,
- scheduler.jobQueuesManager.getJobs("default")
+ scheduler.jobQueuesManager.getWaitingJobs("default")
.size());
// test if changing the job priority/start-time works as expected in the
@@ -788,7 +789,7 @@
private JobInProgress[] getJobsInQueue(boolean waiting) {
Collection<JobInProgress> queue =
waiting
- ? scheduler.jobQueuesManager.getJobs("default")
+ ? scheduler.jobQueuesManager.getWaitingJobs("default")
: scheduler.jobQueuesManager.getRunningJobQueue("default");
return queue.toArray(new JobInProgress[0]);
}
@@ -897,7 +898,7 @@
JobQueuesManager mgr = scheduler.jobQueuesManager;
- while(mgr.getJobs("default").size() < 4){
+ while(mgr.getWaitingJobs("default").size() < 4){
Thread.sleep(1);
}
//Raise status change events for jobs submitted.
@@ -2147,10 +2148,10 @@
// reference to the initializedJobs data structure
// changes are reflected in the set as they are made by the poller
- HashSet<JobID> initializedJobs = initPoller.getInitializedJobList();
+ Set<JobID> initializedJobs = initPoller.getInitializedJobList();
// we should have 12 (3 x 4) jobs in the job queue
- assertEquals(mgr.getJobs("default").size(), 12);
+ assertEquals(mgr.getWaitingJobs("default").size(), 12);
// run one poller iteration.
p.selectJobsToInitialize();
@@ -2264,7 +2265,7 @@
scheduler.setInitializationPoller(p);
scheduler.start();
JobInitializationPoller initPoller = scheduler.getInitializationPoller();
- HashSet<JobID> initializedJobsList = initPoller.getInitializedJobList();
+ Set<JobID> initializedJobsList = initPoller.getInitializedJobList();
// submit 3 jobs for 3 users
submitJobs(3,3,"default");
@@ -2317,22 +2318,38 @@
scheduler.start();
JobQueuesManager mgr = scheduler.jobQueuesManager;
- JobInitializationPoller initPoller = scheduler.getInitializationPoller();
- HashSet<JobID> initializedJobsList = initPoller.getInitializedJobList();
+ // check proper running job movement and completion
+ checkRunningJobMovementAndCompletion();
+
+ // check failed running job movement
+ checkFailedRunningJobMovement();
+
+ // Check job movement of failed initalized job
+ checkFailedInitializedJobMovement();
+
+ // Check failed waiting job movement
+ checkFailedWaitingJobMovement();
+
+ }
+
+ private void checkRunningJobMovementAndCompletion() throws IOException {
+
+ JobQueuesManager mgr = scheduler.jobQueuesManager;
+ JobInitializationPoller p = scheduler.getInitializationPoller();
// submit a job
FakeJobInProgress job =
submitJob(JobStatus.PREP, 1, 1, "default", "u1");
p.selectJobsToInitialize();
- assertEquals(initializedJobsList.size(), 1);
+ assertEquals(p.getInitializedJobList().size(), 1);
// make it running.
raiseStatusChangeEvents(mgr);
// it should be there in both the queues.
assertTrue("Job not present in Job Queue",
- mgr.getJobs("default").contains(job));
+ mgr.getWaitingJobs("default").contains(job));
assertTrue("Job not present in Running Queue",
mgr.getRunningJobQueue("default").contains(job));
@@ -2343,12 +2360,12 @@
p.selectJobsToInitialize();
// now this task should be removed from the initialized list.
- assertTrue(initializedJobsList.isEmpty());
+ assertTrue(p.getInitializedJobList().isEmpty());
// the job should also be removed from the job queue as tasks
// are scheduled
assertFalse("Job present in Job Queue",
- mgr.getJobs("default").contains(job));
+ mgr.getWaitingJobs("default").contains(job));
// complete tasks and job
taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", job);
@@ -2360,8 +2377,78 @@
mgr.getRunningJobQueue("default").contains(job));
}
+ private void checkFailedRunningJobMovement() throws IOException {
+
+ JobQueuesManager mgr = scheduler.jobQueuesManager;
+
+ //submit a job and initalized the same
+ FakeJobInProgress job =
+ submitJobAndInit(JobStatus.RUNNING, 1, 1, "default", "u1");
+
+ //check if the job is present in running queue.
+ assertTrue("Running jobs list does not contain submitted job",
+ mgr.getRunningJobQueue("default").contains(job));
+
+ taskTrackerManager.finalizeJob(job, JobStatus.KILLED);
+
+ //check if the job is properly removed from running queue.
+ assertFalse("Running jobs list does not contain submitted job",
+ mgr.getRunningJobQueue("default").contains(job));
+
+ }
+
+ private void checkFailedInitializedJobMovement() throws IOException {
+
+ JobQueuesManager mgr = scheduler.jobQueuesManager;
+ JobInitializationPoller p = scheduler.getInitializationPoller();
+
+ //submit a job
+ FakeJobInProgress job = submitJob(JobStatus.PREP, 1, 1, "default", "u1");
+ //Initialize the job
+ p.selectJobsToInitialize();
+ //Don't raise the status change event.
+
+ //check in waiting and initialized jobs list.
+ assertTrue("Waiting jobs list does not contain the job",
+ mgr.getWaitingJobs("default").contains(job));
+
+ assertTrue("Initialized job does not contain the job",
+ p.getInitializedJobList().contains(job.getJobID()));
+
+ //fail the initalized job
+ taskTrackerManager.finalizeJob(job, JobStatus.KILLED);
+
+ //Check if the job is present in waiting queue
+ assertFalse("Waiting jobs list contains failed job",
+ mgr.getWaitingJobs("default").contains(job));
+
+ //run the poller to do the cleanup
+ p.selectJobsToInitialize();
+
+ //check for failed job in the initialized job list
+ assertFalse("Initialized jobs contains failed job",
+ p.getInitializedJobList().contains(job.getJobID()));
+ }
+
+ private void checkFailedWaitingJobMovement() throws IOException {
+ JobQueuesManager mgr = scheduler.jobQueuesManager;
+ // submit a job
+ FakeJobInProgress job = submitJob(JobStatus.PREP, 1, 1, "default",
+ "u1");
+
+ // check in waiting and initialized jobs list.
+ assertTrue("Waiting jobs list does not contain the job", mgr
+ .getWaitingJobs("default").contains(job));
+ // fail the waiting job
+ taskTrackerManager.finalizeJob(job, JobStatus.KILLED);
+
+ // Check if the job is present in waiting queue
+ assertFalse("Waiting jobs list contains failed job", mgr
+ .getWaitingJobs("default").contains(job));
+ }
+
private void raiseStatusChangeEvents(JobQueuesManager mgr) {
- Collection<JobInProgress> jips = mgr.getJobs("default");
+ Collection<JobInProgress> jips = mgr.getWaitingJobs("default");
for(JobInProgress jip : jips) {
if(jip.getStatus().getRunState() == JobStatus.RUNNING) {
JobStatusChangeEvent evt = new JobStatusChangeEvent(jip,