Proposal for redesign/refactoring of the JobTracker and TaskTracker
-------------------------------------------------------------------
Key: HADOOP-869
URL: https://issues.apache.org/jira/browse/HADOOP-869
Project: Hadoop
Issue Type: Improvement
Components: mapred
Reporter: Arun C Murthy
Assigned To: Arun C Murthy
During discussions on HADOOP-815 wrt some hard-to-maintain code on the
JobTracker we all agreed that the current state-of-affairs there is brittle and
merits some rework.
Case in point: there are back-calls from TaskInProgress to JobTracker and from
JobInProgress to JobTracker which mean that synchronization is quite involved
and brittle, leading to issues like HADOOP-600. Also one is forced to lock
several data-structures individually before certain operations (taskTrackers,
trackerExpiryQueue, jobs etc.)
Hence I'd like to present some early thoughts (which have undergone a quick
iteration) on how we could do slightly better by a bit of redesign/refactoring,
also during discussions with Owen on the same we agreed that HADOOP-554 is an
integral part along the same direction... and I also feel that a good candidate
to be done along with this is HADOOP-398 (mapred package refactoring).
Context:
---------
a) The unit of communication between the JobTracker & TaskTracker is a 'task'.
b) Due to (a) the JobTracker maintains a bunch of information related on the
'taskid' i.e. taskidToTipMap, taskidToTrackerMap etc. and hence we need to
update the JobTracker's data-structures via back-calls from TaskInProgress &
JobInProgress where the context is available (complete/failed task,
already-completed task etc.)
c) This implies that we have a fairly elaborate and hard to maintain locking
structures and also some redundant information in the JobTracker; making it
harder to maintain.
Overall at both the JobTracker & TaskTracker the concept of a 'job' is
overshadowed by the 'task'; which I propose we fix.
Proposal:
----------
Here is the main flow of control:
JobTracker -> JobInProgress -> TaskInProgress -> task_attempt
The main idea is to break the existing nexus between the JobTracker &
TaskInProgress/taskid by (I've put code for illustrative purposes only, and
ignored pieces irrelevant to this discussion):
a) Making the 'job' the primary unit of communication between JobTracker &
TaskTracker.
b) TaskTrackerStatus now looks like this:
class TaskTrackerStatus {
List<JobStatus> jobStatuses; // the status of the 'jobs' running on a
TaskTracker
String getTrackerName();
}
class JobStatus {
List<TaskStatus> taskStatuses; // the status of the 'tasks' belonging to a
job
JobId getJobId();
}
c) The JobTracker maintains only a single map of jobid -> JobInProgress, and
mapping from taskTracker -> List<JobInProgress>
Map<JobId, JobInProgress> allJobs;
Map<String, List<JobInProgress>> trackerToJobsMap;
d) The JobTracker delegates a bunch of responsibilities to the JobInProgress to
reflect the fact the primary 'concept' in map/reduce is the 'job', thus
empowering the JobInProgress class:
class JobInProgress {
TaskInProgress[] mapTasks;
TaskInProgress[] reduceTasks;
Map<String, List<TaskInProgress>> trackerToTasksMap; // tracker -> tasks
running
Map<String, List<TaskAttempt>> trackerToMarkedTasksMap; // tracker ->
completed (success/failed/killed) task-attempt,
// but the tracker doesn't know it yet
void updateStatus(JobStatus jobStatus);
MapOutputLocation[] getMapOutputLocations(int[] mapTasksNeeded, int reduce);
TaskAttempt getTaskToRun(String taskTracker);
List<TaskTrackerAction> getTaskToKill(String taskTracker);
}
d) On receipt of TaskTrackerStatus from a tracker, the processeing of heartbeat
looks like this:
for (JobStatus jobStatus : taskTrackerStatus.getJobStatuses()) {
JobInProgress job = allJobs.get(jobId);
synchronized (job) {
job.updateStatus(jobStatus);
return (HeartbeatResponse(repsonseId,
job.getTaskAttemptToRun(trackerName),
job.getTaskToKill(trackerName)
));
}
}
The big change is that the JobTracker delegates a lot of responsibility to the
JobInProgress, we get away from all the complicated synchronization constructs:
simply lock the JobInProgress object at all places via allJobs/trackerToJobsMap
and we are done. This also enhances throughput since mostly we will not need to
lock up the JobTracker (even in the heartbeat loop); locking the JobInProgress
or the 2 maps is sufficient in most cases... thus enhance the inherent
parallelism of the JobTracker's inner loop (processing heartbeat) and provide
better response when multiple jobs are running on the cluster.
Hence the JobInProgress is responsible for maintaining it's TaskInProgress'es
which in turn are completely responsible for the TaskAttempt`s, the
JobInProgress also provides sufficient information as and when needed to the
JobTracker to schedule jobs/tasks and the JobTracker is blissfully unaware of
the innards of jobs/tasks.
-*-*-
I hope to articulate more a general direction towards an improved and
maintainable 'mapred' and would love to hear out how we can improve and
pitfalls to avoid... lets discuss. We could take this piecemeal an implement or
at one go...
Last, not least; I propose that while we are at this we redo the nomenclature a
bit:
JobInProgress -> Job
TaskInProgress -> Task
taskid -> replace with a new TaskAttempt
this should help clarify each class and it's roles.
Of course we will probably need a separate org.apache.hadoop.mapred.job.Task
v/s org.apache.hadoop.mapred.task.Task which is why I feel HADOOP-554
(refactoring of mapred packages) would be very important to get a complete,
coherent solution.
Thoughts?
--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators:
https://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira