akalash commented on a change in pull request #16653:
URL: https://github.com/apache/flink/pull/16653#discussion_r682620389
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -1635,6 +1636,56 @@ public void testRestorePerformedFromInvoke() throws
Exception {
assertThat(task.streamTask.restoreInvocationCount, is(1));
}
+ @Test
+ public void
testQuiesceOfMailboxRightBeforeSubmittingActionViaTimerService() throws
Exception {
+ // given: the stream task with configured handle async exception.
+ AtomicBoolean submitThroughputFail = new AtomicBoolean();
+ MockEnvironment mockEnvironment = new MockEnvironmentBuilder().build();
+
+ final UnAvailableTestInputProcessor inputProcessor = new
UnAvailableTestInputProcessor();
+ RunningTask<StreamTask<?, ?>> task =
+ runTask(
+ () ->
+ new MockStreamTaskBuilder(mockEnvironment)
+ .setHandleAsyncException(
+ (str, t) ->
submitThroughputFail.set(true))
+
.setStreamInputProcessor(inputProcessor)
+ .build());
+
+ waitTaskIsRunning(task.streamTask, task.invocationFuture);
+
+ TimerService timerService = task.streamTask.systemTimerService;
+ MailboxExecutor mainMailboxExecutor =
+ task.streamTask.mailboxProcessor.getMainMailboxExecutor();
+
+ CountDownLatch stoppintMailboxLatch = new CountDownLatch(1);
+ timerService.registerTimer(
+ timerService.getCurrentProcessingTime(),
+ (time) -> {
+ stoppintMailboxLatch.await();
+ // The time to the start 'afterInvoke' inside of mailbox.
+ // 'afterInvoke' won't finish until this execution won't
finish so it is
+ // impossible to wait on latch or something else.
+ Thread.sleep(5);
+ mainMailboxExecutor.submit(() -> {}, "test");
+ });
+
+ // when: Calling the quiesce for mailbox and finishing the timer
service.
+ mainMailboxExecutor
+ .submit(
+ () -> {
+ stoppintMailboxLatch.countDown();
+ task.streamTask.afterInvoke();
+ },
+ "test")
+ .get();
+
+ // then: the exception handle wasn't invoked because the such
situation is expected.
+ assertFalse(submitThroughputFail.get());
+
+
inputProcessor.availabilityProvider.getUnavailableToResetAvailable().complete(null);
Review comment:
Now it has a comment. In general, it is the correct way to finish the
stream task
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]