[
https://issues.apache.org/jira/browse/FLINK-20159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17232969#comment-17232969
]
Sundaram Ananthanarayanan commented on FLINK-20159:
---------------------------------------------------
[~becket_qin]Sure I'll submit a patch later today.
> [FLIP-27 source] FutureNotifier does not return a new future when
> Future::future() is invoked within the returned future's callback
> -----------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-20159
> URL: https://issues.apache.org/jira/browse/FLINK-20159
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.11.2
> Reporter: Sundaram Ananthanarayanan
> Priority: Minor
> Fix For: 1.12.0
>
>
> Here's the *problem*. FutureNotifier::future should return a new future every
> time the previous future was completed. That's the expectation. However, if
> the future is being requested from within the completion callback of the
> previous future, then it, instead of returning a new future, returns the
> existing future. This could potentially result in infinite recursions
> depending on how the callback method is implemented. Here's an example:
>
> {code:java}
> Consumer code:
> void consumeDataOnce() {
> // get the data from the producer and check if it was empty
> Data data = producer.getData();
> // if data was empty, then grab the future and attach a callback as below
> if (data.isEmpty()) {
> producer.getCompletableFuture().thenRun(() -> consumeDataOnce());
> }
> }
> {code}
>
> In the above method, let's say the producer notified the consumer (produced
> by FutureNofier::future), thinking that some data was available to be
> consumed. Now let's say the data returned from the producer was instead empty
> during the callback. In this case, the method goes on in an infinite loop
> when the future is completed.
>
> *Issue:* If you observe FutureNotifier::notifyComplete's implementation
> closely, you realize that the future is completed before the futureRef is
> swapped with null.
> {code:java}
> public void notifyComplete() {
> CompletableFuture<Void> future = futureRef.get();
> // If there are multiple threads trying to complete the future, only the
> first one succeeds.
> if (future != null && future.complete(null)) {
> futureRef.compareAndSet(future, null);
> }
> }
> {code}
> If we can change the ordering instead, where the future is swapped atomically
> first before being completed, then we can guarantee that the future returned
> by FutureNotifier::future will always be a new one if the previous one had
> completed.
>
> [~sewen] [~jqin] [~stevenz3wu]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)