[
https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15304062#comment-15304062
]
B Wyatt commented on FLINK-3974:
--------------------------------
In the topologies I've been building object reuse has been a pretty significant
CPU win. It would be a shame to lose that capability for topos where care is
taken to actually maintain the application level requirements of object reuse:
Don't modify your inputs.
Disabling chaining is a fine option if this is not a bottleneck in your
topology. If unchained-splits become the default, I'd like to see the ability
to chain them remain as an option. Sometimes splitting into multiple
chain/threads is good: cpu heavy operators that benefit from parallelism.
Sometimes maintaining the chain to avoid the de/serialization costs is good
(cpu light operators with high throughput).
The patch attached essentially gives each collector its own dedicated stream
record with a reference to the value. You are correct it doesn't solve the
problems of mutable operator inputs but, that has always been the primary
concession of object reuse and it doesn't seem like a bad requirement to put on
the application level. I don't know that there is a way to avoid the cost of
cloning *and* protect against operators mutating input in a language like Java.
> enableObjectReuse fails when an operator chains to multiple downstream
> operators
> --------------------------------------------------------------------------------
>
> Key: FLINK-3974
> URL: https://issues.apache.org/jira/browse/FLINK-3974
> Project: Flink
> Issue Type: Bug
> Components: DataStream API
> Affects Versions: 1.0.3
> Reporter: B Wyatt
> Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch
>
>
> Given a topology that looks like this:
> {code:java}
> DataStream<A> input = ...
> input
> .map(MapFunction<A,B>...)
> .addSink(...);
> input
> .map(MapFunction<A,C>...)
> .addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of
> {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output<StreamRecord<A>>.collect}}
> which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which
> mutates the value stored in the StreamRecord<>.
> As a result, when the {{Output<StreamRecord<A>>.collect}} call passes the
> {{StreamRecord<A>}} to the second map operation it is actually a
> {{StreamRecord<B>}} and behaves as if the two map operations were serial
> instead of parallel.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)