[ https://issues.apache.org/jira/browse/FLINK-2883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14966537#comment-14966537 ]
Till Rohrmann commented on FLINK-2883: -------------------------------------- Yes, documenting it sounds like a reasonable solution. This is probably also more of a corner case. > Combinable reduce produces wrong result > --------------------------------------- > > Key: FLINK-2883 > URL: https://issues.apache.org/jira/browse/FLINK-2883 > Project: Flink > Issue Type: Bug > Affects Versions: 0.10 > Reporter: Till Rohrmann > > If one uses a combinable reduce operation which also changes the key value of > the underlying data element, then the results of the reduce operation can > become wrong. The reason is that after the combine phase, another reduce > operator is executed which will then reduce the elements based on the new key > values. This might be not so surprising if one explicitly defined ones > {{GroupReduceOperation}} as combinable. However, the {{ReduceFunction}} > conceals the fact that a combiner is used implicitly. Furthermore, the API > does not prevent the user from changing the key fields which could solve the > problem. > The following example program illustrates the problem > {code} > val env = ExecutionEnvironment.getExecutionEnvironment > env.setParallelism(1) > val input = env.fromElements((1,2), (1,3), (2,3), (3,3), (3,4)) > val result = input.groupBy(0).reduce{ > (left, right) => > (left._1 + right._1, left._2 + right._2) > } > result.output(new PrintingOutputFormat[Int]()) > env.execute() > {code} > The expected output is > {code} > (2, 5) > (2, 3) > (6, 7) > {code} > However, the actual output is > {code} > (4, 8) > (6, 7) > {code} > I think that the underlying problem is that associativity and commutativity > is not sufficient for a combinable reduce operation. Additionally we also > need to make sure that the key stays the same. -- This message was sent by Atlassian JIRA (v6.3.4#6332)