akalash commented on a change in pull request #15375:
URL: https://github.com/apache/flink/pull/15375#discussion_r604760303
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -535,41 +545,82 @@ protected void beforeInvoke() throws Exception {
// -------- Invoke --------
LOG.debug("Invoking {}", getName());
+ SuspendableMailboxLoop suspendableLoop =
mailboxProcessor.getSuspendableLoop();
+
// we need to make sure that any triggers scheduled in open() cannot be
// executed before all operators are opened
- actionExecutor.runThrowing(
- () -> {
- SequentialChannelStateReader reader =
- getEnvironment()
- .getTaskStateManager()
- .getSequentialChannelStateReader();
- reader.readOutputData(
- getEnvironment().getAllWriters(),
- !configuration.isGraphContainingLoops());
-
- operatorChain.initializeStateAndOpenOperators(
- createStreamTaskStateInitializer());
-
- channelIOExecutor.execute(
- () -> {
- try {
-
reader.readInputData(getEnvironment().getAllInputGates());
- } catch (Exception e) {
- asyncExceptionHandler.handleAsyncException(
- "Unable to read channel state", e);
- }
- });
+ CompletableFuture<Void> allGatesRecoveredFuture =
+ actionExecutor.call(
+ () -> {
+ SequentialChannelStateReader reader =
+ getEnvironment()
+ .getTaskStateManager()
+ .getSequentialChannelStateReader();
+ reader.readOutputData(
+ getEnvironment().getAllWriters(),
+ !configuration.isGraphContainingLoops());
+
+ operatorChain.initializeStateAndOpenOperators(
+ createStreamTaskStateInitializer());
+
+ IndexedInputGate[] inputGates =
getEnvironment().getAllInputGates();
+ channelIOExecutor.execute(
+ () -> {
+ try {
+ reader.readInputData(inputGates);
+ } catch (Exception e) {
+
asyncExceptionHandler.handleAsyncException(
+ "Unable to read channel
state", e);
+ }
+ });
+
+ List<CompletableFuture<?>> recoveredFutures =
+ new ArrayList<>(inputGates.length);
+ for (InputGate inputGate : inputGates) {
+
recoveredFutures.add(inputGate.getStateConsumedFuture());
+
+ inputGate
+ .getStateConsumedFuture()
+ .thenRun(
+ () ->
+
mainMailboxExecutor.execute(
+
inputGate::requestPartitions,
+ "Input gate
request partitions"));
+ }
- for (InputGate inputGate :
getEnvironment().getAllInputGates()) {
- inputGate
- .getStateConsumedFuture()
- .thenRun(
- () ->
- mainMailboxExecutor.execute(
-
inputGate::requestPartitions,
- "Input gate request
partitions"));
- }
- });
+ return gatesRecoveredFuture(suspendableLoop,
recoveredFutures);
+ });
+
+ // Run mailbox until all gates will be recovered.
+ do {
+ suspendableLoop.run();
+ } while (isMailboxLoopRunning() && !allGatesRecoveredFuture.isDone());
Review comment:
I actually agree that we can do something like that:
```
do {
mailboxProcessor.runMailboxStep(singleStep = true);
} while (isMailboxLoopRunning() && !allGatesRecoveredFuture.isDone())
```
But I am not fully sure about **singleStep = false**. Doesn't it lead to
potentially infinity loop in processMailsWhenDefaultActionUnavailable if for
example it doesn't have default action until state is not running but state is
not running because we wait for at least one default action after
allGatesRecoveredFuture was set to true? Or doesn't my example make sense?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]