pnowojski commented on pull request #16905:
URL: https://github.com/apache/flink/pull/16905#issuecomment-904506952


   I'm not sure if this is the right way to go. A couple of issues.
   1. The main one is: do we want to create a contract, that a future returned 
from `mainOperator.stop()` is always completed from the mailbox thread? If that 
would be no longer the case, the same issue would come back.
   2. Another one, is that when that future from 1. is completed, you are 
enqueuing another mailbox action (inside `triggerSourcesCheckpointAsync()` - 
this could maybe be avoided), but the important bit I'm not sure if it's 
guaranteed to work as you expect it to work. Future from `stop()` is completed 
in `StreamOperatorWrapper#quiesceTimeServiceAndFinishOperator`, and more 
precisely in `deferFinishOperatorToMailbox()`. Are you sure that the mailbox 
action that you enqueued in `triggerSourcesCheckpointAsync()` will be executed 
before we return from `quiesceTimeServiceAndFinishOperator()`? I'm not sure. I 
think there is a chance that it will be executed only in 
`mailboxProcessor.drain();` if we are unlucky. What that means, is that it 
currently works because `triggerSourcesCheckpointAsync()` is enqueuing only 
single mailbox action. If that was split into two, we could have seen the 
second action rejected because mailbox is in `quiesce` mode.
   
   I'm not sure if I'm right or wrong above the things above. But it creates a 
feeling for me that this solution is very complicated and possibly fragile. 
   
   A counterproposal. On the other hand, the whole idea of `stop-with-savepoint 
--drain` was supposed to be working the exactly same way as waiting for final 
checkpoint after sources finished normally. And I think we already have a code 
to handle this exactly same issue for FLIP-147 final's checkpoint. However I 
think the issue might be that it's hidden behind the feature toggle 
`areCheckpointsWithFinishedTasksEnabled` in `StreamTask#afterInvoke`. There we 
are keep running the mailbox loop and waiting for the final checkpoint to 
complete (`StreamTask#finalCheckpointCompleted`). So isn't the alternate 
solution as simple as fixing/adjusting [if condition in 
`afterInvoke`](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L830)
 to keep the task waiting also with `stop-with-savepoint --drain` even if 
FLIP-147 is disabled?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to