This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8b0fd4126269613f5c1d3064ea8e5dd464a3ebce
Author: Piotr Nowojski <[email protected]>
AuthorDate: Thu Sep 17 17:48:10 2020 +0200

    [FLINK-18906][task] Fix processSingleStep semantic
    
    Previously processSingleStep call could be ignored, and wouldn't process 
mail actions
---
 .../flink/streaming/runtime/tasks/StreamTask.java  |  5 +++
 .../runtime/tasks/mailbox/MailboxProcessor.java    | 48 +++++++++++++++-------
 .../runtime/tasks/MultipleInputStreamTaskTest.java |  8 ++--
 .../tasks/StreamTaskMailboxTestHarness.java        | 17 +++++---
 ...treamTaskMultipleInputSelectiveReadingTest.java | 12 +++---
 5 files changed, 61 insertions(+), 29 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index a7fc22d..46e7ef1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -560,6 +560,11 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                return mailboxProcessor.runMailboxStep();
        }
 
+       @VisibleForTesting
+       public boolean isMailboxLoopRunning() {
+               return mailboxProcessor.isMailboxLoopRunning();
+       }
+
        private void runMailboxLoop() throws Exception {
                mailboxProcessor.runMailboxLoop();
        }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
index 2180971..6476cc7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
@@ -178,17 +178,27 @@ public class MailboxProcessor implements Closeable {
 
                final MailboxController defaultActionContext = new 
MailboxController(this);
 
-               while (runMailboxStep(localMailbox, defaultActionContext)) {
+               while (isMailboxLoopRunning()) {
+                       // The blocking `processMail` call will not return 
until default action is available.
+                       processMail(localMailbox, false);
+                       if (isMailboxLoopRunning()) {
+                               
mailboxDefaultAction.runDefaultAction(defaultActionContext); // lock is 
acquired inside default action as needed
+                       }
                }
        }
 
+       /**
+        * Execute a single (as small as possible) step of the mailbox.
+        *
+        * @return true if something was processed.
+        */
+       @VisibleForTesting
        public boolean runMailboxStep() throws Exception {
-               return runMailboxStep(mailbox, new MailboxController(this));
-       }
-
-       private boolean runMailboxStep(TaskMailbox localMailbox, 
MailboxController defaultActionContext) throws Exception {
-               if (processMail(localMailbox)) {
-                       
mailboxDefaultAction.runDefaultAction(defaultActionContext); // lock is 
acquired inside default action as needed
+               if (processMail(mailbox, true)) {
+                       return true;
+               }
+               if (!isDefaultActionUnavailable() && isMailboxLoopRunning()) {
+                       mailboxDefaultAction.runDefaultAction(new 
MailboxController(this));
                        return true;
                }
                return false;
@@ -249,25 +259,34 @@ public class MailboxProcessor implements Closeable {
        }
 
        /**
-        * This helper method handles all special actions from the mailbox. It 
returns true if the mailbox loop should
-        * continue running, false if it should stop. In the current design, 
this method also evaluates all control flag
-        * changes. This keeps the hot path in {@link #runMailboxLoop()} free 
from any other flag checking, at the cost
+        * This helper method handles all special actions from the mailbox.
+        * In the current design, this method also evaluates all control flag 
changes.
+        * This keeps the hot path in {@link #runMailboxLoop()} free from any 
other flag checking, at the cost
         * that all flag changes must make sure that the mailbox signals 
mailbox#hasMail.
+        *
+        * @return true if a mail has been processed.
         */
-       private boolean processMail(TaskMailbox mailbox) throws Exception {
-
+       private boolean processMail(TaskMailbox mailbox, boolean singleStep) 
throws Exception {
                // Doing this check is an optimization to only have a volatile 
read in the expected hot path, locks are only
                // acquired after this point.
                if (!mailbox.createBatch()) {
                        // We can also directly return true because all changes 
to #isMailboxLoopRunning must be connected to
                        // mailbox.hasMail() == true.
-                       return true;
+                       return false;
                }
 
+               boolean processed = false;
                // Take mails in a non-blockingly and execute them.
                Optional<Mail> maybeMail;
                while (isMailboxLoopRunning() && (maybeMail = 
mailbox.tryTakeFromBatch()).isPresent()) {
                        maybeMail.get().run();
+                       processed = true;
+                       if (singleStep) {
+                               break;
+                       }
+               }
+               if (singleStep) {
+                       return processed;
                }
 
                // If the default action is currently not available, we can run 
a blocking mailbox execution until the default
@@ -280,9 +299,10 @@ public class MailboxProcessor implements Closeable {
                                idleTime.markEvent(System.currentTimeMillis() - 
start);
                        }
                        maybeMail.get().run();
+                       processed = true;
                }
 
-               return isMailboxLoopRunning();
+               return processed;
        }
 
        /**
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
index 51bb156..0780b51 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
@@ -296,14 +296,14 @@ public class MultipleInputStreamTaskTest {
                try {
                        testHarness.processElement(new 
StreamRecord<>("Hello-1"), 0);
                        testHarness.endInput(0);
-                       testHarness.process();
+                       testHarness.processWhileAvailable();
 
                        testHarness.processElement(new 
StreamRecord<>("Hello-2"), 1);
                        testHarness.processElement(new 
StreamRecord<>("Hello-3"), 2);
                        testHarness.endInput(1);
-                       testHarness.process();
+                       testHarness.processWhileAvailable();
                        testHarness.endInput(2);
-                       testHarness.process();
+                       testHarness.processWhileAvailable();
                        assertEquals(
                                true,
                                
testHarness.getStreamTask().getInputOutputJointFuture(InputStatus.NOTHING_AVAILABLE).isDone());
@@ -347,7 +347,7 @@ public class MultipleInputStreamTaskTest {
                        testHarness.processElement(new StreamRecord<>("0"), 2);
                        testHarness.processElement(new StreamRecord<>("1"), 2);
 
-                       testHarness.process();
+                       testHarness.processWhileAvailable();
 
                        // We do not know which of the input will be picked 
first, but we are expecting them
                        // to alternate
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java
index c6d3fb8..a8761ff 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java
@@ -107,16 +107,16 @@ public class StreamTaskMailboxTestHarness<OUT> implements 
AutoCloseable {
 
        private void maybeProcess() throws Exception {
                if (autoProcess) {
-                       process();
+                       processWhileAvailable();
                }
        }
 
-       public void process() throws Exception {
-               while (processSingleStep()) {
+       public void processWhileAvailable() throws Exception {
+               while (processIfAvailable()) {
                }
        }
 
-       public boolean processSingleStep() throws Exception {
+       public boolean processIfAvailable() throws Exception {
                if (streamTask.inputProcessor.isAvailable() && 
streamTask.mailboxProcessor.isMailboxLoopRunning()) {
                        streamTask.runMailboxStep();
                        return true;
@@ -124,6 +124,12 @@ public class StreamTaskMailboxTestHarness<OUT> implements 
AutoCloseable {
                return false;
        }
 
+       public void processSingleStep() throws Exception {
+               if (streamTask.mailboxProcessor.isMailboxLoopRunning()) {
+                       streamTask.runMailboxStep();
+               }
+       }
+
        public void endInput() {
                for (int i = 0; i < inputGates.length; i++) {
                        endInput(i);
@@ -139,7 +145,8 @@ public class StreamTaskMailboxTestHarness<OUT> implements 
AutoCloseable {
 
        public void waitForTaskCompletion() throws Exception {
                endInput();
-               while (streamTask.runMailboxStep()) {
+               while (streamTask.isMailboxLoopRunning()) {
+                       streamTask.runMailboxStep();
                }
        }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMultipleInputSelectiveReadingTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMultipleInputSelectiveReadingTest.java
index b51a701..53ea3c2 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMultipleInputSelectiveReadingTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMultipleInputSelectiveReadingTest.java
@@ -157,7 +157,7 @@ public class StreamTaskMultipleInputSelectiveReadingTest {
                        testHarness.endInput();
 
                        if (!autoProcess) {
-                               testHarness.process();
+                               testHarness.processWhileAvailable();
                        }
                        testHarness.waitForTaskCompletion();
 
@@ -187,7 +187,7 @@ public class StreamTaskMultipleInputSelectiveReadingTest {
 
                        testHarness.setAutoProcess(false);
                        // StreamMultipleInputProcessor starts with all inputs 
available. Let it rotate and refresh properly.
-                       testHarness.processSingleStep();
+                       testHarness.processIfAvailable();
                        assertTrue(testHarness.getOutput().isEmpty());
 
                        testHarness.processElement(new 
StreamRecord<>("NOT_SELECTED"), 0);
@@ -197,16 +197,16 @@ public class StreamTaskMultipleInputSelectiveReadingTest {
                        testHarness.processElement(new StreamRecord<>("3"), 1);
                        testHarness.processElement(new StreamRecord<>("4"), 1);
 
-                       testHarness.processSingleStep();
+                       testHarness.processIfAvailable();
                        expectedOutput.add(new StreamRecord<>("[2]: 1"));
-                       testHarness.processSingleStep();
+                       testHarness.processIfAvailable();
                        expectedOutput.add(new StreamRecord<>("[2]: 2"));
                        assertThat(testHarness.getOutput(), 
contains(expectedOutput.toArray()));
 
                        // InputGate 2 was not available in previous steps, so 
let's check if we are not starving it
                        testHarness.processElement(new StreamRecord<>("1"), 2);
-                       testHarness.processSingleStep();
-                       testHarness.processSingleStep();
+                       testHarness.processIfAvailable();
+                       testHarness.processIfAvailable();
 
                        // One of those processing single step should pick up 
InputGate 2, however it's not
                        // important which one. We just must avoid starvation.

Reply via email to