[
https://issues.apache.org/jira/browse/FLINK-2394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639545#comment-14639545
]
Fabian Hueske commented on FLINK-2394:
--------------------------------------
Hadoop handles {{OutputCommitters}} differently for its both APIs ({{mapred}}
and {{mapreduce}}).
- For the newer {{mapreduce}} API, things are quite simple.
{{org.apache.hadoop.mapreduce.OutputFormat}} has an abstract method
{{getOutputCommitter(TaskAttemptContext context)}} which provides the output
committer that must be used. So updating the Flink
{{mapreduce.HadoopOutputFormatBase}} class such that it gets the
{{OutputCommitter}} directly from the {{OutputFormat}} instead of creating a
{{FileOutputCommitter}} should be easy.
- For the older {{mapred}} API, the OutputFormat interface does not define a
{{getOutputCommitter()}} method. Instead the {{JobConf}} has a
{{getOutputCommitter()}} method which returns a {{FileOutputCommitter}} if
nothing else is defined. So we could update the
{{mapred.HadoopOutputFormatBase}} to get the {{OutputCommitter}} from the
{{JobConf}}. However, this would require that users manually set a different
{{OutputCommitter}} in the {{JobConf}} which is not really intuitive. We could
offer an additional constructor which sets a {{OutputCommitter}} in the
provided {{JobConf}} argument, like [~stefano.bortoli] suggested.
> HadoopOutFormat OutputCommitter is default to FileOutputCommiter
> ----------------------------------------------------------------
>
> Key: FLINK-2394
> URL: https://issues.apache.org/jira/browse/FLINK-2394
> Project: Flink
> Issue Type: Bug
> Components: Hadoop Compatibility
> Affects Versions: 0.9.0
> Reporter: Stefano Bortoli
>
> MongoOutputFormat does not write back in collection because the
> HadoopOutputFormat wrapper does not allow to set the MongoOutputCommiter and
> is set as default to FileOutputCommitter. Therefore, on close and
> globalFinalize execution the commit does not happen and mongo collection
> stays untouched.
> A simple solution would be to:
> 1 - create a constructor of HadoopOutputFormatBase and HadoopOutputFormat
> that gets the OutputCommitter as a parameter
> 2 - change the outputCommitter field of HadoopOutputFormatBase to be a
> generic OutputCommitter
> 3 - remove the default assignment in the open() and finalizeGlobal to the
> outputCommitter to FileOutputCommitter(), or keep it as a default in case of
> no specific assignment.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)