[
https://issues.apache.org/jira/browse/FLINK-2883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14966510#comment-14966510
]
Aljoscha Krettek commented on FLINK-2883:
-----------------------------------------
I don't think this can be solved in our API because we don't have a separation
between what is key and what is not key in the user data-type. (Spark, for
example can do it because they have a clear key/value model). The users just
have to follow the rules if they want correct results.
> Combinable reduce produces wrong result
> ---------------------------------------
>
> Key: FLINK-2883
> URL: https://issues.apache.org/jira/browse/FLINK-2883
> Project: Flink
> Issue Type: Bug
> Affects Versions: 0.10
> Reporter: Till Rohrmann
>
> If one uses a combinable reduce operation which also changes the key value of
> the underlying data element, then the results of the reduce operation can
> become wrong. The reason is that after the combine phase, another reduce
> operator is executed which will then reduce the elements based on the new key
> values. This might be not so surprising if one explicitly defined ones
> {{GroupReduceOperation}} as combinable. However, the {{ReduceFunction}}
> conceals the fact that a combiner is used implicitly. Furthermore, the API
> does not prevent the user from changing the key fields which could solve the
> problem.
> The following example program illustrates the problem
> {code}
> val env = ExecutionEnvironment.getExecutionEnvironment
> env.setParallelism(1)
> val input = env.fromElements((1,2), (1,3), (2,3), (3,3), (3,4))
> val result = input.groupBy(0).reduce{
> (left, right) =>
> (left._1 + right._1, left._2 + right._2)
> }
> result.output(new PrintingOutputFormat[Int]())
> env.execute()
> {code}
> The expected output is
> {code}
> (2, 5)
> (2, 3)
> (6, 7)
> {code}
> However, the actual output is
> {code}
> (4, 8)
> (6, 7)
> {code}
> I think that the underlying problem is that associativity and commutativity
> is not sufficient for a combinable reduce operation. Additionally we also
> need to make sure that the key stays the same.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)