[ 
https://issues.apache.org/jira/browse/HADOOP-3445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12630103#action_12630103
 ] 

Sameer Paranjpye commented on HADOOP-3445:
------------------------------------------

The sanity checks are pretty important. The defaults for queues are nice to 
have but not essential immediately. So yes they can be introduced in a 
subsequent JIRA, possibly even after 0.19, since it appears to be a back 
compatible change. 

> Implementing core scheduler functionality in Resource Manager (V1) for Hadoop
> -----------------------------------------------------------------------------
>
>                 Key: HADOOP-3445
>                 URL: https://issues.apache.org/jira/browse/HADOOP-3445
>             Project: Hadoop Core
>          Issue Type: New Feature
>            Reporter: Vivek Ratan
>            Assignee: Vivek Ratan
>         Attachments: 3445.1.patch, 3445.10.patch, 3445.2.patch, 3445.3.patch, 
> 3445.4.patch, 3445.5.patch, 3445.6.patch, 3445.7.patch, 3445.8.patch, 
> 3445.9.patch
>
>
> The architecture for the Hadoop Resource Manager (V1) is described in 
> HADOOP-3444. This Jira proposes implementation details on the core scheduling 
> piece - the changes to the JT to handle Orgs, queues, guaranteed capacities, 
> user limits, and ultimately, scheduling a task on a TT. 
> As per the architecture in HADOOP-3444, the JT contains a new component, Job 
> Queue Manager (JQM), to handle queues of jobs. Each queue represents a queue 
> in an Org (one queue per Org). Job queues are backed up by disk based 
> storage. 
> We now look at some details. Terminology: 
> * A queue has *excess capacity* if it does not have enough jobs (queued or 
> running) to take up its guaranteed capacity. Excess capacity needs to be 
> distributed to queues that have none. 
> * Queues that have given up excess capacity to other queues are called *low 
> queues*, for the sake of this discussion. Queues that are running on 
> additional capacity are called *high queues*.
> For each queue, the JT keeps track of the following: 
> * Guaranteed capacity (GC): the capacity guaranteed to the queue, set up 
> through configuration. The sum of all GCs is equal to the grid capacity. 
> Since we're handling Map and Reduce slots differently, we will have a GC for 
> each, i.e., a CG-M for maps and a GC-R for reducers. The sum of all GC-Ms is 
> equal to the sum of all map slots available in the Grid, and the same for 
> GC-Rs. 
> * Allocated capacity (AC): the current capacity of the queue. This can be 
> higher or lower than the GC because of excess capacity distribution. The sum 
> of all ACs is equal to the grid capacity. As above, each queue will have a 
> AC-M and AC-R. 
> * Timer for claiming containers: can just be the # of seconds the queue can 
> wait till it needs its capacity back. There will be separate timers for 
> claiming map and reduce slots (we will likely want to take more time to claim 
> reduce slots, as reducers take longer to run). 
> * # of containers being used, i.e., the number of running tasks associated 
> with the queue (C-RUN).  Each queue will have a C-RUN-M and C-RUN-R. 
> * Whether any jobs are queued. 
> * The number of Map and Reduce containers used by each user. 
> Every once in a while (this can be done periodically, or based on events), 
> the JT looks at redistributing capacity. This can result in excess capacity 
> being given to queues that need them, and capacity being claimed by queues. 
> *Excess capacity is redistributed as follows*:
>    * The algorithm below is in terms of tasks, which can be map or reduce 
> tasks. It is the same for both. The JT will run the algorithm to redistribute 
> excess capacity for both Maps and Reduces. 
>    * The JT checks each queue to see if it has excess capacity. A queue has 
> excess capacity if the number of running tasks associated with the queue is 
> less than the allocated capacity of the queue (i.e., if C-RUN < AC) and there 
> are no jobs queued. 
>       ** Note: a tighter definition is if C-RUN plus the number of tasks 
> required by the waiting jobs is less than AC, but we don't need that level of 
> detail. 
>    * If there is at least one queue with excess capacity, the total excess 
> capacity is the sum of excess capacities of each queue. The JT figures out 
> the queues that this capacity can be distributed to. These are queues that 
> need capacity, where C-RUN = AC (i.e., the queue is running at max capacity) 
> and there are queued jobs. 
>    * The JT now figures out how much excess capacity to distribute to each 
> queue that needs it. This can be done in many ways. 
>       ** Distribute capacity in the ratio of each Org's guaranteed capacity. 
> So if queues Q1, Q2, and Q3 have guaranteed capacities of GC1, GC2, and GC3, 
> and if Q3 has N containers of excess capacity, Q1 gets (GC1*N)/(GC1+GC2) 
> additional capacity, while Q2 gets (GC2*N)/(GC1+GC2). 
>       ** You could use some other ratio that uses the number of waiting jobs. 
> The more waiting jobs a queue has, the more its share of excess capacity.
>    * For each queue that needs capacity, the JT increments its AC with the 
> capacity it is allocated. At the same time, the JT appropriately decrements 
> the AC of queues with excess capacity. 
> *Excess capacity is reclaimed as follows*: 
> * The algorithm below is in terms of tasks, which can be map or reduce tasks. 
> It is the same for both. The JT will run the algorithm to reclaim excess 
> capacity for both Maps and Reduces. 
> * The JT determines which queues are low queues (if AC < GC). If a low queue 
> has a job waiting, then we need to reclaim its resources. Capacity to be 
> reclaimed = GC-AC. 
> * Capacity is re-claimed from any of the high queues (where AC > GC). 
> * JT decrements the AC of the high queue from which capacity is to be 
> claimed, and increments the AC of the low queue. The decremented AC of the 
> high queue cannot go below its GC, so the low queue may get its capacity back 
> from more than one queue. 
> * The JT also starts a timer for the low queue (this can be an actual timer, 
> or just a count, perhaps representing seconds, which can be decremented by 
> the JT periodically). 
> * If a timer goes off, the JT needs to instruct some high queue to kill some 
> of their tasks. How do we decide which high queues to claim capacity from? 
> ** The candidates are those high queues which are running more tasks than 
> they should be, i.e., where C-RUN > AC. 
> ** Among these queues, the JT can pick those that are using the most excess 
> capacity (i.e. queues with a higher value for (C-RUN - AC)/AC ). 
> * How does a high queue decide which tasks to kill? 
> ** Ideally, you want to kill tasks that have started recently or made the 
> least progress. You might want to use the same algorithm you use to decide 
> which tasks to speculatively run (though that algorithm needs to be fixed). 
> ** Note: it is expensive to kill tasks, so we need to focus on getting better 
> at deciding which tasks to kill. 
> Within a queue, a user's limit can dynamically change depending on how many 
> users have submitted jobs. This needs to be handled in a way similar to how 
> we handle excess capacity between queues. 
> *When a TT has a free Map slot*:
> # TT contacts JT to give it a Map task to run. 
> # JT figures out which queue to approach first (among all queues that have 
> capacity, i.e., where C-RUN-M < AC-M). This can be done in a few ways:
>       ** Round-robin, so every queue/Org has the same chance to get a free 
> container. 
>       ** JT can pick the queue with the maximum unused capacity. 
> # JT needs to pick a job which can use the slot. 
>       ** If it has no running jobs from that queue, it gets one from the JQM. 
>          *** JT asks for the first Job in the selected queue, via the JQM. If 
> the job's user's limit is maxed out, the job is returned to the queue and JT 
> asks for the next job. This continues till the JT finds a suitable job. 
>          *** Or else, JT has a list of users in the queue whose jobs it is 
> running, and it can figure out which of these users are over their limit. It 
> asks the JQM for the first job in the queue whose user is not in a list of 
> maxed-out users it provides. 
>       ** If the JT already has a list of running jobs from the queue, it 
> looks at each (in order of priority) till it finds one whose user's limit has 
> not been exceeded.
>    # If there is no job in the queue that is eligible to run (the queue may 
> have no queued jobs), the JT picks another queue using the same steps. 
>    # The JT figures out which Map task from the job to run on the free TT 
> using the same algorithm as today (find a  locality match using the job's 
> cache, then look for failed tasks or tasks on the rack, etc). 
>    # JT increments C-RUN-M and the number of Map containers used by the job's 
> user. It then returns the task to the TT. 
> *When a TT has a free Reduce slot*: This is similar to what happens with a 
> free Map slot, except that: 
>    * we can use a different algorithm to decide which Reduce task to run from 
> a give job. I'm not sure what we do today for Reduce tasks (I think we just 
> pick the first one), but if it needs to be improved, that's a separate issue.
>    * Since there is no preemption of jobs based on priorities, we will not 
> have the situation where a job's Reducers are blocking containers as they're 
> waiting for Maps to run and there are no Map slots to run. 
> *When a task fails or completes*: JT decrements C-RUN and the # of containers 
> used by the user. 

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to