akalash commented on a change in pull request #16653:
URL: https://github.com/apache/flink/pull/16653#discussion_r682620389



##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -1635,6 +1636,56 @@ public void testRestorePerformedFromInvoke() throws 
Exception {
         assertThat(task.streamTask.restoreInvocationCount, is(1));
     }
 
+    @Test
+    public void 
testQuiesceOfMailboxRightBeforeSubmittingActionViaTimerService() throws 
Exception {
+        // given: the stream task with configured handle async exception.
+        AtomicBoolean submitThroughputFail = new AtomicBoolean();
+        MockEnvironment mockEnvironment = new MockEnvironmentBuilder().build();
+
+        final UnAvailableTestInputProcessor inputProcessor = new 
UnAvailableTestInputProcessor();
+        RunningTask<StreamTask<?, ?>> task =
+                runTask(
+                        () ->
+                                new MockStreamTaskBuilder(mockEnvironment)
+                                        .setHandleAsyncException(
+                                                (str, t) -> 
submitThroughputFail.set(true))
+                                        
.setStreamInputProcessor(inputProcessor)
+                                        .build());
+
+        waitTaskIsRunning(task.streamTask, task.invocationFuture);
+
+        TimerService timerService = task.streamTask.systemTimerService;
+        MailboxExecutor mainMailboxExecutor =
+                task.streamTask.mailboxProcessor.getMainMailboxExecutor();
+
+        CountDownLatch stoppintMailboxLatch = new CountDownLatch(1);
+        timerService.registerTimer(
+                timerService.getCurrentProcessingTime(),
+                (time) -> {
+                    stoppintMailboxLatch.await();
+                    // The time to the start 'afterInvoke' inside of mailbox.
+                    // 'afterInvoke' won't finish until this execution won't 
finish so it is
+                    // impossible to wait on latch or something else.
+                    Thread.sleep(5);
+                    mainMailboxExecutor.submit(() -> {}, "test");
+                });
+
+        // when: Calling the quiesce for mailbox and finishing the timer 
service.
+        mainMailboxExecutor
+                .submit(
+                        () -> {
+                            stoppintMailboxLatch.countDown();
+                            task.streamTask.afterInvoke();
+                        },
+                        "test")
+                .get();
+
+        // then: the exception handle wasn't invoked because the such 
situation is expected.
+        assertFalse(submitThroughputFail.get());
+
+        
inputProcessor.availabilityProvider.getUnavailableToResetAvailable().complete(null);

Review comment:
       Now it has a comment. In general, it is the correct way to finish the 
stream task




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to