[ https://issues.apache.org/jira/browse/FLINK-19864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17233772#comment-17233772 ]
Kezhu Wang commented on FLINK-19864: ------------------------------------ [~AHeise] I think {{InputStatus}} is not enough. It only means that all input has been *fetched*. We need assertion happens-after all input processed. I think we can query {{MailboxProcessor.isDefaultActionUnavailable}} through {{MaiilboxExecutor}}. This should provide enough guarantee for assertion since {{controller.suspendDefaultAction()}} executes after all input processed in mailbox thread. All we need to do is changing {{StreamTaskTestHarness.waitForInputProcessing}}, it should similar to following code: {code:java} public class StreamTaskTestHarness<OUT> { public void waitForInputProcessing() throws Exception { while (true) { checkForErrorInTaskThread() if (allInputConsumed()) { break } } // Wait for all currently available input has been processed. final MailboxProcessor mailboxProcessor = taskThread.task.mailboxProcessor; final MailboxExecutor mailboxExecutor = mailboxProcessor.getMainMailboxExecutor(); while (taskThread.isAlive()) { final AtomicBoolean allInputProcessed = new AtomicBoolean(); final CountDownLatch latch = new CountDownLatch(1); try { mailboxExecutor.execute(() -> { allInputProcessed.set(mailboxProcessor.isDefaultActionUnavailable()); latch.countDown(); }, "query-whether-processInput-has-suspend-itself"); // Mail could be dropped due to task exception. latch.await(1, TimeUnit.SECONDS); } catch (RejectedExecutionException ex) { // Loop until task thread exit for possible task exception. } if (allInputProcessed.get()) { break; } try { Thread.sleep(1); } catch (InterruptedException ignored) {} } Throwable error = taskThread.getError(); if (error != null) { throw new Exception("Exception in the task thread", error); } } } {code} How about this approach ? I think it adheres what mailbox modeling encourage. > TwoInputStreamTaskTest.testWatermarkMetrics failed with "expected:<1> but > was:<-9223372036854775808>" > ----------------------------------------------------------------------------------------------------- > > Key: FLINK-19864 > URL: https://issues.apache.org/jira/browse/FLINK-19864 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Tests > Affects Versions: 1.12.0 > Reporter: Dian Fu > Priority: Major > Labels: test-stability > Fix For: 1.12.0 > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8541&view=logs&j=77a9d8e1-d610-59b3-fc2a-4766541e0e33&t=7c61167f-30b3-5893-cc38-a9e3d057e392 > {code} > 2020-10-28T22:40:44.2528420Z [ERROR] > testWatermarkMetrics(org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest) > Time elapsed: 1.528 s <<< FAILURE! 2020-10-28T22:40:44.2529225Z > java.lang.AssertionError: expected:<1> but was:<-9223372036854775808> > 2020-10-28T22:40:44.2541228Z at org.junit.Assert.fail(Assert.java:88) > 2020-10-28T22:40:44.2542157Z at > org.junit.Assert.failNotEquals(Assert.java:834) 2020-10-28T22:40:44.2542954Z > at org.junit.Assert.assertEquals(Assert.java:645) > 2020-10-28T22:40:44.2543456Z at > org.junit.Assert.assertEquals(Assert.java:631) 2020-10-28T22:40:44.2544002Z > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest.testWatermarkMetrics(TwoInputStreamTaskTest.java:540) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)