[ https://issues.apache.org/jira/browse/FLINK-19864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17233174#comment-17233174 ]
Kezhu Wang commented on FLINK-19864: ------------------------------------ How about using a combination of {{Thread.isAlive}} and {{TaskMailbox.isMailboxThreadBlocked}} to replace thread state judgement ? Changes are similar to following code snippets: {code:java} public interface TaskMailbox { /** * Check whether mailbox thread is blocking on waiting for new mails. * * @return true if mailbox thread is blocking on waiting for new mails. */ @VisibleForTesting boolean isMailboxThreadBlocked(); } public class TaskMailboxImpl implements TaskMailbox { @Override public boolean isMailboxThreadBlocked() { final ReentrantLock lock = this.lock; lock.lock(); try { return lock.hasWaiters(notEmpty); } finally { lock.unlock(); } } } public class StreamTaskTestHarness<OUT> { public void waitForInputProcessing() throws Exception { while (true) { checkForErrorInTaskThread() if (allInputConsumed()) { break } } // Wait for the Task Thread to be blocked in mailbox loop, this should be the case if // it cannot notifyNonEmpty more input before reaching here, i.e. all currently available // input has been processed. while (taskThread.isAlive()) { if (taskThread.task.isMailboxLoopBlocked()) { break; } try { Thread.sleep(1); } catch (InterruptedException ignored) {} } Throwable error = taskThread.getError(); if (error != null) { throw new Exception("Exception in the task thread", error); } } } {code} It should be equivalent to thread state checking version but sticking to mailbox loop. > TwoInputStreamTaskTest.testWatermarkMetrics failed with "expected:<1> but > was:<-9223372036854775808>" > ----------------------------------------------------------------------------------------------------- > > Key: FLINK-19864 > URL: https://issues.apache.org/jira/browse/FLINK-19864 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Tests > Affects Versions: 1.12.0 > Reporter: Dian Fu > Priority: Major > Labels: test-stability > Fix For: 1.12.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8541&view=logs&j=77a9d8e1-d610-59b3-fc2a-4766541e0e33&t=7c61167f-30b3-5893-cc38-a9e3d057e392 > {code} > 2020-10-28T22:40:44.2528420Z [ERROR] > testWatermarkMetrics(org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest) > Time elapsed: 1.528 s <<< FAILURE! 2020-10-28T22:40:44.2529225Z > java.lang.AssertionError: expected:<1> but was:<-9223372036854775808> > 2020-10-28T22:40:44.2541228Z at org.junit.Assert.fail(Assert.java:88) > 2020-10-28T22:40:44.2542157Z at > org.junit.Assert.failNotEquals(Assert.java:834) 2020-10-28T22:40:44.2542954Z > at org.junit.Assert.assertEquals(Assert.java:645) > 2020-10-28T22:40:44.2543456Z at > org.junit.Assert.assertEquals(Assert.java:631) 2020-10-28T22:40:44.2544002Z > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest.testWatermarkMetrics(TwoInputStreamTaskTest.java:540) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)