This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new fecd0fbdf43 [FLINK-26864] Fix performance regression from mailbox 
latency measurement
fecd0fbdf43 is described below

commit fecd0fbdf4349aba2e0487cd1527e4a061a0ffa8
Author: Sebastian Mattheis <[email protected]>
AuthorDate: Thu Apr 7 11:04:50 2022 +0200

    [FLINK-26864] Fix performance regression from mailbox latency measurement
---
 .../flink/streaming/runtime/tasks/StreamTask.java  |  49 ++-----
 .../tasks/mailbox/MailboxMetricsController.java    | 152 +++++++++++++++++++++
 .../runtime/tasks/mailbox/MailboxProcessor.java    |  49 ++++---
 .../streaming/runtime/tasks/StreamTaskTest.java    |   8 +-
 .../tasks/mailbox/TaskMailboxProcessorTest.java    |   4 +-
 5 files changed, 208 insertions(+), 54 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 5d43f0f1885..312f4f38eb3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -95,6 +95,7 @@ import 
org.apache.flink.streaming.runtime.tasks.mailbox.GaugePeriodTimer;
 import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
 import 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction.Suspension;
 import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorFactory;
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxMetricsController;
 import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor;
 import org.apache.flink.streaming.runtime.tasks.mailbox.PeriodTimer;
 import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
@@ -107,7 +108,6 @@ import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TernaryBoolean;
-import org.apache.flink.util.clock.SystemClock;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.function.RunnableWithException;
@@ -275,9 +275,6 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
     protected final MailboxProcessor mailboxProcessor;
 
-    /** Mailbox metrics measurement is timer triggered with the given interval 
in milliseconds. */
-    protected int mailboxMetricsInterval = 1000;
-
     final MailboxExecutor mainMailboxExecutor;
 
     /** TODO it might be replaced by the global IO executor on TaskManager 
level future. */
@@ -380,16 +377,24 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
         try {
             this.environment = environment;
             this.configuration = new 
StreamConfig(environment.getTaskConfiguration());
-            Counter numMailsProcessedCounter =
-                    
environment.getMetricGroup().getIOMetricGroup().getNumMailsProcessedCounter();
-            this.mailboxProcessor =
-                    new MailboxProcessor(
-                            this::processInput, mailbox, actionExecutor, 
numMailsProcessedCounter);
+
+            // Initialize mailbox metrics
+            MailboxMetricsController mailboxMetricsControl =
+                    new MailboxMetricsController(
+                            
environment.getMetricGroup().getIOMetricGroup().getMailboxLatency(),
+                            environment
+                                    .getMetricGroup()
+                                    .getIOMetricGroup()
+                                    .getNumMailsProcessedCounter());
             environment
                     .getMetricGroup()
                     .getIOMetricGroup()
                     .registerMailboxSizeSupplier(() -> mailbox.size());
 
+            this.mailboxProcessor =
+                    new MailboxProcessor(
+                            this::processInput, mailbox, actionExecutor, 
mailboxMetricsControl);
+
             // Should be closed last.
             resourceCloser.registerCloseable(mailboxProcessor);
 
@@ -469,6 +474,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
             Configuration taskManagerConf = 
environment.getTaskManagerInfo().getConfiguration();
 
             this.bufferDebloatPeriod = 
taskManagerConf.get(BUFFER_DEBLOAT_PERIOD).toMillis();
+            mailboxMetricsControl.setupLatencyMeasurement(systemTimerService, 
mainMailboxExecutor);
         } catch (Exception ex) {
             try {
                 resourceCloser.close();
@@ -761,8 +767,6 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
         scheduleBufferDebloater();
 
-        scheduleMailboxMetrics();
-
         // let the task do its work
         runMailboxLoop();
 
@@ -797,29 +801,6 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                 "Buffer size recalculation"));
     }
 
-    @VisibleForTesting
-    public void measureMailboxLatency() {
-        long startTime = SystemClock.getInstance().relativeTimeMillis();
-        mainMailboxExecutor.execute(
-                () -> {
-                    long endTime = 
SystemClock.getInstance().relativeTimeMillis();
-                    long latency = endTime - startTime;
-                    environment
-                            .getMetricGroup()
-                            .getIOMetricGroup()
-                            .getMailboxLatency()
-                            .update(latency);
-                    scheduleMailboxMetrics();
-                },
-                "Measure mailbox latency metric");
-    }
-
-    private void scheduleMailboxMetrics() {
-        systemTimerService.registerTimer(
-                systemTimerService.getCurrentProcessingTime() + 
mailboxMetricsInterval,
-                timestamp -> measureMailboxLatency());
-    }
-
     @VisibleForTesting
     void debloat() {
         for (IndexedInputGate inputGate : environment.getAllInputGates()) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxMetricsController.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxMetricsController.java
new file mode 100644
index 00000000000..74c214c2182
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxMetricsController.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.streaming.runtime.tasks.TimerService;
+import org.apache.flink.util.clock.SystemClock;
+
+import javax.annotation.Nullable;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Mailbox metrics controller class. The use of mailbox metrics, in particular 
scheduling latency
+ * measurements that require a {@link TimerService}, induce (cyclic) 
dependencies between {@link
+ * MailboxProcessor} and {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask}. An instance of
+ * this class contains and gives control over these dependencies.
+ */
+@Internal
+public class MailboxMetricsController {
+    /** Default timer interval in milliseconds for triggering mailbox latency 
measurement. */
+    public final int defaultLatencyMeasurementInterval = 1000;
+
+    private final Histogram latencyHistogram;
+    private final Counter mailCounter;
+
+    @Nullable private TimerService timerService;
+    @Nullable private MailboxExecutor mailboxExecutor;
+    private int measurementInterval = defaultLatencyMeasurementInterval;
+    private boolean started = false;
+
+    /**
+     * Creates instance of {@link MailboxMetricsController} with references to 
metrics provided as
+     * parameters.
+     *
+     * @param latencyHistogram Histogram of mailbox latency measurements.
+     * @param mailCounter Counter for number of mails processed.
+     */
+    public MailboxMetricsController(Histogram latencyHistogram, Counter 
mailCounter) {
+        this.timerService = null;
+        this.mailboxExecutor = null;
+        this.latencyHistogram = latencyHistogram;
+        this.mailCounter = mailCounter;
+    }
+
+    /**
+     * Sets up latency measurement with required {@link TimerService} and 
{@link MailboxExecutor}.
+     *
+     * <p>Note: For each instance, latency measurement can be set up only once.
+     *
+     * @param timerService {@link TimerService} used for latency measurement.
+     * @param mailboxExecutor {@link MailboxExecutor} used for latency 
measurement.
+     */
+    public void setupLatencyMeasurement(
+            TimerService timerService, MailboxExecutor mailboxExecutor) {
+        checkState(
+                !isLatencyMeasurementSetup(),
+                "latency measurement has already been setup and cannot be 
setup twice");
+        this.timerService = timerService;
+        this.mailboxExecutor = mailboxExecutor;
+    }
+
+    /**
+     * Starts mailbox latency measurement. This requires setup of latency 
measurement via {@link
+     * MailboxMetricsController#setupLatencyMeasurement(TimerService, 
MailboxExecutor)}. Latency is
+     * measured through execution of a mail that is triggered by default in 
the interval defined by
+     * {@link MailboxMetricsController#defaultLatencyMeasurementInterval}.
+     *
+     * <p>Note: For each instance, latency measurement can be started only 
once.
+     */
+    public void startLatencyMeasurement() {
+        checkState(!isLatencyMeasurementStarted(), "latency measurement has 
already been started");
+        checkState(
+                isLatencyMeasurementSetup(),
+                "timer service and mailbox executor must be setup for latency 
measurement");
+        scheduleLatencyMeasurement();
+        started = true;
+    }
+
+    /**
+     * Indicates if latency mesurement has been started.
+     *
+     * @return True if latency measurement has been started.
+     */
+    public boolean isLatencyMeasurementStarted() {
+        return started;
+    }
+
+    /**
+     * Indicates if latency measurement has been setup.
+     *
+     * @return True if latency measurement has been setup.
+     */
+    public boolean isLatencyMeasurementSetup() {
+        return this.timerService != null && this.mailboxExecutor != null;
+    }
+
+    /**
+     * Gets {@link Counter} for number of mails processed.
+     *
+     * @return {@link Counter} for number of mails processed.
+     */
+    public Counter getMailCounter() {
+        return this.mailCounter;
+    }
+
+    @VisibleForTesting
+    public void setLatencyMeasurementInterval(int measurementInterval) {
+        this.measurementInterval = measurementInterval;
+    }
+
+    @VisibleForTesting
+    public void measureMailboxLatency() {
+        assert mailboxExecutor != null;
+        long startTime = SystemClock.getInstance().relativeTimeMillis();
+        mailboxExecutor.execute(
+                () -> {
+                    long endTime = 
SystemClock.getInstance().relativeTimeMillis();
+                    long latency = endTime - startTime;
+                    latencyHistogram.update(latency);
+                    scheduleLatencyMeasurement();
+                },
+                "Measure mailbox latency metric");
+    }
+
+    private void scheduleLatencyMeasurement() {
+        assert timerService != null;
+        timerService.registerTimer(
+                timerService.getCurrentProcessingTime() + measurementInterval,
+                timestamp -> measureMailboxLatency());
+    }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
index faaea1a9d26..b100152e3da 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
@@ -20,8 +20,8 @@ package org.apache.flink.streaming.runtime.tasks.mailbox;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.operators.MailboxExecutor;
-import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
 import 
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox.MailboxClosedException;
 import org.apache.flink.util.ExceptionUtils;
@@ -101,8 +101,7 @@ public class MailboxProcessor implements Closeable {
 
     private final StreamTaskActionExecutor actionExecutor;
 
-    /** Counter that counts number of mails processed from mailbox. */
-    private final Counter numMailsProcessed;
+    private final MailboxMetricsController mailboxMetricsControl;
 
     @VisibleForTesting
     public MailboxProcessor() {
@@ -122,20 +121,25 @@ public class MailboxProcessor implements Closeable {
             MailboxDefaultAction mailboxDefaultAction,
             TaskMailbox mailbox,
             StreamTaskActionExecutor actionExecutor) {
-        this(mailboxDefaultAction, mailbox, actionExecutor, new 
SimpleCounter());
+        this(
+                mailboxDefaultAction,
+                mailbox,
+                actionExecutor,
+                new MailboxMetricsController(
+                        new DescriptiveStatisticsHistogram(10), new 
SimpleCounter()));
     }
 
     public MailboxProcessor(
             MailboxDefaultAction mailboxDefaultAction,
             TaskMailbox mailbox,
             StreamTaskActionExecutor actionExecutor,
-            Counter numMailsProcessed) {
+            MailboxMetricsController mailboxMetricsControl) {
         this.mailboxDefaultAction = 
Preconditions.checkNotNull(mailboxDefaultAction);
         this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
         this.mailbox = Preconditions.checkNotNull(mailbox);
         this.mailboxLoopRunning = true;
         this.suspendedDefaultAction = null;
-        this.numMailsProcessed = numMailsProcessed;
+        this.mailboxMetricsControl = mailboxMetricsControl;
     }
 
     public MailboxExecutor getMainMailboxExecutor() {
@@ -152,12 +156,13 @@ public class MailboxProcessor implements Closeable {
     }
 
     /**
-     * Returns attached {@link Counter} that counts number of mails processed.
+     * Gets {@link MailboxMetricsController} for control and access to mailbox 
metrics.
      *
-     * @return {@link Counter} that counts number of mails processed.
+     * @return {@link MailboxMetricsController}.
      */
-    public Counter getNumMailsProcessedCounter() {
-        return numMailsProcessed;
+    @VisibleForTesting
+    public MailboxMetricsController getMailboxMetricsControl() {
+        return this.mailboxMetricsControl;
     }
 
     /** Lifecycle method to close the mailbox for action submission. */
@@ -197,8 +202,7 @@ public class MailboxProcessor implements Closeable {
      */
     public void drain() throws Exception {
         for (final Mail mail : mailbox.drain()) {
-            mail.run();
-            numMailsProcessed.inc();
+            runMail(mail);
         }
     }
 
@@ -359,8 +363,8 @@ public class MailboxProcessor implements Closeable {
                 maybeMail = Optional.of(mailbox.take(MIN_PRIORITY));
             }
             maybePauseIdleTimer();
-            maybeMail.get().run();
-            numMailsProcessed.inc();
+
+            runMail(maybeMail.get());
 
             maybeRestartIdleTimer();
             processedSomething = true;
@@ -376,8 +380,7 @@ public class MailboxProcessor implements Closeable {
             if (processedMails++ == 0) {
                 maybePauseIdleTimer();
             }
-            maybeMail.get().run();
-            numMailsProcessed.inc();
+            runMail(maybeMail.get());
             if (singleStep) {
                 break;
             }
@@ -390,6 +393,20 @@ public class MailboxProcessor implements Closeable {
         }
     }
 
+    private void runMail(Mail mail) throws Exception {
+        mailboxMetricsControl.getMailCounter().inc();
+        mail.run();
+        if (!suspended) {
+            // start latency measurement on first mail that is not suspending 
mailbox execution,
+            // i.e., on first non-poison mail, otherwise latency measurement 
is not started to avoid
+            // overhead
+            if (!mailboxMetricsControl.isLatencyMeasurementStarted()
+                    && mailboxMetricsControl.isLatencyMeasurementSetup()) {
+                mailboxMetricsControl.startLatencyMeasurement();
+            }
+        }
+    }
+
     private void maybePauseIdleTimer() {
         if (suspendedDefaultAction != null && 
suspendedDefaultAction.suspensionTimer != null) {
             suspendedDefaultAction.suspensionTimer.markEnd();
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 6a2d576c8a6..f20938061fe 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -1685,7 +1685,9 @@ public class StreamTaskTest extends TestLogger {
                             new StreamTask<Object, 
StreamOperator<Object>>(mockEnvironment) {
                                 @Override
                                 protected void init() {
-                                    this.mailboxMetricsInterval = 2;
+                                    this.mailboxProcessor
+                                            .getMailboxMetricsControl()
+                                            .setLatencyMeasurementInterval(2);
                                 }
 
                                 @Override
@@ -1741,12 +1743,12 @@ public class StreamTaskTest extends TestLogger {
                             .getIOMetricGroup()
                             .getMailboxSize();
             long startTime = SystemClock.getInstance().relativeTimeMillis();
-            harness.streamTask.measureMailboxLatency();
+            
harness.streamTask.mailboxProcessor.getMailboxMetricsControl().measureMailboxLatency();
             for (int i = 0; i < numMails; ++i) {
                 harness.streamTask.mainMailboxExecutor.execute(
                         () -> Thread.sleep(sleepTime), "add value");
             }
-            harness.streamTask.measureMailboxLatency();
+            
harness.streamTask.mailboxProcessor.getMailboxMetricsControl().measureMailboxLatency();
 
             assertThat(mailboxSizeMetric.getValue(), 
greaterThanOrEqualTo(numMails));
             assertThat(mailboxLatencyMetric.getCount(), equalTo(0L));
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxProcessorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxProcessorTest.java
index d38242818fd..b7139e67632 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxProcessorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxProcessorTest.java
@@ -122,7 +122,9 @@ public class TaskMailboxProcessorTest {
         mailboxProcessor.getMailboxExecutor(DEFAULT_PRIORITY).execute(() -> 
stop.set(true), "stop");
         mailboxThread.join();
         assertThat(counter.get(), greaterThan(0));
-        assertThat(mailboxProcessor.getNumMailsProcessedCounter().getCount(), 
greaterThan(0L));
+        assertThat(
+                
mailboxProcessor.getMailboxMetricsControl().getMailCounter().getCount(),
+                greaterThan(0L));
     }
 
     @Test

Reply via email to