[ 
https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15304062#comment-15304062
 ] 

B Wyatt commented on FLINK-3974:
--------------------------------

In the topologies I've been building object reuse has been a pretty significant 
CPU win.  It would be a shame to lose that capability for topos where care is 
taken to actually maintain the application level requirements of object reuse: 
Don't modify your inputs.

Disabling chaining is a fine option if this is not a bottleneck in your 
topology.  If unchained-splits become the default, I'd like to see the ability 
to chain them remain as an option.  Sometimes splitting into multiple 
chain/threads is good: cpu heavy operators that benefit from parallelism.  
Sometimes maintaining the chain to avoid the de/serialization costs is good 
(cpu light operators with high throughput).

The patch attached essentially gives each collector its own dedicated stream 
record with a reference to the value. You are correct it doesn't solve the 
problems of mutable operator inputs but, that has always been the primary 
concession of object reuse and it doesn't seem like a bad requirement to put on 
the application level.  I don't know that there is a way to avoid the cost of 
cloning *and* protect against operators mutating input in a language like Java.

> enableObjectReuse fails when an operator chains to multiple downstream 
> operators
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-3974
>                 URL: https://issues.apache.org/jira/browse/FLINK-3974
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.0.3
>            Reporter: B Wyatt
>         Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch
>
>
> Given a topology that looks like this:
> {code:java}
> DataStream<A> input = ...
> input
>     .map(MapFunction<A,B>...)
>     .addSink(...);
> input
>     .map(MapFunction<A,C>...)
>     ​.addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of 
> {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output<StreamRecord<A>>.collect}} 
> which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which 
> mutates the value stored in the StreamRecord<>.  
> As a result, when the {{Output<StreamRecord<A>>.collect}} call passes the 
> {{StreamRecord<A>}} to the second map operation it is actually a 
> {{StreamRecord<B>}} and behaves as if the two map operations were serial 
> instead of parallel.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to