[ 
https://issues.apache.org/jira/browse/FLINK-2883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15850521#comment-15850521
 ] 

ASF GitHub Bot commented on FLINK-2883:
---------------------------------------

GitHub user greghogan opened a pull request:

    https://github.com/apache/flink/pull/3256

    [FLINK-2883] [docs] Add documentation to forbid key-modifying ReduceFunction

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/greghogan/flink 
2883_add_documentation_to_forbid_keymodifying_reducefunction

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3256.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3256
    
----
commit 2a858819e2163a4c935521670d766ebaaba5b99d
Author: Greg Hogan <c...@greghogan.com>
Date:   2017-02-02T21:15:52Z

    [FLINK-2883] [docs] Add documentation to forbid key-modifying ReduceFunction

----


> Add documentation to forbid key-modifying ReduceFunction
> --------------------------------------------------------
>
>                 Key: FLINK-2883
>                 URL: https://issues.apache.org/jira/browse/FLINK-2883
>             Project: Flink
>          Issue Type: Task
>          Components: DataStream API, Documentation
>    Affects Versions: 0.10.0
>            Reporter: Till Rohrmann
>            Assignee: Greg Hogan
>
> If one uses a combinable reduce operation which also changes the key value of 
> the underlying data element, then the results of the reduce operation can 
> become wrong. The reason is that after the combine phase, another reduce 
> operator is executed which will then reduce the elements based on the new key 
> values. This might be not so surprising if one explicitly defined ones 
> {{GroupReduceOperation}} as combinable. However, the {{ReduceFunction}} 
> conceals the fact that a combiner is used implicitly. Furthermore, the API 
> does not prevent the user from changing the key fields which could solve the 
> problem.
> The following example program illustrates the problem
> {code}
> val env = ExecutionEnvironment.getExecutionEnvironment
> env.setParallelism(1)
> val input = env.fromElements((1,2), (1,3), (2,3), (3,3), (3,4))
> val result = input.groupBy(0).reduce{
>   (left, right) =>
>     (left._1 + right._1, left._2 + right._2)
> }
> result.output(new PrintingOutputFormat[Int]())
> env.execute()
> {code}
> The expected output is 
> {code}
> (2, 5)
> (2, 3)
> (6, 7)
> {code}
> However, the actual output is
> {code}
> (4, 8)
> (6, 7)
> {code}
> I think that the underlying problem is that associativity and commutativity 
> is not sufficient for a combinable reduce operation. Additionally we also 
> need to make sure that the key stays the same.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to