becketqin commented on pull request #12510: URL: https://github.com/apache/flink/pull/12510#issuecomment-643563232
@StephanEwen Thanks for the comment. Please see the reply below: #### `RecreateOnResetOperatorCoordinator` v.s. having the logic in `OperatorCoordinatorHolder`. I actually moved back and forth a little bit on this. My initial motivation of having a separate class was exactly as you mentioned - the users can choose to recreate on reset or not. The downside is that users has one more thing to think about. Given that the global failure itself should be rare and performance is not the key consideration in that case. I was thinking that maybe we can just always recreate the coordinator. Therefore I moved the logic to `OperatorCoordinatorHolder`. But I don't have a strong opinion on this and would be happy to have a separate `RecreateOnResetOperatorCoordinator` class. #### Regarding exception in `QuieseableContext`. In general, I think the API should communicate back the accurate result of an invocation to the caller. That means if an event was not successfully sent, we should avoid swallowing the exception and pretend it has been successfully sent. For example, the quiesced coordinator may have some followup external interaction with external system after successfully sending an event to a task. Not throwing exception in this case might cause problem. Ideally we should have a `QuiescedException` thrown in this case so the users know that this coordinator should no longer perform any action that may cause side-effects, not necessarily to Flink but maybe some other systems. #### In terms of the lock-free style coordination. What we are trying to guard against is the following race condition. 1. The user thread in closing coordinator checks the quiesced flag and found it is not set, so it proceeds. Right before it actually sends an event, the user thread is context switched out. 2. The quiescing thread sets the quiesced flag to the context and resets the valve. At this point the valve is reset and open to serve the event sending from the new coordinator. 3. The user thread in closing coordinator comes back and send the event. Because the valve is open, that event will get in and sent to the task. So the in-progress thread counter here is to make sure after `quiesce()` is returned, no user thread is in any of the context method invocation. The assumption here is that the event sending logic will finish super fast, i.e. putting the event into a queue in this case. So the quiescing thread can avoid the expensive context switch, which is going to happen when using locks. From the quiescing thread's perspective, it basically does the following in order: 1. block any further invocation on the context. 2. wait for the ongoing invocations on the context to exit (this is very fast) in a spin lock manner. 3. clean up the context to serve the new coordinator. We can actually just remove the entire `Thread.sleep(1)` part and make it a tight while loop. I put the sleep there for kind of a silly reason - in case there is a single CPU core available in the entire system, the spin-lock manner will waste one OS scheduler slot. I actually usually just use a tight while-loop and the code will jump out that loop super quickly (e.g. a few iterations) without expensive context switch. In the past we see this lock-free trick has 100x performance compared with the wait/notify synchronization. But that being said, I guess this trick does not gain us much here as we only have one thread sending events, and the quiesce happens rarely. So a wait/notify synchronization would probably be much more readable. #### Why not synchronize on `failJob()` My first thought was if the `failJob()` call went through right before the quiescence, that `failJob()` instruction will be sitting in the scheduler queue and the scheduler will still fail the job later again. So guarding against it only reduce the probability of failing the job twice, but does not completely preventing that from happening. But I agree it is confusing and is better to also put the same synchronization logic as we did for `sendEvent()`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
