StephanEwen commented on pull request #13574:
URL: https://github.com/apache/flink/pull/13574#issuecomment-718205400
Thanks for all the thoughts and input. This is a very important discussion
and critical to get right for a good behavior in the long-term.
@becketqin We are probably close to on the same page, let me summarize my
understanding:
- The `SplitEnumerator` should treat methods as synchronous: if a call
succeeds, then the method succeeded. Calls may be blocking. During a blockin
method call, the enumerator cannot do anything else (it is single-threaded),
but it does not harm anything. Users are encouraged to use the "callAsync()"
tool, but nothing breaks if they don't. That is important so that it is easier
for users.
- From the runtime perspective, calls on the operator coordinator (the
`SourceCoordinator`) must be non-blocking. If they succeed, the operation is
assumed to succeed by the scheduler. It does not really matter if it actually
succeeded in most cases, because most methods are either just reporting
information (subtask failed) or triggering an explicitly async operation
(snapshot state, which returns a future). `close()` might be the only notable
exception.
- The `SourceCoordinator` internally dispatches all calls to the executor,
which in turn calls the `SplitEnumerator` methods. So the `SplitEnumerator`
methods can be synchronous already without a problem. If they fail, the
exception is caught in the invoking executor runnable and the
`SourceCoordinator` handles them in general by calling `failJob()`.
- So far this works without users needing to worry about this asynchrony.
Most cases they can just throw an exception during a method call and it fails
the job.
- There is a case to allow the `SplitEnumerator` to trigger `failJob()`
based on the result of an explicit `context.callAsync()`, but if I see it
correctly, mainly for that. Not for general method calls.
- The `close()` method remains tricky, because the scheduler does not
support async execution graph close or reset. That means the close method of
the `SourceCoordinator` must not block. It cannot be async on the Operator
Coordinator layer (not without a bigger refactoring involving the scheduler).
I think @becketqin's original idea works here: Spawn a thread for the
closing so that the `coordinator.close()` call completes immediately. If the
closing thread lives longer than X time (1 minute) fail the job, to avoid
lingering resource leaks.
Again, the user implementing the `SplitEnumerator` sees nothing of that
model, close is still synchronous, it is all hidden in the `SourceCoordinator`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]