This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 57845ce  [FLINK-20079][task] Initialize operator chain before upstream 
partition request
57845ce is described below

commit 57845ce2bf26562667e8e26d800e1b8340e9b4b3
Author: Roman Khachatryan <[email protected]>
AuthorDate: Tue Nov 10 23:32:45 2020 +0100

    [FLINK-20079][task] Initialize operator chain before upstream partition 
request
---
 .../flink/streaming/runtime/tasks/StreamTask.java  | 56 +++++++++++-----------
 1 file changed, 27 insertions(+), 29 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index e00ec13..a7fc22d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -476,41 +476,39 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                        // both the following operations are protected by the 
lock
                        // so that we avoid race conditions in the case that 
initializeState()
                        // registers a timer, that fires before the open() is 
called.
-                       readRecoveredChannelState(); // WARN: should be done 
before operatorChain.initializeStateAndOpenOperators (see FLINK-19907)
+                       // WARN: should be done before 
operatorChain.initializeStateAndOpenOperators (see FLINK-19907)
+                       ChannelStateReader reader = 
getEnvironment().getTaskStateManager().getChannelStateReader();
+                       if (reader.hasChannelStates()) {
+                               ResultPartitionWriter[] writers = 
getEnvironment().getAllWriters();
+                               if (writers != null) {
+                                       for (ResultPartitionWriter writer : 
writers) {
+                                               
writer.readRecoveredState(reader);
+                                       }
+                               }
 
-                       
operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());
-               });
+                               
operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());
 
-               isRunning = true;
-       }
-
-       private void readRecoveredChannelState() throws IOException, 
InterruptedException {
-               ChannelStateReader reader = 
getEnvironment().getTaskStateManager().getChannelStateReader();
-               if (!reader.hasChannelStates()) {
-                       requestPartitions();
-                       return;
-               }
+                               // It would get possible benefits to recovery 
input side after output side, which guarantees the
+                               // output can request more floating buffers 
from global firstly.
+                               InputGate[] inputGates = 
getEnvironment().getAllInputGates();
+                               if (inputGates != null && inputGates.length > 
0) {
+                                       CompletableFuture[] futures = new 
CompletableFuture[inputGates.length];
+                                       for (int i = 0; i < inputGates.length; 
i++) {
+                                               futures[i] = 
inputGates[i].readRecoveredState(channelIOExecutor, reader);
+                                       }
 
-               ResultPartitionWriter[] writers = 
getEnvironment().getAllWriters();
-               if (writers != null) {
-                       for (ResultPartitionWriter writer : writers) {
-                               writer.readRecoveredState(reader);
-                       }
-               }
+                                       // Note that we must request partition 
after all the single gates finished recovery.
+                                       
CompletableFuture.allOf(futures).thenRun(() -> mainMailboxExecutor.execute(
+                                               this::requestPartitions, "Input 
gates request partitions"));
+                               }
 
-               // It would get possible benefits to recovery input side after 
output side, which guarantees the
-               // output can request more floating buffers from global firstly.
-               InputGate[] inputGates = getEnvironment().getAllInputGates();
-               if (inputGates != null && inputGates.length > 0) {
-                       CompletableFuture[] futures = new 
CompletableFuture[inputGates.length];
-                       for (int i = 0; i < inputGates.length; i++) {
-                               futures[i] = 
inputGates[i].readRecoveredState(channelIOExecutor, reader);
+                       } else {
+                               
operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());
+                               requestPartitions();
                        }
+               });
 
-                       // Note that we must request partition after all the 
single gates finished recovery.
-                       CompletableFuture.allOf(futures).thenRun(() -> 
mainMailboxExecutor.execute(
-                               this::requestPartitions, "Input gates request 
partitions"));
-               }
+               isRunning = true;
        }
 
        private void requestPartitions() throws IOException {

Reply via email to