Author: acmurthy
Date: Sun Sep 11 23:57:37 2011
New Revision: 1169585

URL: http://svn.apache.org/viewvc?rev=1169585&view=rev
Log:
MAPREDUCE-2981. Backport FairScheduler from trunk. Contributed by Matei Zaharia.

Added:
    hadoop/common/branches/branch-0.20-security/conf/fair-scheduler.xml.template
    
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/designdoc/
    
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/designdoc/fair_scheduler_design_doc.tex
    
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerEventLog.java
    
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java
    
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LocalityLevel.java
    
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java
    
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java
    
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingAlgorithms.java
    
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingMode.java
    
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java
    
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestCapBasedLoadManager.java
    
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestComputeFairShares.java
    
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairSchedulerSystem.java
    
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestLinuxTaskControllerLaunchArgs.java
    
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapreduce/TestSleepJob.java
Modified:
    hadoop/common/branches/branch-0.20-security/CHANGES.txt
    
hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    
hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
    
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/ivy.xml
    
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java
    
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/DefaultTaskSelector.java
    
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
    
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java
    
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FifoJobComparator.java
    
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java
    
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/NewJobWeightBooster.java
    
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java
    
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java
    
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/TaskSelector.java
    
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/WeightAdjuster.java
    
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
    
hadoop/common/branches/branch-0.20-security/src/docs/src/documentation/content/xdocs/fair_scheduler.xml
    
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
    
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java
    
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
    
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java

Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1169585&r1=1169584&r2=1169585&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Sun Sep 11 23:57:37 
2011
@@ -202,6 +202,9 @@ Release 0.20.205.0 - unreleased
     HADOOP-7599. Script improvements to setup a secure Hadoop cluster 
     (Eric Yang via ddas)
 
+    MAPREDUCE-2981. Backport FairScheduler from trunk. (Matei Zaharia via
+    acmurthy) 
+
 Release 0.20.204.0 - 2011-8-25
 
   NEW FEATURES

Added: 
hadoop/common/branches/branch-0.20-security/conf/fair-scheduler.xml.template
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/conf/fair-scheduler.xml.template?rev=1169585&view=auto
==============================================================================
--- 
hadoop/common/branches/branch-0.20-security/conf/fair-scheduler.xml.template 
(added)
+++ 
hadoop/common/branches/branch-0.20-security/conf/fair-scheduler.xml.template 
Sun Sep 11 23:57:37 2011
@@ -0,0 +1,12 @@
+<?xml version="1.0"?>
+
+<!--
+  This file contains pool and user allocations for the Fair Scheduler.
+  Its format is explained in the Fair Scheduler documentation at
+  http://hadoop.apache.org/common/docs/r0.20.205.0/fair_scheduler.html.
+  The documentation also includes a sample config file.
+-->
+
+<allocations>
+
+</allocations>

Modified: 
hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=1169585&r1=1169584&r2=1169585&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
 (original)
+++ 
hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
 Sun Sep 11 23:57:37 2011
@@ -509,8 +509,9 @@ class CapacityTaskScheduler extends Task
       job.schedulingOpportunity();
       
       // First, try to get a 'local' task
-      Task t = 
-        job.obtainNewLocalMapTask(taskTracker, numTaskTrackers, 
numUniqueHosts);
+      Task t = job.obtainNewNodeOrRackLocalMapTask(taskTracker,
+                                                   numTaskTrackers,
+                                                   numUniqueHosts);
       
       if (t != null) {
         return TaskLookupResult.getTaskFoundResult(t, job); 

Modified: 
hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1169585&r1=1169584&r2=1169585&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
 (original)
+++ 
hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
 Sun Sep 11 23:57:37 2011
@@ -195,8 +195,8 @@ public class TestCapacityScheduler exten
     }
 
     @Override
-    public Task obtainNewLocalMapTask(final TaskTrackerStatus tts, int 
clusterSize,
-        int ignored) throws IOException {
+    public Task obtainNewNodeOrRackLocalMapTask(final TaskTrackerStatus tts,
+        int clusterSize, int ignored) throws IOException {
       return obtainNewMapTask(tts, clusterSize, ignored);
     }
     
@@ -553,6 +553,12 @@ public class TestCapacityScheduler exten
       return statuses;
     }
 
+    @Override
+    public boolean killTask(TaskAttemptID taskid, boolean shouldFail)
+      throws IOException {
+      return false;
+    }
+
 
     public void addJobInProgressListener(JobInProgressListener listener) {
       mylisteners.add(listener);

Added: 
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/designdoc/fair_scheduler_design_doc.tex
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/designdoc/fair_scheduler_design_doc.tex?rev=1169585&view=auto
==============================================================================
--- 
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/designdoc/fair_scheduler_design_doc.tex
 (added)
+++ 
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/designdoc/fair_scheduler_design_doc.tex
 Sun Sep 11 23:57:37 2011
@@ -0,0 +1,253 @@
+% Licensed to the Apache Software Foundation (ASF) under one
+% or more contributor license agreements.  See the NOTICE file
+% distributed with this work for additional information
+% regarding copyright ownership.  The ASF licenses this file
+% to you under the Apache License, Version 2.0 (the
+% "License"); you may not use this file except in compliance
+% with the License.  You may obtain a copy of the License at
+% 
+%     http://www.apache.org/licenses/LICENSE-2.0
+% 
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS,
+% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+% See the License for the specific language governing permissions and
+% limitations under the License.
+
+\documentclass[11pt]{article}
+\usepackage{geometry}
+\geometry{letterpaper}
+
+\begin{document}
+
+\title{Hadoop Fair Scheduler Design Document}
+\author{}
+\maketitle
+\tableofcontents
+
+\section{Introduction}
+
+The Hadoop Fair Scheduler started as a simple means to share MapReduce 
clusters. Over time, it has grown in functionality to support hierarchical 
scheduling, preemption, and multiple ways of organizing and weighing jobs. This 
document explains the goals and features of the Fair Scheduler and its internal 
design.
+
+\section{Fair Scheduler Goals}
+
+The Fair Scheduler was designed with four main goals:
+\begin{enumerate}
+  \item Run small jobs quickly even if they are sharing a cluster with large 
jobs. Unlike Hadoop's built-in FIFO scheduler, fair scheduling lets small jobs 
make progress even if a large job is running, without starving the large job.
+  \item Provide guaranteed service levels to ``production" jobs, to let them 
run alongside experimental jobs in a shared cluster.
+  \item Be simple to administer and configure. The scheduler should do 
something reasonable ``out of the box," and users should only need to configure 
it as they discover that they want to use more advanced features.
+  \item Support reconfiguration at runtime, without requiring a cluster 
restart.
+\end{enumerate}
+
+\section{Scheduler Features}
+
+This section provides a quick overview of the features of the Fair Scheduler. 
A detailed usage guide is available in the Hadoop documentation in {\tt 
build/docs/fair\_scheduler.html}.
+
+\subsection{Pools}
+
+The Fair Scheduler groups jobs into ``pools" and performs fair sharing between 
these pools. Each pool can use either FIFO or fair sharing to schedule jobs 
internal to the pool. The pool that a job is placed in is determined by a 
JobConf property, the ``pool name property". By default, this is {\tt 
user.name}, so that there is one pool per user. However, different properties 
can be used, e.g.~{\tt group.name} to have one pool per Unix group.
+
+A common trick is to set the pool name property to an unused property name 
such as {\tt pool.name} and make this default to {\tt user.name}, so that there 
is one pool per user but it is also possible to place jobs into ``special" 
pools by setting their {\tt pool.name} directly. The {\tt mapred-site.xml} 
snippet below shows how to do this:
+
+\begin{verbatim}
+<property>
+  <name>mapred.fairscheduler.poolnameproperty</name>
+  <value>pool.name</value>
+</property>
+
+<property>
+  <name>pool.name</name>
+  <value>${user.name}</value>
+</property>
+\end{verbatim}
+
+\subsection{Minimum Shares}
+
+Normally, active pools (those that contain jobs) will get equal shares of the 
map and reduce task slots in the cluster. However, it is also possible to set a 
\emph{minimum share} of map and reduce slots on a given pool, which is a number 
of slots that it will always get when it is active, even if its fair share 
would be below this number. This is useful for guaranteeing that production 
jobs get a certain desired level of service when sharing a cluster with 
non-production jobs. Minimum shares have three effects:
+\begin{enumerate}
+  \item The pool's fair share will always be at least as large as its minimum 
share. Slots are taken from the share of other pools to achieve this. The only 
exception is if the minimum shares of the active pools add up to more than the 
total number of slots in the cluster; in this case, each pool's share will be 
scaled down proportionally.
+  \item Pools whose running task count is below their minimum share get 
assigned slots first when slots are available.
+  \item It is possible to set a \emph{preemption timeout} on the pool after 
which, if it has not received enough task slots to meet its minimum share, it 
is allowed to kill tasks in other jobs to meet its share. Minimum shares with 
preemption timeouts thus act like SLAs.
+\end{enumerate}
+
+Note that when a pool is inactive (contains no jobs), its minimum share is not 
``reserved" for it -- the slots are split up among the other pools.
+
+\subsection{Preemption}
+
+As explained above, the scheduler may kill tasks from a job in one pool in 
order to meet the minimum share of another pool. We call this preemption, 
although this usage of the word is somewhat strange given the normal definition 
of preemption as pausing; really it is the \emph{job} that gets preempted, 
while the task gets killed. The feature explained above is called \emph{min 
share preemption}. In addition, the scheduler supports \emph{fair share 
preemption}, to kill tasks when a pool's fair share is not being met. Fair 
share preemption is much more conservative than min share preemption, because 
pools without min shares are expected to be non-production jobs where some 
amount of unfairness is tolerable. In particular, fair share preemption 
activates if a pool has been below \emph{half} of its fair share for a 
configurable fair share preemption timeout, which is recommended to be set 
fairly high (e.g. 10 minutes).
+
+In both types of preemption, the scheduler kills the most recently launched 
tasks from over-scheduled pools, to minimize the amount of computation wasted 
by preemption.
+
+\subsection{Running Job Limits}
+
+The fair scheduler can limit the number of concurrently running jobs from each 
user and from each pool. This is useful for limiting the amount of intermediate 
data generated on the cluster. The jobs that will run are chosen in order of 
submit time and priority. Jobs submitted beyond the limit wait for one of the 
running jobs to finish.
+
+\subsection{Job Priorities}
+
+Within a pool, job priorities can be used to control the scheduling of jobs, 
whether the pool's internal scheduling mode is FIFO or fair sharing:
+\begin{itemize}
+  \item In FIFO pools, jobs are ordered first by priority and then by submit 
time, as in Hadoop's default scheduler.
+  \item In fair sharing pools, job priorities are used as weights to control 
how much share a job gets. The normal priority corresponds to a weight of 1.0, 
and each level gives 2x more weight. For example, a high-priority job gets a 
weight of 2.0, and will therefore get 2x the share of a normal-priority job. 
+\end{itemize}
+
+\subsection{Pool Weights}
+
+Pools can be given weights to achieve unequal sharing of the cluster. For 
example, a pool with weight 2.0 gets 2x the share of a pool with weight 1.0.
+
+\subsection{Delay Scheduling}
+
+The Fair Scheduler contains an algorithm called delay scheduling to improve 
data locality. Jobs that cannot launch a data-local map task wait for some 
period of time before they are allowed to launch non-data-local tasks, ensuring 
that they will run locally if some node in the cluster has the relevant data. 
Delay scheduling is described in detail in Section \ref{sec:delay-scheduling}.
+
+\subsection{Administration}
+
+The Fair Scheduler includes a web UI displaying the active pools and jobs and 
their fair shares, moving jobs between pools, and changing job priorities.
+In addition, the Fair Scheduler's allocation file (specifying min shares and 
preemption timeouts for the pools) is automatically reloaded if it is modified 
on disk, to allow runtime reconfiguration.
+
+\section{Implementation}
+
+\subsection{Hadoop Scheduling Background}
+
+Hadoop jobs consist of a number of map and reduce \emph{tasks}. These task run 
in \emph{slots} on the nodes on the cluster. Each node is configured with a 
number of map slots and reduce slots based on its computational resources 
(typically one slot per core). The role of the scheduler is to assign tasks to 
any slots that are free.
+
+All schedulers in Hadoop, including the Fair Scheduler, inherit from the {\tt 
TaskScheduler} abstract class. This class provides access to a {\tt 
TaskTrackerManager} -- an interface to the JobTracker -- as well as a {\tt 
Configuration} instance. It also ask the scheduler to implement three abstract 
methods: the lifecycle methods {\tt start} and {\tt terminate}, and a method 
called {\tt assignTasks} to launch tasks on a given TaskTracker.
+Task assignment in Hadoop is reactive. TaskTrackers periodically send 
heartbeats to the JobTracker with their {\tt TaskTrackerStatus}, which contains 
a list of running tasks, the number of slots on the node, and other 
information. The JobTracker then calls {\tt assignTasks} on the scheduler to 
obtain tasks to launch. These are returned with the heartbeat response.
+
+Apart from reacting to heartbeats through {\tt assignTasks}, schedulers can 
also be notified when jobs have been submitted to the cluster, killed, or 
removed by adding listeners to the {\tt TaskTrackerManager}. The Fair Scheduler 
sets up these listeners in its {\tt start} method. An important role of the 
listeners is to initialize jobs that are submitted -- until a job is 
initialized, it cannot launch tasks. The Fair Scheduler currently initializes 
all jobs right away, but it may also be desirable to hold off initializing jobs 
if too many are submitted to limit memory usage on the JobTracker.
+
+Selection of tasks \emph{within} a job is mostly done by the {\tt 
JobInProgress} class, and not by individual schedulers. {\tt JobInProgress} 
exposes two methods, {\tt obtainNewMapTask} and {\tt obtainNewReduceTask}, to 
launch a task of either type. Both methods may either return a {\tt Task} 
object or {\tt null} if the job does not wish to launch a task. Whether a job 
wishes to launch a task may change back and forth during its lifetime. Even 
after all tasks in the job have been started, the job may wish to run another 
task for speculative execution. In addition, if the node containing a map task 
failed, the job will wish to re-run it to rebuild its output for use in the 
reduce tasks. Schedulers may therefore need to poll multiple jobs until they 
find one with a task to run.
+
+Finally, for map tasks, an important scheduling criterion is data locality: 
running the task on a node or rack that contains its input data. Normally, {\tt 
JobInProgress.obtainNewMapTask} returns the ``closest" map task to a given 
node. However, to give schedulers slightly more control over data locality, 
there is also a version of {\tt obtainNewMapTask} that allow the scheduler to 
cap the level of non-locality allowed for the task (e.g.~request a task only on 
the same node, or {\tt null} if none is available). The Fair Scheduler uses 
this method with an algorithm called delay scheduling (Section 
\ref{sec:delay-scheduling}) to optimize data locality.
+
+\subsection{Fair Scheduler Basics}
+
+At a high level, the Fair Scheduler uses hierarchical scheduling to assign 
tasks. First it selects a pool to assign a task to according to the fair 
sharing algorithm in Section \ref{sec:fair-sharing-alg}. Then it asks the pool 
obtain a task. The pool chooses among its jobs according to its internal 
scheduling order (FIFO or fair sharing).
+
+In fact, because jobs might not have tasks to launch ({\tt 
obtainNew(Map|Reduce)Task} can return null), the scheduler actually establishes 
an ordering on jobs and asks them for tasks in turn. Within a pool, jobs are 
sorted either by priority and start time (for FIFO) or by distance below fair 
share. If the first job in the ordering does not have a task to launch, the 
pool will ask the second, third, etc jobs. Pools themselves are sorted by 
distance below min share and fair share, so if the first pool does not have any 
jobs that can launch tasks, the second pool is asked, etc. This makes it 
straightforward to implement features like delay scheduling (Section 
\ref{sec:delay-scheduling}) that may cause jobs to ``pass" on a slot.
+
+Apart from the assign tasks code path, the Fair Scheduler also has a periodic 
update thread that calls {\tt update} every few seconds. This thread is 
responsible for recomputing fair shares to display them on the UI (Section 
\ref{sec:fair-share-computation}), checking whether jobs need to be preempted 
(Section \ref{sec:preemption}), and checking whether the allocations file has 
changed to reload pool allocations (through {\tt PoolManager}).
+
+\subsection{The {\tt Schedulable} Class}
+
+To allow the same fair sharing algorithm to be used both between pools and 
within a pool, the Fair Scheduler uses an abstract class called {\tt 
Schedulable} to represent both pools and jobs. Its subclasses for these roles 
are {\tt PoolSchedulable} and {\tt JobSchedulable}. A {\tt Schedulable} is 
responsible for three roles:
+\begin{enumerate}
+  \item It can be asked to obtain a task through {\tt assignTask}. This may 
return {\tt null} if the {\tt Schedulable} has no tasks to launch.
+  \item It can be queried for information about the pool/job to use in 
scheduling, such as:
+  \begin{itemize}
+    \item Number of running tasks.
+    \item Demand (number of tasks the {\tt Schedulable} \emph{wants} to run; 
this is equal to number of running tasks + number of unlaunched tasks).
+    \item Min share assigned through config file.
+    \item Weight (for fair sharing).
+    \item Priority and start time (for FIFO scheduling).
+  \end{itemize}
+  \item It can be assigned a fair share through {\tt setFairShare}.
+\end{enumerate}
+
+There are separate {\tt Schedulable}s for map and reduce tasks, to make it 
possible to use the same algorithm on both types of tasks.
+
+\subsection{Fair Sharing Algorithm}
+\label{sec:fair-sharing-alg}
+
+A simple way to achieve fair sharing is the following: whenever a slot is 
available, assign it to the pool that has the fewest running tasks. This will 
ensure that all pool get an equal number of slots, unless a pool's demand is 
less than its fair share, in which case the extra slots are divided evenly 
among the other pools. Two features of the Fair Scheduler complicate this 
algorithm a little:
+\begin{itemize}
+  \item Pool weights mean that some pools should get more slots than others. 
For example, a pool with weight 2 should get 2x more slots than a pool with 
weight 1. This is accomplished by changing the scheduling rule to ``assign the 
slot to the pool whose value of $runningTasks/weight$ is smallest."
+  \item Minimum shares mean that pools below their min share should get slots 
first. When we sort pools to choose which ones to schedule next, we place pools 
below their min share ahead of pools above their min share. We order the pools 
below their min share by how far they are below it as a percentage of the share.
+\end{itemize}
+
+This fair sharing algorithm is implemented in {\tt FairShareComparator} in the 
{\tt SchedulingAlgorithms} class. The comparator orders jobs by distance below 
min share and then by $runningTasks/weight$.
+
+\subsection{Preemption}
+\label{sec:preemption}
+
+To determine when to preempt tasks, the Fair Schedulers maintains two values 
for each {\tt PoolSchedulable}: the last time when the pool was at its min 
share, and the last time when the pool was at half its fair share. These 
conditions are checked periodically by the update thread in {\tt 
FairScheduler.updatePreemptionVariables}, using the methods {\tt 
isStarvedForMinShare} and {\tt isStarvedForFairShare}. These methods also take 
into account the demand of the pool, so that a pool is not counted as starving 
if its demand is below its min/fair share but is otherwise met.
+
+When preempting tasks, the scheduler kills the most recently launched tasks 
from over-scheduled pools. This minimizes the amount of computation wasted by 
preemption and ensures that all jobs can eventually finish (it is as if the 
preempted jobs just never got their last few slots). The tasks are chosen and 
preempted in {\tt preemptTasks}.
+
+Note that for min share preemption, it is clear when a pool is below its min 
share because the min share is given as a number of slots, but for fair share 
preemption, we must be able to compute a pool's fair share to determine when it 
is being starved. This computation is trickier than dividing the number of 
slots by the number of pools due to weights, min shares and demands. Section 
\ref{sec:fair-share-computation} explains how fair shares are computed.
+
+\subsection{Fair Share Computation}
+\label{sec:fair-share-computation}
+
+The scheduling algorithm in Section \ref{sec:fair-sharing-alg} achieves fair 
shares without actually needing to compute pools' numerical shares beforehand. 
However, for preemption and for displaying shares in the Web UI, we want to 
know what a pool's fair share is even if the pool is not currently at its 
share. That is, we want to know how many slots the pool \emph{would} get if we 
started with all slots being empty and ran the algorithm in Section 
\ref{sec:fair-sharing-alg} until we filled them.
+One way to compute these shares would be to simulate starting out with empty 
slots and calling {\tt assignTasks} repeatedly until they filled, but this is 
expensive, because each scheduling decision takes $O(numJobs)$ time and we need 
to make $O(numSlots)$ decisions.
+
+To compute fair shares efficiently, the Fair Scheduler includes an algorithm 
based on binary search in {\tt SchedulingAlgorithms.computeFairShares}. This 
algorithm is based on the following observation. If all slots had been assigned 
according to weighted fair sharing respecting pools' demands and min shares, 
then there would exist a ratio $r$ such that:
+\begin{enumerate}
+  \item Pools whose demand $d_i$ is less than $r w_i$ (where $w_i$ is the 
weight of the pool) are assigned $d_i$ slots.
+  \item Pools whose min share $m_i$ is more than $r w_i$ are assigned 
$\min(m_i, d_i)$ slots.
+  \item All other pools are assigned $r w_i$ slots.
+  \item The pools' shares sum up to the total number of slots $t$.
+\end{enumerate}
+
+The Fair Scheduler uses binary search to compute the correct $r$. We define a 
function $f(r)$ as the number of slots that would be used for a given $r$ if 
conditions 1-3 above were met, and then find a value of $r$ that makes 
$f(r)=t$. More precisely, $f(r)$ is defined as:
+$$f(r) = \sum_i{\min(d_i, \max(r w_i, m_i)).}$$
+
+Note that $f(r)$ is increasing in $r$ because every term of the sum is 
increasing, so the equation $f(r) = t$ can be solved by binary search. We 
choose 0 as a lower bound of our binary search because with $r=0$, only min 
shares are assigned. (An earlier check in {\tt computeFairShares} checks 
whether the min shares add up to more than the total number of slots, and if 
so, computes fair shares by scaling down the min shares proportionally and 
returns.) To compute an upper bound for the binary search, we try 
$r=1,2,4,8,\dots$ until we find a value large enough that either more than $t$ 
slots are used or all pools' demands are met (in case the demands added up to 
less than $t$).
+
+The steps of the algorithm are explained in detail in {\tt 
SchedulingAlgorithms.java}.
+
+This algorithm runs in time $O(NP)$, where $N$ is the number of jobs/pools and 
$P$ is the desired number of bits of precision in the computed values (number 
of iterations of binary search), which we've set to 25. It thus scales linearly 
in the number of jobs and pools.
+
+\subsection{Running Job Limits}
+
+Running job limits are implemented by marking jobs as not runnable if there 
are too many jobs submitted by the same user or pool. This is done in {\tt 
FairScheduler.updateRunnability}. A job that is not runnable declares its 
demand as 0 and always returns {\tt null} from {\tt assignTasks}.
+
+\subsection{Delay Scheduling}
+\label{sec:delay-scheduling}
+
+In Hadoop, running map tasks on the nodes or racks that contain their input 
data is critical for performance, because it avoids shipping the data over the 
network. However, always assigning slots to the first job in order of pool 
shares and in-pool ordering (the ``head-of-line job") can sometimes lead to 
poor locality:
+\begin{itemize}
+  \item If the head-of-line job is small, the chance of it having data on the 
node that a heartbeat was received from is small. Therefore, locality would be 
poor in a small-job workload if we always assigned slots to the head-of-line 
job.
+  \item When fair sharing is used, there is a strong bias for a job to be 
reassigned into a slot that it just finished a task in, because when it 
finishes the task, the job falls below its fair share. This can mean that jobs 
have a difficult time running in slots that other jobs have taken and thus 
achieve poor locality.
+\end{itemize}
+
+To deal with both of these situations, the Fair Scheduler can sacrifice 
fairness temporarily to improve locality through an algorithm called delay 
scheduling. If the head-of-line job cannot launch a local task on the 
TaskTracker that sent a heartbeat, then it is skipped, and other running jobs 
are looked at in order of pool shares and in-pool scheduling rules to find a 
job with a local task. However, if the head-of-line job has been skipped for a 
sufficiently long time, it is allowed to launch rack-local tasks. Then, if it 
is skipped for a longer time, it is also allowed to launch off-rack tasks. 
These skip times are called locality delays. Delays of a few seconds are 
sufficient to drastically increase locality.
+
+The Fair Scheduler allows locality delays to be set through {\tt 
mapred-site.xml} or to be turned off by setting them to zero. However, by 
default, it computes the delay automatically based on the heartbeat interval of 
the cluster. The delay is set to 1.5x the heartbeat interval.
+
+When a job that has been allowed to launch non-local tasks ends up launching a 
local task again, its ``locality level" resets and it must wait again before 
launching non-local tasks. This is done so that a job that gets ``unlucky" 
early in its lifetime does not continue to launch non-local tasks throughout 
its life.
+
+Delay scheduling is implemented by keeping track of two variables on each job: 
the locality level of the last map it launched (0 for node-local, 1 for 
rack-local and 2 for off-rack) and the time it has spent being skipped for a 
task. These are kept in a {\tt JobInfo} structure associated with each job in 
{\tt FairScheduler.java}. Whenever a job is asked for tasks, it checks the 
locality level it is allowed to launch them at through {\tt 
FairScheduler.getAllowedLocalityLevel}. If it does not launch a task, it is 
marked as ``visited" on that heartbeat by appending itself to a {\tt visited} 
job list that is passed around between calls to {\tt assignTasks} on the same 
heartbeat. Jobs that are visited on a heartbeat but do not launch any tasks 
during it are considered as skipped for the time interval between this 
heartbeat and the next. Code at the beginning of {\tt 
FairScheduler.assignTasks} increments the wait time of each skipped job by the 
time elapsed since the last heartbea
 t. Once a job has been skipped for more than the locality delay, {\tt 
getAllowedLocalityLevel} starts returning higher locality so that it is allowed 
to launch less-local tasks. Whenever the job launches a task, its wait time is 
reset, but we remember the locality level of the launched task so that the job 
is allowed to launch more tasks at this level without further waiting.
+
+\subsection{Locking Order}
+
+Fair Scheduler data structures can be touched by several threads. Most 
commonly, the JobTracker invokes {\tt assignTasks}. This happens inside a block 
of code where the JobTracker has locked itself already. Therefore, to prevent 
deadlocks, we always ensure that \emph{if both the FairScheduler and the 
JobTracker must be locked, the JobTracker is locked first}. Other threads that 
can lock the FairScheduler include the update thread and the web UI.
+
+\subsection{Unit Tests}
+
+The Fair Scheduler contains extensive unit tests using mock {\tt 
TaskTrackerManager}, {\tt JobInProgress}, {\tt TaskInProgress}, and {\tt 
Schedulable} objects. Scheduler tests are in {\tt TestFairScheduler.java}. The 
{\tt computeFairShares} algorithm is tested separately in {\tt 
TestComputeFairShares.java}. All tests use accelerated time via a fake {\tt 
Clock} class.
+
+\pagebreak
+\section{Code Guide}
+
+The following table lists some key source files in the Fair Scheduler:
+
+\begin{center}
+\begin{tabular}{|l|p{0.7\columnwidth}|}
+  \hline
+  {\bf File} & {\bf Contents} 
+  \\ \hline
+  {\tt FairScheduler.java} & Scheduler entry point. Also contains update 
thread, and logic for preemption, delay scheduling, and running job limits.
+  \\ \hline
+  {\tt Schedulable.java} & Definition of the {\tt Schedulable} class. Extended 
by {\tt JobSchedulable} and {\tt PoolSchedulable}.
+  \\ \hline
+  {\tt SchedulingAlgorithms.java} & Contains FIFO and fair sharing 
comparators, as well as the {\tt computeFairShares} algorithm in Section 
\ref{sec:fair-share-computation}.
+  \\ \hline
+  {\tt PoolManager.java} & Reads pool properties from the allocation file and 
maintains a collection of {\tt Pool} objects. Pools are created on demand.
+  \\ \hline
+  {\tt Pool.java} & Represents a pool and stores its map and reduce {\tt 
Schedulables}.
+  \\ \hline
+  {\tt FairSchedulerServlet.java} & Implements the scheduler's web UI.
+  \\ \hline
+  {\tt FairSchedulerEventLog.java} & An easy-to-parse event log for debugging. 
Must be enabled through {\tt mapred.fairscheduler.eventlog.enabled}.
+  If enabled, logs are placed in {\tt \$HADOOP\_LOG\_DIR/fairscheduler}.
+  \\ \hline
+  {\tt TaskSelector.java} & A pluggable class responsible for picking tasks 
within a job. Currently, {\tt DefaultTaskSelector} delegates to {\tt 
JobInProgress}, but this would be a useful place to experiment with new 
algorithms for speculative execution and locality.
+  \\ \hline
+  {\tt LoadManager.java} & A pluggable class responsible for determining when 
to launch more tasks on a TaskTracker. Currently, {\tt CapBasedLoadManager} 
uses slot counts, but this would be a useful place to experiment with 
scheduling based on machine load.
+  \\ \hline
+  {\tt WeightAdjuster.java} & A pluggable class responsible for setting job 
weights. An example, {\tt NewJobWeightBooster}, is provided, which increases 
weight temporarily for new jobs.
+  \\ \hline
+\end{tabular}
+\end{center}
+
+\end{document}

Modified: 
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/ivy.xml
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/ivy.xml?rev=1169585&r1=1169584&r2=1169585&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/ivy.xml 
(original)
+++ 
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/ivy.xml 
Sun Sep 11 23:57:37 2011
@@ -26,6 +26,14 @@
       name="commons-logging"
       rev="${commons-logging.version}"
       conf="common->default"/>
+    <dependency org="commons-collections"
+      name="commons-collections"
+      rev="${commons-collections.version}"
+      conf="common->default"/>
+    <dependency org="commons-cli"
+      name="commons-cli"
+      rev="${commons-cli.version}"
+      conf="common->default"/>
     <dependency org="log4j"
       name="log4j"
       rev="${log4j.version}"

Modified: 
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java?rev=1169585&r1=1169584&r2=1169585&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java
 (original)
+++ 
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java
 Sun Sep 11 23:57:37 2011
@@ -18,12 +18,23 @@
 
 package org.apache.hadoop.mapred;
 
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.conf.Configuration;
+
 /**
  * A {@link LoadManager} for use by the {@link FairScheduler} that allocates
  * tasks evenly across nodes up to their per-node maximum, using the default
  * load management algorithm in Hadoop.
  */
 public class CapBasedLoadManager extends LoadManager {
+  
+  float maxDiff = 0.0f;
+  
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+    maxDiff = conf.getFloat("mapred.fairscheduler.load.max.diff", 0.0f);
+  }
+  
   /**
    * Determine how many tasks of a given type we want to run on a TaskTracker. 
    * This cap is chosen based on how many tasks of that type are outstanding in
@@ -32,7 +43,7 @@ public class CapBasedLoadManager extends
    * machines sent out heartbeats earliest.
    */
   int getCap(int totalRunnableTasks, int localMaxTasks, int totalSlots) {
-    double load = ((double)totalRunnableTasks) / totalSlots;
+    double load = maxDiff + ((double)totalRunnableTasks) / totalSlots;
     return (int) Math.ceil(localMaxTasks * Math.min(1.0, load));
   }
 
@@ -49,4 +60,10 @@ public class CapBasedLoadManager extends
     return tracker.countReduceTasks() < getCap(totalRunnableReduces,
         tracker.getMaxReduceSlots(), totalReduceSlots);
   }
+
+  @Override
+  public boolean canLaunchTask(TaskTrackerStatus tracker,
+      JobInProgress job,  TaskType type) {
+    return true;
+  }
 }

Modified: 
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/DefaultTaskSelector.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/DefaultTaskSelector.java?rev=1169585&r1=1169584&r2=1169585&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/DefaultTaskSelector.java
 (original)
+++ 
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/DefaultTaskSelector.java
 Sun Sep 11 23:57:37 2011
@@ -56,12 +56,21 @@ public class DefaultTaskSelector extends
   }
 
   @Override
-  public Task obtainNewMapTask(TaskTrackerStatus taskTracker, JobInProgress 
job)
-      throws IOException {
+  public Task obtainNewMapTask(TaskTrackerStatus taskTracker, JobInProgress 
job,
+      int localityLevel) throws IOException {
     ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
     int numTaskTrackers = clusterStatus.getTaskTrackers();
-    return job.obtainNewMapTask(taskTracker, numTaskTrackers,
-        taskTrackerManager.getNumberOfUniqueHosts());
+    switch (localityLevel) {
+      case 1:
+        return job.obtainNewNodeLocalMapTask(taskTracker, numTaskTrackers,
+          taskTrackerManager.getNumberOfUniqueHosts());
+      case 2:
+        return job.obtainNewNodeOrRackLocalMapTask(taskTracker, 
numTaskTrackers,
+          taskTrackerManager.getNumberOfUniqueHosts());
+      default:
+        return job.obtainNewMapTask(taskTracker, numTaskTrackers,
+          taskTrackerManager.getNumberOfUniqueHosts());
+    }
   }
 
   @Override


Reply via email to