[ 
https://issues.apache.org/jira/browse/FLINK-2394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14712936#comment-14712936
 ] 

ASF GitHub Bot commented on FLINK-2394:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1056#discussion_r37969330
  
    --- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala
 ---
    @@ -18,11 +18,17 @@
     package org.apache.flink.api.scala.hadoop.mapred
     
     import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase
    -import org.apache.hadoop.mapred.{JobConf, OutputFormat}
    +import org.apache.hadoop.mapred.{OutputCommitter, JobConf, OutputFormat}
     
     class HadoopOutputFormat[K, V](mapredOutputFormat: OutputFormat[K, V], 
job: JobConf)
       extends HadoopOutputFormatBase[K, V, (K, V)](mapredOutputFormat, job) {
     
    +  def this(mapredOutputFormat: OutputFormat[K, V], outputCommitterClass: 
Class[OutputCommitter],
    +           job: JobConf) {
    --- End diff --
    
    Fixed. 
    I'd propose to add this to the Scala checkstyle, if we want to enforce it.


> HadoopOutFormat OutputCommitter is default to FileOutputCommiter
> ----------------------------------------------------------------
>
>                 Key: FLINK-2394
>                 URL: https://issues.apache.org/jira/browse/FLINK-2394
>             Project: Flink
>          Issue Type: Bug
>          Components: Hadoop Compatibility
>    Affects Versions: 0.9.0
>            Reporter: Stefano Bortoli
>            Assignee: Fabian Hueske
>             Fix For: 0.10, 0.9.1
>
>
> MongoOutputFormat does not write back in collection because the 
> HadoopOutputFormat wrapper does not allow to set the MongoOutputCommiter and 
> is set as default to FileOutputCommitter. Therefore, on close and 
> globalFinalize execution the commit does not happen and mongo collection 
> stays untouched. 
> A simple solution would be to:
> 1 - create a constructor of HadoopOutputFormatBase and HadoopOutputFormat 
> that gets the OutputCommitter as a parameter
> 2 - change the outputCommitter field of HadoopOutputFormatBase to be a 
> generic OutputCommitter
> 3 - remove the default assignment in the open() and finalizeGlobal to the 
> outputCommitter to FileOutputCommitter(), or keep it as a default in case of 
> no specific assignment.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to