GitHub user greghogan opened a pull request:
https://github.com/apache/flink/pull/3515
[FLINK-5890] [gelly] GatherSumApply broken when object reuse enabled
The initial fix for this ticket is not working on larger data sets.
Reduce supports returning the left input, right input, a new object, or a
locally reused object. The trouble with the initial fix was that the returned
local object was reusing fields from the input tuples.
The problem is with ReduceDriver#run managing two values (reuse1 and
reuse2) and with a third, local value returned by
GatherSumApplyIteration.SumUDF. After the first grouping value.f1 == reuse1.f1.
Following UDF calls may swap value.f1 and reuse2.f1, which causes reuse1.f1 ==
reuse2.f1. With an odd number of swaps the next grouping will reduce with
reuse1 and reuse2 sharing a field and deserialization will overwrite stored
values.
The simple fix is to only use and return the provided inputs.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/greghogan/flink
5890b_gathersumapply_broken_when_object_reuse_enabled
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/3515.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3515
----
commit c1bddcb7a59eff8ac1639deaa9fabac3073c6552
Author: Greg Hogan <[email protected]>
Date: 2017-03-10T21:44:27Z
[FLINK-5890] [gelly] GatherSumApply broken when object reuse enabled
The initial fix for this ticket is not working on larger data sets.
Reduce supports returning the left input, right input, a new object, or
a locally reused object. The trouble with the initial fix was that the
returned local object was reusing fields from the input tuples.
The problem is with ReduceDriver#run managing two values (reuse1 and
reuse2) and with a third, local value returned by
GatherSumApplyIteration.SumUDF. After the first grouping value.f1 ==
reuse1.f1. Following UDF calls may swap value.f1 and reuse2.f1, which
causes reuse1.f1 == reuse2.f1. With an odd number of swaps the next
grouping will reduce with reuse1 and reuse2 sharing a field and
deserialization will overwrite stored values.
The simple fix is to only use and return the provided inputs.
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---