damccorm opened a new issue, #20576:
URL: https://github.com/apache/beam/issues/20576

   From my email on this Flink ML thread: 
https://lists.apache.org/thread.html/rca7eb85183ac17be478b4d6c59c5ca348077689e5ba4a42c87b71bf4%40%3Cuser.flink.apache.org%3E:
   
   We need the synchronized block in the source because the call to 
`reader.advance()` (via the invoker) and `reader.getCurrent()` (via 
`emitElement()`) must be atomic with respect to state. We cannot advance the 
reader state, not emit that record but still checkpoint the new reader state. 
The monitor ensures that no checkpoint can happen in between those to calls.
   
   The basic problem is now that we can starve checkpointing because the 
monitor/lock is not fair. This could be solved by using a fair lock but that 
would require Flink proper to be changed to use a fair lock instead of a 
monitor/synchronized. I don't see this as an immediate solution.
   
   One thing that exacerbates this problem is that too many things are 
happening "under" the synchronized block. All the transforms before a 
shuffle/rebalance/keyBy are chained to the source, which means that they are 
invoked from the `emitElement()` call.
   
   A possible mitigation would be to disable chaining globally by inserting a 
`flinkStreamEnv.disableOperatorChaining()` in [1].
   
   A more surgical version would be to only disable chaining for sources but 
this can also have an impact on performance since without chaining we 
potentially have more serialization between tasks/operators.
   
   Imported from Jira 
[BEAM-11251](https://issues.apache.org/jira/browse/BEAM-11251). Original Jira 
may contain additional context.
   Reported by: aljoscha.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to