[ 
https://issues.apache.org/jira/browse/FLINK-12351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16828869#comment-16828869
 ] 

Jark Wu commented on FLINK-12351:
---------------------------------

Hi [~aitozi], I think fix the bug in AsyncWaitOperator and enable objectReuse 
on operator level are two orthogonal problems. We can create another JIRA to 
discuss the operator level object reuse problem.

Currently, I only find the AsyncWaitOperator is affected, because it doesn't 
deep copy input record before put it into heap buffer (Java ArrayDeque).

IMO, no matter object reuse is enabled or not, the AsyncWaitOperator should 
output the same result, because it's the framework code not user code.

Hi [~till.rohrmann], what do you think about this? If you don't object, I can 
create a PR for this.

> AsyncWaitOperator should deep copy StreamElement when object reuse is enabled
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-12351
>                 URL: https://issues.apache.org/jira/browse/FLINK-12351
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Jark Wu
>            Priority: Major
>             Fix For: 1.9.0
>
>
> Currently, AsyncWaitOperator directly put the input StreamElement into 
> {{StreamElementQueue}}. But when object reuse is enabled, the StreamElement 
> is reused, which means the element in {{StreamElementQueue}} will be 
> modified. As a result, the output of AsyncWaitOperator might be wrong.
> An easy way to fix this might be deep copy the input StreamElement when 
> object reuse is enabled, like this: 
> https://github.com/apache/flink/blob/blink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java#L209



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to