[
https://issues.apache.org/jira/browse/FLINK-19864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17233772#comment-17233772
]
Kezhu Wang edited comment on FLINK-19864 at 11/19/20, 1:28 PM:
---------------------------------------------------------------
[~AHeise] -I think {{InputStatus}} is not enough. It only means that all input
has been *fetched*. We need assertion happens-after all input processed.- I
think we can query {{MailboxProcessor.isDefaultActionUnavailable}} through
{{MaiilboxExecutor}}. This should provide enough guarantee for assertion since
{{controller.suspendDefaultAction()}} executes after all input processed in
mailbox thread. All we need to do is changing
{{StreamTaskTestHarness.waitForInputProcessing}}, it should similar to
following code:
{code:java}
public class StreamTaskTestHarness<OUT> {
public void waitForInputProcessing() throws Exception {
while (true) {
checkForErrorInTaskThread()
if (allInputConsumed()) {
break
}
}
// Wait for all currently available input has been processed.
final MailboxProcessor mailboxProcessor =
taskThread.task.mailboxProcessor;
final MailboxExecutor mailboxExecutor =
mailboxProcessor.getMainMailboxExecutor();
while (taskThread.isAlive()) {
final AtomicBoolean allInputProcessed = new
AtomicBoolean();
final CountDownLatch latch = new CountDownLatch(1);
try {
mailboxExecutor.execute(() -> {
allInputProcessed.set(mailboxProcessor.isDefaultActionUnavailable());
latch.countDown();
},
"query-whether-processInput-has-suspend-itself");
// Mail could be dropped due to task exception.
latch.await(1, TimeUnit.SECONDS);
} catch (RejectedExecutionException ex) {
// Loop until task thread exit for possible
task exception.
}
if (allInputProcessed.get()) {
break;
}
try {
Thread.sleep(1);
} catch (InterruptedException ignored) {}
}
Throwable error = taskThread.getError();
if (error != null) {
throw new Exception("Exception in the task thread",
error);
}
}
}
{code}
How about this approach ? I think it adheres what mailbox modeling encourage.
was (Author: kezhuw):
[~AHeise] I think {{InputStatus}} is not enough. It only means that all input
has been *fetched*. We need assertion happens-after all input processed. I
think we can query {{MailboxProcessor.isDefaultActionUnavailable}} through
{{MaiilboxExecutor}}. This should provide enough guarantee for assertion since
{{controller.suspendDefaultAction()}} executes after all input processed in
mailbox thread. All we need to do is changing
{{StreamTaskTestHarness.waitForInputProcessing}}, it should similar to
following code:
{code:java}
public class StreamTaskTestHarness<OUT> {
public void waitForInputProcessing() throws Exception {
while (true) {
checkForErrorInTaskThread()
if (allInputConsumed()) {
break
}
}
// Wait for all currently available input has been processed.
final MailboxProcessor mailboxProcessor =
taskThread.task.mailboxProcessor;
final MailboxExecutor mailboxExecutor =
mailboxProcessor.getMainMailboxExecutor();
while (taskThread.isAlive()) {
final AtomicBoolean allInputProcessed = new
AtomicBoolean();
final CountDownLatch latch = new CountDownLatch(1);
try {
mailboxExecutor.execute(() -> {
allInputProcessed.set(mailboxProcessor.isDefaultActionUnavailable());
latch.countDown();
},
"query-whether-processInput-has-suspend-itself");
// Mail could be dropped due to task exception.
latch.await(1, TimeUnit.SECONDS);
} catch (RejectedExecutionException ex) {
// Loop until task thread exit for possible
task exception.
}
if (allInputProcessed.get()) {
break;
}
try {
Thread.sleep(1);
} catch (InterruptedException ignored) {}
}
Throwable error = taskThread.getError();
if (error != null) {
throw new Exception("Exception in the task thread",
error);
}
}
}
{code}
How about this approach ? I think it adheres what mailbox modeling encourage.
> TwoInputStreamTaskTest.testWatermarkMetrics failed with "expected:<1> but
> was:<-9223372036854775808>"
> -----------------------------------------------------------------------------------------------------
>
> Key: FLINK-19864
> URL: https://issues.apache.org/jira/browse/FLINK-19864
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream, Tests
> Affects Versions: 1.12.0
> Reporter: Dian Fu
> Priority: Major
> Labels: test-stability
> Fix For: 1.12.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8541&view=logs&j=77a9d8e1-d610-59b3-fc2a-4766541e0e33&t=7c61167f-30b3-5893-cc38-a9e3d057e392
> {code}
> 2020-10-28T22:40:44.2528420Z [ERROR]
> testWatermarkMetrics(org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest)
> Time elapsed: 1.528 s <<< FAILURE! 2020-10-28T22:40:44.2529225Z
> java.lang.AssertionError: expected:<1> but was:<-9223372036854775808>
> 2020-10-28T22:40:44.2541228Z at org.junit.Assert.fail(Assert.java:88)
> 2020-10-28T22:40:44.2542157Z at
> org.junit.Assert.failNotEquals(Assert.java:834) 2020-10-28T22:40:44.2542954Z
> at org.junit.Assert.assertEquals(Assert.java:645)
> 2020-10-28T22:40:44.2543456Z at
> org.junit.Assert.assertEquals(Assert.java:631) 2020-10-28T22:40:44.2544002Z
> at
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest.testWatermarkMetrics(TwoInputStreamTaskTest.java:540)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)