[ https://issues.apache.org/jira/browse/FLINK-12351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16828869#comment-16828869 ]
Jark Wu commented on FLINK-12351: --------------------------------- Hi [~aitozi], I think fix the bug in AsyncWaitOperator and enable objectReuse on operator level are two orthogonal problems. We can create another JIRA to discuss the operator level object reuse problem. Currently, I only find the AsyncWaitOperator is affected, because it doesn't deep copy input record before put it into heap buffer (Java ArrayDeque). IMO, no matter object reuse is enabled or not, the AsyncWaitOperator should output the same result, because it's the framework code not user code. Hi [~till.rohrmann], what do you think about this? If you don't object, I can create a PR for this. > AsyncWaitOperator should deep copy StreamElement when object reuse is enabled > ----------------------------------------------------------------------------- > > Key: FLINK-12351 > URL: https://issues.apache.org/jira/browse/FLINK-12351 > Project: Flink > Issue Type: Bug > Reporter: Jark Wu > Priority: Major > Fix For: 1.9.0 > > > Currently, AsyncWaitOperator directly put the input StreamElement into > {{StreamElementQueue}}. But when object reuse is enabled, the StreamElement > is reused, which means the element in {{StreamElementQueue}} will be > modified. As a result, the output of AsyncWaitOperator might be wrong. > An easy way to fix this might be deep copy the input StreamElement when > object reuse is enabled, like this: > https://github.com/apache/flink/blob/blink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java#L209 -- This message was sent by Atlassian JIRA (v7.6.3#76005)