AHeise commented on a change in pull request #14573:
URL: https://github.com/apache/flink/pull/14573#discussion_r553285345
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -1256,54 +1309,97 @@ public void testProcessWithUnAvailableOutput() throws
Exception {
final RunnableWithException completeFutureTask =
() -> {
- Thread.sleep(sleepTime + 1);
assertEquals(1, inputProcessor.currentNumProcessCalls);
assertTrue(task.mailboxProcessor.isDefaultActionUnavailable());
environment.getWriter(1).getAvailableFuture().complete(null);
};
+ waitingThread =
+ new WaitingThread(
+ executor,
+ completeFutureTask,
+ sleepTimeInsideMail,
+ sleepTimeOutsideMail);
+ // Make sure WaitingThread is started after Task starts processing.
executor.submit(
- () -> {
- executor.submit(
- completeFutureTask,
- "This task will complete the future to resume
process input action.");
- },
+ waitingThread::start,
"This task will submit another task to execute after
processing input once.");
+ long startTs = System.currentTimeMillis();
TaskIOMetricGroup ioMetricGroup =
task.getEnvironment().getMetricGroup().getIOMetricGroup();
task.invoke();
+ long totalDuration = System.currentTimeMillis() - startTs;
+ assertThat(
+ ioMetricGroup.getBackPressuredTimePerSecond().getCount(),
+ Matchers.greaterThanOrEqualTo(sleepTimeOutsideMail));
assertThat(
ioMetricGroup.getBackPressuredTimePerSecond().getCount(),
- Matchers.greaterThanOrEqualTo(sleepTime));
+ Matchers.lessThanOrEqualTo(totalDuration -
sleepTimeInsideMail));
assertThat(ioMetricGroup.getIdleTimeMsPerSecond().getCount(),
is(0L));
assertEquals(numberOfProcessCalls,
inputProcessor.currentNumProcessCalls);
+ } finally {
+ if (waitingThread != null) {
Review comment:
I was thinking after `task.invoke()` but I guess this is working as well.
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -1241,10 +1241,63 @@ public void testProcessWithAvailableOutput() throws
Exception {
}
}
+ /**
+ * In this weird construct, we are:
+ *
+ * <ul>
+ * <li>1. We start a thread, which will...
+ * <li>2. ... sleep for X ms, and enqueue another mail, that will...
+ * <li>3. ... sleep for Y ms, and make the output available again
+ * </ul>
+ *
+ * <p>2nd step is to check that back pressure or idle counter is at least
X. In the last 3rd
+ * step, we test whether this counter was paused for the duration of
processing mails.
+ */
+ private static class WaitingThread extends Thread {
+ private final MailboxExecutor executor;
+ private final RunnableWithException resumeTask;
+ private final long sleepTimeInsideMail;
+ private final long sleepTimeOutsideMail;
+
+ @Nullable private Exception asyncException;
+
+ public WaitingThread(
+ MailboxExecutor executor,
+ RunnableWithException resumeTask,
+ long sleepTimeInsideMail,
+ long sleepTimeOutsideMail) {
+ this.executor = executor;
+ this.resumeTask = resumeTask;
+ this.sleepTimeInsideMail = sleepTimeInsideMail;
+ this.sleepTimeOutsideMail = sleepTimeOutsideMail;
+ }
+
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(sleepTimeOutsideMail);
+ } catch (InterruptedException e) {
+ asyncException = e;
+ }
+ executor.submit(
+ () -> {
+ if (asyncException != null) {
Review comment:
Is this working visibility-wise? `asyncException` should probably be
volatile.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]