This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8332e9bbe0a3b490a85585367a82e87d9044aae9 Author: Chesnay Schepler <[email protected]> AuthorDate: Fri Dec 17 12:22:38 2021 +0100 [FLINK-23976][metrics] Add additional job status metrics --- docs/content.zh/docs/ops/metrics.md | 33 ++- docs/content/docs/ops/metrics.md | 43 +++- .../shortcodes/generated/metric_configuration.html | 6 + .../apache/flink/configuration/MetricOptions.java | 87 +++++++ .../executiongraph/metrics/RestartTimeGauge.java | 80 ------- .../flink/runtime/scheduler/SchedulerBase.java | 114 ++++++++- .../scheduler/adaptive/AdaptiveScheduler.java | 22 +- .../metrics/RestartTimeGaugeTest.java | 84 ------- .../jobmaster/slotpool/SlotPoolTestUtils.java | 8 + .../runtime/scheduler/DefaultSchedulerTest.java | 100 ++++++++ .../flink/runtime/scheduler/SchedulerBaseTest.java | 254 +++++++++++++++++++++ .../scheduler/adaptive/AdaptiveSchedulerTest.java | 19 +- 12 files changed, 655 insertions(+), 195 deletions(-) diff --git a/docs/content.zh/docs/ops/metrics.md b/docs/content.zh/docs/ops/metrics.md index dfd961a..2b8a5fd 100644 --- a/docs/content.zh/docs/ops/metrics.md +++ b/docs/content.zh/docs/ops/metrics.md @@ -1066,6 +1066,9 @@ Metrics related to data exchange between task executors using netty network comm ### Availability +The metrics in this table are available for each of the following job states: INITIALIZING, CREATED, RUNNING, RESTARTING, CANCELLING, FAILING. +Whether these metrics are reported depends on the [metrics.job.status.enable]({{< ref "docs/deployment/config" >}}#metrics-job-status-enable) setting. + <table class="table table-bordered"> <thead> <tr> @@ -1077,12 +1080,36 @@ Metrics related to data exchange between task executors using netty network comm </thead> <tbody> <tr> - <th rowspan="5"><strong>Job (only available on JobManager)</strong></th> - <td>restartingTime</td> - <td>The time it took to restart the job, or how long the current restart has been in progress (in milliseconds).</td> + <th rowspan="3"><strong>Job (only available on JobManager)</strong></th> + <td><jobStatus>State</td> + <td>For a given state, return 1 if the job is currently in that state, otherwise return 0.</td> + <td>Gauge</td> + </tr> + <tr> + <td><jobStatus>Time</td> + <td>For a given state, if the job is currently in that state, return the time (in milliseconds) since the job transitioned into that state, otherwise return 0.</td> + <td>Gauge</td> + </tr> + <tr> + <td><jobStatus>TimeTotal</td> + <td>For a given state, return how much time (in milliseconds) the job has spent in that state in total.</td> <td>Gauge</td> </tr> + </tbody> +</table> + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 18%">Scope</th> + <th class="text-left" style="width: 26%">Metrics</th> + <th class="text-left" style="width: 48%">Description</th> + <th class="text-left" style="width: 8%">Type</th> + </tr> + </thead> + <tbody> <tr> + <th rowspan="4"><strong>Job (only available on JobManager)</strong></th> <td>uptime</td> <td> The time that the job has been running without interruption. diff --git a/docs/content/docs/ops/metrics.md b/docs/content/docs/ops/metrics.md index e6a09ea..88a9bd7 100644 --- a/docs/content/docs/ops/metrics.md +++ b/docs/content/docs/ops/metrics.md @@ -1066,6 +1066,9 @@ Metrics related to data exchange between task executors using netty network comm ### Availability +The metrics in this table are available for each of the following job states: INITIALIZING, CREATED, RUNNING, RESTARTING, CANCELLING, FAILING. +Whether these metrics are reported depends on the [metrics.job.status.enable]({{< ref "docs/deployment/config" >}}#metrics-job-status-enable) setting. + <table class="table table-bordered"> <thead> <tr> @@ -1077,25 +1080,43 @@ Metrics related to data exchange between task executors using netty network comm </thead> <tbody> <tr> - <th rowspan="5"><strong>Job (only available on JobManager)</strong></th> - <td>restartingTime</td> - <td>The time it took to restart the job, or how long the current restart has been in progress (in milliseconds).</td> + <th rowspan="3"><strong>Job (only available on JobManager)</strong></th> + <td><jobStatus>State</td> + <td>For a given state, return 1 if the job is currently in that state, otherwise return 0.</td> + <td>Gauge</td> + </tr> + <tr> + <td><jobStatus>Time</td> + <td>For a given state, if the job is currently in that state, return the time (in milliseconds) since the job transitioned into that state, otherwise return 0.</td> + <td>Gauge</td> + </tr> + <tr> + <td><jobStatus>TimeTotal</td> + <td>For a given state, return how much time (in milliseconds) the job has spent in that state in total.</td> <td>Gauge</td> </tr> + </tbody> +</table> + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 18%">Scope</th> + <th class="text-left" style="width: 26%">Metrics</th> + <th class="text-left" style="width: 48%">Description</th> + <th class="text-left" style="width: 8%">Type</th> + </tr> + </thead> + <tbody> <tr> + <th rowspan="4"><strong>Job (only available on JobManager)</strong></th> <td>uptime</td> - <td> - The time that the job has been running without interruption. - <p>Returns -1 for completed jobs (in milliseconds).</p> - </td> + <td><span class="label label-danger">Attention:</span> deprecated, use <b>runningTime</b>.</td> <td>Gauge</td> </tr> <tr> <td>downtime</td> - <td> - For jobs currently in a failing/recovering situation, the time elapsed during this outage. - <p>Returns 0 for running jobs and -1 for completed jobs (in milliseconds).</p> - </td> + <td><span class="label label-danger">Attention:</span> deprecated, use <b>restartingTime</b>, <b>cancellingTime</b> <b>failingTime</b>.</td> <td>Gauge</td> </tr> <tr> diff --git a/docs/layouts/shortcodes/generated/metric_configuration.html b/docs/layouts/shortcodes/generated/metric_configuration.html index b906b38..c0fbc86 100644 --- a/docs/layouts/shortcodes/generated/metric_configuration.html +++ b/docs/layouts/shortcodes/generated/metric_configuration.html @@ -27,6 +27,12 @@ <td>The thread priority used for Flink's internal metric query service. The thread is created by Akka's thread pool executor. The range of the priority is from 1 (MIN_PRIORITY) to 10 (MAX_PRIORITY). Warning, increasing this value may bring the main Flink components down.</td> </tr> <tr> + <td><h5>metrics.job.status.enable</h5></td> + <td style="word-wrap: break-word;">CURRENT_TIME</td> + <td><p>List<Enum></p></td> + <td>The selection of job status metrics that should be reported.<br /><br />Possible values:<ul><li>"STATE": For a given state, return 1 if the job is currently in that state, otherwise return 0.</li><li>"CURRENT_TIME": For a given state, if the job is currently in that state, return the time since the job transitioned into that state, otherwise return 0.</li><li>"TOTAL_TIME": For a given state, return how much time the job has spent in that state in total.</li></ul></td> + </tr> + <tr> <td><h5>metrics.latency.granularity</h5></td> <td style="word-wrap: break-word;">"operator"</td> <td>String</td> diff --git a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java index 0ba3cf5..86f787d6 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java @@ -21,8 +21,11 @@ package org.apache.flink.configuration; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.docs.Documentation; import org.apache.flink.configuration.description.Description; +import org.apache.flink.configuration.description.InlineElement; +import org.apache.flink.configuration.description.TextElement; import java.time.Duration; +import java.util.List; import static org.apache.flink.configuration.ConfigOptions.key; import static org.apache.flink.configuration.description.TextElement.text; @@ -215,5 +218,89 @@ public class MetricOptions { + "faster updating metrics. Increase this value if the metric fetcher causes too much load. Setting this value to 0 " + "disables the metric fetching completely."); + /** Controls which job status metrics will be exposed. */ + public static final ConfigOption<List<JobStatusMetrics>> JOB_STATUS_METRICS = + key("metrics.job.status.enable") + .enumType(JobStatusMetrics.class) + .asList() + .defaultValues(JobStatusMetrics.CURRENT_TIME) + .withDescription( + "The selection of job status metrics that should be reported."); + + /** Enum describing the different kinds of job status metrics. */ + public enum JobStatusMetrics implements DescribedEnum { + STATE( + "For a given state, return 1 if the job is currently in that state, otherwise return 0."), + CURRENT_TIME( + "For a given state, if the job is currently in that state, return the time since the job transitioned into that state, otherwise return 0."), + TOTAL_TIME( + "For a given state, return how much time the job has spent in that state in total."), + ; + + private final String description; + + JobStatusMetrics(String description) { + this.description = description; + } + + @Override + public InlineElement getDescription() { + return TextElement.text(description); + } + } + + /** Describes which job status metrics have been enabled. */ + public static final class JobStatusMetricsSettings { + + private final boolean stateMetricsEnabled; + private final boolean currentTimeMetricsEnabled; + private final boolean totalTimeMetricsEnabled; + + private JobStatusMetricsSettings( + boolean stateMetricsEnabled, + boolean currentTimeMetricsEnabled, + boolean totalTimeMetricsEnabled) { + this.stateMetricsEnabled = stateMetricsEnabled; + this.currentTimeMetricsEnabled = currentTimeMetricsEnabled; + this.totalTimeMetricsEnabled = totalTimeMetricsEnabled; + } + + public boolean isStateMetricsEnabled() { + return stateMetricsEnabled; + } + + public boolean isCurrentTimeMetricsEnabled() { + return currentTimeMetricsEnabled; + } + + public boolean isTotalTimeMetricsEnabled() { + return totalTimeMetricsEnabled; + } + + public static JobStatusMetricsSettings fromConfiguration(Configuration configuration) { + final List<JobStatusMetrics> jobStatusMetrics = configuration.get(JOB_STATUS_METRICS); + boolean stateMetricsEnabled = false; + boolean currentTimeMetricsEnabled = false; + boolean totalTimeMetricsEnabled = false; + + for (JobStatusMetrics jobStatusMetric : jobStatusMetrics) { + switch (jobStatusMetric) { + case STATE: + stateMetricsEnabled = true; + break; + case CURRENT_TIME: + currentTimeMetricsEnabled = true; + break; + case TOTAL_TIME: + totalTimeMetricsEnabled = true; + break; + } + } + + return new JobStatusMetricsSettings( + stateMetricsEnabled, currentTimeMetricsEnabled, totalTimeMetricsEnabled); + } + } + private MetricOptions() {} } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/RestartTimeGauge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/RestartTimeGauge.java deleted file mode 100644 index 6840f7e..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/RestartTimeGauge.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.executiongraph.metrics; - -import org.apache.flink.api.common.JobStatus; -import org.apache.flink.metrics.Gauge; -import org.apache.flink.runtime.executiongraph.JobStatusProvider; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Gauge which returns the last restarting time. - * - * <p>Restarting time is the time between {@link JobStatus#RESTARTING} and {@link - * JobStatus#RUNNING}, or a terminal state if {@link JobStatus#RUNNING} was not reached. - * - * <p>If the job has not yet reached either of these states, then the time is measured since - * reaching {@link JobStatus#RESTARTING}. If it is still the initial job execution, then the gauge - * will return 0. - */ -public class RestartTimeGauge implements Gauge<Long> { - - public static final String METRIC_NAME = "restartingTime"; - - // ------------------------------------------------------------------------ - - private final JobStatusProvider jobStatusProvider; - - public RestartTimeGauge(JobStatusProvider jobStatusProvider) { - this.jobStatusProvider = checkNotNull(jobStatusProvider); - } - - // ------------------------------------------------------------------------ - - @Override - public Long getValue() { - final JobStatus status = jobStatusProvider.getState(); - - final long restartingTimestamp = jobStatusProvider.getStatusTimestamp(JobStatus.RESTARTING); - - final long switchToRunningTimestamp; - final long lastRestartTime; - - if (restartingTimestamp <= 0) { - // we haven't yet restarted our job - return 0L; - } else if ((switchToRunningTimestamp = - jobStatusProvider.getStatusTimestamp(JobStatus.RUNNING)) - >= restartingTimestamp) { - // we have transitioned to RUNNING since the last restart - lastRestartTime = switchToRunningTimestamp - restartingTimestamp; - } else if (status.isTerminalState()) { - // since the last restart we've switched to a terminal state without touching - // the RUNNING state (e.g. failing from RESTARTING) - lastRestartTime = jobStatusProvider.getStatusTimestamp(status) - restartingTimestamp; - } else { - // we're still somewhere between RESTARTING and RUNNING - lastRestartTime = System.currentTimeMillis() - restartingTimestamp; - } - - // we guard this with 'Math.max' to avoid negative timestamps when clocks re-sync - return Math.max(lastRestartTime, 0); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index db92b7c..10c063c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; import org.apache.flink.configuration.WebOptions; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; @@ -55,7 +56,6 @@ import org.apache.flink.runtime.executiongraph.JobStatusProvider; import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition; import org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker; import org.apache.flink.runtime.executiongraph.metrics.DownTimeGauge; -import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge; import org.apache.flink.runtime.executiongraph.metrics.UpTimeGauge; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; @@ -92,6 +92,8 @@ import org.apache.flink.runtime.util.IntArrayList; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.IterableUtils; +import org.apache.flink.util.clock.Clock; +import org.apache.flink.util.clock.SystemClock; import org.apache.flink.util.concurrent.FutureUtils; import org.slf4j.Logger; @@ -105,6 +107,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.OptionalLong; @@ -112,6 +115,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; +import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -156,6 +160,8 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling private final ExecutionGraphFactory executionGraphFactory; + private final MetricOptions.JobStatusMetricsSettings jobStatusMetricsSettings; + public SchedulerBase( final Logger log, final JobGraph jobGraph, @@ -223,6 +229,9 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling this.exceptionHistory = new BoundedFIFOQueue<>( jobMasterConfiguration.getInteger(WebOptions.MAX_EXCEPTION_HISTORY_SIZE)); + + this.jobStatusMetricsSettings = + MetricOptions.JobStatusMetricsSettings.fromConfiguration(jobMasterConfiguration); } private void shutDownCheckpointServices(JobStatus jobStatus) { @@ -599,7 +608,13 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling @Override public final void startScheduling() { mainThreadExecutor.assertRunningInMainThread(); - registerJobMetrics(jobManagerJobMetricGroup, executionGraph, this::getNumberOfRestarts); + registerJobMetrics( + jobManagerJobMetricGroup, + executionGraph, + this::getNumberOfRestarts, + executionGraph::registerJobStatusListener, + executionGraph.getStatusTimestamp(JobStatus.INITIALIZING), + jobStatusMetricsSettings); operatorCoordinatorHandler.startAllOperatorCoordinators(); startSchedulingInternal(); } @@ -607,12 +622,103 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling public static void registerJobMetrics( MetricGroup metrics, JobStatusProvider jobStatusProvider, - Gauge<Long> numberOfRestarts) { - metrics.gauge(RestartTimeGauge.METRIC_NAME, new RestartTimeGauge(jobStatusProvider)); + Gauge<Long> numberOfRestarts, + Consumer<JobStatusListener> jobStatusListenerRegistrar, + long initializationTimestamp, + MetricOptions.JobStatusMetricsSettings jobStatusMetricsSettings) { metrics.gauge(DownTimeGauge.METRIC_NAME, new DownTimeGauge(jobStatusProvider)); metrics.gauge(UpTimeGauge.METRIC_NAME, new UpTimeGauge(jobStatusProvider)); metrics.gauge(MetricNames.NUM_RESTARTS, numberOfRestarts); metrics.gauge(MetricNames.FULL_RESTARTS, numberOfRestarts); + + jobStatusListenerRegistrar.accept( + new JobStatusMetrics(metrics, initializationTimestamp, jobStatusMetricsSettings)); + } + + @VisibleForTesting + static class JobStatusMetrics implements JobStatusListener { + + private JobStatus currentStatus = JobStatus.INITIALIZING; + private long currentStatusTimestamp; + private final long[] cumulativeStatusTimes; + + public JobStatusMetrics( + MetricGroup metricGroup, + long initializationTimestamp, + MetricOptions.JobStatusMetricsSettings jobStatusMetricsSettings) { + + currentStatus = JobStatus.INITIALIZING; + currentStatusTimestamp = initializationTimestamp; + cumulativeStatusTimes = new long[JobStatus.values().length]; + + for (JobStatus jobStatus : JobStatus.values()) { + if (!jobStatus.isTerminalState() && jobStatus != JobStatus.RECONCILING) { + + if (jobStatusMetricsSettings.isStateMetricsEnabled()) { + metricGroup.gauge( + getStateMetricName(jobStatus), createStateMetric(jobStatus)); + } + + if (jobStatusMetricsSettings.isCurrentTimeMetricsEnabled()) { + metricGroup.gauge( + getCurrentTimeMetricName(jobStatus), + createCurrentTimeMetric(jobStatus, SystemClock.getInstance())); + } + + if (jobStatusMetricsSettings.isTotalTimeMetricsEnabled()) { + metricGroup.gauge( + getTotalTimeMetricName(jobStatus), + createTotalTimeMetric(jobStatus, SystemClock.getInstance())); + } + } + } + } + + @VisibleForTesting + Gauge<Long> createStateMetric(JobStatus jobStatus) { + return () -> currentStatus == jobStatus ? 1L : 0L; + } + + @VisibleForTesting + Gauge<Long> createCurrentTimeMetric(JobStatus jobStatus, Clock clock) { + return () -> + currentStatus == jobStatus + ? Math.max(clock.absoluteTimeMillis() - currentStatusTimestamp, 0) + : 0; + } + + @VisibleForTesting + Gauge<Long> createTotalTimeMetric(JobStatus jobStatus, Clock clock) { + return () -> + currentStatus == jobStatus + ? cumulativeStatusTimes[jobStatus.ordinal()] + + Math.max( + clock.absoluteTimeMillis() - currentStatusTimestamp, 0) + : cumulativeStatusTimes[jobStatus.ordinal()]; + } + + @VisibleForTesting + static String getStateMetricName(JobStatus jobStatus) { + return jobStatus.name().toLowerCase(Locale.ROOT) + "State"; + } + + @VisibleForTesting + static String getCurrentTimeMetricName(JobStatus jobStatus) { + return jobStatus.name().toLowerCase(Locale.ROOT) + "Time"; + } + + @VisibleForTesting + static String getTotalTimeMetricName(JobStatus jobStatus) { + return jobStatus.name().toLowerCase(Locale.ROOT) + "TimeTotal"; + } + + @Override + public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp) { + cumulativeStatusTimes[currentStatus.ordinal()] += timestamp - currentStatusTimestamp; + + currentStatus = newJobStatus; + currentStatusTimestamp = timestamp; + } } protected abstract void startSchedulingInternal(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 06588bc..db6c2a17 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.MetricOptions; import org.apache.flink.configuration.SchedulerExecutionMode; import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.runtime.JobException; @@ -115,8 +116,9 @@ import javax.annotation.Nullable; import java.io.IOException; import java.net.InetSocketAddress; import java.time.Duration; -import java.util.Arrays; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -189,6 +191,7 @@ public class AdaptiveScheduler private final Duration resourceStabilizationTimeout; private final ExecutionGraphFactory executionGraphFactory; + private final JobStatusStore jobStatusStore; private State state = new Created(this, LOG); @@ -258,9 +261,7 @@ public class AdaptiveScheduler this.componentMainThreadExecutor = mainThreadExecutor; - final JobStatusStore jobStatusStore = new JobStatusStore(initializationTimestamp); - this.jobStatusListeners = - Arrays.asList(Preconditions.checkNotNull(jobStatusListener), jobStatusStore); + this.jobStatusStore = new JobStatusStore(initializationTimestamp); this.scaleUpController = new ReactiveScaleUpController(configuration); @@ -270,8 +271,19 @@ public class AdaptiveScheduler this.executionGraphFactory = executionGraphFactory; + final Collection<JobStatusListener> tmpJobStatusListeners = new ArrayList<>(); + tmpJobStatusListeners.add(Preconditions.checkNotNull(jobStatusListener)); + tmpJobStatusListeners.add(jobStatusStore); + SchedulerBase.registerJobMetrics( - jobManagerJobMetricGroup, jobStatusStore, () -> (long) numRestarts); + jobManagerJobMetricGroup, + jobStatusStore, + () -> (long) numRestarts, + tmpJobStatusListeners::add, + initializationTimestamp, + MetricOptions.JobStatusMetricsSettings.fromConfiguration(configuration)); + + jobStatusListeners = Collections.unmodifiableCollection(tmpJobStatusListeners); } private static void assertPreconditions(JobGraph jobGraph) throws RuntimeException { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/metrics/RestartTimeGaugeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/metrics/RestartTimeGaugeTest.java deleted file mode 100644 index a927508..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/metrics/RestartTimeGaugeTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.executiongraph.metrics; - -import org.apache.flink.api.common.JobStatus; -import org.apache.flink.runtime.executiongraph.TestingJobStatusProvider; -import org.apache.flink.util.TestLogger; - -import org.junit.Test; - -import java.util.HashMap; -import java.util.Map; - -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; - -/** Tests for {@link RestartTimeGauge}. */ -public class RestartTimeGaugeTest extends TestLogger { - - @Test - public void testNotRestarted() { - final RestartTimeGauge gauge = - new RestartTimeGauge(new TestingJobStatusProvider(JobStatus.RUNNING, -1)); - assertThat(gauge.getValue(), is(0L)); - } - - @Test - public void testInRestarting() { - final Map<JobStatus, Long> statusTimestampMap = new HashMap<>(); - statusTimestampMap.put(JobStatus.RESTARTING, 1L); - - final RestartTimeGauge gauge = - new RestartTimeGauge( - new TestingJobStatusProvider( - JobStatus.RESTARTING, - status -> statusTimestampMap.getOrDefault(status, -1L))); - assertThat(gauge.getValue(), greaterThan(0L)); - } - - @Test - public void testRunningAfterRestarting() { - final Map<JobStatus, Long> statusTimestampMap = new HashMap<>(); - statusTimestampMap.put(JobStatus.RESTARTING, 123L); - statusTimestampMap.put(JobStatus.RUNNING, 234L); - - final RestartTimeGauge gauge = - new RestartTimeGauge( - new TestingJobStatusProvider( - JobStatus.RUNNING, - status -> statusTimestampMap.getOrDefault(status, -1L))); - assertThat(gauge.getValue(), is(111L)); - } - - @Test - public void testFailedAfterRestarting() { - final Map<JobStatus, Long> statusTimestampMap = new HashMap<>(); - statusTimestampMap.put(JobStatus.RESTARTING, 123L); - statusTimestampMap.put(JobStatus.FAILED, 456L); - - final RestartTimeGauge gauge = - new RestartTimeGauge( - new TestingJobStatusProvider( - JobStatus.FAILED, - status -> statusTimestampMap.getOrDefault(status, -1L))); - assertThat(gauge.getValue(), is(333L)); - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTestUtils.java index f83e7cd..95a6f22 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTestUtils.java @@ -61,4 +61,12 @@ public final class SlotPoolTestUtils { return slotPool.offerSlots( slotOffers, new LocalTaskManagerLocation(), taskManagerGateway, 0); } + + @Nonnull + public static Collection<SlotOffer> offerSlots( + SlotPool slotPool, + Collection<SlotOffer> slotOffers, + TaskManagerGateway taskManagerGateway) { + return slotPool.offerSlots(new LocalTaskManagerLocation(), taskManagerGateway, slotOffers); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java index 46742dc..a63a752 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java @@ -19,11 +19,14 @@ package org.apache.flink.runtime.scheduler; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; import org.apache.flink.configuration.WebOptions; import org.apache.flink.core.testutils.ScheduledTask; +import org.apache.flink.metrics.Gauge; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; @@ -33,6 +36,7 @@ import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.hooks.TestMasterHook; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.execution.ExecutionState; @@ -60,6 +64,15 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder; +import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridgeBuilder; +import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl; +import org.apache.flink.runtime.jobmaster.slotpool.SlotPool; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.metrics.util.TestingMetricRegistry; +import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerTest; import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntryMatcher; import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; @@ -74,6 +87,7 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.runtime.util.ResourceCounter; import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; @@ -88,11 +102,13 @@ import org.hamcrest.collection.IsIterableContainingInOrder; import org.hamcrest.collection.IsIterableWithSize; import org.hamcrest.core.Is; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -112,6 +128,8 @@ import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import static org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements; +import static org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.offerSlots; import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.acknowledgePendingCheckpoint; import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.enableCheckpointing; import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.getCheckpointCoordinator; @@ -1378,6 +1396,88 @@ public class DefaultSchedulerTest extends TestLogger { } @Test + public void testStatusMetrics() throws Exception { + // running time acts as a stand-in for generic status time metrics + final CompletableFuture<Gauge<Long>> runningTimeMetricFuture = new CompletableFuture<>(); + final MetricRegistry metricRegistry = + TestingMetricRegistry.builder() + .setRegisterConsumer( + (metric, name, group) -> { + switch (name) { + case "runningTimeTotal": + runningTimeMetricFuture.complete((Gauge<Long>) metric); + break; + } + }) + .build(); + + final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); + final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph); + + final Configuration configuration = new Configuration(); + configuration.set( + MetricOptions.JOB_STATUS_METRICS, + Arrays.asList(MetricOptions.JobStatusMetrics.TOTAL_TIME)); + + final ComponentMainThreadExecutor singleThreadMainThreadExecutor = + ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor( + scheduledExecutorService); + + final Time slotTimeout = Time.milliseconds(5L); + final SlotPool slotPool = + new DeclarativeSlotPoolBridgeBuilder() + .setBatchSlotTimeout(slotTimeout) + .buildAndStart(singleThreadMainThreadExecutor); + final PhysicalSlotProvider slotProvider = + new PhysicalSlotProviderImpl( + LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool); + + final DefaultScheduler scheduler = + createSchedulerBuilder(jobGraph, singleThreadMainThreadExecutor) + .setJobMasterConfiguration(configuration) + .setJobManagerJobMetricGroup( + JobManagerMetricGroup.createJobManagerMetricGroup( + metricRegistry, "localhost") + .addJob(new JobID(), "jobName")) + .setExecutionSlotAllocatorFactory( + SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory( + slotProvider, slotTimeout)) + .build(); + + final AdaptiveSchedulerTest.SubmissionBufferingTaskManagerGateway taskManagerGateway = + new AdaptiveSchedulerTest.SubmissionBufferingTaskManagerGateway(1); + + taskManagerGateway.setCancelConsumer( + executionAttemptId -> { + singleThreadMainThreadExecutor.execute( + () -> + scheduler.updateTaskExecutionState( + new TaskExecutionState( + executionAttemptId, ExecutionState.CANCELED))); + }); + + singleThreadMainThreadExecutor.execute( + () -> { + scheduler.startScheduling(); + + offerSlots( + slotPool, + createSlotOffersForResourceRequirements( + ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)), + taskManagerGateway); + }); + + // wait for the first task submission + taskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5)); + + // sleep a bit to ensure uptime is > 0 + Thread.sleep(10L); + + final Gauge<Long> runningTimeGauge = runningTimeMetricFuture.get(); + Assert.assertThat(runningTimeGauge.getValue(), greaterThan(0L)); + } + + @Test public void testDeploymentWaitForProducedPartitionRegistration() { shuffleMaster.setAutoCompleteRegistration(false); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerBaseTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerBaseTest.java new file mode 100644 index 0000000..1584f56 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerBaseTest.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup; +import org.apache.flink.util.clock.ManualClock; + +import org.junit.jupiter.api.Test; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.Arrays; +import java.util.EnumMap; +import java.util.EnumSet; +import java.util.Map; +import java.util.Optional; + +import static org.assertj.core.api.Assertions.assertThat; + +class SchedulerBaseTest { + + @Test + void testStateMetric() { + final SchedulerBase.JobStatusMetrics jobStatusMetrics = + new SchedulerBase.JobStatusMetrics( + new UnregisteredMetricsGroup(), + 0L, + enable( + MetricOptions.JobStatusMetrics.STATE, + MetricOptions.JobStatusMetrics.CURRENT_TIME, + MetricOptions.JobStatusMetrics.TOTAL_TIME)); + + final Gauge<Long> metric = jobStatusMetrics.createStateMetric(JobStatus.RUNNING); + + assertThat(metric.getValue()).isEqualTo(0L); + jobStatusMetrics.jobStatusChanges(new JobID(), JobStatus.RUNNING, 1L); + assertThat(metric.getValue()).isEqualTo(1L); + jobStatusMetrics.jobStatusChanges(new JobID(), JobStatus.RESTARTING, 2L); + assertThat(metric.getValue()).isEqualTo(0L); + } + + @Test + void testCurrentTimeMetric() { + final SchedulerBase.JobStatusMetrics jobStatusMetrics = + new SchedulerBase.JobStatusMetrics( + new UnregisteredMetricsGroup(), + 0L, + enable( + MetricOptions.JobStatusMetrics.STATE, + MetricOptions.JobStatusMetrics.CURRENT_TIME, + MetricOptions.JobStatusMetrics.TOTAL_TIME)); + + final ManualClock clock = new ManualClock(); + final Gauge<Long> metric = + jobStatusMetrics.createCurrentTimeMetric(JobStatus.RUNNING, clock); + + assertThat(metric.getValue()).isEqualTo(0L); + jobStatusMetrics.jobStatusChanges(new JobID(), JobStatus.RUNNING, 1L); + clock.advanceTime(Duration.ofMillis(11)); + assertThat(metric.getValue()).isEqualTo(10L); + jobStatusMetrics.jobStatusChanges(new JobID(), JobStatus.RESTARTING, 15L); + assertThat(metric.getValue()).isEqualTo(0L); + } + + @Test + void testTotalTimeMetric() { + final SchedulerBase.JobStatusMetrics jobStatusMetrics = + new SchedulerBase.JobStatusMetrics( + new UnregisteredMetricsGroup(), + 0L, + enable( + MetricOptions.JobStatusMetrics.STATE, + MetricOptions.JobStatusMetrics.CURRENT_TIME, + MetricOptions.JobStatusMetrics.TOTAL_TIME)); + + final ManualClock clock = new ManualClock(0); + final Gauge<Long> metric = jobStatusMetrics.createTotalTimeMetric(JobStatus.RUNNING, clock); + + assertThat(metric.getValue()).isEqualTo(0L); + + jobStatusMetrics.jobStatusChanges( + new JobID(), JobStatus.RUNNING, clock.absoluteTimeMillis()); + + clock.advanceTime(Duration.ofMillis(10)); + assertThat(metric.getValue()).isEqualTo(10L); + + jobStatusMetrics.jobStatusChanges( + new JobID(), JobStatus.RESTARTING, clock.absoluteTimeMillis()); + + clock.advanceTime(Duration.ofMillis(4)); + assertThat(metric.getValue()).isEqualTo(10L); + + jobStatusMetrics.jobStatusChanges( + new JobID(), JobStatus.RUNNING, clock.absoluteTimeMillis()); + + clock.advanceTime(Duration.ofMillis(1)); + assertThat(metric.getValue()).isEqualTo(11L); + } + + @Test + void testStatusSelection() { + final InterceptingOperatorMetricGroup metricGroup = new InterceptingOperatorMetricGroup(); + + final SchedulerBase.JobStatusMetrics jobStatusMetrics = + new SchedulerBase.JobStatusMetrics( + metricGroup, 0L, enable(MetricOptions.JobStatusMetrics.STATE)); + final Map<JobStatus, StatusMetricSet> registeredMetrics = extractMetrics(metricGroup); + + for (JobStatus value : JobStatus.values()) { + if (value.isTerminalState() || value == JobStatus.RECONCILING) { + assertThat(registeredMetrics).doesNotContainKey(value); + } else { + assertThat(registeredMetrics).containsKey(value); + } + } + } + + @Test + void testEnableStateMetrics() { + testMetricSelection(MetricOptions.JobStatusMetrics.STATE); + } + + @Test + void testEnableCurrentTimeMetrics() { + testMetricSelection(MetricOptions.JobStatusMetrics.CURRENT_TIME); + } + + @Test + void testEnableTotalTimeMetrics() { + testMetricSelection(MetricOptions.JobStatusMetrics.TOTAL_TIME); + } + + @Test + void testEnableMultipleMetrics() { + testMetricSelection( + MetricOptions.JobStatusMetrics.CURRENT_TIME, + MetricOptions.JobStatusMetrics.TOTAL_TIME); + } + + private static void testMetricSelection(MetricOptions.JobStatusMetrics... selectedMetrics) { + final EnumSet<MetricOptions.JobStatusMetrics> selectedMetricsSet = + EnumSet.noneOf(MetricOptions.JobStatusMetrics.class); + Arrays.stream(selectedMetrics).forEach(selectedMetricsSet::add); + + final InterceptingOperatorMetricGroup metricGroup = new InterceptingOperatorMetricGroup(); + + final SchedulerBase.JobStatusMetrics jobStatusMetrics = + new SchedulerBase.JobStatusMetrics(metricGroup, 1L, enable(selectedMetrics)); + final Map<JobStatus, StatusMetricSet> registeredMetrics = extractMetrics(metricGroup); + + for (StatusMetricSet metrics : registeredMetrics.values()) { + assertThat(metrics.getState().isPresent()) + .isEqualTo(selectedMetricsSet.contains(MetricOptions.JobStatusMetrics.STATE)); + assertThat(metrics.getCurrentTime().isPresent()) + .isEqualTo( + selectedMetricsSet.contains( + MetricOptions.JobStatusMetrics.CURRENT_TIME)); + assertThat(metrics.getTotalTime().isPresent()) + .isEqualTo( + selectedMetricsSet.contains(MetricOptions.JobStatusMetrics.TOTAL_TIME)); + } + } + + private static MetricOptions.JobStatusMetricsSettings enable( + MetricOptions.JobStatusMetrics... enabledMetrics) { + final Configuration configuration = new Configuration(); + + configuration.set(MetricOptions.JOB_STATUS_METRICS, Arrays.asList(enabledMetrics)); + + return MetricOptions.JobStatusMetricsSettings.fromConfiguration(configuration); + } + + private static Map<JobStatus, StatusMetricSet> extractMetrics( + InterceptingOperatorMetricGroup metrics) { + final Map<JobStatus, StatusMetricSet> extractedMetrics = new EnumMap<>(JobStatus.class); + + for (JobStatus jobStatus : JobStatus.values()) { + final StatusMetricSet statusMetricSet = + new StatusMetricSet( + (Gauge<Long>) + metrics.get( + SchedulerBase.JobStatusMetrics.getStateMetricName( + jobStatus)), + (Gauge<Long>) + metrics.get( + SchedulerBase.JobStatusMetrics.getCurrentTimeMetricName( + jobStatus)), + (Gauge<Long>) + metrics.get( + SchedulerBase.JobStatusMetrics.getTotalTimeMetricName( + jobStatus))); + if (statusMetricSet.getState().isPresent() + || statusMetricSet.getCurrentTime().isPresent() + || statusMetricSet.getTotalTime().isPresent()) { + extractedMetrics.put(jobStatus, statusMetricSet); + } + } + + return extractedMetrics; + } + + private static class StatusMetricSet { + + @Nullable private final Gauge<Long> state; + @Nullable private final Gauge<Long> currentTime; + @Nullable private final Gauge<Long> totalTime; + + private StatusMetricSet( + @Nullable Gauge<Long> state, + @Nullable Gauge<Long> currentTime, + @Nullable Gauge<Long> totalTime) { + this.state = state; + this.currentTime = currentTime; + this.totalTime = totalTime; + } + + @Nullable + public Optional<Gauge<Long>> getState() { + return Optional.ofNullable(state); + } + + @Nullable + public Optional<Gauge<Long>> getCurrentTime() { + return Optional.ofNullable(currentTime); + } + + @Nullable + public Optional<Gauge<Long>> getTotalTime() { + return Optional.ofNullable(totalTime); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java index 2926316..3ca32d9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.MetricOptions; import org.apache.flink.configuration.SchedulerExecutionMode; import org.apache.flink.metrics.Gauge; import org.apache.flink.runtime.checkpoint.CheckpointException; @@ -45,7 +46,6 @@ import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition; import org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy; import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy; import org.apache.flink.runtime.executiongraph.metrics.DownTimeGauge; -import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge; import org.apache.flink.runtime.executiongraph.metrics.UpTimeGauge; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; @@ -93,6 +93,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; @@ -496,8 +497,8 @@ public class AdaptiveSchedulerTest extends TestLogger { public void testStatusMetrics() throws Exception { final CompletableFuture<UpTimeGauge> upTimeMetricFuture = new CompletableFuture<>(); final CompletableFuture<DownTimeGauge> downTimeMetricFuture = new CompletableFuture<>(); - final CompletableFuture<RestartTimeGauge> restartTimeMetricFuture = - new CompletableFuture<>(); + // restartingTime acts as a stand-in for generic status time metrics + final CompletableFuture<Gauge<Long>> restartTimeMetricFuture = new CompletableFuture<>(); final MetricRegistry metricRegistry = TestingMetricRegistry.builder() .setRegisterConsumer( @@ -509,9 +510,8 @@ public class AdaptiveSchedulerTest extends TestLogger { case DownTimeGauge.METRIC_NAME: downTimeMetricFuture.complete((DownTimeGauge) metric); break; - case RestartTimeGauge.METRIC_NAME: - restartTimeMetricFuture.complete( - (RestartTimeGauge) metric); + case "restartingTimeTotal": + restartTimeMetricFuture.complete((Gauge<Long>) metric); break; } }) @@ -525,6 +525,9 @@ public class AdaptiveSchedulerTest extends TestLogger { final Configuration configuration = new Configuration(); configuration.set(JobManagerOptions.MIN_PARALLELISM_INCREASE, 1); configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(10L)); + configuration.set( + MetricOptions.JOB_STATUS_METRICS, + Arrays.asList(MetricOptions.JobStatusMetrics.TOTAL_TIME)); final AdaptiveScheduler scheduler = new AdaptiveSchedulerBuilder(jobGraph, singleThreadMainThreadExecutor) @@ -538,7 +541,7 @@ public class AdaptiveSchedulerTest extends TestLogger { final UpTimeGauge upTimeGauge = upTimeMetricFuture.get(); final DownTimeGauge downTimeGauge = downTimeMetricFuture.get(); - final RestartTimeGauge restartTimeGauge = restartTimeMetricFuture.get(); + final Gauge<Long> restartTimeGauge = restartTimeMetricFuture.get(); final SubmissionBufferingTaskManagerGateway taskManagerGateway = new SubmissionBufferingTaskManagerGateway(1 + PARALLELISM); @@ -1202,7 +1205,7 @@ public class AdaptiveSchedulerTest extends TestLogger { * A {@link SimpleAckingTaskManagerGateway} that buffers all the task submissions into a * blocking queue, allowing one to wait for an arbitrary number of submissions. */ - private static class SubmissionBufferingTaskManagerGateway + public static class SubmissionBufferingTaskManagerGateway extends SimpleAckingTaskManagerGateway { final BlockingQueue<TaskDeploymentDescriptor> submittedTasks;
