[
https://issues.apache.org/jira/browse/HADOOP-3150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Amareshwari Sriramadasu updated HADOOP-3150:
--------------------------------------------
Attachment: patch-3150.txt
Here is a patch for review after reworking on Devaraj's earlier patch.
The new apis look the following:
{code}
public class JobContext {
public JobConf getJobConf();
}
public class TaskAttemptContext {
public TaskAttemptID getTaskAttemptID();
public JobConf getJobConf();
public Progressable getProgressable();
}
public abstract class OutputFormat {
public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext context
) throws IOException;
public abstract void checkOutputSpecs(JobContext context) throws IOException;
public abstract setupJob(JobContext context) throws IOException;
public abstract cleanupJob(JobContext context) throws IOException;
public abstract setupTask(TaskAttemptContext context) throws IOException;
public abstract cleanupTask(TaskAttemptContext context, boolean promote)
throws IOException;
}
{code}
I did not move getProgressable() to JobContext, because {setup/cleaup}Job() are
not done during the job. So, they need not report progress.
Output Commit is done as follows:
1) The task framework code knows when it is done with the task. Then it sets
the state of itself to *COMMIT_PENDING* and sends that status out to the
tasktracker. This is done thru an RPC _commitPending()_.
The RPC also wakesup tasktracker to send heartbeat immediately. Thus
tasktracker forwards this status to the JobTracker immediately.
Note: If the job's output format is FileOutputFormat and there are no files to
be commited in the output directory, the task is declared SUCCESSFUL without
any commit.
2) The Job tracker sends a COMMITTASKACTION with commitResponse as *DO_COMMIT*
for that task if this is the first such attempt trying to commit. It sends a
COMMITTASKACTION with commitResponse as *DISCARD* for all other task attempts,
if they report COMMIT_PENDING. And if two status messages with COMMIT_PENDING
come at the same time from two running attempts, the first one wins.
3) The tasktracker gets the COMMITTASKACTION and notes it.
4) The task will continuously poll the tasktracker until it gets the response
whether to commit or not. Tasktracker will respond to the task as *DO_WAIT*
until it gets a response from JobTracker. This is done thru an RPC
_conCommit()_.
5) Task will also report status as COMMIT_PENDING until it completes the commit
and also while polling.
6) If the commit fails, the task attempt fails. The JobTracker then reexecutes
that.
7) When a task gets the commitResponse as DISCARD, the task will discard the
output and kill itself.
The other issue is to support side files. Supporting of side files in involves
three issues:
1) The framework should set the configuration property "mapred.work.output.dir"
before the task execution starts.
2) Applications should know the side files directory.
3) Framework should commit the side files.
The following describes how the side files are supported with the patch.
Case 1: Job's OutputFormat is a FileOutputFormat : Side files are handled by
the output format.
1. FileOutputFormat.setupTask sets "mapred.work.output.dir".
2. User will access side file directory thru the api
FileOutputFormat.getWorkOutputPath() (as is today)
3. Commit of side files will be included in OutputFormat's commit, since they
share the same output directory.
Case 2: Job's OutputFormat is not a FileOutputFormat
1. User sets output path using FileOutFormat.setOutputPath(outputdir). The api
sets mapred.output.dir. (as is today)
2. In Task localization, task will set (i.e. task tracker)
mapred.work.output.dir value as ${ mapred.output.dir}/_temporary/_taskid . This
is done thru FileOutputFormat.setWorkOutputPath. (as is today)
3. User will create side files in mapred.work.output.dir obtained thru
FileOutputFormat.getWorkOutputPath(). (as is today)
4. When the task is complete, task will check if there are any side files to be
committed using FileOutputFormat.isTaskCommitRequired(). If so, commit the
files using FileOutputFormat.commitTaskOutput(). Task will also call
jobOutptuFormat.commit()
5. At the end of the job, JIP.garbeCollect() will do the cleanup of _temporary
directory. It will call jobOutptuFormat.cleanupJob() and
FileOutputFormat.cleanupTempJobOutput().
A lot of changes are to do with the api changes. The core changes are in
OutputFormat.java, FileOutputFormat.java, Task.java, TaskTracker.java,
JobInProgress.java, TaskInProgress.java and LocalJobRunner.java.
Note: Since the commit of a task is done by OutputFormat, we cannot support old
OutputFormat interface. I don't see a way to move the commit to the output
format and still support the old interface.
Thoughts?
> Move task file promotion into the task
> --------------------------------------
>
> Key: HADOOP-3150
> URL: https://issues.apache.org/jira/browse/HADOOP-3150
> Project: Hadoop Core
> Issue Type: Improvement
> Components: mapred
> Reporter: Owen O'Malley
> Assignee: Amareshwari Sriramadasu
> Fix For: 0.19.0
>
> Attachments: 3150.patch, patch-3150.txt
>
>
> We need to move the task file promotion from the JobTracker to the Task and
> move it down into the output format.
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.