This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 57845ce [FLINK-20079][task] Initialize operator chain before upstream
partition request
57845ce is described below
commit 57845ce2bf26562667e8e26d800e1b8340e9b4b3
Author: Roman Khachatryan <[email protected]>
AuthorDate: Tue Nov 10 23:32:45 2020 +0100
[FLINK-20079][task] Initialize operator chain before upstream partition
request
---
.../flink/streaming/runtime/tasks/StreamTask.java | 56 +++++++++++-----------
1 file changed, 27 insertions(+), 29 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index e00ec13..a7fc22d 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -476,41 +476,39 @@ public abstract class StreamTask<OUT, OP extends
StreamOperator<OUT>>
// both the following operations are protected by the
lock
// so that we avoid race conditions in the case that
initializeState()
// registers a timer, that fires before the open() is
called.
- readRecoveredChannelState(); // WARN: should be done
before operatorChain.initializeStateAndOpenOperators (see FLINK-19907)
+ // WARN: should be done before
operatorChain.initializeStateAndOpenOperators (see FLINK-19907)
+ ChannelStateReader reader =
getEnvironment().getTaskStateManager().getChannelStateReader();
+ if (reader.hasChannelStates()) {
+ ResultPartitionWriter[] writers =
getEnvironment().getAllWriters();
+ if (writers != null) {
+ for (ResultPartitionWriter writer :
writers) {
+
writer.readRecoveredState(reader);
+ }
+ }
-
operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());
- });
+
operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());
- isRunning = true;
- }
-
- private void readRecoveredChannelState() throws IOException,
InterruptedException {
- ChannelStateReader reader =
getEnvironment().getTaskStateManager().getChannelStateReader();
- if (!reader.hasChannelStates()) {
- requestPartitions();
- return;
- }
+ // It would get possible benefits to recovery
input side after output side, which guarantees the
+ // output can request more floating buffers
from global firstly.
+ InputGate[] inputGates =
getEnvironment().getAllInputGates();
+ if (inputGates != null && inputGates.length >
0) {
+ CompletableFuture[] futures = new
CompletableFuture[inputGates.length];
+ for (int i = 0; i < inputGates.length;
i++) {
+ futures[i] =
inputGates[i].readRecoveredState(channelIOExecutor, reader);
+ }
- ResultPartitionWriter[] writers =
getEnvironment().getAllWriters();
- if (writers != null) {
- for (ResultPartitionWriter writer : writers) {
- writer.readRecoveredState(reader);
- }
- }
+ // Note that we must request partition
after all the single gates finished recovery.
+
CompletableFuture.allOf(futures).thenRun(() -> mainMailboxExecutor.execute(
+ this::requestPartitions, "Input
gates request partitions"));
+ }
- // It would get possible benefits to recovery input side after
output side, which guarantees the
- // output can request more floating buffers from global firstly.
- InputGate[] inputGates = getEnvironment().getAllInputGates();
- if (inputGates != null && inputGates.length > 0) {
- CompletableFuture[] futures = new
CompletableFuture[inputGates.length];
- for (int i = 0; i < inputGates.length; i++) {
- futures[i] =
inputGates[i].readRecoveredState(channelIOExecutor, reader);
+ } else {
+
operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());
+ requestPartitions();
}
+ });
- // Note that we must request partition after all the
single gates finished recovery.
- CompletableFuture.allOf(futures).thenRun(() ->
mainMailboxExecutor.execute(
- this::requestPartitions, "Input gates request
partitions"));
- }
+ isRunning = true;
}
private void requestPartitions() throws IOException {