pnowojski commented on a change in pull request #18991:
URL: https://github.com/apache/flink/pull/18991#discussion_r824689000



##########
File path: docs/content.zh/docs/ops/metrics.md
##########
@@ -1443,6 +1443,11 @@ Note that the metrics are only available via reporters.
       <td>The latency is the time that actions spend waiting in the task's 
mailbox before being processed. The metric is a statistic of the latency in 
milliseconds that is measured approximately once every second and includes the 
last 60 measurements.</td>
       <td>Histogram</td>
     </tr>
+    <tr>
+      <td>mailboxSize</td>

Review comment:
       `mailboxQueueSize`?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
##########
@@ -175,6 +198,7 @@ public void close() {
     public void drain() throws Exception {
         for (final Mail mail : mailbox.drain()) {
             mail.run();
+            numMailsProcessed.inc();

Review comment:
       Nit, extract 
   ```
   mail.run();
   numMailsProcessed.inc();
   ``` 
   to a helper method to deduplicate the code?

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -1763,6 +1765,66 @@ public long calculateThroughput() {
         }
     }
 
+    /**
+     * Tests mailbox metrics latency and queue size and verifies that (1) 
latency measurement is
+     * executed initially once and at least once triggered by timer, (2) 
latency mean value is
+     * greater than zero and (3) mailbox size is greater than zero for some 
time and eventually
+     * equals to zero.
+     *
+     * @throws Exception on {@link MockEnvironmentBuilder#build()} failure.
+     */
+    @Test
+    public void testMailboxMetrics() throws Exception {

Review comment:
       Are you sure this test is going to be stable? It looks like 20ms freeze 
of the metrics thread would cause it to fail. 
   
   Can you take a look at the buffer debloating tests and `StreamTask#debloat` 
method in particular? I think using `StreamTaskMailboxTestHarnessBuilder` you 
would be able to implement a precise and reliable test. 
   
   1. Unlike `StreamTask#debloat` method, yours 
`StreamTask#meassureMailboxLatency` would look like:
   ```
   StreamTask#meassureMailboxLatency() {
                       long startTime = 
SystemClock.getInstance().relativeTimeMillis();
                       mainMailboxExecutor.execute(
                               () -> {
                                   long endTime = 
SystemClock.getInstance().relativeTimeMillis();
                                   long latency = endTime - startTime;
                                   environment
                                           .getMetricGroup()
                                           .getIOMetricGroup()
                                           .getMailboxLatency()
                                           .update(latency);
                                   scheduleMailboxMetrics();
                               },
                               "Measure mailbox latency metric");
                   });
   }
   ```
   2. You could enqueue 10 mails.
   3. Setup to sleep X ms (`20ms`?)  per record.
   4. Call `StreamTask#meassureMailboxLatency`
   5. Assert `mailboxQueueSize == 11`.
   6. Call `testHarness.processAll()`
   7. Assert `maxLatency >= X ms`.

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxProcessorTest.java
##########
@@ -117,6 +119,8 @@ public void runDefaultAction(Controller controller) throws 
Exception {
         MailboxProcessor mailboxProcessor = start(mailboxThread);
         mailboxProcessor.getMailboxExecutor(DEFAULT_PRIORITY).execute(() -> 
stop.set(true), "stop");
         mailboxThread.join();
+        Assert.assertTrue(counter.get() > 0);

Review comment:
       `assertEquals(1, counter.get())`?
   
   
   note, I'm recommending both the condition to be more strict and dropping 
vague `assertTrue`. `assertTrue` would give a very not helpful error in case of 
a failure.

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxProcessorTest.java
##########
@@ -117,6 +119,8 @@ public void runDefaultAction(Controller controller) throws 
Exception {
         MailboxProcessor mailboxProcessor = start(mailboxThread);
         mailboxProcessor.getMailboxExecutor(DEFAULT_PRIORITY).execute(() -> 
stop.set(true), "stop");
         mailboxThread.join();
+        Assert.assertTrue(counter.get() > 0);
+        
Assert.assertTrue(mailboxProcessor.getNumMailsProcessedCounter().getCount() > 
0);

Review comment:
       `assertGE(mailboxProcessor.getNumMailsProcessedCounter().getCount(), 
counter.get())`?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to