[ 
https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15346141#comment-15346141
 ] 

ASF GitHub Bot commented on FLINK-3974:
---------------------------------------

Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/2110
  
    Thanks for the thorough review, @tillrohrmann!
    
    Your points are valid, maybe I'll have to change this PR but let me first 
explain my reasoning.
    
    The shallow copy is performed in the one place that all code paths have to 
go through because it is the point right before control is passed to the 
operator. Putting it in different place would mean placing it in 
`BroadcastingOutputCollector`, as you mentioned, as well as in 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java,
 which is used when the user does a split()/select() operation 
(`DataStream.split()`). The number of places where we have to put this might 
evolve in the future.
    
    Also, putting it in `BroadcastingOutputCollector` and `DirectedOutput` 
would mean that we always do two copies per record for the common case of 
having object-copying enabled (which is the default).
    
    About the ITCase. I also don't like having that in there because we are 
approaching the 2h mark on Travis but I think in this case it's valid. This 
test really verifies that the whole system works correctly when the user uses a 
certain feature (I would also add a test for split()/select() now that I 
thought about it). 


> enableObjectReuse fails when an operator chains to multiple downstream 
> operators
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-3974
>                 URL: https://issues.apache.org/jira/browse/FLINK-3974
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.0.3
>            Reporter: B Wyatt
>         Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch
>
>
> Given a topology that looks like this:
> {code:java}
> DataStream<A> input = ...
> input
>     .map(MapFunction<A,B>...)
>     .addSink(...);
> input
>     .map(MapFunction<A,C>...)
>     ​.addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of 
> {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output<StreamRecord<A>>.collect}} 
> which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which 
> mutates the value stored in the StreamRecord<>.  
> As a result, when the {{Output<StreamRecord<A>>.collect}} call passes the 
> {{StreamRecord<A>}} to the second map operation it is actually a 
> {{StreamRecord<B>}} and behaves as if the two map operations were serial 
> instead of parallel.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to