This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8b0fd4126269613f5c1d3064ea8e5dd464a3ebce Author: Piotr Nowojski <[email protected]> AuthorDate: Thu Sep 17 17:48:10 2020 +0200 [FLINK-18906][task] Fix processSingleStep semantic Previously processSingleStep call could be ignored, and wouldn't process mail actions --- .../flink/streaming/runtime/tasks/StreamTask.java | 5 +++ .../runtime/tasks/mailbox/MailboxProcessor.java | 48 +++++++++++++++------- .../runtime/tasks/MultipleInputStreamTaskTest.java | 8 ++-- .../tasks/StreamTaskMailboxTestHarness.java | 17 +++++--- ...treamTaskMultipleInputSelectiveReadingTest.java | 12 +++--- 5 files changed, 61 insertions(+), 29 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index a7fc22d..46e7ef1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -560,6 +560,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> return mailboxProcessor.runMailboxStep(); } + @VisibleForTesting + public boolean isMailboxLoopRunning() { + return mailboxProcessor.isMailboxLoopRunning(); + } + private void runMailboxLoop() throws Exception { mailboxProcessor.runMailboxLoop(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java index 2180971..6476cc7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java @@ -178,17 +178,27 @@ public class MailboxProcessor implements Closeable { final MailboxController defaultActionContext = new MailboxController(this); - while (runMailboxStep(localMailbox, defaultActionContext)) { + while (isMailboxLoopRunning()) { + // The blocking `processMail` call will not return until default action is available. + processMail(localMailbox, false); + if (isMailboxLoopRunning()) { + mailboxDefaultAction.runDefaultAction(defaultActionContext); // lock is acquired inside default action as needed + } } } + /** + * Execute a single (as small as possible) step of the mailbox. + * + * @return true if something was processed. + */ + @VisibleForTesting public boolean runMailboxStep() throws Exception { - return runMailboxStep(mailbox, new MailboxController(this)); - } - - private boolean runMailboxStep(TaskMailbox localMailbox, MailboxController defaultActionContext) throws Exception { - if (processMail(localMailbox)) { - mailboxDefaultAction.runDefaultAction(defaultActionContext); // lock is acquired inside default action as needed + if (processMail(mailbox, true)) { + return true; + } + if (!isDefaultActionUnavailable() && isMailboxLoopRunning()) { + mailboxDefaultAction.runDefaultAction(new MailboxController(this)); return true; } return false; @@ -249,25 +259,34 @@ public class MailboxProcessor implements Closeable { } /** - * This helper method handles all special actions from the mailbox. It returns true if the mailbox loop should - * continue running, false if it should stop. In the current design, this method also evaluates all control flag - * changes. This keeps the hot path in {@link #runMailboxLoop()} free from any other flag checking, at the cost + * This helper method handles all special actions from the mailbox. + * In the current design, this method also evaluates all control flag changes. + * This keeps the hot path in {@link #runMailboxLoop()} free from any other flag checking, at the cost * that all flag changes must make sure that the mailbox signals mailbox#hasMail. + * + * @return true if a mail has been processed. */ - private boolean processMail(TaskMailbox mailbox) throws Exception { - + private boolean processMail(TaskMailbox mailbox, boolean singleStep) throws Exception { // Doing this check is an optimization to only have a volatile read in the expected hot path, locks are only // acquired after this point. if (!mailbox.createBatch()) { // We can also directly return true because all changes to #isMailboxLoopRunning must be connected to // mailbox.hasMail() == true. - return true; + return false; } + boolean processed = false; // Take mails in a non-blockingly and execute them. Optional<Mail> maybeMail; while (isMailboxLoopRunning() && (maybeMail = mailbox.tryTakeFromBatch()).isPresent()) { maybeMail.get().run(); + processed = true; + if (singleStep) { + break; + } + } + if (singleStep) { + return processed; } // If the default action is currently not available, we can run a blocking mailbox execution until the default @@ -280,9 +299,10 @@ public class MailboxProcessor implements Closeable { idleTime.markEvent(System.currentTimeMillis() - start); } maybeMail.get().run(); + processed = true; } - return isMailboxLoopRunning(); + return processed; } /** diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java index 51bb156..0780b51 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java @@ -296,14 +296,14 @@ public class MultipleInputStreamTaskTest { try { testHarness.processElement(new StreamRecord<>("Hello-1"), 0); testHarness.endInput(0); - testHarness.process(); + testHarness.processWhileAvailable(); testHarness.processElement(new StreamRecord<>("Hello-2"), 1); testHarness.processElement(new StreamRecord<>("Hello-3"), 2); testHarness.endInput(1); - testHarness.process(); + testHarness.processWhileAvailable(); testHarness.endInput(2); - testHarness.process(); + testHarness.processWhileAvailable(); assertEquals( true, testHarness.getStreamTask().getInputOutputJointFuture(InputStatus.NOTHING_AVAILABLE).isDone()); @@ -347,7 +347,7 @@ public class MultipleInputStreamTaskTest { testHarness.processElement(new StreamRecord<>("0"), 2); testHarness.processElement(new StreamRecord<>("1"), 2); - testHarness.process(); + testHarness.processWhileAvailable(); // We do not know which of the input will be picked first, but we are expecting them // to alternate diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java index c6d3fb8..a8761ff 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java @@ -107,16 +107,16 @@ public class StreamTaskMailboxTestHarness<OUT> implements AutoCloseable { private void maybeProcess() throws Exception { if (autoProcess) { - process(); + processWhileAvailable(); } } - public void process() throws Exception { - while (processSingleStep()) { + public void processWhileAvailable() throws Exception { + while (processIfAvailable()) { } } - public boolean processSingleStep() throws Exception { + public boolean processIfAvailable() throws Exception { if (streamTask.inputProcessor.isAvailable() && streamTask.mailboxProcessor.isMailboxLoopRunning()) { streamTask.runMailboxStep(); return true; @@ -124,6 +124,12 @@ public class StreamTaskMailboxTestHarness<OUT> implements AutoCloseable { return false; } + public void processSingleStep() throws Exception { + if (streamTask.mailboxProcessor.isMailboxLoopRunning()) { + streamTask.runMailboxStep(); + } + } + public void endInput() { for (int i = 0; i < inputGates.length; i++) { endInput(i); @@ -139,7 +145,8 @@ public class StreamTaskMailboxTestHarness<OUT> implements AutoCloseable { public void waitForTaskCompletion() throws Exception { endInput(); - while (streamTask.runMailboxStep()) { + while (streamTask.isMailboxLoopRunning()) { + streamTask.runMailboxStep(); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMultipleInputSelectiveReadingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMultipleInputSelectiveReadingTest.java index b51a701..53ea3c2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMultipleInputSelectiveReadingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMultipleInputSelectiveReadingTest.java @@ -157,7 +157,7 @@ public class StreamTaskMultipleInputSelectiveReadingTest { testHarness.endInput(); if (!autoProcess) { - testHarness.process(); + testHarness.processWhileAvailable(); } testHarness.waitForTaskCompletion(); @@ -187,7 +187,7 @@ public class StreamTaskMultipleInputSelectiveReadingTest { testHarness.setAutoProcess(false); // StreamMultipleInputProcessor starts with all inputs available. Let it rotate and refresh properly. - testHarness.processSingleStep(); + testHarness.processIfAvailable(); assertTrue(testHarness.getOutput().isEmpty()); testHarness.processElement(new StreamRecord<>("NOT_SELECTED"), 0); @@ -197,16 +197,16 @@ public class StreamTaskMultipleInputSelectiveReadingTest { testHarness.processElement(new StreamRecord<>("3"), 1); testHarness.processElement(new StreamRecord<>("4"), 1); - testHarness.processSingleStep(); + testHarness.processIfAvailable(); expectedOutput.add(new StreamRecord<>("[2]: 1")); - testHarness.processSingleStep(); + testHarness.processIfAvailable(); expectedOutput.add(new StreamRecord<>("[2]: 2")); assertThat(testHarness.getOutput(), contains(expectedOutput.toArray())); // InputGate 2 was not available in previous steps, so let's check if we are not starving it testHarness.processElement(new StreamRecord<>("1"), 2); - testHarness.processSingleStep(); - testHarness.processSingleStep(); + testHarness.processIfAvailable(); + testHarness.processIfAvailable(); // One of those processing single step should pick up InputGate 2, however it's not // important which one. We just must avoid starvation.
