akalash commented on a change in pull request #16556:
URL: https://github.com/apache/flink/pull/16556#discussion_r678133730
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -1727,6 +1731,99 @@ protected void cancelTask() {
assertTrue(OpenFailingOperator.wasClosed);
}
+ @Test
+ public void testMailboxNotifyThroughputMeterAboutIdleTime() throws
Exception {
+ final long sleepTimeOutsideMail = 42;
+ final long sleepTimeInsideMail = 44;
+ int incomingDataSize = 10_000;
+
+ @Nullable WaitingThread waitingThread = null;
+ try (final MockEnvironment environment = setupEnvironment(true, true))
{
+ final UnAvailableTestInputProcessor inputProcessor =
+ new UnAvailableTestInputProcessor();
+ final StreamTask task =
+ new MockStreamTaskBuilder(environment)
+ .setStreamInputProcessor(inputProcessor)
+ .build();
+ TaskIOMetricGroup ioMetricGroup =
+ task.getEnvironment().getMetricGroup().getIOMetricGroup();
+ ThroughputCalculator throughputCalculator =
environment.getThroughputMeter();
+
+ final MailboxExecutor executor =
task.mailboxProcessor.getMainMailboxExecutor();
+ final RunnableWithException completeFutureTask =
+ () ->
+ inputProcessor
+ .availabilityProvider
+ .getUnavailableToResetAvailable()
+ .complete(null);
+
+ waitingThread =
+ new WaitingThread(
+ executor,
+ completeFutureTask,
+ sleepTimeInsideMail,
+ sleepTimeOutsideMail,
+ ioMetricGroup.getIdleTimeMsPerSecond());
+ // Make sure WaitingThread is started after Task starts processing.
+ executor.submit(
+ waitingThread::start,
+ "Start WaitingThread after Task starts processing input.");
+
+ long startTs = System.currentTimeMillis();
+ throughputCalculator.calculateThroughput();
+ throughputCalculator.incomingDataSize(incomingDataSize);
+ task.invoke();
+ long resultThroughput = throughputCalculator.calculateThroughput();
+ long totalDuration = System.currentTimeMillis() - startTs;
+
+ assertThat(
+ resultThroughput,
+ greaterThanOrEqualTo(
+ incomingDataSize * 1000 / (totalDuration -
sleepTimeOutsideMail)));
+ } finally {
+ if (waitingThread != null) {
+ waitingThread.join();
+ }
+ }
+ }
+
+ @Test
+ public void testThroughputSchedulerStartsOnInvoke() throws Exception {
+ // given: Throughput meter which finishes the task when
calculateThroughput was called.
+ UnAvailableTestInputProcessor inputProcessor = new
UnAvailableTestInputProcessor();
+ AtomicInteger desirableNumberOfCalculation = new AtomicInteger(2);
+ try (MockEnvironment mockEnvironment =
+ new MockEnvironmentBuilder()
+ .setTaskConfiguration(
+ new
Configuration().set(IN_FLIGHT_DATA_ADJUSTMENT_PERIOD, 1))
+ .setThroughputMeter(
+ new
ThroughputCalculator(SystemClock.getInstance(), 10) {
+ @Override
+ public long calculateThroughput() {
+ if
(desirableNumberOfCalculation.decrementAndGet() == 0) {
+ inputProcessor
+ .availabilityProvider
+
.getUnavailableToResetAvailable()
+ .complete(null);
+ }
+ return super.calculateThroughput();
+ }
+ })
+ .build()) {
+
+ StreamTask task =
+ new MockStreamTaskBuilder(mockEnvironment)
+ .setStreamInputProcessor(inputProcessor)
+ .build();
+
+ // when: Starting the task.
+ task.invoke();
+
+ // then: The task successfully finishes because throughput
calculation was called from
+ // scheduler which leads to the finish of the task
+ }
+ }
Review comment:
This test checks the fact that throughput calculation is started
automatically(just to be sure that the scheduler is configured). But the
calculation inside of `testMailboxNotifyThroughputMeterAboutIdleTime` is
triggered manually in order to have more predictable value.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]