This is an automated email from the ASF dual-hosted git repository.

yuanmei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new fb2d690  [FLINK-23558][streaming] Ignoring RejectedExecutionException 
during s… (#16653)
fb2d690 is described below

commit fb2d690f2c91c459e3a8ae1fa4b9c84ea4198956
Author: Anton Kalashnikov <[email protected]>
AuthorDate: Thu Aug 5 05:10:41 2021 +0200

    [FLINK-23558][streaming] Ignoring RejectedExecutionException during s… 
(#16653)
    
    * [FLINK-23558][streaming] Ignoring RejectedExecutionException during 
submitting the throughput calculation
    
    * [hotfix] Clarification of execution thread of actionExecutor in javadoc
---
 .../flink/streaming/runtime/tasks/StreamTask.java  | 17 +++-----
 .../streaming/runtime/tasks/StreamTaskTest.java    | 51 ++++++++++++++++++++++
 2 files changed, 56 insertions(+), 12 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 858b868..e68df16 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -180,7 +180,8 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>> extends Ab
     /**
      * All actions outside of the task {@link #mailboxProcessor mailbox} (i.e. 
performed by another
      * thread) must be executed through this executor to ensure that we don't 
have concurrent method
-     * calls that void consistent checkpoints.
+     * calls that void consistent checkpoints. The execution will always be 
performed in the task
+     * thread.
      *
      * <p>CheckpointLock is superseded by {@link MailboxExecutor}, with {@link
      * StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor
@@ -685,7 +686,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>> extends Ab
         afterInvoke();
     }
 
-    private void throughputCalculationSetup() {
+    void throughputCalculationSetup() {
         systemTimerService.registerTimer(
                 systemTimerService.getCurrentProcessingTime()
                         + 
getTaskConfiguration().get(AUTOMATIC_BUFFER_ADJUSTMENT_PERIOD),
@@ -741,9 +742,6 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>> extends Ab
         LOG.debug("Finished task {}", getName());
         getCompletionFuture().exceptionally(unused -> null).join();
 
-        final CompletableFuture<Void> timersFinishedFuture = new 
CompletableFuture<>();
-        final CompletableFuture<Void> systemTimersFinishedFuture = new 
CompletableFuture<>();
-
         // close all operators in a chain effect way
         operatorChain.finishOperators(actionExecutor);
         finishedOperators = true;
@@ -775,10 +773,9 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>> extends Ab
         // at the same time, this makes sure that during any "regular" exit 
where still
         actionExecutor.runThrowing(
                 () -> {
-
                     // make sure no new timers can come
-                    FutureUtils.forward(timerService.quiesce(), 
timersFinishedFuture);
-                    FutureUtils.forward(systemTimerService.quiesce(), 
systemTimersFinishedFuture);
+                    timerService.quiesce().get();
+                    systemTimerService.quiesce().get();
 
                     // let mailbox execution reject all new letters from this 
point
                     mailboxProcessor.prepareClose();
@@ -796,10 +793,6 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>> extends Ab
                     isRunning = false;
                 });
 
-        // make sure all timers finish
-        timersFinishedFuture.get();
-        systemTimersFinishedFuture.get();
-
         LOG.debug("Closed operators for task {}", getName());
 
         // make sure all buffered data is flushed
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 15c77f8..b9741e1 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -1637,6 +1637,57 @@ public class StreamTaskTest extends TestLogger {
     }
 
     @Test
+    public void 
testQuiesceOfMailboxRightBeforeSubmittingActionViaTimerService() throws 
Exception {
+        // given: the stream task with configured handle async exception.
+        AtomicBoolean submitThroughputFail = new AtomicBoolean();
+        MockEnvironment mockEnvironment = new MockEnvironmentBuilder().build();
+
+        final UnAvailableTestInputProcessor inputProcessor = new 
UnAvailableTestInputProcessor();
+        RunningTask<StreamTask<?, ?>> task =
+                runTask(
+                        () ->
+                                new MockStreamTaskBuilder(mockEnvironment)
+                                        .setHandleAsyncException(
+                                                (str, t) -> 
submitThroughputFail.set(true))
+                                        
.setStreamInputProcessor(inputProcessor)
+                                        .build());
+
+        waitTaskIsRunning(task.streamTask, task.invocationFuture);
+
+        TimerService timerService = task.streamTask.systemTimerService;
+        MailboxExecutor mainMailboxExecutor =
+                task.streamTask.mailboxProcessor.getMainMailboxExecutor();
+
+        CountDownLatch stoppingMailboxLatch = new CountDownLatch(1);
+        timerService.registerTimer(
+                timerService.getCurrentProcessingTime(),
+                (time) -> {
+                    stoppingMailboxLatch.await();
+                    // The time to the start 'afterInvoke' inside of mailbox.
+                    // 'afterInvoke' won't finish until this execution won't 
finish so it is
+                    // impossible to wait on latch or something else.
+                    Thread.sleep(5);
+                    mainMailboxExecutor.submit(() -> {}, "test");
+                });
+
+        // when: Calling the quiesce for mailbox and finishing the timer 
service.
+        mainMailboxExecutor
+                .submit(
+                        () -> {
+                            stoppingMailboxLatch.countDown();
+                            task.streamTask.afterInvoke();
+                        },
+                        "test")
+                .get();
+
+        // then: the exception handle wasn't invoked because the such 
situation is expected.
+        assertFalse(submitThroughputFail.get());
+
+        // Correctly shutdown the stream task to avoid hanging.
+        
inputProcessor.availabilityProvider.getUnavailableToResetAvailable().complete(null);
+    }
+
+    @Test
     public void testTaskAvoidHangingAfterSnapshotStateThrownException() throws 
Exception {
         // given: Configured SourceStreamTask with source which fails on 
checkpoint.
         Configuration taskManagerConfig = new Configuration();

Reply via email to