[
https://issues.apache.org/jira/browse/FLINK-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15350900#comment-15350900
]
ASF GitHub Bot commented on FLINK-4113:
---------------------------------------
Github user greghogan commented on a diff in the pull request:
https://github.com/apache/flink/pull/2156#discussion_r68566231
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java
---
@@ -89,7 +89,7 @@ public void collect(IT record) {
numRecordsIn.inc();
try {
if (base == null) {
- base = objectReuseEnabled ? record :
serializer.copy(record);
+ base = serializer.copy(record);
} else {
base = objectReuseEnabled ?
reducer.reduce(base, record) : serializer.copy(reducer.reduce(base, record));
--- End diff --
We fixed this in FLINK-3340 for non-chained reduce drivers (where the
driver chooses the object to deserialize into) but for chained drivers we
cannot prevent one UDF from overwriting an object from a previous UDF. If you
look in {{OverwriteObjects.java}} you will see {{testReduce}} fail.
> Always copy first value in ChainedAllReduceDriver
> -------------------------------------------------
>
> Key: FLINK-4113
> URL: https://issues.apache.org/jira/browse/FLINK-4113
> Project: Flink
> Issue Type: Bug
> Components: Local Runtime
> Affects Versions: 1.1.0, 1.0.4
> Reporter: Greg Hogan
> Assignee: Greg Hogan
> Priority: Critical
> Fix For: 1.1.0, 1.0.4
>
>
> {{ChainedAllReduceDriver.collect}} must copy the first record even when
> object reuse is enabled or {{base}} may later point to the same object as
> {{record}}.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)