This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new fecd0fbdf43 [FLINK-26864] Fix performance regression from mailbox
latency measurement
fecd0fbdf43 is described below
commit fecd0fbdf4349aba2e0487cd1527e4a061a0ffa8
Author: Sebastian Mattheis <[email protected]>
AuthorDate: Thu Apr 7 11:04:50 2022 +0200
[FLINK-26864] Fix performance regression from mailbox latency measurement
---
.../flink/streaming/runtime/tasks/StreamTask.java | 49 ++-----
.../tasks/mailbox/MailboxMetricsController.java | 152 +++++++++++++++++++++
.../runtime/tasks/mailbox/MailboxProcessor.java | 49 ++++---
.../streaming/runtime/tasks/StreamTaskTest.java | 8 +-
.../tasks/mailbox/TaskMailboxProcessorTest.java | 4 +-
5 files changed, 208 insertions(+), 54 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 5d43f0f1885..312f4f38eb3 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -95,6 +95,7 @@ import
org.apache.flink.streaming.runtime.tasks.mailbox.GaugePeriodTimer;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction.Suspension;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorFactory;
+import
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxMetricsController;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor;
import org.apache.flink.streaming.runtime.tasks.mailbox.PeriodTimer;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
@@ -107,7 +108,6 @@ import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TernaryBoolean;
-import org.apache.flink.util.clock.SystemClock;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.RunnableWithException;
@@ -275,9 +275,6 @@ public abstract class StreamTask<OUT, OP extends
StreamOperator<OUT>>
protected final MailboxProcessor mailboxProcessor;
- /** Mailbox metrics measurement is timer triggered with the given interval
in milliseconds. */
- protected int mailboxMetricsInterval = 1000;
-
final MailboxExecutor mainMailboxExecutor;
/** TODO it might be replaced by the global IO executor on TaskManager
level future. */
@@ -380,16 +377,24 @@ public abstract class StreamTask<OUT, OP extends
StreamOperator<OUT>>
try {
this.environment = environment;
this.configuration = new
StreamConfig(environment.getTaskConfiguration());
- Counter numMailsProcessedCounter =
-
environment.getMetricGroup().getIOMetricGroup().getNumMailsProcessedCounter();
- this.mailboxProcessor =
- new MailboxProcessor(
- this::processInput, mailbox, actionExecutor,
numMailsProcessedCounter);
+
+ // Initialize mailbox metrics
+ MailboxMetricsController mailboxMetricsControl =
+ new MailboxMetricsController(
+
environment.getMetricGroup().getIOMetricGroup().getMailboxLatency(),
+ environment
+ .getMetricGroup()
+ .getIOMetricGroup()
+ .getNumMailsProcessedCounter());
environment
.getMetricGroup()
.getIOMetricGroup()
.registerMailboxSizeSupplier(() -> mailbox.size());
+ this.mailboxProcessor =
+ new MailboxProcessor(
+ this::processInput, mailbox, actionExecutor,
mailboxMetricsControl);
+
// Should be closed last.
resourceCloser.registerCloseable(mailboxProcessor);
@@ -469,6 +474,7 @@ public abstract class StreamTask<OUT, OP extends
StreamOperator<OUT>>
Configuration taskManagerConf =
environment.getTaskManagerInfo().getConfiguration();
this.bufferDebloatPeriod =
taskManagerConf.get(BUFFER_DEBLOAT_PERIOD).toMillis();
+ mailboxMetricsControl.setupLatencyMeasurement(systemTimerService,
mainMailboxExecutor);
} catch (Exception ex) {
try {
resourceCloser.close();
@@ -761,8 +767,6 @@ public abstract class StreamTask<OUT, OP extends
StreamOperator<OUT>>
scheduleBufferDebloater();
- scheduleMailboxMetrics();
-
// let the task do its work
runMailboxLoop();
@@ -797,29 +801,6 @@ public abstract class StreamTask<OUT, OP extends
StreamOperator<OUT>>
"Buffer size recalculation"));
}
- @VisibleForTesting
- public void measureMailboxLatency() {
- long startTime = SystemClock.getInstance().relativeTimeMillis();
- mainMailboxExecutor.execute(
- () -> {
- long endTime =
SystemClock.getInstance().relativeTimeMillis();
- long latency = endTime - startTime;
- environment
- .getMetricGroup()
- .getIOMetricGroup()
- .getMailboxLatency()
- .update(latency);
- scheduleMailboxMetrics();
- },
- "Measure mailbox latency metric");
- }
-
- private void scheduleMailboxMetrics() {
- systemTimerService.registerTimer(
- systemTimerService.getCurrentProcessingTime() +
mailboxMetricsInterval,
- timestamp -> measureMailboxLatency());
- }
-
@VisibleForTesting
void debloat() {
for (IndexedInputGate inputGate : environment.getAllInputGates()) {
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxMetricsController.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxMetricsController.java
new file mode 100644
index 00000000000..74c214c2182
--- /dev/null
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxMetricsController.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.streaming.runtime.tasks.TimerService;
+import org.apache.flink.util.clock.SystemClock;
+
+import javax.annotation.Nullable;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Mailbox metrics controller class. The use of mailbox metrics, in particular
scheduling latency
+ * measurements that require a {@link TimerService}, induce (cyclic)
dependencies between {@link
+ * MailboxProcessor} and {@link
org.apache.flink.streaming.runtime.tasks.StreamTask}. An instance of
+ * this class contains and gives control over these dependencies.
+ */
+@Internal
+public class MailboxMetricsController {
+ /** Default timer interval in milliseconds for triggering mailbox latency
measurement. */
+ public final int defaultLatencyMeasurementInterval = 1000;
+
+ private final Histogram latencyHistogram;
+ private final Counter mailCounter;
+
+ @Nullable private TimerService timerService;
+ @Nullable private MailboxExecutor mailboxExecutor;
+ private int measurementInterval = defaultLatencyMeasurementInterval;
+ private boolean started = false;
+
+ /**
+ * Creates instance of {@link MailboxMetricsController} with references to
metrics provided as
+ * parameters.
+ *
+ * @param latencyHistogram Histogram of mailbox latency measurements.
+ * @param mailCounter Counter for number of mails processed.
+ */
+ public MailboxMetricsController(Histogram latencyHistogram, Counter
mailCounter) {
+ this.timerService = null;
+ this.mailboxExecutor = null;
+ this.latencyHistogram = latencyHistogram;
+ this.mailCounter = mailCounter;
+ }
+
+ /**
+ * Sets up latency measurement with required {@link TimerService} and
{@link MailboxExecutor}.
+ *
+ * <p>Note: For each instance, latency measurement can be set up only once.
+ *
+ * @param timerService {@link TimerService} used for latency measurement.
+ * @param mailboxExecutor {@link MailboxExecutor} used for latency
measurement.
+ */
+ public void setupLatencyMeasurement(
+ TimerService timerService, MailboxExecutor mailboxExecutor) {
+ checkState(
+ !isLatencyMeasurementSetup(),
+ "latency measurement has already been setup and cannot be
setup twice");
+ this.timerService = timerService;
+ this.mailboxExecutor = mailboxExecutor;
+ }
+
+ /**
+ * Starts mailbox latency measurement. This requires setup of latency
measurement via {@link
+ * MailboxMetricsController#setupLatencyMeasurement(TimerService,
MailboxExecutor)}. Latency is
+ * measured through execution of a mail that is triggered by default in
the interval defined by
+ * {@link MailboxMetricsController#defaultLatencyMeasurementInterval}.
+ *
+ * <p>Note: For each instance, latency measurement can be started only
once.
+ */
+ public void startLatencyMeasurement() {
+ checkState(!isLatencyMeasurementStarted(), "latency measurement has
already been started");
+ checkState(
+ isLatencyMeasurementSetup(),
+ "timer service and mailbox executor must be setup for latency
measurement");
+ scheduleLatencyMeasurement();
+ started = true;
+ }
+
+ /**
+ * Indicates if latency mesurement has been started.
+ *
+ * @return True if latency measurement has been started.
+ */
+ public boolean isLatencyMeasurementStarted() {
+ return started;
+ }
+
+ /**
+ * Indicates if latency measurement has been setup.
+ *
+ * @return True if latency measurement has been setup.
+ */
+ public boolean isLatencyMeasurementSetup() {
+ return this.timerService != null && this.mailboxExecutor != null;
+ }
+
+ /**
+ * Gets {@link Counter} for number of mails processed.
+ *
+ * @return {@link Counter} for number of mails processed.
+ */
+ public Counter getMailCounter() {
+ return this.mailCounter;
+ }
+
+ @VisibleForTesting
+ public void setLatencyMeasurementInterval(int measurementInterval) {
+ this.measurementInterval = measurementInterval;
+ }
+
+ @VisibleForTesting
+ public void measureMailboxLatency() {
+ assert mailboxExecutor != null;
+ long startTime = SystemClock.getInstance().relativeTimeMillis();
+ mailboxExecutor.execute(
+ () -> {
+ long endTime =
SystemClock.getInstance().relativeTimeMillis();
+ long latency = endTime - startTime;
+ latencyHistogram.update(latency);
+ scheduleLatencyMeasurement();
+ },
+ "Measure mailbox latency metric");
+ }
+
+ private void scheduleLatencyMeasurement() {
+ assert timerService != null;
+ timerService.registerTimer(
+ timerService.getCurrentProcessingTime() + measurementInterval,
+ timestamp -> measureMailboxLatency());
+ }
+}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
index faaea1a9d26..b100152e3da 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
@@ -20,8 +20,8 @@ package org.apache.flink.streaming.runtime.tasks.mailbox;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.MailboxExecutor;
-import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox.MailboxClosedException;
import org.apache.flink.util.ExceptionUtils;
@@ -101,8 +101,7 @@ public class MailboxProcessor implements Closeable {
private final StreamTaskActionExecutor actionExecutor;
- /** Counter that counts number of mails processed from mailbox. */
- private final Counter numMailsProcessed;
+ private final MailboxMetricsController mailboxMetricsControl;
@VisibleForTesting
public MailboxProcessor() {
@@ -122,20 +121,25 @@ public class MailboxProcessor implements Closeable {
MailboxDefaultAction mailboxDefaultAction,
TaskMailbox mailbox,
StreamTaskActionExecutor actionExecutor) {
- this(mailboxDefaultAction, mailbox, actionExecutor, new
SimpleCounter());
+ this(
+ mailboxDefaultAction,
+ mailbox,
+ actionExecutor,
+ new MailboxMetricsController(
+ new DescriptiveStatisticsHistogram(10), new
SimpleCounter()));
}
public MailboxProcessor(
MailboxDefaultAction mailboxDefaultAction,
TaskMailbox mailbox,
StreamTaskActionExecutor actionExecutor,
- Counter numMailsProcessed) {
+ MailboxMetricsController mailboxMetricsControl) {
this.mailboxDefaultAction =
Preconditions.checkNotNull(mailboxDefaultAction);
this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
this.mailbox = Preconditions.checkNotNull(mailbox);
this.mailboxLoopRunning = true;
this.suspendedDefaultAction = null;
- this.numMailsProcessed = numMailsProcessed;
+ this.mailboxMetricsControl = mailboxMetricsControl;
}
public MailboxExecutor getMainMailboxExecutor() {
@@ -152,12 +156,13 @@ public class MailboxProcessor implements Closeable {
}
/**
- * Returns attached {@link Counter} that counts number of mails processed.
+ * Gets {@link MailboxMetricsController} for control and access to mailbox
metrics.
*
- * @return {@link Counter} that counts number of mails processed.
+ * @return {@link MailboxMetricsController}.
*/
- public Counter getNumMailsProcessedCounter() {
- return numMailsProcessed;
+ @VisibleForTesting
+ public MailboxMetricsController getMailboxMetricsControl() {
+ return this.mailboxMetricsControl;
}
/** Lifecycle method to close the mailbox for action submission. */
@@ -197,8 +202,7 @@ public class MailboxProcessor implements Closeable {
*/
public void drain() throws Exception {
for (final Mail mail : mailbox.drain()) {
- mail.run();
- numMailsProcessed.inc();
+ runMail(mail);
}
}
@@ -359,8 +363,8 @@ public class MailboxProcessor implements Closeable {
maybeMail = Optional.of(mailbox.take(MIN_PRIORITY));
}
maybePauseIdleTimer();
- maybeMail.get().run();
- numMailsProcessed.inc();
+
+ runMail(maybeMail.get());
maybeRestartIdleTimer();
processedSomething = true;
@@ -376,8 +380,7 @@ public class MailboxProcessor implements Closeable {
if (processedMails++ == 0) {
maybePauseIdleTimer();
}
- maybeMail.get().run();
- numMailsProcessed.inc();
+ runMail(maybeMail.get());
if (singleStep) {
break;
}
@@ -390,6 +393,20 @@ public class MailboxProcessor implements Closeable {
}
}
+ private void runMail(Mail mail) throws Exception {
+ mailboxMetricsControl.getMailCounter().inc();
+ mail.run();
+ if (!suspended) {
+ // start latency measurement on first mail that is not suspending
mailbox execution,
+ // i.e., on first non-poison mail, otherwise latency measurement
is not started to avoid
+ // overhead
+ if (!mailboxMetricsControl.isLatencyMeasurementStarted()
+ && mailboxMetricsControl.isLatencyMeasurementSetup()) {
+ mailboxMetricsControl.startLatencyMeasurement();
+ }
+ }
+ }
+
private void maybePauseIdleTimer() {
if (suspendedDefaultAction != null &&
suspendedDefaultAction.suspensionTimer != null) {
suspendedDefaultAction.suspensionTimer.markEnd();
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 6a2d576c8a6..f20938061fe 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -1685,7 +1685,9 @@ public class StreamTaskTest extends TestLogger {
new StreamTask<Object,
StreamOperator<Object>>(mockEnvironment) {
@Override
protected void init() {
- this.mailboxMetricsInterval = 2;
+ this.mailboxProcessor
+ .getMailboxMetricsControl()
+ .setLatencyMeasurementInterval(2);
}
@Override
@@ -1741,12 +1743,12 @@ public class StreamTaskTest extends TestLogger {
.getIOMetricGroup()
.getMailboxSize();
long startTime = SystemClock.getInstance().relativeTimeMillis();
- harness.streamTask.measureMailboxLatency();
+
harness.streamTask.mailboxProcessor.getMailboxMetricsControl().measureMailboxLatency();
for (int i = 0; i < numMails; ++i) {
harness.streamTask.mainMailboxExecutor.execute(
() -> Thread.sleep(sleepTime), "add value");
}
- harness.streamTask.measureMailboxLatency();
+
harness.streamTask.mailboxProcessor.getMailboxMetricsControl().measureMailboxLatency();
assertThat(mailboxSizeMetric.getValue(),
greaterThanOrEqualTo(numMails));
assertThat(mailboxLatencyMetric.getCount(), equalTo(0L));
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxProcessorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxProcessorTest.java
index d38242818fd..b7139e67632 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxProcessorTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxProcessorTest.java
@@ -122,7 +122,9 @@ public class TaskMailboxProcessorTest {
mailboxProcessor.getMailboxExecutor(DEFAULT_PRIORITY).execute(() ->
stop.set(true), "stop");
mailboxThread.join();
assertThat(counter.get(), greaterThan(0));
- assertThat(mailboxProcessor.getNumMailsProcessedCounter().getCount(),
greaterThan(0L));
+ assertThat(
+
mailboxProcessor.getMailboxMetricsControl().getMailCounter().getCount(),
+ greaterThan(0L));
}
@Test