[ https://issues.apache.org/jira/browse/FLINK-2394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14639545#comment-14639545 ]
Fabian Hueske commented on FLINK-2394: -------------------------------------- Hadoop handles {{OutputCommitters}} differently for its both APIs ({{mapred}} and {{mapreduce}}). - For the newer {{mapreduce}} API, things are quite simple. {{org.apache.hadoop.mapreduce.OutputFormat}} has an abstract method {{getOutputCommitter(TaskAttemptContext context)}} which provides the output committer that must be used. So updating the Flink {{mapreduce.HadoopOutputFormatBase}} class such that it gets the {{OutputCommitter}} directly from the {{OutputFormat}} instead of creating a {{FileOutputCommitter}} should be easy. - For the older {{mapred}} API, the OutputFormat interface does not define a {{getOutputCommitter()}} method. Instead the {{JobConf}} has a {{getOutputCommitter()}} method which returns a {{FileOutputCommitter}} if nothing else is defined. So we could update the {{mapred.HadoopOutputFormatBase}} to get the {{OutputCommitter}} from the {{JobConf}}. However, this would require that users manually set a different {{OutputCommitter}} in the {{JobConf}} which is not really intuitive. We could offer an additional constructor which sets a {{OutputCommitter}} in the provided {{JobConf}} argument, like [~stefano.bortoli] suggested. > HadoopOutFormat OutputCommitter is default to FileOutputCommiter > ---------------------------------------------------------------- > > Key: FLINK-2394 > URL: https://issues.apache.org/jira/browse/FLINK-2394 > Project: Flink > Issue Type: Bug > Components: Hadoop Compatibility > Affects Versions: 0.9.0 > Reporter: Stefano Bortoli > > MongoOutputFormat does not write back in collection because the > HadoopOutputFormat wrapper does not allow to set the MongoOutputCommiter and > is set as default to FileOutputCommitter. Therefore, on close and > globalFinalize execution the commit does not happen and mongo collection > stays untouched. > A simple solution would be to: > 1 - create a constructor of HadoopOutputFormatBase and HadoopOutputFormat > that gets the OutputCommitter as a parameter > 2 - change the outputCommitter field of HadoopOutputFormatBase to be a > generic OutputCommitter > 3 - remove the default assignment in the open() and finalizeGlobal to the > outputCommitter to FileOutputCommitter(), or keep it as a default in case of > no specific assignment. -- This message was sent by Atlassian JIRA (v6.3.4#6332)