[ 
https://issues.apache.org/jira/browse/FLINK-2883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14966537#comment-14966537
 ] 

Till Rohrmann commented on FLINK-2883:
--------------------------------------

Yes, documenting it sounds like a reasonable solution. This is probably also 
more of a corner case. 

> Combinable reduce produces wrong result
> ---------------------------------------
>
>                 Key: FLINK-2883
>                 URL: https://issues.apache.org/jira/browse/FLINK-2883
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 0.10
>            Reporter: Till Rohrmann
>
> If one uses a combinable reduce operation which also changes the key value of 
> the underlying data element, then the results of the reduce operation can 
> become wrong. The reason is that after the combine phase, another reduce 
> operator is executed which will then reduce the elements based on the new key 
> values. This might be not so surprising if one explicitly defined ones 
> {{GroupReduceOperation}} as combinable. However, the {{ReduceFunction}} 
> conceals the fact that a combiner is used implicitly. Furthermore, the API 
> does not prevent the user from changing the key fields which could solve the 
> problem.
> The following example program illustrates the problem
> {code}
> val env = ExecutionEnvironment.getExecutionEnvironment
> env.setParallelism(1)
> val input = env.fromElements((1,2), (1,3), (2,3), (3,3), (3,4))
> val result = input.groupBy(0).reduce{
>   (left, right) =>
>     (left._1 + right._1, left._2 + right._2)
> }
> result.output(new PrintingOutputFormat[Int]())
> env.execute()
> {code}
> The expected output is 
> {code}
> (2, 5)
> (2, 3)
> (6, 7)
> {code}
> However, the actual output is
> {code}
> (4, 8)
> (6, 7)
> {code}
> I think that the underlying problem is that associativity and commutativity 
> is not sufficient for a combinable reduce operation. Additionally we also 
> need to make sure that the key stays the same.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to