[ 
https://issues.apache.org/jira/browse/FLINK-19864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17233174#comment-17233174
 ] 

Kezhu Wang commented on FLINK-19864:
------------------------------------

How about using a combination of {{Thread.isAlive}} and 
{{TaskMailbox.isMailboxThreadBlocked}} to replace thread state judgement ? 
Changes are similar to following code snippets:
{code:java}
public interface TaskMailbox {
        /**
         * Check whether mailbox thread is blocking on waiting for new mails.
         *
         * @return true if mailbox thread is blocking on waiting for new mails.
         */
        @VisibleForTesting
        boolean isMailboxThreadBlocked();
}

public class TaskMailboxImpl implements TaskMailbox {
        @Override
        public boolean isMailboxThreadBlocked() {
                final ReentrantLock lock = this.lock;
                lock.lock();
                try {
                        return lock.hasWaiters(notEmpty);
                } finally {
                        lock.unlock();
                }
        }
}

public class StreamTaskTestHarness<OUT> {
        public void waitForInputProcessing() throws Exception {
                while (true) {
                        checkForErrorInTaskThread()
                        if (allInputConsumed()) {
                                break
                        }
                }

                // Wait for the Task Thread to be blocked in mailbox loop, this 
should be the case if
                // it cannot notifyNonEmpty more input before reaching here, 
i.e. all currently available
                // input has been processed.
                while (taskThread.isAlive()) {
                        if (taskThread.task.isMailboxLoopBlocked()) {
                                break;
                        }

                        try {
                                Thread.sleep(1);
                        } catch (InterruptedException ignored) {}
                }

                Throwable error = taskThread.getError();
                if (error != null) {
                        throw new Exception("Exception in the task thread", 
error);
                }
        }
}
{code}

It should be equivalent to thread state checking version but sticking to 
mailbox loop.

> TwoInputStreamTaskTest.testWatermarkMetrics failed with "expected:<1> but 
> was:<-9223372036854775808>"
> -----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-19864
>                 URL: https://issues.apache.org/jira/browse/FLINK-19864
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream, Tests
>    Affects Versions: 1.12.0
>            Reporter: Dian Fu
>            Priority: Major
>              Labels: test-stability
>             Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8541&view=logs&j=77a9d8e1-d610-59b3-fc2a-4766541e0e33&t=7c61167f-30b3-5893-cc38-a9e3d057e392
> {code}
> 2020-10-28T22:40:44.2528420Z [ERROR] 
> testWatermarkMetrics(org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest)
>  Time elapsed: 1.528 s <<< FAILURE! 2020-10-28T22:40:44.2529225Z 
> java.lang.AssertionError: expected:<1> but was:<-9223372036854775808> 
> 2020-10-28T22:40:44.2541228Z at org.junit.Assert.fail(Assert.java:88) 
> 2020-10-28T22:40:44.2542157Z at 
> org.junit.Assert.failNotEquals(Assert.java:834) 2020-10-28T22:40:44.2542954Z 
> at org.junit.Assert.assertEquals(Assert.java:645) 
> 2020-10-28T22:40:44.2543456Z at 
> org.junit.Assert.assertEquals(Assert.java:631) 2020-10-28T22:40:44.2544002Z 
> at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest.testWatermarkMetrics(TwoInputStreamTaskTest.java:540)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to