This is an automated email from the ASF dual-hosted git repository.

ptoth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new cdcef65  [SPARK-53405] Add metrics recording for latency of Spark app 
state transition
cdcef65 is described below

commit cdcef652ad57bc8a66f8f6df495cea5d4fccb28a
Author: Zhou JIANG <jiang...@umich.edu>
AuthorDate: Thu Sep 4 10:15:17 2025 +0200

    [SPARK-53405] Add metrics recording for latency of Spark app state 
transition
    
    ### What changes were proposed in this pull request?
    
    This PR adds metrics tracking state transition latency for Spark 
Application in format of
    
    ```
    sparkapp.latency.from.<fromState>.to.<toState>
    ```
    
    ### Why are the changes needed?
    
    Latency measuring would be useful to analyze the performance from 
scheduling / orchestration perspective. For example, to analyze which state 
causes significant overhad and therefore optimize at cluster/app level.
    
    ### Does this PR introduce _any_ user-facing change?
    
    More metrics becomes available.
    
    ### How was this patch tested?
    
    CIs. New unit test added.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #299 from jiangzho/state_transition.
    
    Authored-by: Zhou JIANG <jiang...@umich.edu>
    Signed-off-by: Peter Toth <peter.t...@gmail.com>
---
 docs/configuration.md                              |  14 +++
 .../apache/spark/k8s/operator/SparkOperator.java   |   6 +-
 .../k8s/operator/metrics/BaseOperatorSource.java   |  91 ++++++++++++++++++
 .../metrics/SparkAppStatusRecorderSource.java      |  67 +++++++++++++
 .../metrics/source/OperatorJosdkMetrics.java       | 106 +++++++--------------
 .../k8s/operator/utils/SparkAppStatusRecorder.java |  11 ++-
 .../metrics/SparkAppStatusRecorderSourceTest.java  |  78 +++++++++++++++
 7 files changed, 301 insertions(+), 72 deletions(-)

diff --git a/docs/configuration.md b/docs/configuration.md
index d13a82c..b957285 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -91,6 +91,20 @@ via [Codahale JVM 
Metrics](https://javadoc.io/doc/com.codahale.metrics/metrics-j
 | kubernetes.client.`ResourceName`.`Method`                 | Meter      | 
Tracking the rates of HTTP request for a combination of one Kubernetes resource 
and one http method                      |
 | kubernetes.client.`NamespaceName`.`ResourceName`.`Method` | Meter      | 
Tracking the rates of HTTP request for a combination of one namespace-scoped 
Kubernetes resource and one http method     |
 
+### Latency for State Transition
+
+Spark Operator also measures the latency between each state transition for 
apps, in the format of
+
+| Metrics Name                                         | Type  | Description   
                                                   |
+|------------------------------------------------------|-------|------------------------------------------------------------------|
+| sparkapp.latency.from.`<fromState>`.to.`<toState>`   | Timer | Tracking 
latency for app of transition from one state to another |
+
+The latency metrics can be used to provide insights about time spent in each 
state. For example, a
+long latency between `DriverRequested` and `DriverStarted` indicates overhead 
for driver pod to be
+scheduled. Latency between `DriverStarted` and `DriverReady` indicates 
overhead to pull image, to
+run init containers and to start SparkSession. These metrics can be used to 
analyze the overhead
+from multiple dimensions.
+
 ### Forward Metrics to Prometheus
 
 In this section, we will show you how to forward Spark Operator metrics
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java
index 3d492bd..1daffbc 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java
@@ -46,6 +46,7 @@ import 
org.apache.spark.k8s.operator.config.SparkOperatorConfigMapReconciler;
 import org.apache.spark.k8s.operator.metrics.MetricsService;
 import org.apache.spark.k8s.operator.metrics.MetricsSystem;
 import org.apache.spark.k8s.operator.metrics.MetricsSystemFactory;
+import org.apache.spark.k8s.operator.metrics.SparkAppStatusRecorderSource;
 import org.apache.spark.k8s.operator.metrics.healthcheck.SentinelManager;
 import 
org.apache.spark.k8s.operator.metrics.source.KubernetesMetricsInterceptor;
 import org.apache.spark.k8s.operator.metrics.source.OperatorJosdkMetrics;
@@ -83,7 +84,10 @@ public class SparkOperator {
         
KubernetesClientFactory.buildKubernetesClient(getClientInterceptors(metricsSystem));
     this.appSubmissionWorker = new SparkAppSubmissionWorker();
     this.clusterSubmissionWorker = new SparkClusterSubmissionWorker();
-    this.sparkAppStatusRecorder = new 
SparkAppStatusRecorder(getAppStatusListener());
+    SparkAppStatusRecorderSource recorderSource = new 
SparkAppStatusRecorderSource();
+    this.metricsSystem.registerSource(recorderSource);
+    this.sparkAppStatusRecorder =
+        new SparkAppStatusRecorder(getAppStatusListener(), recorderSource);
     this.sparkClusterStatusRecorder = new 
SparkClusterStatusRecorder(getClusterStatusListener());
     this.registeredSparkControllers = new HashSet<>();
     this.watchedNamespaces = getWatchedNamespaces();
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/BaseOperatorSource.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/BaseOperatorSource.java
new file mode 100644
index 0000000..3953cf6
--- /dev/null
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/BaseOperatorSource.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.metrics;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import lombok.RequiredArgsConstructor;
+
+@RequiredArgsConstructor
+public class BaseOperatorSource {
+  protected final MetricRegistry metricRegistry;
+  protected final Map<String, Histogram> histograms = new 
ConcurrentHashMap<>();
+  protected final Map<String, Counter> counters = new ConcurrentHashMap<>();
+  protected final Map<String, Gauge<?>> gauges = new ConcurrentHashMap<>();
+  protected final Map<String, Timer> timers = new ConcurrentHashMap<>();
+
+  protected Histogram getHistogram(String metricNamePrefix, String... names) {
+    Histogram histogram;
+    String metricName = MetricRegistry.name(metricNamePrefix, 
names).toLowerCase();
+    if (histograms.containsKey(metricName)) {
+      histogram = histograms.get(metricName);
+    } else {
+      histogram = metricRegistry.histogram(metricName);
+      histograms.put(metricName, histogram);
+    }
+    return histogram;
+  }
+
+  protected Counter getCounter(String metricNamePrefix, String... names) {
+    Counter counter;
+    String metricName = MetricRegistry.name(metricNamePrefix, 
names).toLowerCase();
+    if (counters.containsKey(metricName)) {
+      counter = counters.get(metricName);
+    } else {
+      counter = metricRegistry.counter(metricName);
+      counters.put(metricName, counter);
+    }
+    return counter;
+  }
+
+  protected Gauge<?> getGauge(Gauge<?> defaultGauge, String metricNamePrefix, 
String... names) {
+    Gauge<?> gauge;
+    String metricName = MetricRegistry.name(metricNamePrefix, 
names).toLowerCase();
+    if (gauges.containsKey(metricName)) {
+      gauge = gauges.get(metricName);
+    } else {
+      gauge = defaultGauge;
+      gauges.put(metricName, defaultGauge);
+    }
+    return gauge;
+  }
+
+  protected Timer getTimer(String metricNamePrefix, String... names) {
+    Timer timer;
+    String metricName = MetricRegistry.name(metricNamePrefix, 
names).toLowerCase();
+    if (timers.containsKey(metricName)) {
+      timer = timers.get(metricName);
+    } else {
+      timer = metricRegistry.timer(metricName);
+      timers.put(metricName, timer);
+    }
+    return timer;
+  }
+
+  protected String getMetricNamePrefix(Class<?> klass) {
+    return klass.getSimpleName();
+  }
+}
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/SparkAppStatusRecorderSource.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/SparkAppStatusRecorderSource.java
new file mode 100644
index 0000000..622b425
--- /dev/null
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/SparkAppStatusRecorderSource.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.metrics;
+
+import java.time.Duration;
+import java.time.Instant;
+
+import com.codahale.metrics.MetricRegistry;
+
+import org.apache.spark.k8s.operator.status.ApplicationState;
+import org.apache.spark.k8s.operator.status.ApplicationStatus;
+import org.apache.spark.metrics.source.Source;
+
+public class SparkAppStatusRecorderSource extends BaseOperatorSource 
implements Source {
+
+  public static final String RESOURCE_TYPE = "sparkapp";
+  public static final String LATENCY_METRIC_FORMAT = "latency.from.%s.to.%s";
+
+  public SparkAppStatusRecorderSource() {
+    super(new MetricRegistry());
+  }
+
+  @Override
+  public String sourceName() {
+    return "SparkAppStatusRecorder";
+  }
+
+  @Override
+  public MetricRegistry metricRegistry() {
+    return metricRegistry;
+  }
+
+  public void recordStatusUpdateLatency(
+      final ApplicationStatus status, final ApplicationState newState) {
+    ApplicationState currentState = status.getCurrentState();
+    if (currentState != null) {
+      Duration duration =
+          Duration.between(
+              Instant.parse(currentState.getLastTransitionTime()),
+              Instant.parse(newState.getLastTransitionTime()));
+      getTimer(
+              RESOURCE_TYPE,
+              String.format(
+                  LATENCY_METRIC_FORMAT,
+                  currentState.getCurrentStateSummary().name(),
+                  newState.getCurrentStateSummary().name()))
+          .update(duration);
+    }
+  }
+}
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/OperatorJosdkMetrics.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/OperatorJosdkMetrics.java
index eee97a4..967a2d6 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/OperatorJosdkMetrics.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/OperatorJosdkMetrics.java
@@ -23,12 +23,9 @@ import static 
io.javaoperatorsdk.operator.api.reconciler.Constants.CONTROLLER_NA
 
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
-import com.codahale.metrics.Counter;
 import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Histogram;
 import com.codahale.metrics.MetricRegistry;
 import io.fabric8.kubernetes.api.model.HasMetadata;
 import io.javaoperatorsdk.operator.api.monitoring.Metrics;
@@ -45,20 +42,18 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.spark.k8s.operator.BaseResource;
 import org.apache.spark.k8s.operator.SparkApplication;
 import org.apache.spark.k8s.operator.SparkCluster;
+import org.apache.spark.k8s.operator.metrics.BaseOperatorSource;
 import org.apache.spark.metrics.source.Source;
 import org.apache.spark.util.Clock;
 import org.apache.spark.util.SystemClock;
 
 /** Metrics for the Java Operator SDK. */
 @Slf4j
-public class OperatorJosdkMetrics implements Source, Metrics {
+public class OperatorJosdkMetrics extends BaseOperatorSource implements 
Source, Metrics {
   public static final String FINISHED = "finished";
   public static final String CLEANUP = "cleanup";
   public static final String FAILED = "failed";
   public static final String RETRIES = "retries";
-  private final Map<String, Histogram> histograms = new ConcurrentHashMap<>();
-  private final Map<String, Counter> counters = new ConcurrentHashMap<>();
-  private final Map<String, Gauge<?>> gauges = new ConcurrentHashMap<>();
   private static final String RECONCILIATION = "reconciliation";
   private static final String RESOURCE = "resource";
   private static final String EVENT = "event";
@@ -72,11 +67,10 @@ public class OperatorJosdkMetrics implements Source, 
Metrics {
   private static final String SIZE = "size";
 
   private final Clock clock;
-  private final MetricRegistry metricRegistry;
 
   public OperatorJosdkMetrics() {
+    super(new MetricRegistry());
     this.clock = new SystemClock();
-    this.metricRegistry = new MetricRegistry();
   }
 
   @Override
@@ -104,10 +98,12 @@ public class OperatorJosdkMetrics implements Source, 
Metrics {
           getResourceClass(metadata);
       final Optional<String> namespaceOptional = 
event.getRelatedCustomResourceID().getNamespace();
       resource.ifPresent(
-          aClass -> getCounter(aClass, action.name().toLowerCase(), RESOURCE, 
EVENT).inc());
+          aClass ->
+              getCounter(getMetricNamePrefix(aClass), 
action.name().toLowerCase(), RESOURCE, EVENT)
+                  .inc());
       if (resource.isPresent() && namespaceOptional.isPresent()) {
         getCounter(
-                resource.get(),
+                getMetricNamePrefix(resource.get()),
                 namespaceOptional.get(),
                 action.name().toLowerCase(),
                 RESOURCE,
@@ -133,18 +129,13 @@ public class OperatorJosdkMetrics implements Source, 
Metrics {
       T result = execution.execute();
       final String successType = execution.successTypeName(result);
       if (resourceClass.isPresent()) {
-        getHistogram(resourceClass.get(), name, execName, 
successType).update(toSeconds(startTime));
-        getCounter(resourceClass.get(), name, execName, SUCCESS, 
successType).inc();
+        String metricsPrefix = getMetricNamePrefix(resourceClass.get());
+        getHistogram(metricsPrefix, name, execName, 
successType).update(toSeconds(startTime));
+        getCounter(metricsPrefix, name, execName, SUCCESS, successType).inc();
         if (namespaceOptional.isPresent()) {
-          getHistogram(resourceClass.get(), namespaceOptional.get(), name, 
execName, successType)
+          getHistogram(metricsPrefix, namespaceOptional.get(), name, execName, 
successType)
               .update(toSeconds(startTime));
-          getCounter(
-                  resourceClass.get(),
-                  namespaceOptional.get(),
-                  name,
-                  execName,
-                  SUCCESS,
-                  successType)
+          getCounter(metricsPrefix, namespaceOptional.get(), name, execName, 
SUCCESS, successType)
               .inc();
         }
       }
@@ -154,13 +145,14 @@ public class OperatorJosdkMetrics implements Source, 
Metrics {
           "Controller execution failed for resource {}, metadata {}", 
resourceID, metadata, e);
       final String exception = e.getClass().getSimpleName();
       if (resourceClass.isPresent()) {
-        getHistogram(resourceClass.get(), name, execName, 
FAILURE).update(toSeconds(startTime));
-        getCounter(resourceClass.get(), name, execName, FAILURE, EXCEPTION, 
exception).inc();
+        String metricsPrefix = getMetricNamePrefix(resourceClass.get());
+        getHistogram(metricsPrefix, name, execName, 
FAILURE).update(toSeconds(startTime));
+        getCounter(metricsPrefix, name, execName, FAILURE, EXCEPTION, 
exception).inc();
         if (namespaceOptional.isPresent()) {
-          getHistogram(resourceClass.get(), namespaceOptional.get(), name, 
execName, FAILURE)
+          getHistogram(metricsPrefix, namespaceOptional.get(), name, execName, 
FAILURE)
               .update(toSeconds(startTime));
           getCounter(
-                  resourceClass.get(),
+                  metricsPrefix,
                   namespaceOptional.get(),
                   name,
                   execName,
@@ -182,13 +174,13 @@ public class OperatorJosdkMetrics implements Source, 
Metrics {
         resource,
         retryInfo,
         metadata);
+    String metricsPrefix = getMetricNamePrefix(resource.getClass());
     if (retryInfo != null) {
       final String namespace = resource.getMetadata().getNamespace();
-      getCounter(resource.getClass(), RECONCILIATION, RETRIES).inc();
-      getCounter(resource.getClass(), namespace, RECONCILIATION, 
RETRIES).inc();
+      getCounter(metricsPrefix, RECONCILIATION, RETRIES).inc();
+      getCounter(metricsPrefix, namespace, RECONCILIATION, RETRIES).inc();
     }
-    getCounter(
-            resource.getClass(), (String) metadata.get(CONTROLLER_NAME), 
RECONCILIATIONS_QUEUE_SIZE)
+    getCounter(metricsPrefix, (String) metadata.get(CONTROLLER_NAME), 
RECONCILIATIONS_QUEUE_SIZE)
         .inc();
   }
 
@@ -197,26 +189,27 @@ public class OperatorJosdkMetrics implements Source, 
Metrics {
       HasMetadata resource, Exception exception, Map<String, Object> metadata) 
{
     log.error(
         "Failed reconciliation for resource {} with metadata {}", resource, 
exception, exception);
-    getCounter(resource.getClass(), RECONCILIATION, FAILED).inc();
-    getCounter(resource.getClass(), resource.getMetadata().getNamespace(), 
RECONCILIATION, FAILED)
-        .inc();
+    String metricsPrefix = getMetricNamePrefix(resource.getClass());
+    getCounter(metricsPrefix, RECONCILIATION, FAILED).inc();
+    getCounter(metricsPrefix, resource.getMetadata().getNamespace(), 
RECONCILIATION, FAILED).inc();
   }
 
   @Override
   public void finishedReconciliation(HasMetadata resource, Map<String, Object> 
metadata) {
     log.debug("Finished reconciliation for resource {} with metadata {}", 
resource, metadata);
-    getCounter(resource.getClass(), RECONCILIATION, FINISHED).inc();
-    getCounter(
-        resource.getClass(), resource.getMetadata().getNamespace(), 
RECONCILIATION, FINISHED);
+    String metricsPrefix = getMetricNamePrefix(resource.getClass());
+    getCounter(metricsPrefix, RECONCILIATION, FINISHED).inc();
+    getCounter(metricsPrefix, resource.getMetadata().getNamespace(), 
RECONCILIATION, FINISHED);
   }
 
   @Override
   public void cleanupDoneFor(ResourceID resourceID, Map<String, Object> 
metadata) {
     log.debug("Cleanup Done for resource {} with metadata {}", resourceID, 
metadata);
-    getCounter(resourceID.getClass(), RECONCILIATION, CLEANUP).inc();
+    String metricsPrefix = getMetricNamePrefix(resourceID.getClass());
+    getCounter(metricsPrefix, RECONCILIATION, CLEANUP).inc();
     resourceID
         .getNamespace()
-        .ifPresent(ns -> getCounter(resourceID.getClass(), ns, RECONCILIATION, 
CLEANUP).inc());
+        .ifPresent(ns -> getCounter(metricsPrefix, ns, RECONCILIATION, 
CLEANUP).inc());
   }
 
   @Override
@@ -237,11 +230,11 @@ public class OperatorJosdkMetrics implements Source, 
Metrics {
   public void reconciliationExecutionStarted(HasMetadata resource, Map<String, 
Object> metadata) {
     log.debug("Reconciliation execution started");
     String namespace = resource.getMetadata().getNamespace();
-    getCounter(
-            resource.getClass(), (String) metadata.get(CONTROLLER_NAME), 
RECONCILIATIONS_EXECUTIONS)
+    String metricsPrefix = getMetricNamePrefix(resource.getClass());
+    getCounter(metricsPrefix, (String) metadata.get(CONTROLLER_NAME), 
RECONCILIATIONS_EXECUTIONS)
         .inc();
     getCounter(
-            resource.getClass(),
+            metricsPrefix,
             namespace,
             (String) metadata.get(CONTROLLER_NAME),
             RECONCILIATIONS_EXECUTIONS)
@@ -252,17 +245,16 @@ public class OperatorJosdkMetrics implements Source, 
Metrics {
   public void reconciliationExecutionFinished(HasMetadata resource, 
Map<String, Object> metadata) {
     log.debug("Reconciliation execution finished");
     String namespace = resource.getMetadata().getNamespace();
-    getCounter(
-            resource.getClass(), (String) metadata.get(CONTROLLER_NAME), 
RECONCILIATIONS_EXECUTIONS)
+    String metricsPrefix = getMetricNamePrefix(resource.getClass());
+    getCounter(metricsPrefix, (String) metadata.get(CONTROLLER_NAME), 
RECONCILIATIONS_EXECUTIONS)
         .dec();
     getCounter(
-            resource.getClass(),
+            metricsPrefix,
             namespace,
             (String) metadata.get(CONTROLLER_NAME),
             RECONCILIATIONS_EXECUTIONS)
         .dec();
-    getCounter(
-            resource.getClass(), (String) metadata.get(CONTROLLER_NAME), 
RECONCILIATIONS_QUEUE_SIZE)
+    getCounter(metricsPrefix, (String) metadata.get(CONTROLLER_NAME), 
RECONCILIATIONS_QUEUE_SIZE)
         .dec();
   }
 
@@ -270,30 +262,6 @@ public class OperatorJosdkMetrics implements Source, 
Metrics {
     return TimeUnit.MILLISECONDS.toSeconds(clock.getTimeMillis() - 
startTimeInMilliseconds);
   }
 
-  private Histogram getHistogram(Class<?> kclass, String... names) {
-    String name = MetricRegistry.name(kclass.getSimpleName(), 
names).toLowerCase();
-    Histogram histogram;
-    if (histograms.containsKey(name)) {
-      histogram = histograms.get(name);
-    } else {
-      histogram = metricRegistry.histogram(name);
-      histograms.put(name, histogram);
-    }
-    return histogram;
-  }
-
-  private Counter getCounter(Class<?> klass, String... names) {
-    String name = MetricRegistry.name(klass.getSimpleName(), 
names).toLowerCase();
-    Counter counter;
-    if (counters.containsKey(name)) {
-      counter = counters.get(name);
-    } else {
-      counter = metricRegistry.counter(name);
-      counters.put(name, counter);
-    }
-    return counter;
-  }
-
   private Optional<Class<? extends BaseResource<?, ?, ?, ?, ?>>> 
getResourceClass(
       Map<String, Object> metadata) {
     GroupVersionKind resourceGvk = (GroupVersionKind) 
metadata.get(Constants.RESOURCE_GVK_KEY);
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusRecorder.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusRecorder.java
index b03d6e3..e9f224e 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusRecorder.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusRecorder.java
@@ -24,18 +24,25 @@ import java.util.List;
 import org.apache.spark.k8s.operator.SparkApplication;
 import org.apache.spark.k8s.operator.context.SparkAppContext;
 import org.apache.spark.k8s.operator.listeners.SparkAppStatusListener;
+import org.apache.spark.k8s.operator.metrics.SparkAppStatusRecorderSource;
 import org.apache.spark.k8s.operator.status.ApplicationState;
 import org.apache.spark.k8s.operator.status.ApplicationStatus;
 
 /** Records the status of a Spark application. */
 public class SparkAppStatusRecorder
     extends StatusRecorder<ApplicationStatus, SparkApplication, 
SparkAppStatusListener> {
-  public SparkAppStatusRecorder(List<SparkAppStatusListener> statusListeners) {
+  protected final SparkAppStatusRecorderSource recorderSource;
+
+  public SparkAppStatusRecorder(
+      List<SparkAppStatusListener> statusListeners, 
SparkAppStatusRecorderSource recorderSource) {
     super(statusListeners, ApplicationStatus.class, SparkApplication.class);
+    this.recorderSource = recorderSource;
   }
 
   public void appendNewStateAndPersist(SparkAppContext context, 
ApplicationState newState) {
-    ApplicationStatus updatedStatus = 
context.getResource().getStatus().appendNewState(newState);
+    ApplicationStatus appStatus = context.getResource().getStatus();
+    recorderSource.recordStatusUpdateLatency(appStatus, newState);
+    ApplicationStatus updatedStatus = appStatus.appendNewState(newState);
     persistStatus(context, updatedStatus);
   }
 }
diff --git 
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/SparkAppStatusRecorderSourceTest.java
 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/SparkAppStatusRecorderSourceTest.java
new file mode 100644
index 0000000..18b74f8
--- /dev/null
+++ 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/SparkAppStatusRecorderSourceTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator.metrics;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Map;
+
+import com.codahale.metrics.Timer;
+import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
+import org.junit.jupiter.api.Test;
+
+import org.apache.spark.k8s.operator.SparkApplication;
+import org.apache.spark.k8s.operator.status.ApplicationState;
+import org.apache.spark.k8s.operator.status.ApplicationStateSummary;
+import org.apache.spark.k8s.operator.status.ApplicationStatus;
+
+class SparkAppStatusRecorderSourceTest {
+
+  @Test
+  void recordStatusUpdateLatency() {
+    SparkAppStatusRecorderSource source = new SparkAppStatusRecorderSource();
+    SparkApplication app1 = prepareApplication("foo-1", "bar-1");
+    SparkApplication app2 = prepareApplication("foo-2", "bar-2");
+
+    ApplicationState stateUpdate11 =
+        new ApplicationState(ApplicationStateSummary.DriverRequested, "foo");
+    ApplicationState stateUpdate12 =
+        new ApplicationState(ApplicationStateSummary.DriverRequested, "bar");
+    // record short latency
+    source.recordStatusUpdateLatency(app1.getStatus(), stateUpdate11);
+    source.recordStatusUpdateLatency(app2.getStatus(), stateUpdate12);
+    app1.setStatus(app1.getStatus().appendNewState(stateUpdate11));
+
+    ApplicationState stateUpdate2 =
+        new ApplicationState(ApplicationStateSummary.DriverStarted, "foo");
+    source.recordStatusUpdateLatency(app1.getStatus(), stateUpdate2);
+
+    Map<String, Timer> timers = source.metricRegistry().getTimers();
+    assertEquals(2, timers.size());
+    
assertTrue(timers.containsKey("sparkapp.latency.from.submitted.to.driverrequested"));
+    assertTrue(
+        
timers.get("sparkapp.latency.from.submitted.to.driverrequested").getSnapshot().getMin()
+            > 0);
+    assertEquals(2, 
timers.get("sparkapp.latency.from.submitted.to.driverrequested").getCount());
+    
assertTrue(timers.containsKey("sparkapp.latency.from.driverrequested.to.driverstarted"));
+    assertEquals(
+        1, 
timers.get("sparkapp.latency.from.driverrequested.to.driverstarted").getCount());
+    assertTrue(
+        
timers.get("sparkapp.latency.from.driverrequested.to.driverstarted").getSnapshot().getMin()
+            > 0);
+  }
+
+  protected SparkApplication prepareApplication(String name, String namespace) 
{
+    SparkApplication app = new SparkApplication();
+    app.setMetadata(new 
ObjectMetaBuilder().withName(name).withNamespace(namespace).build());
+    app.setStatus(new ApplicationStatus());
+    return app;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to