[ 
https://issues.apache.org/jira/browse/HADOOP-1558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12511920
 ] 

Alejandro Abdelnur commented on HADOOP-1558:
--------------------------------------------

Doug, I understand the angle you are coming from. 

I've spent some time looking at making this a Task but decided against it. 
Mostly because refactoring Task to other things than running Map/Reduce -and to 
allow them to run in the JT box- brings much higher risk into the code.

Because of that I've taken a compromise path implemented in the patch.

The decoupling the initialize/commit from the OutputFormat into the 
OutputHandler approach relies on a couple of assumptions:

* It is far more common than jobs will use custom OutputFormats than custom 
persistent stores. In other words, as a MapReduce developer I may come up with 
custom OutputFormats on job basis but hardly introduce a new persistent store 
(DFS, HBase, S3) on job basis.

* Leaving to the MapReduce developer implementing OutputFormat the 
initialize/commit logic has a high risk in shared cluster environments as the 
decision on where temporary output directories are created could clash with out 
OutputFormat implementations from other jobs. IMO it seems a good thing for 
Hadoop code to keep control on this.

Regarding extensibility:

* The OutputHandler is an interface and custom implementations can be added to 
the Hadoop cluster classpath to be available for use by MapReduce jobs. Even 
for existing OutputFormats as the default OutputHandler can be overridden in 
the JobConf. As I think this a much less frequent situation I see this approach 
acceptable.

Regarding stores that are not file based and the 'Path getUncommitedPath(Job)' 
method. I see 2 options:

* This method could be ignored by non-file-based OutputHandlers, they would 
just care about the initialize and commit methods.

* Change this method to 'String getUncommittedName(Job)'. In the case of of 
file-based OutputHandler this would be interpreted as the Path to use by the 
OutputFormats. In the case of non-file-based this would be interpreted 
according to the store implementation, for example in the case of HBase it 
could be the value for a 'uncommitted' column, thus records of a non-completed 
jobs could be easily tracked and cleaned up, the initialize() would remove all 
records with this name (from a failed prior run), the commit() would set this 
column to null for all records of the job.

Thoughts?


> changes to OutputFormat to work on temporary directory to enable re-running 
> crashed jobs (Issue: 1121)
> ------------------------------------------------------------------------------------------------------
>
>                 Key: HADOOP-1558
>                 URL: https://issues.apache.org/jira/browse/HADOOP-1558
>             Project: Hadoop
>          Issue Type: Improvement
>          Components: mapred
>         Environment: all
>            Reporter: Alejandro Abdelnur
>             Fix For: 0.14.0
>
>         Attachments: hadoop-1558-JUN1007-1934.txt, 
> hadoop-1558-JUN1107-1533.txt
>
>
> Add  OutputFormat methods like:
> /** Called to initialize output for this job. */
> void initialize(JobConf job) throws IOException;
> /** Called to finalize output for this job. */
> void commit(JobConf job) throws IOException;
> In the base implemenation for FileSystem output, initialize() might then 
> create a temporary directory for the job, removing any that already exists, 
> and commit could rename the temporary output directory to the final name. 
> The existing checkOutputSpecs() would continue to throw an exception if the 
> final output already exists.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to