pnowojski commented on a change in pull request #18991:
URL: https://github.com/apache/flink/pull/18991#discussion_r827989506



##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -1763,6 +1765,108 @@ public long calculateThroughput() {
         }
     }
 
+    /**
+     * Tests mailbox metrics latency and queue size and verifies that (1) 
latency measurement is
+     * executed initially once and at least once triggered by timer, (2) 
latency max value is
+     * greater than zero and (3) mailbox size is greater than zero for some 
time and eventually
+     * equals to zero.
+     *
+     * <p>Note: This test uses a timeout which, if exceeded, indicates that 
the minimum number of
+     * latency measurements has not reached the specified minimum within 
specified time.
+     *
+     * @throws Exception on {@link MockEnvironmentBuilder#build()} failure.
+     */
+    @Test(timeout = 4000)
+    public void testMailboxMetrics() throws Exception {
+        try (MockEnvironment mockEnvironment = new 
MockEnvironmentBuilder().build()) {
+            Gauge<Integer> mailboxSizeMetric =
+                    
mockEnvironment.getMetricGroup().getIOMetricGroup().getMailboxSize();
+            Histogram mailboxLatencyMetric =
+                    
mockEnvironment.getMetricGroup().getIOMetricGroup().getMailboxLatency();
+            AtomicInteger maxMailboxSize = new AtomicInteger(-1);
+            final int minMeasurements = 2;
+            SupplierWithException<StreamTask, Exception> task =
+                    () ->
+                            new StreamTask<Object, 
StreamOperator<Object>>(mockEnvironment) {
+                                @Override
+                                protected void init() {
+                                    this.mailboxMetricsInterval = 2;
+                                }
+
+                                @Override
+                                protected void processInput(
+                                        MailboxDefaultAction.Controller 
controller)
+                                        throws Exception {
+                                    if (mailboxLatencyMetric.getCount() < 
minMeasurements) {
+                                        mailboxProcessor
+                                                .getMainMailboxExecutor()
+                                                .execute(() -> {}, "mail");
+                                        // The actual delay here is irrelevant 
for the test but
+                                        // delay should be at least once 10 ms 
to reach a measurable
+                                        // delay >~ 8 ms.
+                                        
Thread.sleep(mailboxLatencyMetric.getCount() == 0 ? 10 : 1);
+                                    } else {
+                                        controller.suspendDefaultAction();
+                                        mailboxProcessor.suspend();
+                                    }
+                                    maxMailboxSize.set(
+                                            Math.max(
+                                                    maxMailboxSize.get(),
+                                                    
mailboxSizeMetric.getValue()));
+                                }
+                            };
+
+            runTask(task::get).waitForTaskCompletion(false);
+
+            assertThat(
+                    mailboxLatencyMetric.getCount(),
+                    greaterThanOrEqualTo(new Long(minMeasurements)));
+            assertThat(mailboxLatencyMetric.getStatistics().getMax(), 
greaterThan(0L));
+            assertThat(maxMailboxSize.get(), greaterThan(0));
+            assertThat(mailboxSizeMetric.getValue(), equalTo(0));
+        }
+    }
+
+    @Test
+    public void testMailboxMetricsWithTestHarness() throws Exception {
+        final int numMails = 10;
+        StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, 
BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO)
+                        .setupOutputForSingletonOperatorChain(
+                                new TestBoundedOneInputStreamOperator());
+        try (StreamTaskMailboxTestHarness<Integer> harness = builder.build()) {
+            Histogram mailboxLatencyMetric =
+                    harness.streamTask
+                            .getEnvironment()
+                            .getMetricGroup()
+                            .getIOMetricGroup()
+                            .getMailboxLatency();
+            Gauge<Integer> mailboxSizeMetric =
+                    harness.streamTask
+                            .getEnvironment()
+                            .getMetricGroup()
+                            .getIOMetricGroup()
+                            .getMailboxSize();
+
+            harness.streamTask.measureMailboxLatency();
+            for (int i = 0; i < numMails; ++i) {
+                harness.streamTask.mainMailboxExecutor.execute(() -> 
Thread.sleep(5), "add value");
+            }
+            harness.streamTask.measureMailboxLatency();
+
+            assertThat(mailboxSizeMetric.getValue(), 
greaterThanOrEqualTo(numMails));
+            assertThat(mailboxLatencyMetric.getCount(), equalTo(0L));
+
+            harness.processAll();
+
+            assertThat(mailboxSizeMetric.getValue(), equalTo(0));
+            assertThat(mailboxLatencyMetric.getCount(), equalTo(2L));
+            assertThat(mailboxLatencyMetric.getStatistics().getMax(), 
greaterThan(0L));

Review comment:
       `greaterThan(5 * numMails)` and less than time diff between 
`harness.streamTask.measureMailboxLatency();` and `harness.processAll();`?

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -1763,6 +1765,108 @@ public long calculateThroughput() {
         }
     }
 
+    /**
+     * Tests mailbox metrics latency and queue size and verifies that (1) 
latency measurement is
+     * executed initially once and at least once triggered by timer, (2) 
latency max value is
+     * greater than zero and (3) mailbox size is greater than zero for some 
time and eventually
+     * equals to zero.
+     *
+     * <p>Note: This test uses a timeout which, if exceeded, indicates that 
the minimum number of
+     * latency measurements has not reached the specified minimum within 
specified time.
+     *
+     * @throws Exception on {@link MockEnvironmentBuilder#build()} failure.
+     */
+    @Test(timeout = 4000)

Review comment:
       I would remove this timeout. Our general practise is to avoid timeouts 
inside the tests. If there is a deadlock/livelock, it's actually better for mvn 
watchdog to detect that, and kill the build with printing thread dump.
   
   Apart of that 4s is most likely too short. We have seen occasional 10-15sec 
freezes in the CI. 

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
##########
@@ -175,6 +198,7 @@ public void close() {
     public void drain() throws Exception {
         for (final Mail mail : mailbox.drain()) {
             mail.run();
+            numMailsProcessed.inc();

Review comment:
       nit: bump, have you missed this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to