becketqin commented on pull request #12510:
URL: https://github.com/apache/flink/pull/12510#issuecomment-643563232


   @StephanEwen Thanks for the comment. Please see the reply below:
   
   #### `RecreateOnResetOperatorCoordinator` v.s. having the logic in 
`OperatorCoordinatorHolder`.
   I actually moved back and forth a little bit on this. My initial motivation 
of having a separate class was exactly as you mentioned - the users can choose 
to recreate on reset or not. The downside is that users has one more thing to 
think about. Given that the global failure itself should be rare and 
performance is not the key consideration in that case. I was thinking that 
maybe we can just always recreate the coordinator. Therefore I moved the logic 
to `OperatorCoordinatorHolder`. But I don't have a strong opinion on this and 
would be happy to have a separate `RecreateOnResetOperatorCoordinator` class.
   
   #### Regarding exception in `QuieseableContext`.
   In general, I think the API should communicate back the accurate result of 
an invocation to the caller. That means if an event was not successfully sent, 
we should avoid swallowing the exception and pretend it has been successfully 
sent. For example, the quiesced coordinator may have some followup external 
interaction with external system after successfully sending an event to a task. 
Not throwing exception in this case might cause problem. Ideally we should have 
a `QuiescedException` thrown in this case so the users know that this 
coordinator should no longer perform any action that may cause side-effects, 
not necessarily to Flink but maybe some other systems.
   
   #### In terms of the lock-free style coordination.
   What we are trying to guard against is the following race condition. 
   1. The user thread in closing coordinator checks the quiesced flag and found 
it is not set, so it proceeds. Right before it actually sends an event, the 
user thread is context switched out.
   2. The quiescing thread sets the quiesced flag to the context and resets the 
valve. At this point the valve is reset and open to serve the event sending 
from the new coordinator.
   3. The user thread in closing coordinator comes back and send the event. 
Because the valve is open, that event will get in and sent to the task.
   
   So the in-progress thread counter here is to make sure after `quiesce()` is 
returned, no user thread is in any of the context method invocation. The 
assumption here is that the event sending logic will finish super fast, i.e. 
putting the event into a queue in this case. So the quiescing thread can avoid 
the expensive context switch, which is going to happen when using locks. From 
the quiescing thread's perspective, it basically does the following in order:
   1. block any further invocation on the context.
   2. wait for the ongoing invocations on the context to exit (this is very 
fast) in a spin lock manner.
   3. clean up the context to serve the new coordinator.
   
   We can actually just remove the entire `Thread.sleep(1)` part and make it a 
tight while loop. I put the sleep there for kind of a silly reason - in case 
there is a single CPU core available in the entire system, the spin-lock manner 
will waste one OS scheduler slot. I actually usually just use a tight 
while-loop and the code will jump out that loop super quickly (e.g. a few 
iterations) without expensive context switch.
   
   In the past we see this lock-free trick has 100x performance compared with 
the wait/notify synchronization. But that being said, I guess this trick does 
not gain us much here as we only have one thread sending events, and the 
quiesce happens rarely. So a wait/notify synchronization would probably be much 
more readable.
   
   #### Why not synchronize on `failJob()`
   My first thought was if the `failJob()` call went through right before the 
quiescence, that `failJob()` instruction will be sitting in the scheduler queue 
and the scheduler will still fail the job later again. So guarding against it 
only reduce the probability of failing the job twice, but does not completely 
preventing that from happening.
   
   But I agree it is confusing and is better to also put the same 
synchronization logic as we did for `sendEvent()`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to