[
https://issues.apache.org/jira/browse/FLINK-5890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15906198#comment-15906198
]
ASF GitHub Bot commented on FLINK-5890:
---------------------------------------
GitHub user greghogan opened a pull request:
https://github.com/apache/flink/pull/3515
[FLINK-5890] [gelly] GatherSumApply broken when object reuse enabled
The initial fix for this ticket is not working on larger data sets.
Reduce supports returning the left input, right input, a new object, or a
locally reused object. The trouble with the initial fix was that the returned
local object was reusing fields from the input tuples.
The problem is with ReduceDriver#run managing two values (reuse1 and
reuse2) and with a third, local value returned by
GatherSumApplyIteration.SumUDF. After the first grouping value.f1 == reuse1.f1.
Following UDF calls may swap value.f1 and reuse2.f1, which causes reuse1.f1 ==
reuse2.f1. With an odd number of swaps the next grouping will reduce with
reuse1 and reuse2 sharing a field and deserialization will overwrite stored
values.
The simple fix is to only use and return the provided inputs.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/greghogan/flink
5890b_gathersumapply_broken_when_object_reuse_enabled
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/3515.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3515
----
commit c1bddcb7a59eff8ac1639deaa9fabac3073c6552
Author: Greg Hogan <[email protected]>
Date: 2017-03-10T21:44:27Z
[FLINK-5890] [gelly] GatherSumApply broken when object reuse enabled
The initial fix for this ticket is not working on larger data sets.
Reduce supports returning the left input, right input, a new object, or
a locally reused object. The trouble with the initial fix was that the
returned local object was reusing fields from the input tuples.
The problem is with ReduceDriver#run managing two values (reuse1 and
reuse2) and with a third, local value returned by
GatherSumApplyIteration.SumUDF. After the first grouping value.f1 ==
reuse1.f1. Following UDF calls may swap value.f1 and reuse2.f1, which
causes reuse1.f1 == reuse2.f1. With an odd number of swaps the next
grouping will reduce with reuse1 and reuse2 sharing a field and
deserialization will overwrite stored values.
The simple fix is to only use and return the provided inputs.
----
> GatherSumApply broken when object reuse enabled
> -----------------------------------------------
>
> Key: FLINK-5890
> URL: https://issues.apache.org/jira/browse/FLINK-5890
> Project: Flink
> Issue Type: Bug
> Components: Gelly
> Affects Versions: 1.3.0
> Reporter: Greg Hogan
> Assignee: Greg Hogan
> Fix For: 1.3.0
>
>
> {{GatherSumApplyIteration.SumUdf.reduce}} can store a value from {{arg1}} in
> the new {{Tuple2}} which can be overwritten in {{ReduceDriver}}. We need to
> swap {{arg0.f1}} and {{arg1.f1}} when this happens (as done in
> {{ReduceDriver}} for the returned results).
> {code}
> @Override
> public Tuple2<K, M> reduce(Tuple2<K, M> arg0, Tuple2<K, M> arg1) throws
> Exception {
> K key = arg0.f0;
> M result = this.sumFunction.sum(arg0.f1, arg1.f1);
> return new Tuple2<>(key, result);
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)