[ 
https://issues.apache.org/jira/browse/FLINK-5890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15906198#comment-15906198
 ] 

ASF GitHub Bot commented on FLINK-5890:
---------------------------------------

GitHub user greghogan opened a pull request:

    https://github.com/apache/flink/pull/3515

    [FLINK-5890] [gelly] GatherSumApply broken when object reuse enabled

    The initial fix for this ticket is not working on larger data sets.
    
    Reduce supports returning the left input, right input, a new object, or a 
locally reused object. The trouble with the initial fix was that the returned 
local object was reusing fields from the input tuples.
    
    The problem is with ReduceDriver#run managing two values (reuse1 and 
reuse2) and with a third, local value returned by 
GatherSumApplyIteration.SumUDF. After the first grouping value.f1 == reuse1.f1. 
Following UDF calls may swap value.f1 and reuse2.f1, which causes reuse1.f1 == 
reuse2.f1. With an odd number of swaps the next grouping will reduce with 
reuse1 and reuse2 sharing a field and deserialization will overwrite stored 
values.
    
    The simple fix is to only use and return the provided inputs.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/greghogan/flink 
5890b_gathersumapply_broken_when_object_reuse_enabled

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3515.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3515
    
----
commit c1bddcb7a59eff8ac1639deaa9fabac3073c6552
Author: Greg Hogan <[email protected]>
Date:   2017-03-10T21:44:27Z

    [FLINK-5890] [gelly] GatherSumApply broken when object reuse enabled
    
    The initial fix for this ticket is not working on larger data sets.
    
    Reduce supports returning the left input, right input, a new object, or
    a locally reused object. The trouble with the initial fix was that the
    returned local object was reusing fields from the input tuples.
    
    The problem is with ReduceDriver#run managing two values (reuse1 and
    reuse2) and with a third, local value returned by
    GatherSumApplyIteration.SumUDF. After the first grouping value.f1 ==
    reuse1.f1. Following UDF calls may swap value.f1 and reuse2.f1, which
    causes reuse1.f1 == reuse2.f1. With an odd number of swaps the next
    grouping will reduce with reuse1 and reuse2 sharing a field and
    deserialization will overwrite stored values.
    
    The simple fix is to only use and return the provided inputs.

----


> GatherSumApply broken when object reuse enabled
> -----------------------------------------------
>
>                 Key: FLINK-5890
>                 URL: https://issues.apache.org/jira/browse/FLINK-5890
>             Project: Flink
>          Issue Type: Bug
>          Components: Gelly
>    Affects Versions: 1.3.0
>            Reporter: Greg Hogan
>            Assignee: Greg Hogan
>             Fix For: 1.3.0
>
>
> {{GatherSumApplyIteration.SumUdf.reduce}} can store a value from {{arg1}} in 
> the new {{Tuple2}} which can be overwritten in {{ReduceDriver}}. We need to 
> swap {{arg0.f1}} and {{arg1.f1}} when this happens (as done in 
> {{ReduceDriver}} for the returned results).
> {code}
>       @Override
>       public Tuple2<K, M> reduce(Tuple2<K, M> arg0, Tuple2<K, M> arg1) throws 
> Exception {
>               K key = arg0.f0;
>               M result = this.sumFunction.sum(arg0.f1, arg1.f1);
>               return new Tuple2<>(key, result);
>       }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to