[
https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15344537#comment-15344537
]
ASF GitHub Bot commented on FLINK-3974:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2110#discussion_r68076977
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
---
@@ -306,8 +306,9 @@ public ChainingOutput(OneInputStreamOperator<T, ?>
operator) {
@Override
public void collect(StreamRecord<T> record) {
try {
- operator.setKeyContextElement1(record);
- operator.processElement(record);
+ StreamRecord<T> shallowCopy =
record.copy(record.getValue());
+ operator.setKeyContextElement1(shallowCopy);
+ operator.processElement(shallowCopy);
--- End diff --
Actually I'm wondering whether the `ChainingOutput` is the right place to
do this copying. Wouldn't it make more sense to do it in the
`BroadcastingOutputCollector`, because only if we have a branching chained data
flow we have to make sure that every down stream operator get his own copy of
the record. For simple chaining it should be correct to reuse the stream record.
So I would adapt the `collect` method of `BroadcastingOutputCollector` the
following way:
```
public void collect(StreamRecord<T> record) {
for (int i = 0; i < outputs.length - 1; i++) {
StreamRecord<T> shallowCopy = record.copy(record.getValue());
outputs[i].collect(shallowCopy);
}
outputs[outputs.length - 1].collect(record);
}
```
> enableObjectReuse fails when an operator chains to multiple downstream
> operators
> --------------------------------------------------------------------------------
>
> Key: FLINK-3974
> URL: https://issues.apache.org/jira/browse/FLINK-3974
> Project: Flink
> Issue Type: Bug
> Components: DataStream API
> Affects Versions: 1.0.3
> Reporter: B Wyatt
> Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch
>
>
> Given a topology that looks like this:
> {code:java}
> DataStream<A> input = ...
> input
> .map(MapFunction<A,B>...)
> .addSink(...);
> input
> .map(MapFunction<A,C>...)
> .addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of
> {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output<StreamRecord<A>>.collect}}
> which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which
> mutates the value stored in the StreamRecord<>.
> As a result, when the {{Output<StreamRecord<A>>.collect}} call passes the
> {{StreamRecord<A>}} to the second map operation it is actually a
> {{StreamRecord<B>}} and behaves as if the two map operations were serial
> instead of parallel.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)