[
https://issues.apache.org/jira/browse/BEAM-11251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Beam JIRA Bot updated BEAM-11251:
---------------------------------
Priority: P3 (was: P2)
> Unfair lock/monitor in UnboundedSourceWrapper can starve checkpointing
> ----------------------------------------------------------------------
>
> Key: BEAM-11251
> URL: https://issues.apache.org/jira/browse/BEAM-11251
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Reporter: Aljoscha Krettek
> Priority: P3
> Labels: stale-P2
> Attachments:
> 0001-BEAM-11251-Don-t-chain-sources-to-avoid-checkpoint-s.patch
>
>
> From my email on this Flink ML thread:
> https://lists.apache.org/thread.html/rca7eb85183ac17be478b4d6c59c5ca348077689e5ba4a42c87b71bf4%40%3Cuser.flink.apache.org%3E:
> We need the synchronized block in the source because the call to
> {{reader.advance()}} (via the invoker) and {{reader.getCurrent()}} (via
> {{emitElement()}}) must be atomic with respect to state. We cannot advance
> the reader state, not emit that record but still checkpoint the new reader
> state. The monitor ensures that no checkpoint can happen in between those to
> calls.
> The basic problem is now that we can starve checkpointing because the
> monitor/lock is not fair. This could be solved by using a fair lock but that
> would require Flink proper to be changed to use a fair lock instead of a
> monitor/synchronized. I don't see this as an immediate solution.
> One thing that exacerbates this problem is that too many things are happening
> "under" the synchronized block. All the transforms before a
> shuffle/rebalance/keyBy are chained to the source, which means that they are
> invoked from the {{emitElement()}} call.
> A possible mitigation would be to disable chaining globally by inserting a
> {{flinkStreamEnv.disableOperatorChaining()}} in [1].
> A more surgical version would be to only disable chaining for sources but
> this can also have an impact on performance since without chaining we
> potentially have more serialization between tasks/operators.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)