This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 32c2ca7  [FLINK-26279] Add mailbox metrics
32c2ca7 is described below

commit 32c2ca72bef69fba7451be4a25c367cded8f0405
Author: Sebastian Mattheis <[email protected]>
AuthorDate: Fri Mar 4 13:12:33 2022 +0100

    [FLINK-26279] Add mailbox metrics
    
    Add metrics for mailbox latency, throughput and queue size to 
TaskIOMetricGroup.
---
 docs/content.zh/docs/ops/metrics.md                |  17 +++-
 docs/content/docs/ops/metrics.md                   |  17 +++-
 .../metrics/DescriptiveStatisticsHistogram.java    |   3 +-
 .../apache/flink/runtime/metrics/MetricNames.java  |   4 +
 .../runtime/metrics/groups/TaskIOMetricGroup.java  |  55 +++++++++++
 .../flink/streaming/runtime/tasks/StreamTask.java  |  39 +++++++-
 .../runtime/tasks/mailbox/MailboxProcessor.java    |  27 +++++
 .../runtime/tasks/mailbox/TaskMailbox.java         |   8 ++
 .../runtime/tasks/mailbox/TaskMailboxImpl.java     |   2 +-
 .../streaming/runtime/tasks/StreamTaskTest.java    | 109 +++++++++++++++++++++
 .../tasks/mailbox/TaskMailboxProcessorTest.java    |   6 ++
 11 files changed, 281 insertions(+), 6 deletions(-)

diff --git a/docs/content.zh/docs/ops/metrics.md 
b/docs/content.zh/docs/ops/metrics.md
index 30dc10f..2f94cc4 100644
--- a/docs/content.zh/docs/ops/metrics.md
+++ b/docs/content.zh/docs/ops/metrics.md
@@ -1333,7 +1333,7 @@ Note that the metrics are only available via reporters.
       <td>Histogram</td>
     </tr>
     <tr>
-      <th rowspan="20"><strong>Task</strong></th>
+      <th rowspan="23"><strong>Task</strong></th>
       <td>numBytesInLocal</td>
       <td><span class="label label-danger">Attention:</span> deprecated, use 
<a href="{{< ref "docs/ops/metrics" >}}#default-shuffle-service">Default 
shuffle service metrics</a>.</td>
       <td>Counter</td>
@@ -1434,6 +1434,21 @@ Note that the metrics are only available via reporters.
       <td>Gauge</td>
     </tr>
     <tr>
+      <td>mailboxMailsPerSecond</td>
+      <td>The number of actions processed from the task's mailbox per second 
which includes all actions, e.g., checkpointing, timer, or cancellation 
actions.</td>
+      <td>Meter</td>
+    </tr>
+    <tr>
+      <td>mailboxLatencyMs</td>
+      <td>The latency is the time that actions spend waiting in the task's 
mailbox before being processed. The metric is a statistic of the latency in 
milliseconds that is measured approximately once every second and includes the 
last 60 measurements.</td>
+      <td>Histogram</td>
+    </tr>
+    <tr>
+      <td>mailboxQueueSize</td>
+      <td>The number of actions in the task's mailbox that are waiting to be 
processed.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
       <td rowspan="2"><strong>Task (only if buffer debloating is enabled and 
in non-source tasks)</strong></td>
       <td>estimatedTimeToConsumeBuffersMs</td>
       <td>The estimated time (in milliseconds) by the buffer debloater to 
consume all of the buffered data in the network exchange preceding this task. 
This value is calculated by approximated amount of the in-flight data and 
calculated throughput.</td>
diff --git a/docs/content/docs/ops/metrics.md b/docs/content/docs/ops/metrics.md
index a06f62a..15fd6c8 100644
--- a/docs/content/docs/ops/metrics.md
+++ b/docs/content/docs/ops/metrics.md
@@ -1326,7 +1326,7 @@ Note that the metrics are only available via reporters.
       <td>Histogram</td>
     </tr>
     <tr>
-      <th rowspan="20"><strong>Task</strong></th>
+      <th rowspan="23"><strong>Task</strong></th>
       <td>numBytesInLocal</td>
       <td><span class="label label-danger">Attention:</span> deprecated, use 
<a href="{{< ref "docs/ops/metrics" >}}#default-shuffle-service">Default 
shuffle service metrics</a>.</td>
       <td>Counter</td>
@@ -1427,6 +1427,21 @@ Note that the metrics are only available via reporters.
       <td>Gauge</td>
     </tr>
     <tr>
+      <td>mailboxMailsPerSecond</td>
+      <td>The number of actions processed from the task's mailbox per second 
which includes all actions, e.g., checkpointing, timer, or cancellation 
actions.</td>
+      <td>Meter</td>
+    </tr>
+    <tr>
+      <td>mailboxLatencyMs</td>
+      <td>The latency is the time that actions spend waiting in the task's 
mailbox before being processed. The metric is a statistic of the latency in 
milliseconds that is measured approximately once every second and includes the 
last 60 measurements.</td>
+      <td>Histogram</td>
+    </tr>
+    <tr>
+      <td>mailboxQueueSize</td>
+      <td>The number of actions in the task's mailbox that are waiting to be 
processed.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
       <td rowspan="2"><strong>Task (only if buffer debloating is enabled and 
in non-source tasks)</strong></td>
       <td>estimatedTimeToConsumeBuffersMs</td>
       <td>The estimated time (in milliseconds) by the buffer debloater to 
consume all of the buffered data in the network exchange preceding this task. 
This value is calculated by approximated amount of the in-flight data and 
calculated throughput.</td>
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogram.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogram.java
index 35f49e2..61914fd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogram.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/DescriptiveStatisticsHistogram.java
@@ -28,8 +28,7 @@ import java.io.Serializable;
  * The {@link DescriptiveStatisticsHistogram} use a DescriptiveStatistics 
{@link
  * DescriptiveStatistics} as a Flink {@link Histogram}.
  */
-public class DescriptiveStatisticsHistogram
-        implements org.apache.flink.metrics.Histogram, Serializable {
+public class DescriptiveStatisticsHistogram implements Histogram, Serializable 
{
     private static final long serialVersionUID = 1L;
 
     private final CircularDoubleArray descriptiveStatistics;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
index ee922d1..762c667 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java
@@ -102,4 +102,8 @@ public class MetricNames {
 
     // FLIP-182 (watermark alignment)
     public static final String WATERMARK_ALIGNMENT_DRIFT = 
"watermarkAlignmentDrift";
+
+    public static final String MAILBOX_THROUGHPUT = "mailboxMailsPerSecond";
+    public static final String MAILBOX_LATENCY = "mailboxLatencyMs";
+    public static final String MAILBOX_SIZE = "mailboxQueueSize";
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
index b976b99..0b580d9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -20,11 +20,13 @@ package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MeterView;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.TimerGauge;
 
@@ -44,6 +46,7 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup<TaskMetricGroup> {
     private final SumCounter numRecordsIn;
     private final SumCounter numRecordsOut;
     private final Counter numBuffersOut;
+    private final Counter numMailsProcessed;
 
     private final Meter numBytesInRate;
     private final Meter numBytesOutRate;
@@ -57,6 +60,9 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup<TaskMetricGroup> {
     private final TimerGauge hardBackPressuredTimePerSecond;
     private final Gauge<Long> maxSoftBackPressuredTime;
     private final Gauge<Long> maxHardBackPressuredTime;
+    private final Meter mailboxThroughput;
+    private final Histogram mailboxLatency;
+    private final SizeGauge mailboxSize;
 
     private volatile boolean busyTimeEnabled;
 
@@ -100,6 +106,13 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup<TaskMetricGroup> {
                         
hardBackPressuredTimePerSecond::getMaxSingleMeasurement);
 
         this.busyTimePerSecond = gauge(MetricNames.TASK_BUSY_TIME, 
this::getBusyTimePerSecond);
+
+        this.numMailsProcessed = new SimpleCounter();
+        this.mailboxThroughput =
+                meter(MetricNames.MAILBOX_THROUGHPUT, new 
MeterView(numMailsProcessed));
+        this.mailboxLatency =
+                histogram(MetricNames.MAILBOX_LATENCY, new 
DescriptiveStatisticsHistogram(60));
+        this.mailboxSize = gauge(MetricNames.MAILBOX_SIZE, new SizeGauge());
     }
 
     public IOMetrics createSnapshot() {
@@ -135,6 +148,10 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup<TaskMetricGroup> {
         return numBuffersOut;
     }
 
+    public Counter getNumMailsProcessedCounter() {
+        return numMailsProcessed;
+    }
+
     public TimerGauge getIdleTimeMsPerSecond() {
         return idleTimePerSecond;
     }
@@ -161,6 +178,18 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup<TaskMetricGroup> {
         return busyTimeEnabled ? 1000.0 - Math.min(busyTime, 1000.0) : 
Double.NaN;
     }
 
+    public Meter getMailboxThroughput() {
+        return mailboxThroughput;
+    }
+
+    public Histogram getMailboxLatency() {
+        return mailboxLatency;
+    }
+
+    public Gauge<Integer> getMailboxSize() {
+        return mailboxSize;
+    }
+
     // 
============================================================================================
     // Metric Reuse
     // 
============================================================================================
@@ -177,6 +206,10 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup<TaskMetricGroup> {
         this.numBytesProducedOfPartitions.put(resultPartitionId, 
numBytesProducedCounter);
     }
 
+    public void registerMailboxSizeSupplier(SizeGauge.SizeSupplier<Integer> 
supplier) {
+        this.mailboxSize.registerSupplier(supplier);
+    }
+
     /**
      * A {@link SimpleCounter} that can contain other {@link Counter}s. A call 
to {@link
      * SumCounter#getCount()} returns the sum of this counters and all 
contained counters.
@@ -199,4 +232,26 @@ public class TaskIOMetricGroup extends 
ProxyMetricGroup<TaskMetricGroup> {
             return sum;
         }
     }
+
+    private static class SizeGauge implements Gauge<Integer> {
+        private SizeSupplier<Integer> supplier;
+
+        @FunctionalInterface
+        public interface SizeSupplier<R> {
+            R get();
+        }
+
+        public void registerSupplier(SizeSupplier<Integer> supplier) {
+            this.supplier = supplier;
+        }
+
+        @Override
+        public Integer getValue() {
+            if (supplier != null) {
+                return supplier.get();
+            } else {
+                return 0; // return "assumed" empty queue size
+            }
+        }
+    }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index ec0e334..5d43f0f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -107,6 +107,7 @@ import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TernaryBoolean;
+import org.apache.flink.util.clock.SystemClock;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.function.RunnableWithException;
@@ -274,6 +275,9 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
     protected final MailboxProcessor mailboxProcessor;
 
+    /** Mailbox metrics measurement is timer triggered with the given interval 
in milliseconds. */
+    protected int mailboxMetricsInterval = 1000;
+
     final MailboxExecutor mainMailboxExecutor;
 
     /** TODO it might be replaced by the global IO executor on TaskManager 
level future. */
@@ -376,8 +380,16 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
         try {
             this.environment = environment;
             this.configuration = new 
StreamConfig(environment.getTaskConfiguration());
+            Counter numMailsProcessedCounter =
+                    
environment.getMetricGroup().getIOMetricGroup().getNumMailsProcessedCounter();
             this.mailboxProcessor =
-                    new MailboxProcessor(this::processInput, mailbox, 
actionExecutor);
+                    new MailboxProcessor(
+                            this::processInput, mailbox, actionExecutor, 
numMailsProcessedCounter);
+            environment
+                    .getMetricGroup()
+                    .getIOMetricGroup()
+                    .registerMailboxSizeSupplier(() -> mailbox.size());
+
             // Should be closed last.
             resourceCloser.registerCloseable(mailboxProcessor);
 
@@ -749,6 +761,8 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
         scheduleBufferDebloater();
 
+        scheduleMailboxMetrics();
+
         // let the task do its work
         runMailboxLoop();
 
@@ -784,6 +798,29 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
     }
 
     @VisibleForTesting
+    public void measureMailboxLatency() {
+        long startTime = SystemClock.getInstance().relativeTimeMillis();
+        mainMailboxExecutor.execute(
+                () -> {
+                    long endTime = 
SystemClock.getInstance().relativeTimeMillis();
+                    long latency = endTime - startTime;
+                    environment
+                            .getMetricGroup()
+                            .getIOMetricGroup()
+                            .getMailboxLatency()
+                            .update(latency);
+                    scheduleMailboxMetrics();
+                },
+                "Measure mailbox latency metric");
+    }
+
+    private void scheduleMailboxMetrics() {
+        systemTimerService.registerTimer(
+                systemTimerService.getCurrentProcessingTime() + 
mailboxMetricsInterval,
+                timestamp -> measureMailboxLatency());
+    }
+
+    @VisibleForTesting
     void debloat() {
         for (IndexedInputGate inputGate : environment.getAllInputGates()) {
             inputGate.triggerDebloating();
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
index a401620..b621b22 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.runtime.tasks.mailbox;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
 import 
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox.MailboxClosedException;
 import org.apache.flink.util.ExceptionUtils;
@@ -99,6 +101,9 @@ public class MailboxProcessor implements Closeable {
 
     private final StreamTaskActionExecutor actionExecutor;
 
+    /** Counter that counts number of mails processed from mailbox. */
+    private final Counter numMailsProcessed;
+
     @VisibleForTesting
     public MailboxProcessor() {
         this(MailboxDefaultAction.Controller::suspendDefaultAction);
@@ -117,11 +122,20 @@ public class MailboxProcessor implements Closeable {
             MailboxDefaultAction mailboxDefaultAction,
             TaskMailbox mailbox,
             StreamTaskActionExecutor actionExecutor) {
+        this(mailboxDefaultAction, mailbox, actionExecutor, new 
SimpleCounter());
+    }
+
+    public MailboxProcessor(
+            MailboxDefaultAction mailboxDefaultAction,
+            TaskMailbox mailbox,
+            StreamTaskActionExecutor actionExecutor,
+            Counter numMailsProcessed) {
         this.mailboxDefaultAction = 
Preconditions.checkNotNull(mailboxDefaultAction);
         this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
         this.mailbox = Preconditions.checkNotNull(mailbox);
         this.mailboxLoopRunning = true;
         this.suspendedDefaultAction = null;
+        this.numMailsProcessed = numMailsProcessed;
     }
 
     public MailboxExecutor getMainMailboxExecutor() {
@@ -137,6 +151,15 @@ public class MailboxProcessor implements Closeable {
         return new MailboxExecutorImpl(mailbox, priority, actionExecutor, 
this);
     }
 
+    /**
+     * Returns attached {@link Counter} that counts number of mails processed.
+     *
+     * @return {@link Counter} that counts number of mails processed.
+     */
+    public Counter getNumMailsProcessedCounter() {
+        return numMailsProcessed;
+    }
+
     /** Lifecycle method to close the mailbox for action submission. */
     public void prepareClose() {
         mailbox.quiesce();
@@ -175,6 +198,7 @@ public class MailboxProcessor implements Closeable {
     public void drain() throws Exception {
         for (final Mail mail : mailbox.drain()) {
             mail.run();
+            numMailsProcessed.inc();
         }
     }
 
@@ -336,6 +360,8 @@ public class MailboxProcessor implements Closeable {
             }
             maybePauseIdleTimer();
             maybeMail.get().run();
+            numMailsProcessed.inc();
+
             maybeRestartIdleTimer();
             processedSomething = true;
         }
@@ -351,6 +377,7 @@ public class MailboxProcessor implements Closeable {
                 maybePauseIdleTimer();
             }
             maybeMail.get().run();
+            numMailsProcessed.inc();
             if (singleStep) {
                 break;
             }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java
index b6e29dc..d2c5f35 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java
@@ -228,6 +228,14 @@ public interface TaskMailbox {
     State getState();
 
     /**
+     * Returns the current number of mails in this mailbox. (This includes 
mails in the batch not
+     * processed yet.)
+     *
+     * @return number of mails in the mailbox.
+     */
+    int size();
+
+    /**
      * Runs the given code exclusively on this mailbox. No synchronized 
operations can be run
      * concurrently to the given runnable (e.g., {@link #put(Mail)} or 
modifying lifecycle methods).
      *
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java
index 5f98501..00d1922 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxImpl.java
@@ -96,7 +96,7 @@ public class TaskMailboxImpl implements TaskMailbox {
         return !batch.isEmpty() || hasNewMail;
     }
 
-    @VisibleForTesting
+    @Override
     public int size() {
         final ReentrantLock lock = this.lock;
         lock.lock();
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 45be29f..fb9921d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -31,6 +31,8 @@ import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
@@ -191,6 +193,7 @@ import static 
org.apache.flink.runtime.state.CheckpointStorageLocationReference.
 import static 
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox.MAX_PRIORITY;
 import static org.apache.flink.streaming.util.StreamTaskUtil.waitTaskIsRunning;
 import static org.apache.flink.util.Preconditions.checkState;
+import static org.hamcrest.CoreMatchers.both;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.not;
@@ -200,6 +203,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.hamcrest.Matchers.sameInstance;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -1763,6 +1767,111 @@ public class StreamTaskTest extends TestLogger {
         }
     }
 
+    /**
+     * Tests mailbox metrics latency and queue size and verifies that (1) 
latency measurement is
+     * executed initially once and at least once triggered by timer, (2) 
latency max value is
+     * greater than zero and (3) mailbox size is greater than zero for some 
time and eventually
+     * equals to zero.
+     *
+     * @throws Exception on {@link MockEnvironmentBuilder#build()} failure.
+     */
+    @Test
+    public void testMailboxMetricsScheduling() throws Exception {
+        try (MockEnvironment mockEnvironment = new 
MockEnvironmentBuilder().build()) {
+            Gauge<Integer> mailboxSizeMetric =
+                    
mockEnvironment.getMetricGroup().getIOMetricGroup().getMailboxSize();
+            Histogram mailboxLatencyMetric =
+                    
mockEnvironment.getMetricGroup().getIOMetricGroup().getMailboxLatency();
+            AtomicInteger maxMailboxSize = new AtomicInteger(-1);
+            final int minMeasurements = 2;
+            SupplierWithException<StreamTask, Exception> task =
+                    () ->
+                            new StreamTask<Object, 
StreamOperator<Object>>(mockEnvironment) {
+                                @Override
+                                protected void init() {
+                                    this.mailboxMetricsInterval = 2;
+                                }
+
+                                @Override
+                                protected void processInput(
+                                        MailboxDefaultAction.Controller 
controller)
+                                        throws Exception {
+                                    if (mailboxLatencyMetric.getCount() < 
minMeasurements) {
+                                        mailboxProcessor
+                                                .getMainMailboxExecutor()
+                                                .execute(() -> {}, "mail");
+                                        // The actual delay here is irrelevant 
for the test but
+                                        // delay should be at least once 10 ms 
to reach a measurable
+                                        // delay >~ 8 ms.
+                                        
Thread.sleep(mailboxLatencyMetric.getCount() == 0 ? 10 : 1);
+                                    } else {
+                                        controller.suspendDefaultAction();
+                                        mailboxProcessor.suspend();
+                                    }
+                                    maxMailboxSize.set(
+                                            Math.max(
+                                                    maxMailboxSize.get(),
+                                                    
mailboxSizeMetric.getValue()));
+                                }
+                            };
+
+            runTask(task::get).waitForTaskCompletion(false);
+
+            assertThat(
+                    mailboxLatencyMetric.getCount(),
+                    greaterThanOrEqualTo(new Long(minMeasurements)));
+            assertThat(mailboxLatencyMetric.getStatistics().getMax(), 
greaterThan(0L));
+            assertThat(maxMailboxSize.get(), greaterThan(0));
+            assertThat(mailboxSizeMetric.getValue(), equalTo(0));
+        }
+    }
+
+    @Test
+    public void testMailboxMetricsMeasurement() throws Exception {
+        final int numMails = 10, sleepTime = 5;
+        StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, 
BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO)
+                        .setupOutputForSingletonOperatorChain(
+                                new TestBoundedOneInputStreamOperator());
+        try (StreamTaskMailboxTestHarness<Integer> harness = builder.build()) {
+            Histogram mailboxLatencyMetric =
+                    harness.streamTask
+                            .getEnvironment()
+                            .getMetricGroup()
+                            .getIOMetricGroup()
+                            .getMailboxLatency();
+            Gauge<Integer> mailboxSizeMetric =
+                    harness.streamTask
+                            .getEnvironment()
+                            .getMetricGroup()
+                            .getIOMetricGroup()
+                            .getMailboxSize();
+            long startTime = SystemClock.getInstance().relativeTimeMillis();
+            harness.streamTask.measureMailboxLatency();
+            for (int i = 0; i < numMails; ++i) {
+                harness.streamTask.mainMailboxExecutor.execute(
+                        () -> Thread.sleep(sleepTime), "add value");
+            }
+            harness.streamTask.measureMailboxLatency();
+
+            assertThat(mailboxSizeMetric.getValue(), 
greaterThanOrEqualTo(numMails));
+            assertThat(mailboxLatencyMetric.getCount(), equalTo(0L));
+
+            harness.processAll();
+            long endTime = SystemClock.getInstance().relativeTimeMillis();
+
+            assertThat(mailboxSizeMetric.getValue(), equalTo(0));
+            assertThat(mailboxLatencyMetric.getCount(), equalTo(2L));
+            assertThat(
+                    mailboxLatencyMetric.getStatistics().getMax(),
+                    is(
+                            both(greaterThanOrEqualTo(new Long(sleepTime * 
numMails)))
+                                    .and(lessThanOrEqualTo(endTime - 
startTime))));
+        }
+    }
+
     private int getCurrentBufferSize(InputGate inputGate) {
         return getTestChannel(inputGate, 0).getCurrentBufferSize();
     }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxProcessorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxProcessorTest.java
index ad165ce..d382428 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxProcessorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxProcessorTest.java
@@ -33,6 +33,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -102,10 +104,12 @@ public class TaskMailboxProcessorTest {
     @Test
     public void testRunDefaultActionAndMails() throws Exception {
         AtomicBoolean stop = new AtomicBoolean(false);
+        AtomicInteger counter = new AtomicInteger();
         MailboxThread mailboxThread =
                 new MailboxThread() {
                     @Override
                     public void runDefaultAction(Controller controller) throws 
Exception {
+                        counter.incrementAndGet();
                         if (stop.get()) {
                             controller.allActionsCompleted();
                         } else {
@@ -117,6 +121,8 @@ public class TaskMailboxProcessorTest {
         MailboxProcessor mailboxProcessor = start(mailboxThread);
         mailboxProcessor.getMailboxExecutor(DEFAULT_PRIORITY).execute(() -> 
stop.set(true), "stop");
         mailboxThread.join();
+        assertThat(counter.get(), greaterThan(0));
+        assertThat(mailboxProcessor.getNumMailsProcessedCounter().getCount(), 
greaterThan(0L));
     }
 
     @Test

Reply via email to