StephanEwen commented on pull request #12510:
URL: https://github.com/apache/flink/pull/12510#issuecomment-643396266
Thanks for the patch! I like the general direction, but am unsure about a
few details.
The earlier version of the code had the re-creation separately in the
`RecreateOnResetOperatorCoordinator` class. In hind-sight, I think that was
better.
- The `OperatorCoordinatorHolder` has already a somewhat complex task
(threading model, checkpoint guarantees, in the future probably execution
versioning). I think it would be nice to keep future responsibilities out of
this class. The "recreate on reset" sounds like it can fit into a separate
class well (like in the earlier version).
- This also allows us to make this optional. Coordinator implementations
that want that behavior can use it by wrapping themselves into the
`RecreateOnResetOperatorCoordinator`, others simply go as they are.
Does the "quiesced context" have to throw exceptions? In other quiescing
implementations the components turn their methods into no-ops instead, which
means no unexpected exceptions during closing/recreation.
The quiesced context uses a way of counting in-progress message sends.
- Is the purpose here to avoid locks because of dangers of deadlocks?
- I think the same mechanism we use in "sendEvent()" is also needed in
"failJob()". Both are interactions with outside components.
- Could we avoid having the `Thread.sleep(1)` statement?
The code below is a suggestion for a quiesced context that does not throw an
exception and uses wait/notify instead of sleeping.
```java
class QuiesceableContext implements OperatorCoordinator.Context {
private final OperatorCoordinator.Context context;
private Object lock;
private int callsInProgress;
private boolean quiesced;
@Override
public CompletableFuture<Acknowledge> sendEvent(
OperatorEvent evt,
int targetSubtask) throws TaskNotRunningException {
if (enterAndCheckQuiesced()) {
return CompletableFuture.completedFuture(null);
}
try {
return context.sendEvent(evt, targetSubtask);
} finally {
exit();
}
}
private boolean enterAndCheckQuiesced() {
synchronized (lock) {
if (quiesced) {
return true;
}
callsInProgress++;
return false;
}
private booleanexit() {
synchronized (lock) {
callsInProgress--;
if (callsInProgress == 0) {
lock.notifyAll();
}
}
}
private void quiesce() {
synchronized (lock) {
quiesced = true;
while (callsInProgress > 0) {
lock.wait();
}
}
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]