[
https://issues.apache.org/jira/browse/FLINK-2883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Greg Hogan updated FLINK-2883:
------------------------------
Fix Version/s: 1.2.1
1.3.0
> Add documentation to forbid key-modifying ReduceFunction
> --------------------------------------------------------
>
> Key: FLINK-2883
> URL: https://issues.apache.org/jira/browse/FLINK-2883
> Project: Flink
> Issue Type: Task
> Components: DataStream API, Documentation
> Affects Versions: 0.10.0
> Reporter: Till Rohrmann
> Assignee: Greg Hogan
> Fix For: 1.3.0, 1.2.1
>
>
> If one uses a combinable reduce operation which also changes the key value of
> the underlying data element, then the results of the reduce operation can
> become wrong. The reason is that after the combine phase, another reduce
> operator is executed which will then reduce the elements based on the new key
> values. This might be not so surprising if one explicitly defined ones
> {{GroupReduceOperation}} as combinable. However, the {{ReduceFunction}}
> conceals the fact that a combiner is used implicitly. Furthermore, the API
> does not prevent the user from changing the key fields which could solve the
> problem.
> The following example program illustrates the problem
> {code}
> val env = ExecutionEnvironment.getExecutionEnvironment
> env.setParallelism(1)
> val input = env.fromElements((1,2), (1,3), (2,3), (3,3), (3,4))
> val result = input.groupBy(0).reduce{
> (left, right) =>
> (left._1 + right._1, left._2 + right._2)
> }
> result.output(new PrintingOutputFormat[Int]())
> env.execute()
> {code}
> The expected output is
> {code}
> (2, 5)
> (2, 3)
> (6, 7)
> {code}
> However, the actual output is
> {code}
> (4, 8)
> (6, 7)
> {code}
> I think that the underlying problem is that associativity and commutativity
> is not sufficient for a combinable reduce operation. Additionally we also
> need to make sure that the key stays the same.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)