akalash commented on a change in pull request #15221:
URL: https://github.com/apache/flink/pull/15221#discussion_r594542655
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
##########
@@ -172,6 +172,7 @@ public void drain() throws Exception {
/** Runs the mailbox processing loop. This is where the main work is done.
*/
public void runMailboxLoop() throws Exception {
+ mailboxLoopRunning = true;
Review comment:
I don't sure how it is legal to reset the flag to true. In fact, I
didn't find why MailboxProcessor can not be restarted but I could miss
something.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -528,70 +533,90 @@ protected void beforeInvoke() throws Exception {
init();
// save the work of reloading state, etc, if the task is already
canceled
- if (canceled) {
- throw new CancelTaskException();
- }
+ ensureNotCanceled();
// -------- Invoke --------
LOG.debug("Invoking {}", getName());
// we need to make sure that any triggers scheduled in open() cannot be
// executed before all operators are opened
- actionExecutor.runThrowing(
- () -> {
- SequentialChannelStateReader reader =
- getEnvironment()
- .getTaskStateManager()
- .getSequentialChannelStateReader();
- reader.readOutputData(
- getEnvironment().getAllWriters(),
- !configuration.isGraphContainingLoops());
-
- operatorChain.initializeStateAndOpenOperators(
- createStreamTaskStateInitializer());
-
- channelIOExecutor.execute(
- () -> {
- try {
-
reader.readInputData(getEnvironment().getAllInputGates());
- } catch (Exception e) {
- asyncExceptionHandler.handleAsyncException(
- "Unable to read channel state", e);
- }
- });
+ CompletableFuture<Void> allGatesRecoveredFuture =
+ actionExecutor.call(
+ () -> {
+ SequentialChannelStateReader reader =
+ getEnvironment()
+ .getTaskStateManager()
+ .getSequentialChannelStateReader();
+ reader.readOutputData(
+ getEnvironment().getAllWriters(),
+ !configuration.isGraphContainingLoops());
+
+ operatorChain.initializeStateAndOpenOperators(
+ createStreamTaskStateInitializer());
+
+ return asyncRestoreInputGatesData(reader);
+ });
+
+ // Run mailbox until all gates will be recovered.
+ runMailboxLoop();
+
+ ensureNotCanceled();
+
+ if (!allGatesRecoveredFuture.isDone()) {
+ throw new Exception("Mailbox loop interrupted before recovery was
finished.");
Review comment:
It is not clear what type of exception should be here.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobDetailsTest.java
##########
@@ -46,7 +46,7 @@ public void testJobDetailsMarshalling() throws
JsonProcessingException {
9L,
JobStatus.RUNNING,
8L,
- new int[] {1, 3, 3, 7, 4, 2, 7, 3, 3},
+ new int[] {1, 3, 3, 4, 7, 4, 2, 7, 3, 3},
Review comment:
I have no idea how to choose the right number. I just choose the random
one. If it is specific logic here, please, share it with me.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -528,70 +533,90 @@ protected void beforeInvoke() throws Exception {
init();
// save the work of reloading state, etc, if the task is already
canceled
- if (canceled) {
- throw new CancelTaskException();
- }
+ ensureNotCanceled();
// -------- Invoke --------
LOG.debug("Invoking {}", getName());
// we need to make sure that any triggers scheduled in open() cannot be
// executed before all operators are opened
- actionExecutor.runThrowing(
- () -> {
- SequentialChannelStateReader reader =
- getEnvironment()
- .getTaskStateManager()
- .getSequentialChannelStateReader();
- reader.readOutputData(
- getEnvironment().getAllWriters(),
- !configuration.isGraphContainingLoops());
-
- operatorChain.initializeStateAndOpenOperators(
- createStreamTaskStateInitializer());
-
- channelIOExecutor.execute(
- () -> {
- try {
-
reader.readInputData(getEnvironment().getAllInputGates());
- } catch (Exception e) {
- asyncExceptionHandler.handleAsyncException(
- "Unable to read channel state", e);
- }
- });
+ CompletableFuture<Void> allGatesRecoveredFuture =
+ actionExecutor.call(
+ () -> {
+ SequentialChannelStateReader reader =
+ getEnvironment()
+ .getTaskStateManager()
+ .getSequentialChannelStateReader();
+ reader.readOutputData(
+ getEnvironment().getAllWriters(),
+ !configuration.isGraphContainingLoops());
+
+ operatorChain.initializeStateAndOpenOperators(
+ createStreamTaskStateInitializer());
+
+ return asyncRestoreInputGatesData(reader);
+ });
+
+ // Run mailbox until all gates will be recovered.
+ runMailboxLoop();
+
+ ensureNotCanceled();
+
+ if (!allGatesRecoveredFuture.isDone()) {
+ throw new Exception("Mailbox loop interrupted before recovery was
finished.");
+ }
- for (InputGate inputGate :
getEnvironment().getAllInputGates()) {
- inputGate
- .getStateConsumedFuture()
- .thenRun(
- () ->
- mainMailboxExecutor.execute(
-
inputGate::requestPartitions,
- "Input gate request
partitions"));
+ restored = true;
+ }
+
+ private CompletableFuture<Void> asyncRestoreInputGatesData(
+ SequentialChannelStateReader reader) {
+ IndexedInputGate[] inputGates = getEnvironment().getAllInputGates();
+ channelIOExecutor.execute(
+ () -> {
+ try {
+ reader.readInputData(inputGates);
+ } catch (Exception e) {
+ asyncExceptionHandler.handleAsyncException(
+ "Unable to read channel state", e);
}
});
- isRunning = true;
+ List<CompletableFuture<?>> recoveredFutures = new
ArrayList<>(inputGates.length);
+ for (InputGate inputGate : inputGates) {
+ recoveredFutures.add(inputGate.getStateConsumedFuture());
+
+ inputGate
+ .getStateConsumedFuture()
+ .thenRun(
+ () ->
+ mainMailboxExecutor.execute(
+ inputGate::requestPartitions,
+ "Input gate request partitions"));
+ }
+
+ return CompletableFuture.allOf(recoveredFutures.toArray(new
CompletableFuture[0]))
+ .thenRun(mailboxProcessor::allActionsCompleted);
}
@Override
public final void invoke() throws Exception {
- try {
- beforeInvoke();
+ if (!restored) {
+ throw new Exception("Could not perform the task because the state
wasn't restored.");
Review comment:
The same question about exception type
##########
File path: flink-runtime-web/src/test/resources/rest_api_v1.snapshot
##########
@@ -740,7 +740,7 @@
},
"status" : {
"type" : "string",
- "enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING",
"FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING" ]
+ "enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RECOVERING",
"RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING" ]
Review comment:
It is not fully clear to me how I should handle the new state here. As
you can see I just changed the output in the test but I don't think it is all
that I should be done. Should I declare somewhere that API is changed? Or maybe
I should change some version or something similar?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
##########
@@ -1148,10 +1158,22 @@ private void maybeReleasePartitionsAndSendCancelRpcCall(
}
}
+ boolean switchToRecovering() {
+ if (switchTo(DEPLOYING, RECOVERING)) {
+ sendPartitionInfos();
Review comment:
Please, pay attention that I moved 'sendPartitionInfos' from
switchToRunning to switchToRecovering. There are no visible changes right now
because RECOVERING state does nothing. But in the future, the RECOVERING state
will start the mailbox loop for a short period until recovery is not finished
and only then the RUNNING state will start the mailbox loop again until all
actions processed. So we need to decide what state is right for sending
partition. In my opinion, according to the current semantics, the RECOVERING
state is the right choice.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -528,70 +533,90 @@ protected void beforeInvoke() throws Exception {
init();
// save the work of reloading state, etc, if the task is already
canceled
- if (canceled) {
- throw new CancelTaskException();
- }
+ ensureNotCanceled();
// -------- Invoke --------
LOG.debug("Invoking {}", getName());
// we need to make sure that any triggers scheduled in open() cannot be
// executed before all operators are opened
- actionExecutor.runThrowing(
- () -> {
- SequentialChannelStateReader reader =
- getEnvironment()
- .getTaskStateManager()
- .getSequentialChannelStateReader();
- reader.readOutputData(
- getEnvironment().getAllWriters(),
- !configuration.isGraphContainingLoops());
-
- operatorChain.initializeStateAndOpenOperators(
- createStreamTaskStateInitializer());
-
- channelIOExecutor.execute(
- () -> {
- try {
-
reader.readInputData(getEnvironment().getAllInputGates());
- } catch (Exception e) {
- asyncExceptionHandler.handleAsyncException(
- "Unable to read channel state", e);
- }
- });
+ CompletableFuture<Void> allGatesRecoveredFuture =
+ actionExecutor.call(
+ () -> {
+ SequentialChannelStateReader reader =
+ getEnvironment()
+ .getTaskStateManager()
+ .getSequentialChannelStateReader();
+ reader.readOutputData(
+ getEnvironment().getAllWriters(),
+ !configuration.isGraphContainingLoops());
+
+ operatorChain.initializeStateAndOpenOperators(
+ createStreamTaskStateInitializer());
+
+ return asyncRestoreInputGatesData(reader);
+ });
+
+ // Run mailbox until all gates will be recovered.
+ runMailboxLoop();
+
+ ensureNotCanceled();
+
+ if (!allGatesRecoveredFuture.isDone()) {
+ throw new Exception("Mailbox loop interrupted before recovery was
finished.");
+ }
- for (InputGate inputGate :
getEnvironment().getAllInputGates()) {
- inputGate
- .getStateConsumedFuture()
- .thenRun(
- () ->
- mainMailboxExecutor.execute(
-
inputGate::requestPartitions,
- "Input gate request
partitions"));
+ restored = true;
+ }
+
+ private CompletableFuture<Void> asyncRestoreInputGatesData(
+ SequentialChannelStateReader reader) {
+ IndexedInputGate[] inputGates = getEnvironment().getAllInputGates();
+ channelIOExecutor.execute(
+ () -> {
+ try {
+ reader.readInputData(inputGates);
+ } catch (Exception e) {
+ asyncExceptionHandler.handleAsyncException(
+ "Unable to read channel state", e);
}
});
- isRunning = true;
+ List<CompletableFuture<?>> recoveredFutures = new
ArrayList<>(inputGates.length);
+ for (InputGate inputGate : inputGates) {
+ recoveredFutures.add(inputGate.getStateConsumedFuture());
+
+ inputGate
+ .getStateConsumedFuture()
+ .thenRun(
+ () ->
+ mainMailboxExecutor.execute(
+ inputGate::requestPartitions,
+ "Input gate request partitions"));
+ }
+
+ return CompletableFuture.allOf(recoveredFutures.toArray(new
CompletableFuture[0]))
+ .thenRun(mailboxProcessor::allActionsCompleted);
Review comment:
According to task https://issues.apache.org/jira/browse/FLINK-19385, I
suspect that this compound future can never be finished in some cases. So
perhaps, it should be other way to understand when recovery is finished.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]