[ 
https://issues.apache.org/jira/browse/FLINK-2883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-2883:
---------------------------------
    Description: 
If one uses a combinable reduce operation which also changes the key value of 
the underlying data element, then the results of the reduce operation can 
become wrong. The reason is that after the combine phase, another reduce 
operator is executed which will then reduce the elements based on the new key 
values. This might be not so surprising if one explicitly defined ones 
{{GroupReduceOperation}} as combinable. However, the {{ReduceFunction}} 
conceals the fact that a combiner is used implicitly. Furthermore, the API does 
not prevent the user from changing the key fields which could solve the problem.

The following example program illustrates the problem

{code}
val env = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val input = env.fromElements((1,2), (1,3), (2,3), (3,3), (3,4))

val result = input.groupBy(0).reduce{
  (left, right) =>
    (left._1 + right._1, left._2 + right._2)
}

result.output(new PrintingOutputFormat[Int]())

env.execute()
{code}

The expected output is 
{code}
(2, 5)
(2, 3)
(6, 7)
{code}

However, the actual output is
{code}
(4, 8)
(6, 7)
{code}

I think that the underlying problem is that associativity and commutativity is 
not sufficient for a combinable reduce operation. Additionally we also need to 
make sure that the key stays the same.

  was:
If one uses a combinable reduce operation which also changes the key value of 
the underlying data element, then the results of the reduce operation can 
become wrong. The reason is that after the combine phase, another reduce 
operator is executed which will then reduce the elements based on the new key 
values. This might be not so surprising if one explicitly defined ones 
{{GroupReduceOperation}} as combinable. However, the {{ReduceFunction}} 
conceals the fact that a combiner is used implicitly. Furthermore, the API does 
not prevent the user from changing the key fields which could solve the problem.

The following example program illustrates the problem

{code}
val env = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val input = env.fromElements((1,2), (1,3), (2,3), (3,3), (3,4))

val result = input.groupBy(0).reduce{
  (left, right) =>
    (left._1 + right._1, left._2 + right._2)
}

result.output(new PrintingOutputFormat[Int]())

env.execute()
{code}

The expected output is 
{code}
(2, 5)
(2, 3)
(6, 7)
{code}

However, the actual output is
{code}
(4, 8)
(6, 7)
{code}


> Combinable reduce produces wrong result
> ---------------------------------------
>
>                 Key: FLINK-2883
>                 URL: https://issues.apache.org/jira/browse/FLINK-2883
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 0.10
>            Reporter: Till Rohrmann
>
> If one uses a combinable reduce operation which also changes the key value of 
> the underlying data element, then the results of the reduce operation can 
> become wrong. The reason is that after the combine phase, another reduce 
> operator is executed which will then reduce the elements based on the new key 
> values. This might be not so surprising if one explicitly defined ones 
> {{GroupReduceOperation}} as combinable. However, the {{ReduceFunction}} 
> conceals the fact that a combiner is used implicitly. Furthermore, the API 
> does not prevent the user from changing the key fields which could solve the 
> problem.
> The following example program illustrates the problem
> {code}
> val env = ExecutionEnvironment.getExecutionEnvironment
> env.setParallelism(1)
> val input = env.fromElements((1,2), (1,3), (2,3), (3,3), (3,4))
> val result = input.groupBy(0).reduce{
>   (left, right) =>
>     (left._1 + right._1, left._2 + right._2)
> }
> result.output(new PrintingOutputFormat[Int]())
> env.execute()
> {code}
> The expected output is 
> {code}
> (2, 5)
> (2, 3)
> (6, 7)
> {code}
> However, the actual output is
> {code}
> (4, 8)
> (6, 7)
> {code}
> I think that the underlying problem is that associativity and commutativity 
> is not sufficient for a combinable reduce operation. Additionally we also 
> need to make sure that the key stays the same.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to