This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8332e9bbe0a3b490a85585367a82e87d9044aae9
Author: Chesnay Schepler <[email protected]>
AuthorDate: Fri Dec 17 12:22:38 2021 +0100

    [FLINK-23976][metrics] Add additional job status metrics
---
 docs/content.zh/docs/ops/metrics.md                |  33 ++-
 docs/content/docs/ops/metrics.md                   |  43 +++-
 .../shortcodes/generated/metric_configuration.html |   6 +
 .../apache/flink/configuration/MetricOptions.java  |  87 +++++++
 .../executiongraph/metrics/RestartTimeGauge.java   |  80 -------
 .../flink/runtime/scheduler/SchedulerBase.java     | 114 ++++++++-
 .../scheduler/adaptive/AdaptiveScheduler.java      |  22 +-
 .../metrics/RestartTimeGaugeTest.java              |  84 -------
 .../jobmaster/slotpool/SlotPoolTestUtils.java      |   8 +
 .../runtime/scheduler/DefaultSchedulerTest.java    | 100 ++++++++
 .../flink/runtime/scheduler/SchedulerBaseTest.java | 254 +++++++++++++++++++++
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  |  19 +-
 12 files changed, 655 insertions(+), 195 deletions(-)

diff --git a/docs/content.zh/docs/ops/metrics.md 
b/docs/content.zh/docs/ops/metrics.md
index dfd961a..2b8a5fd 100644
--- a/docs/content.zh/docs/ops/metrics.md
+++ b/docs/content.zh/docs/ops/metrics.md
@@ -1066,6 +1066,9 @@ Metrics related to data exchange between task executors 
using netty network comm
 
 ### Availability
 
+The metrics in this table are available for each of the following job states: 
INITIALIZING, CREATED, RUNNING, RESTARTING, CANCELLING, FAILING.
+Whether these metrics are reported depends on the 
[metrics.job.status.enable]({{< ref "docs/deployment/config" 
>}}#metrics-job-status-enable) setting.
+
 <table class="table table-bordered">
   <thead>
     <tr>
@@ -1077,12 +1080,36 @@ Metrics related to data exchange between task executors 
using netty network comm
   </thead>
   <tbody>
     <tr>
-      <th rowspan="5"><strong>Job (only available on JobManager)</strong></th>
-      <td>restartingTime</td>
-      <td>The time it took to restart the job, or how long the current restart 
has been in progress (in milliseconds).</td>
+      <th rowspan="3"><strong>Job (only available on JobManager)</strong></th>
+      <td>&lt;jobStatus&gt;State</td>
+      <td>For a given state, return 1 if the job is currently in that state, 
otherwise return 0.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>&lt;jobStatus&gt;Time</td>
+      <td>For a given state, if the job is currently in that state, return the 
time (in milliseconds) since the job transitioned into that state, otherwise 
return 0.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>&lt;jobStatus&gt;TimeTotal</td>
+      <td>For a given state, return how much time (in milliseconds) the job 
has spent in that state in total.</td>
       <td>Gauge</td>
     </tr>
+  </tbody>
+</table>
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 18%">Scope</th>
+      <th class="text-left" style="width: 26%">Metrics</th>
+      <th class="text-left" style="width: 48%">Description</th>
+      <th class="text-left" style="width: 8%">Type</th>
+    </tr>
+  </thead>
+  <tbody>
     <tr>
+      <th rowspan="4"><strong>Job (only available on JobManager)</strong></th>
       <td>uptime</td>
       <td>
         The time that the job has been running without interruption.
diff --git a/docs/content/docs/ops/metrics.md b/docs/content/docs/ops/metrics.md
index e6a09ea..88a9bd7 100644
--- a/docs/content/docs/ops/metrics.md
+++ b/docs/content/docs/ops/metrics.md
@@ -1066,6 +1066,9 @@ Metrics related to data exchange between task executors 
using netty network comm
 
 ### Availability
 
+The metrics in this table are available for each of the following job states: 
INITIALIZING, CREATED, RUNNING, RESTARTING, CANCELLING, FAILING.
+Whether these metrics are reported depends on the 
[metrics.job.status.enable]({{< ref "docs/deployment/config" 
>}}#metrics-job-status-enable) setting.
+
 <table class="table table-bordered">
   <thead>
     <tr>
@@ -1077,25 +1080,43 @@ Metrics related to data exchange between task executors 
using netty network comm
   </thead>
   <tbody>
     <tr>
-      <th rowspan="5"><strong>Job (only available on JobManager)</strong></th>
-      <td>restartingTime</td>
-      <td>The time it took to restart the job, or how long the current restart 
has been in progress (in milliseconds).</td>
+      <th rowspan="3"><strong>Job (only available on JobManager)</strong></th>
+      <td>&lt;jobStatus&gt;State</td>
+      <td>For a given state, return 1 if the job is currently in that state, 
otherwise return 0.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>&lt;jobStatus&gt;Time</td>
+      <td>For a given state, if the job is currently in that state, return the 
time (in milliseconds) since the job transitioned into that state, otherwise 
return 0.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>&lt;jobStatus&gt;TimeTotal</td>
+      <td>For a given state, return how much time (in milliseconds) the job 
has spent in that state in total.</td>
       <td>Gauge</td>
     </tr>
+  </tbody>
+</table>
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 18%">Scope</th>
+      <th class="text-left" style="width: 26%">Metrics</th>
+      <th class="text-left" style="width: 48%">Description</th>
+      <th class="text-left" style="width: 8%">Type</th>
+    </tr>
+  </thead>
+  <tbody>
     <tr>
+      <th rowspan="4"><strong>Job (only available on JobManager)</strong></th>
       <td>uptime</td>
-      <td>
-        The time that the job has been running without interruption.
-        <p>Returns -1 for completed jobs (in milliseconds).</p>
-      </td>
+      <td><span class="label label-danger">Attention:</span> deprecated, use 
<b>runningTime</b>.</td>
       <td>Gauge</td>
     </tr>
     <tr>
       <td>downtime</td>
-      <td>
-        For jobs currently in a failing/recovering situation, the time elapsed 
during this outage.
-        <p>Returns 0 for running jobs and -1 for completed jobs (in 
milliseconds).</p>
-      </td>
+      <td><span class="label label-danger">Attention:</span> deprecated, use 
<b>restartingTime</b>, <b>cancellingTime</b> <b>failingTime</b>.</td>
       <td>Gauge</td>
     </tr>
     <tr>
diff --git a/docs/layouts/shortcodes/generated/metric_configuration.html 
b/docs/layouts/shortcodes/generated/metric_configuration.html
index b906b38..c0fbc86 100644
--- a/docs/layouts/shortcodes/generated/metric_configuration.html
+++ b/docs/layouts/shortcodes/generated/metric_configuration.html
@@ -27,6 +27,12 @@
             <td>The thread priority used for Flink's internal metric query 
service. The thread is created by Akka's thread pool executor. The range of the 
priority is from 1 (MIN_PRIORITY) to 10 (MAX_PRIORITY). Warning, increasing 
this value may bring the main Flink components down.</td>
         </tr>
         <tr>
+            <td><h5>metrics.job.status.enable</h5></td>
+            <td style="word-wrap: break-word;">CURRENT_TIME</td>
+            <td><p>List&lt;Enum&gt;</p></td>
+            <td>The selection of job status metrics that should be 
reported.<br /><br />Possible values:<ul><li>"STATE": For a given state, return 
1 if the job is currently in that state, otherwise return 
0.</li><li>"CURRENT_TIME": For a given state, if the job is currently in that 
state, return the time since the job transitioned into that state, otherwise 
return 0.</li><li>"TOTAL_TIME": For a given state, return how much time the job 
has spent in that state in total.</li></ul></td>
+        </tr>
+        <tr>
             <td><h5>metrics.latency.granularity</h5></td>
             <td style="word-wrap: break-word;">"operator"</td>
             <td>String</td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
index 0ba3cf5..86f787d6 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
@@ -21,8 +21,11 @@ package org.apache.flink.configuration;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.docs.Documentation;
 import org.apache.flink.configuration.description.Description;
+import org.apache.flink.configuration.description.InlineElement;
+import org.apache.flink.configuration.description.TextElement;
 
 import java.time.Duration;
+import java.util.List;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 import static org.apache.flink.configuration.description.TextElement.text;
@@ -215,5 +218,89 @@ public class MetricOptions {
                                     + "faster updating metrics. Increase this 
value if the metric fetcher causes too much load. Setting this value to 0 "
                                     + "disables the metric fetching 
completely.");
 
+    /** Controls which job status metrics will be exposed. */
+    public static final ConfigOption<List<JobStatusMetrics>> 
JOB_STATUS_METRICS =
+            key("metrics.job.status.enable")
+                    .enumType(JobStatusMetrics.class)
+                    .asList()
+                    .defaultValues(JobStatusMetrics.CURRENT_TIME)
+                    .withDescription(
+                            "The selection of job status metrics that should 
be reported.");
+
+    /** Enum describing the different kinds of job status metrics. */
+    public enum JobStatusMetrics implements DescribedEnum {
+        STATE(
+                "For a given state, return 1 if the job is currently in that 
state, otherwise return 0."),
+        CURRENT_TIME(
+                "For a given state, if the job is currently in that state, 
return the time since the job transitioned into that state, otherwise return 
0."),
+        TOTAL_TIME(
+                "For a given state, return how much time the job has spent in 
that state in total."),
+        ;
+
+        private final String description;
+
+        JobStatusMetrics(String description) {
+            this.description = description;
+        }
+
+        @Override
+        public InlineElement getDescription() {
+            return TextElement.text(description);
+        }
+    }
+
+    /** Describes which job status metrics have been enabled. */
+    public static final class JobStatusMetricsSettings {
+
+        private final boolean stateMetricsEnabled;
+        private final boolean currentTimeMetricsEnabled;
+        private final boolean totalTimeMetricsEnabled;
+
+        private JobStatusMetricsSettings(
+                boolean stateMetricsEnabled,
+                boolean currentTimeMetricsEnabled,
+                boolean totalTimeMetricsEnabled) {
+            this.stateMetricsEnabled = stateMetricsEnabled;
+            this.currentTimeMetricsEnabled = currentTimeMetricsEnabled;
+            this.totalTimeMetricsEnabled = totalTimeMetricsEnabled;
+        }
+
+        public boolean isStateMetricsEnabled() {
+            return stateMetricsEnabled;
+        }
+
+        public boolean isCurrentTimeMetricsEnabled() {
+            return currentTimeMetricsEnabled;
+        }
+
+        public boolean isTotalTimeMetricsEnabled() {
+            return totalTimeMetricsEnabled;
+        }
+
+        public static JobStatusMetricsSettings fromConfiguration(Configuration 
configuration) {
+            final List<JobStatusMetrics> jobStatusMetrics = 
configuration.get(JOB_STATUS_METRICS);
+            boolean stateMetricsEnabled = false;
+            boolean currentTimeMetricsEnabled = false;
+            boolean totalTimeMetricsEnabled = false;
+
+            for (JobStatusMetrics jobStatusMetric : jobStatusMetrics) {
+                switch (jobStatusMetric) {
+                    case STATE:
+                        stateMetricsEnabled = true;
+                        break;
+                    case CURRENT_TIME:
+                        currentTimeMetricsEnabled = true;
+                        break;
+                    case TOTAL_TIME:
+                        totalTimeMetricsEnabled = true;
+                        break;
+                }
+            }
+
+            return new JobStatusMetricsSettings(
+                    stateMetricsEnabled, currentTimeMetricsEnabled, 
totalTimeMetricsEnabled);
+        }
+    }
+
     private MetricOptions() {}
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/RestartTimeGauge.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/RestartTimeGauge.java
deleted file mode 100644
index 6840f7e..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/RestartTimeGauge.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph.metrics;
-
-import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.runtime.executiongraph.JobStatusProvider;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Gauge which returns the last restarting time.
- *
- * <p>Restarting time is the time between {@link JobStatus#RESTARTING} and 
{@link
- * JobStatus#RUNNING}, or a terminal state if {@link JobStatus#RUNNING} was 
not reached.
- *
- * <p>If the job has not yet reached either of these states, then the time is 
measured since
- * reaching {@link JobStatus#RESTARTING}. If it is still the initial job 
execution, then the gauge
- * will return 0.
- */
-public class RestartTimeGauge implements Gauge<Long> {
-
-    public static final String METRIC_NAME = "restartingTime";
-
-    // ------------------------------------------------------------------------
-
-    private final JobStatusProvider jobStatusProvider;
-
-    public RestartTimeGauge(JobStatusProvider jobStatusProvider) {
-        this.jobStatusProvider = checkNotNull(jobStatusProvider);
-    }
-
-    // ------------------------------------------------------------------------
-
-    @Override
-    public Long getValue() {
-        final JobStatus status = jobStatusProvider.getState();
-
-        final long restartingTimestamp = 
jobStatusProvider.getStatusTimestamp(JobStatus.RESTARTING);
-
-        final long switchToRunningTimestamp;
-        final long lastRestartTime;
-
-        if (restartingTimestamp <= 0) {
-            // we haven't yet restarted our job
-            return 0L;
-        } else if ((switchToRunningTimestamp =
-                        
jobStatusProvider.getStatusTimestamp(JobStatus.RUNNING))
-                >= restartingTimestamp) {
-            // we have transitioned to RUNNING since the last restart
-            lastRestartTime = switchToRunningTimestamp - restartingTimestamp;
-        } else if (status.isTerminalState()) {
-            // since the last restart we've switched to a terminal state 
without touching
-            // the RUNNING state (e.g. failing from RESTARTING)
-            lastRestartTime = jobStatusProvider.getStatusTimestamp(status) - 
restartingTimestamp;
-        } else {
-            // we're still somewhere between RESTARTING and RUNNING
-            lastRestartTime = System.currentTimeMillis() - restartingTimestamp;
-        }
-
-        // we guard this with 'Math.max' to avoid negative timestamps when 
clocks re-sync
-        return Math.max(lastRestartTime, 0);
-    }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index db92b7c..10c063c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
@@ -55,7 +56,6 @@ import 
org.apache.flink.runtime.executiongraph.JobStatusProvider;
 import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker;
 import org.apache.flink.runtime.executiongraph.metrics.DownTimeGauge;
-import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
 import org.apache.flink.runtime.executiongraph.metrics.UpTimeGauge;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -92,6 +92,8 @@ import org.apache.flink.runtime.util.IntArrayList;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.IterableUtils;
+import org.apache.flink.util.clock.Clock;
+import org.apache.flink.util.clock.SystemClock;
 import org.apache.flink.util.concurrent.FutureUtils;
 
 import org.slf4j.Logger;
@@ -105,6 +107,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
@@ -112,6 +115,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
@@ -156,6 +160,8 @@ public abstract class SchedulerBase implements SchedulerNG, 
CheckpointScheduling
 
     private final ExecutionGraphFactory executionGraphFactory;
 
+    private final MetricOptions.JobStatusMetricsSettings 
jobStatusMetricsSettings;
+
     public SchedulerBase(
             final Logger log,
             final JobGraph jobGraph,
@@ -223,6 +229,9 @@ public abstract class SchedulerBase implements SchedulerNG, 
CheckpointScheduling
         this.exceptionHistory =
                 new BoundedFIFOQueue<>(
                         
jobMasterConfiguration.getInteger(WebOptions.MAX_EXCEPTION_HISTORY_SIZE));
+
+        this.jobStatusMetricsSettings =
+                
MetricOptions.JobStatusMetricsSettings.fromConfiguration(jobMasterConfiguration);
     }
 
     private void shutDownCheckpointServices(JobStatus jobStatus) {
@@ -599,7 +608,13 @@ public abstract class SchedulerBase implements 
SchedulerNG, CheckpointScheduling
     @Override
     public final void startScheduling() {
         mainThreadExecutor.assertRunningInMainThread();
-        registerJobMetrics(jobManagerJobMetricGroup, executionGraph, 
this::getNumberOfRestarts);
+        registerJobMetrics(
+                jobManagerJobMetricGroup,
+                executionGraph,
+                this::getNumberOfRestarts,
+                executionGraph::registerJobStatusListener,
+                executionGraph.getStatusTimestamp(JobStatus.INITIALIZING),
+                jobStatusMetricsSettings);
         operatorCoordinatorHandler.startAllOperatorCoordinators();
         startSchedulingInternal();
     }
@@ -607,12 +622,103 @@ public abstract class SchedulerBase implements 
SchedulerNG, CheckpointScheduling
     public static void registerJobMetrics(
             MetricGroup metrics,
             JobStatusProvider jobStatusProvider,
-            Gauge<Long> numberOfRestarts) {
-        metrics.gauge(RestartTimeGauge.METRIC_NAME, new 
RestartTimeGauge(jobStatusProvider));
+            Gauge<Long> numberOfRestarts,
+            Consumer<JobStatusListener> jobStatusListenerRegistrar,
+            long initializationTimestamp,
+            MetricOptions.JobStatusMetricsSettings jobStatusMetricsSettings) {
         metrics.gauge(DownTimeGauge.METRIC_NAME, new 
DownTimeGauge(jobStatusProvider));
         metrics.gauge(UpTimeGauge.METRIC_NAME, new 
UpTimeGauge(jobStatusProvider));
         metrics.gauge(MetricNames.NUM_RESTARTS, numberOfRestarts);
         metrics.gauge(MetricNames.FULL_RESTARTS, numberOfRestarts);
+
+        jobStatusListenerRegistrar.accept(
+                new JobStatusMetrics(metrics, initializationTimestamp, 
jobStatusMetricsSettings));
+    }
+
+    @VisibleForTesting
+    static class JobStatusMetrics implements JobStatusListener {
+
+        private JobStatus currentStatus = JobStatus.INITIALIZING;
+        private long currentStatusTimestamp;
+        private final long[] cumulativeStatusTimes;
+
+        public JobStatusMetrics(
+                MetricGroup metricGroup,
+                long initializationTimestamp,
+                MetricOptions.JobStatusMetricsSettings 
jobStatusMetricsSettings) {
+
+            currentStatus = JobStatus.INITIALIZING;
+            currentStatusTimestamp = initializationTimestamp;
+            cumulativeStatusTimes = new long[JobStatus.values().length];
+
+            for (JobStatus jobStatus : JobStatus.values()) {
+                if (!jobStatus.isTerminalState() && jobStatus != 
JobStatus.RECONCILING) {
+
+                    if (jobStatusMetricsSettings.isStateMetricsEnabled()) {
+                        metricGroup.gauge(
+                                getStateMetricName(jobStatus), 
createStateMetric(jobStatus));
+                    }
+
+                    if 
(jobStatusMetricsSettings.isCurrentTimeMetricsEnabled()) {
+                        metricGroup.gauge(
+                                getCurrentTimeMetricName(jobStatus),
+                                createCurrentTimeMetric(jobStatus, 
SystemClock.getInstance()));
+                    }
+
+                    if (jobStatusMetricsSettings.isTotalTimeMetricsEnabled()) {
+                        metricGroup.gauge(
+                                getTotalTimeMetricName(jobStatus),
+                                createTotalTimeMetric(jobStatus, 
SystemClock.getInstance()));
+                    }
+                }
+            }
+        }
+
+        @VisibleForTesting
+        Gauge<Long> createStateMetric(JobStatus jobStatus) {
+            return () -> currentStatus == jobStatus ? 1L : 0L;
+        }
+
+        @VisibleForTesting
+        Gauge<Long> createCurrentTimeMetric(JobStatus jobStatus, Clock clock) {
+            return () ->
+                    currentStatus == jobStatus
+                            ? Math.max(clock.absoluteTimeMillis() - 
currentStatusTimestamp, 0)
+                            : 0;
+        }
+
+        @VisibleForTesting
+        Gauge<Long> createTotalTimeMetric(JobStatus jobStatus, Clock clock) {
+            return () ->
+                    currentStatus == jobStatus
+                            ? cumulativeStatusTimes[jobStatus.ordinal()]
+                                    + Math.max(
+                                            clock.absoluteTimeMillis() - 
currentStatusTimestamp, 0)
+                            : cumulativeStatusTimes[jobStatus.ordinal()];
+        }
+
+        @VisibleForTesting
+        static String getStateMetricName(JobStatus jobStatus) {
+            return jobStatus.name().toLowerCase(Locale.ROOT) + "State";
+        }
+
+        @VisibleForTesting
+        static String getCurrentTimeMetricName(JobStatus jobStatus) {
+            return jobStatus.name().toLowerCase(Locale.ROOT) + "Time";
+        }
+
+        @VisibleForTesting
+        static String getTotalTimeMetricName(JobStatus jobStatus) {
+            return jobStatus.name().toLowerCase(Locale.ROOT) + "TimeTotal";
+        }
+
+        @Override
+        public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long 
timestamp) {
+            cumulativeStatusTimes[currentStatus.ordinal()] += timestamp - 
currentStatusTimestamp;
+
+            currentStatus = newJobStatus;
+            currentStatusTimestamp = timestamp;
+        }
     }
 
     protected abstract void startSchedulingInternal();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 06588bc..db6c2a17 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.SchedulerExecutionMode;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.JobException;
@@ -115,8 +116,9 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.time.Duration;
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -189,6 +191,7 @@ public class AdaptiveScheduler
     private final Duration resourceStabilizationTimeout;
 
     private final ExecutionGraphFactory executionGraphFactory;
+    private final JobStatusStore jobStatusStore;
 
     private State state = new Created(this, LOG);
 
@@ -258,9 +261,7 @@ public class AdaptiveScheduler
 
         this.componentMainThreadExecutor = mainThreadExecutor;
 
-        final JobStatusStore jobStatusStore = new 
JobStatusStore(initializationTimestamp);
-        this.jobStatusListeners =
-                Arrays.asList(Preconditions.checkNotNull(jobStatusListener), 
jobStatusStore);
+        this.jobStatusStore = new JobStatusStore(initializationTimestamp);
 
         this.scaleUpController = new ReactiveScaleUpController(configuration);
 
@@ -270,8 +271,19 @@ public class AdaptiveScheduler
 
         this.executionGraphFactory = executionGraphFactory;
 
+        final Collection<JobStatusListener> tmpJobStatusListeners = new 
ArrayList<>();
+        
tmpJobStatusListeners.add(Preconditions.checkNotNull(jobStatusListener));
+        tmpJobStatusListeners.add(jobStatusStore);
+
         SchedulerBase.registerJobMetrics(
-                jobManagerJobMetricGroup, jobStatusStore, () -> (long) 
numRestarts);
+                jobManagerJobMetricGroup,
+                jobStatusStore,
+                () -> (long) numRestarts,
+                tmpJobStatusListeners::add,
+                initializationTimestamp,
+                
MetricOptions.JobStatusMetricsSettings.fromConfiguration(configuration));
+
+        jobStatusListeners = 
Collections.unmodifiableCollection(tmpJobStatusListeners);
     }
 
     private static void assertPreconditions(JobGraph jobGraph) throws 
RuntimeException {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/metrics/RestartTimeGaugeTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/metrics/RestartTimeGaugeTest.java
deleted file mode 100644
index a927508..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/metrics/RestartTimeGaugeTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph.metrics;
-
-import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.runtime.executiongraph.TestingJobStatusProvider;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-
-/** Tests for {@link RestartTimeGauge}. */
-public class RestartTimeGaugeTest extends TestLogger {
-
-    @Test
-    public void testNotRestarted() {
-        final RestartTimeGauge gauge =
-                new RestartTimeGauge(new 
TestingJobStatusProvider(JobStatus.RUNNING, -1));
-        assertThat(gauge.getValue(), is(0L));
-    }
-
-    @Test
-    public void testInRestarting() {
-        final Map<JobStatus, Long> statusTimestampMap = new HashMap<>();
-        statusTimestampMap.put(JobStatus.RESTARTING, 1L);
-
-        final RestartTimeGauge gauge =
-                new RestartTimeGauge(
-                        new TestingJobStatusProvider(
-                                JobStatus.RESTARTING,
-                                status -> 
statusTimestampMap.getOrDefault(status, -1L)));
-        assertThat(gauge.getValue(), greaterThan(0L));
-    }
-
-    @Test
-    public void testRunningAfterRestarting() {
-        final Map<JobStatus, Long> statusTimestampMap = new HashMap<>();
-        statusTimestampMap.put(JobStatus.RESTARTING, 123L);
-        statusTimestampMap.put(JobStatus.RUNNING, 234L);
-
-        final RestartTimeGauge gauge =
-                new RestartTimeGauge(
-                        new TestingJobStatusProvider(
-                                JobStatus.RUNNING,
-                                status -> 
statusTimestampMap.getOrDefault(status, -1L)));
-        assertThat(gauge.getValue(), is(111L));
-    }
-
-    @Test
-    public void testFailedAfterRestarting() {
-        final Map<JobStatus, Long> statusTimestampMap = new HashMap<>();
-        statusTimestampMap.put(JobStatus.RESTARTING, 123L);
-        statusTimestampMap.put(JobStatus.FAILED, 456L);
-
-        final RestartTimeGauge gauge =
-                new RestartTimeGauge(
-                        new TestingJobStatusProvider(
-                                JobStatus.FAILED,
-                                status -> 
statusTimestampMap.getOrDefault(status, -1L)));
-        assertThat(gauge.getValue(), is(333L));
-    }
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTestUtils.java
index f83e7cd..95a6f22 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTestUtils.java
@@ -61,4 +61,12 @@ public final class SlotPoolTestUtils {
         return slotPool.offerSlots(
                 slotOffers, new LocalTaskManagerLocation(), 
taskManagerGateway, 0);
     }
+
+    @Nonnull
+    public static Collection<SlotOffer> offerSlots(
+            SlotPool slotPool,
+            Collection<SlotOffer> slotOffers,
+            TaskManagerGateway taskManagerGateway) {
+        return slotPool.offerSlots(new LocalTaskManagerLocation(), 
taskManagerGateway, slotOffers);
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 46742dc..a63a752 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -19,11 +19,14 @@
 
 package org.apache.flink.runtime.scheduler;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.testutils.ScheduledTask;
+import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
@@ -33,6 +36,7 @@ import 
org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.hooks.TestMasterHook;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -60,6 +64,15 @@ import 
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
+import 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridgeBuilder;
+import 
org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
+import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerTest;
 import 
org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntryMatcher;
 import 
org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
@@ -74,6 +87,7 @@ import 
org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.runtime.util.ResourceCounter;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
@@ -88,11 +102,13 @@ import org.hamcrest.collection.IsIterableContainingInOrder;
 import org.hamcrest.collection.IsIterableWithSize;
 import org.hamcrest.core.Is;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -112,6 +128,8 @@ import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
+import static 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements;
+import static 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.offerSlots;
 import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.acknowledgePendingCheckpoint;
 import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.enableCheckpointing;
 import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.getCheckpointCoordinator;
@@ -1378,6 +1396,88 @@ public class DefaultSchedulerTest extends TestLogger {
     }
 
     @Test
+    public void testStatusMetrics() throws Exception {
+        // running time acts as a stand-in for generic status time metrics
+        final CompletableFuture<Gauge<Long>> runningTimeMetricFuture = new 
CompletableFuture<>();
+        final MetricRegistry metricRegistry =
+                TestingMetricRegistry.builder()
+                        .setRegisterConsumer(
+                                (metric, name, group) -> {
+                                    switch (name) {
+                                        case "runningTimeTotal":
+                                            
runningTimeMetricFuture.complete((Gauge<Long>) metric);
+                                            break;
+                                    }
+                                })
+                        .build();
+
+        final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+        final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph);
+
+        final Configuration configuration = new Configuration();
+        configuration.set(
+                MetricOptions.JOB_STATUS_METRICS,
+                Arrays.asList(MetricOptions.JobStatusMetrics.TOTAL_TIME));
+
+        final ComponentMainThreadExecutor singleThreadMainThreadExecutor =
+                
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+                        scheduledExecutorService);
+
+        final Time slotTimeout = Time.milliseconds(5L);
+        final SlotPool slotPool =
+                new DeclarativeSlotPoolBridgeBuilder()
+                        .setBatchSlotTimeout(slotTimeout)
+                        .buildAndStart(singleThreadMainThreadExecutor);
+        final PhysicalSlotProvider slotProvider =
+                new PhysicalSlotProviderImpl(
+                        
LocationPreferenceSlotSelectionStrategy.createDefault(), slotPool);
+
+        final DefaultScheduler scheduler =
+                createSchedulerBuilder(jobGraph, 
singleThreadMainThreadExecutor)
+                        .setJobMasterConfiguration(configuration)
+                        .setJobManagerJobMetricGroup(
+                                
JobManagerMetricGroup.createJobManagerMetricGroup(
+                                                metricRegistry, "localhost")
+                                        .addJob(new JobID(), "jobName"))
+                        .setExecutionSlotAllocatorFactory(
+                                
SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(
+                                        slotProvider, slotTimeout))
+                        .build();
+
+        final AdaptiveSchedulerTest.SubmissionBufferingTaskManagerGateway 
taskManagerGateway =
+                new 
AdaptiveSchedulerTest.SubmissionBufferingTaskManagerGateway(1);
+
+        taskManagerGateway.setCancelConsumer(
+                executionAttemptId -> {
+                    singleThreadMainThreadExecutor.execute(
+                            () ->
+                                    scheduler.updateTaskExecutionState(
+                                            new TaskExecutionState(
+                                                    executionAttemptId, 
ExecutionState.CANCELED)));
+                });
+
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    scheduler.startScheduling();
+
+                    offerSlots(
+                            slotPool,
+                            createSlotOffersForResourceRequirements(
+                                    
ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1)),
+                            taskManagerGateway);
+                });
+
+        // wait for the first task submission
+        taskManagerGateway.waitForSubmissions(1, Duration.ofSeconds(5));
+
+        // sleep a bit to ensure uptime is > 0
+        Thread.sleep(10L);
+
+        final Gauge<Long> runningTimeGauge = runningTimeMetricFuture.get();
+        Assert.assertThat(runningTimeGauge.getValue(), greaterThan(0L));
+    }
+
+    @Test
     public void testDeploymentWaitForProducedPartitionRegistration() {
         shuffleMaster.setAutoCompleteRegistration(false);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerBaseTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerBaseTest.java
new file mode 100644
index 0000000..1584f56
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerBaseTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
+import org.apache.flink.util.clock.ManualClock;
+
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.EnumMap;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class SchedulerBaseTest {
+
+    @Test
+    void testStateMetric() {
+        final SchedulerBase.JobStatusMetrics jobStatusMetrics =
+                new SchedulerBase.JobStatusMetrics(
+                        new UnregisteredMetricsGroup(),
+                        0L,
+                        enable(
+                                MetricOptions.JobStatusMetrics.STATE,
+                                MetricOptions.JobStatusMetrics.CURRENT_TIME,
+                                MetricOptions.JobStatusMetrics.TOTAL_TIME));
+
+        final Gauge<Long> metric = 
jobStatusMetrics.createStateMetric(JobStatus.RUNNING);
+
+        assertThat(metric.getValue()).isEqualTo(0L);
+        jobStatusMetrics.jobStatusChanges(new JobID(), JobStatus.RUNNING, 1L);
+        assertThat(metric.getValue()).isEqualTo(1L);
+        jobStatusMetrics.jobStatusChanges(new JobID(), JobStatus.RESTARTING, 
2L);
+        assertThat(metric.getValue()).isEqualTo(0L);
+    }
+
+    @Test
+    void testCurrentTimeMetric() {
+        final SchedulerBase.JobStatusMetrics jobStatusMetrics =
+                new SchedulerBase.JobStatusMetrics(
+                        new UnregisteredMetricsGroup(),
+                        0L,
+                        enable(
+                                MetricOptions.JobStatusMetrics.STATE,
+                                MetricOptions.JobStatusMetrics.CURRENT_TIME,
+                                MetricOptions.JobStatusMetrics.TOTAL_TIME));
+
+        final ManualClock clock = new ManualClock();
+        final Gauge<Long> metric =
+                jobStatusMetrics.createCurrentTimeMetric(JobStatus.RUNNING, 
clock);
+
+        assertThat(metric.getValue()).isEqualTo(0L);
+        jobStatusMetrics.jobStatusChanges(new JobID(), JobStatus.RUNNING, 1L);
+        clock.advanceTime(Duration.ofMillis(11));
+        assertThat(metric.getValue()).isEqualTo(10L);
+        jobStatusMetrics.jobStatusChanges(new JobID(), JobStatus.RESTARTING, 
15L);
+        assertThat(metric.getValue()).isEqualTo(0L);
+    }
+
+    @Test
+    void testTotalTimeMetric() {
+        final SchedulerBase.JobStatusMetrics jobStatusMetrics =
+                new SchedulerBase.JobStatusMetrics(
+                        new UnregisteredMetricsGroup(),
+                        0L,
+                        enable(
+                                MetricOptions.JobStatusMetrics.STATE,
+                                MetricOptions.JobStatusMetrics.CURRENT_TIME,
+                                MetricOptions.JobStatusMetrics.TOTAL_TIME));
+
+        final ManualClock clock = new ManualClock(0);
+        final Gauge<Long> metric = 
jobStatusMetrics.createTotalTimeMetric(JobStatus.RUNNING, clock);
+
+        assertThat(metric.getValue()).isEqualTo(0L);
+
+        jobStatusMetrics.jobStatusChanges(
+                new JobID(), JobStatus.RUNNING, clock.absoluteTimeMillis());
+
+        clock.advanceTime(Duration.ofMillis(10));
+        assertThat(metric.getValue()).isEqualTo(10L);
+
+        jobStatusMetrics.jobStatusChanges(
+                new JobID(), JobStatus.RESTARTING, clock.absoluteTimeMillis());
+
+        clock.advanceTime(Duration.ofMillis(4));
+        assertThat(metric.getValue()).isEqualTo(10L);
+
+        jobStatusMetrics.jobStatusChanges(
+                new JobID(), JobStatus.RUNNING, clock.absoluteTimeMillis());
+
+        clock.advanceTime(Duration.ofMillis(1));
+        assertThat(metric.getValue()).isEqualTo(11L);
+    }
+
+    @Test
+    void testStatusSelection() {
+        final InterceptingOperatorMetricGroup metricGroup = new 
InterceptingOperatorMetricGroup();
+
+        final SchedulerBase.JobStatusMetrics jobStatusMetrics =
+                new SchedulerBase.JobStatusMetrics(
+                        metricGroup, 0L, 
enable(MetricOptions.JobStatusMetrics.STATE));
+        final Map<JobStatus, StatusMetricSet> registeredMetrics = 
extractMetrics(metricGroup);
+
+        for (JobStatus value : JobStatus.values()) {
+            if (value.isTerminalState() || value == JobStatus.RECONCILING) {
+                assertThat(registeredMetrics).doesNotContainKey(value);
+            } else {
+                assertThat(registeredMetrics).containsKey(value);
+            }
+        }
+    }
+
+    @Test
+    void testEnableStateMetrics() {
+        testMetricSelection(MetricOptions.JobStatusMetrics.STATE);
+    }
+
+    @Test
+    void testEnableCurrentTimeMetrics() {
+        testMetricSelection(MetricOptions.JobStatusMetrics.CURRENT_TIME);
+    }
+
+    @Test
+    void testEnableTotalTimeMetrics() {
+        testMetricSelection(MetricOptions.JobStatusMetrics.TOTAL_TIME);
+    }
+
+    @Test
+    void testEnableMultipleMetrics() {
+        testMetricSelection(
+                MetricOptions.JobStatusMetrics.CURRENT_TIME,
+                MetricOptions.JobStatusMetrics.TOTAL_TIME);
+    }
+
+    private static void testMetricSelection(MetricOptions.JobStatusMetrics... 
selectedMetrics) {
+        final EnumSet<MetricOptions.JobStatusMetrics> selectedMetricsSet =
+                EnumSet.noneOf(MetricOptions.JobStatusMetrics.class);
+        Arrays.stream(selectedMetrics).forEach(selectedMetricsSet::add);
+
+        final InterceptingOperatorMetricGroup metricGroup = new 
InterceptingOperatorMetricGroup();
+
+        final SchedulerBase.JobStatusMetrics jobStatusMetrics =
+                new SchedulerBase.JobStatusMetrics(metricGroup, 1L, 
enable(selectedMetrics));
+        final Map<JobStatus, StatusMetricSet> registeredMetrics = 
extractMetrics(metricGroup);
+
+        for (StatusMetricSet metrics : registeredMetrics.values()) {
+            assertThat(metrics.getState().isPresent())
+                    
.isEqualTo(selectedMetricsSet.contains(MetricOptions.JobStatusMetrics.STATE));
+            assertThat(metrics.getCurrentTime().isPresent())
+                    .isEqualTo(
+                            selectedMetricsSet.contains(
+                                    
MetricOptions.JobStatusMetrics.CURRENT_TIME));
+            assertThat(metrics.getTotalTime().isPresent())
+                    .isEqualTo(
+                            
selectedMetricsSet.contains(MetricOptions.JobStatusMetrics.TOTAL_TIME));
+        }
+    }
+
+    private static MetricOptions.JobStatusMetricsSettings enable(
+            MetricOptions.JobStatusMetrics... enabledMetrics) {
+        final Configuration configuration = new Configuration();
+
+        configuration.set(MetricOptions.JOB_STATUS_METRICS, 
Arrays.asList(enabledMetrics));
+
+        return 
MetricOptions.JobStatusMetricsSettings.fromConfiguration(configuration);
+    }
+
+    private static Map<JobStatus, StatusMetricSet> extractMetrics(
+            InterceptingOperatorMetricGroup metrics) {
+        final Map<JobStatus, StatusMetricSet> extractedMetrics = new 
EnumMap<>(JobStatus.class);
+
+        for (JobStatus jobStatus : JobStatus.values()) {
+            final StatusMetricSet statusMetricSet =
+                    new StatusMetricSet(
+                            (Gauge<Long>)
+                                    metrics.get(
+                                            
SchedulerBase.JobStatusMetrics.getStateMetricName(
+                                                    jobStatus)),
+                            (Gauge<Long>)
+                                    metrics.get(
+                                            
SchedulerBase.JobStatusMetrics.getCurrentTimeMetricName(
+                                                    jobStatus)),
+                            (Gauge<Long>)
+                                    metrics.get(
+                                            
SchedulerBase.JobStatusMetrics.getTotalTimeMetricName(
+                                                    jobStatus)));
+            if (statusMetricSet.getState().isPresent()
+                    || statusMetricSet.getCurrentTime().isPresent()
+                    || statusMetricSet.getTotalTime().isPresent()) {
+                extractedMetrics.put(jobStatus, statusMetricSet);
+            }
+        }
+
+        return extractedMetrics;
+    }
+
+    private static class StatusMetricSet {
+
+        @Nullable private final Gauge<Long> state;
+        @Nullable private final Gauge<Long> currentTime;
+        @Nullable private final Gauge<Long> totalTime;
+
+        private StatusMetricSet(
+                @Nullable Gauge<Long> state,
+                @Nullable Gauge<Long> currentTime,
+                @Nullable Gauge<Long> totalTime) {
+            this.state = state;
+            this.currentTime = currentTime;
+            this.totalTime = totalTime;
+        }
+
+        @Nullable
+        public Optional<Gauge<Long>> getState() {
+            return Optional.ofNullable(state);
+        }
+
+        @Nullable
+        public Optional<Gauge<Long>> getCurrentTime() {
+            return Optional.ofNullable(currentTime);
+        }
+
+        @Nullable
+        public Optional<Gauge<Long>> getTotalTime() {
+            return Optional.ofNullable(totalTime);
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index 2926316..3ca32d9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.SchedulerExecutionMode;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
@@ -45,7 +46,6 @@ import 
org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
 import org.apache.flink.runtime.executiongraph.metrics.DownTimeGauge;
-import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
 import org.apache.flink.runtime.executiongraph.metrics.UpTimeGauge;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -93,6 +93,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -496,8 +497,8 @@ public class AdaptiveSchedulerTest extends TestLogger {
     public void testStatusMetrics() throws Exception {
         final CompletableFuture<UpTimeGauge> upTimeMetricFuture = new 
CompletableFuture<>();
         final CompletableFuture<DownTimeGauge> downTimeMetricFuture = new 
CompletableFuture<>();
-        final CompletableFuture<RestartTimeGauge> restartTimeMetricFuture =
-                new CompletableFuture<>();
+        // restartingTime acts as a stand-in for generic status time metrics
+        final CompletableFuture<Gauge<Long>> restartTimeMetricFuture = new 
CompletableFuture<>();
         final MetricRegistry metricRegistry =
                 TestingMetricRegistry.builder()
                         .setRegisterConsumer(
@@ -509,9 +510,8 @@ public class AdaptiveSchedulerTest extends TestLogger {
                                         case DownTimeGauge.METRIC_NAME:
                                             
downTimeMetricFuture.complete((DownTimeGauge) metric);
                                             break;
-                                        case RestartTimeGauge.METRIC_NAME:
-                                            restartTimeMetricFuture.complete(
-                                                    (RestartTimeGauge) metric);
+                                        case "restartingTimeTotal":
+                                            
restartTimeMetricFuture.complete((Gauge<Long>) metric);
                                             break;
                                     }
                                 })
@@ -525,6 +525,9 @@ public class AdaptiveSchedulerTest extends TestLogger {
         final Configuration configuration = new Configuration();
         configuration.set(JobManagerOptions.MIN_PARALLELISM_INCREASE, 1);
         configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(10L));
+        configuration.set(
+                MetricOptions.JOB_STATUS_METRICS,
+                Arrays.asList(MetricOptions.JobStatusMetrics.TOTAL_TIME));
 
         final AdaptiveScheduler scheduler =
                 new AdaptiveSchedulerBuilder(jobGraph, 
singleThreadMainThreadExecutor)
@@ -538,7 +541,7 @@ public class AdaptiveSchedulerTest extends TestLogger {
 
         final UpTimeGauge upTimeGauge = upTimeMetricFuture.get();
         final DownTimeGauge downTimeGauge = downTimeMetricFuture.get();
-        final RestartTimeGauge restartTimeGauge = 
restartTimeMetricFuture.get();
+        final Gauge<Long> restartTimeGauge = restartTimeMetricFuture.get();
 
         final SubmissionBufferingTaskManagerGateway taskManagerGateway =
                 new SubmissionBufferingTaskManagerGateway(1 + PARALLELISM);
@@ -1202,7 +1205,7 @@ public class AdaptiveSchedulerTest extends TestLogger {
      * A {@link SimpleAckingTaskManagerGateway} that buffers all the task 
submissions into a
      * blocking queue, allowing one to wait for an arbitrary number of 
submissions.
      */
-    private static class SubmissionBufferingTaskManagerGateway
+    public static class SubmissionBufferingTaskManagerGateway
             extends SimpleAckingTaskManagerGateway {
         final BlockingQueue<TaskDeploymentDescriptor> submittedTasks;
 

Reply via email to