[ 
https://issues.apache.org/jira/browse/FLINK-19864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17233772#comment-17233772
 ] 

Kezhu Wang commented on FLINK-19864:
------------------------------------

[~AHeise] I think {{InputStatus}} is not enough. It only means that all input 
has been *fetched*. We need assertion happens-after all input processed. I 
think we can query {{MailboxProcessor.isDefaultActionUnavailable}} through 
{{MaiilboxExecutor}}. This should provide enough guarantee for assertion since 
{{controller.suspendDefaultAction()}} executes after all input processed in 
mailbox thread. All we need to do is changing 
{{StreamTaskTestHarness.waitForInputProcessing}}, it should similar to 
following code: 

{code:java}
public class StreamTaskTestHarness<OUT> {
        public void waitForInputProcessing() throws Exception {
                while (true) {
                        checkForErrorInTaskThread()
                        if (allInputConsumed()) {
                                break
                        }
                }

                // Wait for all currently available input has been processed.
                final MailboxProcessor mailboxProcessor = 
taskThread.task.mailboxProcessor;
                final MailboxExecutor mailboxExecutor = 
mailboxProcessor.getMainMailboxExecutor();
                while (taskThread.isAlive()) {
                        final AtomicBoolean allInputProcessed = new 
AtomicBoolean();
                        final CountDownLatch latch = new CountDownLatch(1);
                        try {
                                mailboxExecutor.execute(() -> {
                                        
allInputProcessed.set(mailboxProcessor.isDefaultActionUnavailable());
                                        latch.countDown();
                                }, 
"query-whether-processInput-has-suspend-itself");
                                // Mail could be dropped due to task exception.
                                latch.await(1, TimeUnit.SECONDS);
                        } catch (RejectedExecutionException ex) {
                                // Loop until task thread exit for possible 
task exception.
                        }

                        if (allInputProcessed.get()) {
                                break;
                        }

                        try {
                                Thread.sleep(1);
                        } catch (InterruptedException ignored) {}
                }

                Throwable error = taskThread.getError();
                if (error != null) {
                        throw new Exception("Exception in the task thread", 
error);
                }
        }
}
{code}

How about this approach ? I think it adheres what mailbox modeling encourage.

> TwoInputStreamTaskTest.testWatermarkMetrics failed with "expected:<1> but 
> was:<-9223372036854775808>"
> -----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-19864
>                 URL: https://issues.apache.org/jira/browse/FLINK-19864
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream, Tests
>    Affects Versions: 1.12.0
>            Reporter: Dian Fu
>            Priority: Major
>              Labels: test-stability
>             Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8541&view=logs&j=77a9d8e1-d610-59b3-fc2a-4766541e0e33&t=7c61167f-30b3-5893-cc38-a9e3d057e392
> {code}
> 2020-10-28T22:40:44.2528420Z [ERROR] 
> testWatermarkMetrics(org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest)
>  Time elapsed: 1.528 s <<< FAILURE! 2020-10-28T22:40:44.2529225Z 
> java.lang.AssertionError: expected:<1> but was:<-9223372036854775808> 
> 2020-10-28T22:40:44.2541228Z at org.junit.Assert.fail(Assert.java:88) 
> 2020-10-28T22:40:44.2542157Z at 
> org.junit.Assert.failNotEquals(Assert.java:834) 2020-10-28T22:40:44.2542954Z 
> at org.junit.Assert.assertEquals(Assert.java:645) 
> 2020-10-28T22:40:44.2543456Z at 
> org.junit.Assert.assertEquals(Assert.java:631) 2020-10-28T22:40:44.2544002Z 
> at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest.testWatermarkMetrics(TwoInputStreamTaskTest.java:540)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to