[
https://issues.apache.org/jira/browse/HADOOP-3445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12610945#action_12610945
]
vivekr edited comment on HADOOP-3445 at 7/7/08 5:23 AM:
-------------------------------------------------------------
I'm attaching the first patch (3445.1.patch). This is a partial patch that has
support for:
* for queues and jobs submitted to queues
* guaranteed capacities, priorities, and user limits
* redistribution of capacities (without preemption)
In essence, this patch implements the following requirements (see HADOOP-3421)
: 1.1-1.5, 2.1, 3.1-3.3, 4.1.
The purpose of this patch is to get the basic code reviewed, and there is a
non-trivial amount of it. This is not a complete patch. The following remains
to be done:
* preemption, when reclaiming capacity (req 1.6)
* unit tests
* others such as ACLs, rereading of configuration etc, which will be tracked
elsewhere.
* cleanup, such as better variable/method names, etc.
This patch assumes that the patch for HADOOP-3479 is committed or applied.
Here's some explanation of what this patch incorporates:
* redistribution of capacity: class _RedistributeCapacity_ implements a
runnable class that periodically invokes code to redistribute capacity. The
time interval is defined by
_REDISTRIBUTE_CAPACITY_INTERVAL_, which has a default value of 5 secs, but can
be set in the config file.
* Since we now have queues of jobs, the ArrayList _jobsByPriority_ is replaced
by _jobQueues_, which is a hashmap of queue names to individual lists of
_JobInProgress_ objects.
* A new class, _QueueSchedulingInfo (QSI)_, has been introduced. This class
keeps track of per-queue information required for the new scheduling
algorithms. It's really just a collection of fields. _queueInfoMap_ is a
hashmap that maps a queue name to its QSI object. We also keep two sorted lists
of QSI objects (qsiForMaps and qsiForReduces), one for maps and one for
reduces. The class UnusuedCapacityComparator implements Comparator and is used
to sort these two lists, based on unused capacities. The JT constructor creates
QSI objects for each queue and populates the other data structures accordingly.
* There's new code to handle redistribution of capacities, along with detailed
comments on how the algorithm works. This documentation and code starts at line
1695.
* _getNewTaskForTaskTracker()_ has been changed. If a TT has free map and
reduce slots, we first decide whether to give it a Map or Reduce task. This
logic was earlier based on computing M/R loads, but as I had explained in an
email to core-dev, it seemed unnecessary and also had a few problems. Now that
we have a central scheduler that can look across multiple jobs, the logic to
pick a Map or Reduce task can be simplified. I pick one, depending on how many
unused Map or Reduce slots the TT has. We can probably do better, but this
seems like a decent start. Once we decide whether we need a Map or Reduce task,
we pick a queue (based on how far behind the queue is; again, there are
probably better/different ways to do this, one of which is suggested). we also
pick a queue, and then a job in the queue, based on how much capacity the queue
is using and user limits of the highest priority jobs in the queue.
* _submitJob()_ has also been changed. When a job gets submitted, it gets
placed in the right position in the right queue.
* _JobInProgress_ and _TaskInProgress_ have been updated to keep track of
speculative tasks. This lets us ultimately keep task of how many tasks in a job
need to run and how many are running, which ties in to capacity planning.
* _JobInProgress_ is also changed to expose the queue a job is submitted to. If
no queue is mentioned in the user's conf, the job is assigned to the first
queue (to maintain backwards compatibility).
Please note that the algorithms in the code are a bit different than what was
detailed in the description of this Jira as they've evolved over time. Please
also note that these are not the best algorithms, and that it is assumed that
over time, we will get a lot better at refining them. But they enable us to get
started.
was (Author: vivekr):
I'm attaching the first patch (3445.1.patch). This is a partial patch that
has support for:
* for queues and jobs submitted to queues
* guaranteed capacities, priorities, and user limits
* redistribution of capacities (without preemption)
In essence, this patch implements the following requirements (see HADOOP-3421)
: 1.1-1.5, 2.1, 3.1-3.3, 4.1.
The purpose of this patch is to get the basic code reviewed, and there is a
non-trivial amount of it. This is not a complete patch. The following remains
to be done:
* preemption, when reclaiming capacity (req 1.6)
* unit tests
* others such as ACLs, rereading of configuration etc, which will be tracked
elsewhere.
* cleanup, such as better variable/method names, etc.
This patch assumes that the patch for HADOOP-3470 is committed or applied.
Here's some explanation of what this patch incorporates:
* redistribution of capacity: class _RedistributeCapacity_ implements a
runnable class that periodically invokes code to redistribute capacity. The
time interval is defined by
_REDISTRIBUTE_CAPACITY_INTERVAL_, which has a default value of 5 secs, but can
be set in the config file.
* Since we now have queues of jobs, the ArrayList _jobsByPriority_ is replaced
by _jobQueues_, which is a hashmap of queue names to individual lists of
_JobInProgress_ objects.
* A new class, _QueueSchedulingInfo (QSI)_, has been introduced. This class
keeps track of per-queue information required for the new scheduling
algorithms. It's really just a collection of fields. _queueInfoMap_ is a
hashmap that maps a queue name to its QSI object. We also keep two sorted lists
of QSI objects (qsiForMaps and qsiForReduces), one for maps and one for
reduces. The class UnusuedCapacityComparator implements Comparator and is used
to sort these two lists, based on unused capacities. The JT constructor creates
QSI objects for each queue and populates the other data structures accordingly.
* There's new code to handle redistribution of capacities, along with detailed
comments on how the algorithm works. This documentation and code starts at line
1695.
* _getNewTaskForTaskTracker()_ has been changed. If a TT has free map and
reduce slots, we first decide whether to give it a Map or Reduce task. This
logic was earlier based on computing M/R loads, but as I had explained in an
email to core-dev, it seemed unnecessary and also had a few problems. Now that
we have a central scheduler that can look across multiple jobs, the logic to
pick a Map or Reduce task can be simplified. I pick one, depending on how many
unused Map or Reduce slots the TT has. We can probably do better, but this
seems like a decent start. Once we decide whether we need a Map or Reduce task,
we pick a queue (based on how far behind the queue is; again, there are
probably better/different ways to do this, one of which is suggested). we also
pick a queue, and then a job in the queue, based on how much capacity the queue
is using and user limits of the highest priority jobs in the queue.
* _submitJob()_ has also been changed. When a job gets submitted, it gets
placed in the right position in the right queue.
* _JobInProgress_ and _TaskInProgress_ have been updated to keep track of
speculative tasks. This lets us ultimately keep task of how many tasks in a job
need to run and how many are running, which ties in to capacity planning.
* _JobInProgress_ is also changed to expose the queue a job is submitted to. If
no queue is mentioned in the user's conf, the job is assigned to the first
queue (to maintain backwards compatibility).
Please note that the algorithms in the code are a bit different than what was
detailed in the description of this Jira as they've evolved over time. Please
also note that these are not the best algorithms, and that it is assumed that
over time, we will get a lot better at refining them. But they enable us to get
started.
> Implementing core scheduler functionality in Resource Manager (V1) for Hadoop
> -----------------------------------------------------------------------------
>
> Key: HADOOP-3445
> URL: https://issues.apache.org/jira/browse/HADOOP-3445
> Project: Hadoop Core
> Issue Type: New Feature
> Reporter: Vivek Ratan
> Attachments: 3445.1.patch
>
>
> The architecture for the Hadoop Resource Manager (V1) is described in
> HADOOP-3444. This Jira proposes implementation details on the core scheduling
> piece - the changes to the JT to handle Orgs, queues, guaranteed capacities,
> user limits, and ultimately, scheduling a task on a TT.
> As per the architecture in HADOOP-3444, the JT contains a new component, Job
> Queue Manager (JQM), to handle queues of jobs. Each queue represents a queue
> in an Org (one queue per Org). Job queues are backed up by disk based
> storage.
> We now look at some details. Terminology:
> * A queue has *excess capacity* if it does not have enough jobs (queued or
> running) to take up its guaranteed capacity. Excess capacity needs to be
> distributed to queues that have none.
> * Queues that have given up excess capacity to other queues are called *low
> queues*, for the sake of this discussion. Queues that are running on
> additional capacity are called *high queues*.
> For each queue, the JT keeps track of the following:
> * Guaranteed capacity (GC): the capacity guaranteed to the queue, set up
> through configuration. The sum of all GCs is equal to the grid capacity.
> Since we're handling Map and Reduce slots differently, we will have a GC for
> each, i.e., a CG-M for maps and a GC-R for reducers. The sum of all GC-Ms is
> equal to the sum of all map slots available in the Grid, and the same for
> GC-Rs.
> * Allocated capacity (AC): the current capacity of the queue. This can be
> higher or lower than the GC because of excess capacity distribution. The sum
> of all ACs is equal to the grid capacity. As above, each queue will have a
> AC-M and AC-R.
> * Timer for claiming containers: can just be the # of seconds the queue can
> wait till it needs its capacity back. There will be separate timers for
> claiming map and reduce slots (we will likely want to take more time to claim
> reduce slots, as reducers take longer to run).
> * # of containers being used, i.e., the number of running tasks associated
> with the queue (C-RUN). Each queue will have a C-RUN-M and C-RUN-R.
> * Whether any jobs are queued.
> * The number of Map and Reduce containers used by each user.
> Every once in a while (this can be done periodically, or based on events),
> the JT looks at redistributing capacity. This can result in excess capacity
> being given to queues that need them, and capacity being claimed by queues.
> *Excess capacity is redistributed as follows*:
> * The algorithm below is in terms of tasks, which can be map or reduce
> tasks. It is the same for both. The JT will run the algorithm to redistribute
> excess capacity for both Maps and Reduces.
> * The JT checks each queue to see if it has excess capacity. A queue has
> excess capacity if the number of running tasks associated with the queue is
> less than the allocated capacity of the queue (i.e., if C-RUN < AC) and there
> are no jobs queued.
> ** Note: a tighter definition is if C-RUN plus the number of tasks
> required by the waiting jobs is less than AC, but we don't need that level of
> detail.
> * If there is at least one queue with excess capacity, the total excess
> capacity is the sum of excess capacities of each queue. The JT figures out
> the queues that this capacity can be distributed to. These are queues that
> need capacity, where C-RUN = AC (i.e., the queue is running at max capacity)
> and there are queued jobs.
> * The JT now figures out how much excess capacity to distribute to each
> queue that needs it. This can be done in many ways.
> ** Distribute capacity in the ratio of each Org's guaranteed capacity.
> So if queues Q1, Q2, and Q3 have guaranteed capacities of GC1, GC2, and GC3,
> and if Q3 has N containers of excess capacity, Q1 gets (GC1*N)/(GC1+GC2)
> additional capacity, while Q2 gets (GC2*N)/(GC1+GC2).
> ** You could use some other ratio that uses the number of waiting jobs.
> The more waiting jobs a queue has, the more its share of excess capacity.
> * For each queue that needs capacity, the JT increments its AC with the
> capacity it is allocated. At the same time, the JT appropriately decrements
> the AC of queues with excess capacity.
> *Excess capacity is reclaimed as follows*:
> * The algorithm below is in terms of tasks, which can be map or reduce tasks.
> It is the same for both. The JT will run the algorithm to reclaim excess
> capacity for both Maps and Reduces.
> * The JT determines which queues are low queues (if AC < GC). If a low queue
> has a job waiting, then we need to reclaim its resources. Capacity to be
> reclaimed = GC-AC.
> * Capacity is re-claimed from any of the high queues (where AC > GC).
> * JT decrements the AC of the high queue from which capacity is to be
> claimed, and increments the AC of the low queue. The decremented AC of the
> high queue cannot go below its GC, so the low queue may get its capacity back
> from more than one queue.
> * The JT also starts a timer for the low queue (this can be an actual timer,
> or just a count, perhaps representing seconds, which can be decremented by
> the JT periodically).
> * If a timer goes off, the JT needs to instruct some high queue to kill some
> of their tasks. How do we decide which high queues to claim capacity from?
> ** The candidates are those high queues which are running more tasks than
> they should be, i.e., where C-RUN > AC.
> ** Among these queues, the JT can pick those that are using the most excess
> capacity (i.e. queues with a higher value for (C-RUN - AC)/AC ).
> * How does a high queue decide which tasks to kill?
> ** Ideally, you want to kill tasks that have started recently or made the
> least progress. You might want to use the same algorithm you use to decide
> which tasks to speculatively run (though that algorithm needs to be fixed).
> ** Note: it is expensive to kill tasks, so we need to focus on getting better
> at deciding which tasks to kill.
> Within a queue, a user's limit can dynamically change depending on how many
> users have submitted jobs. This needs to be handled in a way similar to how
> we handle excess capacity between queues.
> *When a TT has a free Map slot*:
> # TT contacts JT to give it a Map task to run.
> # JT figures out which queue to approach first (among all queues that have
> capacity, i.e., where C-RUN-M < AC-M). This can be done in a few ways:
> ** Round-robin, so every queue/Org has the same chance to get a free
> container.
> ** JT can pick the queue with the maximum unused capacity.
> # JT needs to pick a job which can use the slot.
> ** If it has no running jobs from that queue, it gets one from the JQM.
> *** JT asks for the first Job in the selected queue, via the JQM. If
> the job's user's limit is maxed out, the job is returned to the queue and JT
> asks for the next job. This continues till the JT finds a suitable job.
> *** Or else, JT has a list of users in the queue whose jobs it is
> running, and it can figure out which of these users are over their limit. It
> asks the JQM for the first job in the queue whose user is not in a list of
> maxed-out users it provides.
> ** If the JT already has a list of running jobs from the queue, it
> looks at each (in order of priority) till it finds one whose user's limit has
> not been exceeded.
> # If there is no job in the queue that is eligible to run (the queue may
> have no queued jobs), the JT picks another queue using the same steps.
> # The JT figures out which Map task from the job to run on the free TT
> using the same algorithm as today (find a locality match using the job's
> cache, then look for failed tasks or tasks on the rack, etc).
> # JT increments C-RUN-M and the number of Map containers used by the job's
> user. It then returns the task to the TT.
> *When a TT has a free Reduce slot*: This is similar to what happens with a
> free Map slot, except that:
> * we can use a different algorithm to decide which Reduce task to run from
> a give job. I'm not sure what we do today for Reduce tasks (I think we just
> pick the first one), but if it needs to be improved, that's a separate issue.
> * Since there is no preemption of jobs based on priorities, we will not
> have the situation where a job's Reducers are blocking containers as they're
> waiting for Maps to run and there are no Map slots to run.
> *When a task fails or completes*: JT decrements C-RUN and the # of containers
> used by the user.
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.