Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2156#discussion_r68566231
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java
 ---
    @@ -89,7 +89,7 @@ public void collect(IT record) {
                numRecordsIn.inc();
                try {
                        if (base == null) {
    -                           base = objectReuseEnabled ? record : 
serializer.copy(record);
    +                           base = serializer.copy(record);
                        } else {
                                base = objectReuseEnabled ? 
reducer.reduce(base, record) : serializer.copy(reducer.reduce(base, record));
    --- End diff --
    
    We fixed this in FLINK-3340 for non-chained reduce drivers (where the 
driver chooses the object to deserialize into) but for chained drivers we 
cannot prevent one UDF from overwriting an object from a previous UDF. If you 
look in {{OverwriteObjects.java}} you will see {{testReduce}} fail.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to