[
https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15346141#comment-15346141
]
ASF GitHub Bot commented on FLINK-3974:
---------------------------------------
Github user aljoscha commented on the issue:
https://github.com/apache/flink/pull/2110
Thanks for the thorough review, @tillrohrmann!
Your points are valid, maybe I'll have to change this PR but let me first
explain my reasoning.
The shallow copy is performed in the one place that all code paths have to
go through because it is the point right before control is passed to the
operator. Putting it in different place would mean placing it in
`BroadcastingOutputCollector`, as you mentioned, as well as in
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java,
which is used when the user does a split()/select() operation
(`DataStream.split()`). The number of places where we have to put this might
evolve in the future.
Also, putting it in `BroadcastingOutputCollector` and `DirectedOutput`
would mean that we always do two copies per record for the common case of
having object-copying enabled (which is the default).
About the ITCase. I also don't like having that in there because we are
approaching the 2h mark on Travis but I think in this case it's valid. This
test really verifies that the whole system works correctly when the user uses a
certain feature (I would also add a test for split()/select() now that I
thought about it).
> enableObjectReuse fails when an operator chains to multiple downstream
> operators
> --------------------------------------------------------------------------------
>
> Key: FLINK-3974
> URL: https://issues.apache.org/jira/browse/FLINK-3974
> Project: Flink
> Issue Type: Bug
> Components: DataStream API
> Affects Versions: 1.0.3
> Reporter: B Wyatt
> Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch
>
>
> Given a topology that looks like this:
> {code:java}
> DataStream<A> input = ...
> input
> .map(MapFunction<A,B>...)
> .addSink(...);
> input
> .map(MapFunction<A,C>...)
> .addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of
> {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output<StreamRecord<A>>.collect}}
> which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which
> mutates the value stored in the StreamRecord<>.
> As a result, when the {{Output<StreamRecord<A>>.collect}} call passes the
> {{StreamRecord<A>}} to the second map operation it is actually a
> {{StreamRecord<B>}} and behaves as if the two map operations were serial
> instead of parallel.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)