rkhachatryan commented on a change in pull request #10345:
[FLINK-12484][runtime] synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r353589776
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -1433,35 +1423,16 @@ protected void init() {}
@Override
protected void processInput(MailboxDefaultAction.Controller
controller) throws Exception {
- holder = new LockHolder(getCheckpointLock(), latch);
- holder.start();
- latch.await();
-
- // we are at the point where cancelling can happen
- syncLatch.trigger();
-
- // just put this to sleep until it is interrupted
- try {
- Thread.sleep(100000000);
- } catch (InterruptedException ignored) {
- // restore interruption state
- Thread.currentThread().interrupt();
+ syncLatch.trigger(); // signal that the task can be
cancelled now
+ while (task.getExecutionState() ==
ExecutionState.RUNNING) { // wait for the containing task to be terminated from
the outside
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ LOG.debug("interrupted while waiting
for state transition", e);
+ Thread.currentThread().interrupt();
+ }
Review comment:
`TaskCanceller` thread started from `Task` was canceling `StreamTask`, which
in the end was sending a mail to stop `MailboxProcessor` loop.
But after synchronizing each mail execution, `MailboxProcessor` was not able
to execute that mail - the lock was held by a thread simulating an
"ill-behaving" operator from the test.
That thread wasn't interrupted, because interrupt is sent after the loop
breaks.
So I removed lock acquisition for this and 3 other "internal" mails (as it
reflects the original logic and doesn't have performance issues, like adding
`volatile` to `mailboxLoopRunning`).
To do this I sent those mails directly from `MailboxProcessor` instead of
using MailboxExecutor#executeFirst (which I removed, as it was only used for
this).
Probably, we should review the order of tear-down actions in StreamTask to
make it more resilient to such scenarios.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services