This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 3e8966c  [SPARK-55292] Add discover latency metric to track operator 
processing delay
3e8966c is described below

commit 3e8966c074c1d2b125019010d34356cd43868fcc
Author: Zhou JIANG <[email protected]>
AuthorDate: Mon Feb 2 20:28:14 2026 -0800

    [SPARK-55292] Add discover latency metric to track operator processing delay
    
    ### What changes were proposed in this pull request?
    
    This PR adds a new metric `sparkapp.discover.latency` to track the time 
between when a SparkApplication resource is created in Kubernetes and when the 
operator first processes it (transitions to Submitted state).
    
    ### Why are the changes needed?
    
    When troubleshooting slow application startup, it's important to understand 
how much time is spent in different phases. Currently, we can track latency 
between state transitions, but we lack visibility into the initial delay 
between resource creation and when the operator begins processing it.
    
    ### Does this PR introduce _any_ user-facing change?
    
    A new metric `sparkapp.discover.latency` is now available for monitoring 
and can be exported to Prometheus or other metrics backends configured for the 
operator.
    
    ### How was this patch tested?
    
    1. Unit tests added in `SparkAppStatusRecorderSourceTest.java` to verify 
the discover latency is recorded correctly
    2. Manual testing with Prometheus to verify the metric appears and shows 
expected values
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #478 from jiangzho/discoverLatency.
    
    Authored-by: Zhou JIANG <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 docs/configuration.md                              | 12 +++++---
 .../metrics/SparkAppStatusRecorderSource.java      | 17 +++++++---
 .../k8s/operator/utils/SparkAppStatusRecorder.java |  3 +-
 .../metrics/SparkAppStatusRecorderSourceTest.java  | 36 ++++++++++++++++++----
 4 files changed, 52 insertions(+), 16 deletions(-)

diff --git a/docs/configuration.md b/docs/configuration.md
index 04f818b..ab22d41 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -97,13 +97,15 @@ Spark Operator also measures the latency between each state 
transition for apps,
 
 | Metrics Name                                         | Type  | Description   
                                                   |
 
|------------------------------------------------------|-------|------------------------------------------------------------------|
+| sparkapp.latency.discover                            | Timer | Tracking 
latency from resource creation to first Submitted state |
 | sparkapp.latency.from.`<fromState>`.to.`<toState>`   | Timer | Tracking 
latency for app of transition from one state to another |
 
-The latency metrics can be used to provide insights about time spent in each 
state. For example, a
-long latency between `DriverRequested` and `DriverStarted` indicates overhead 
for driver pod to be
-scheduled. Latency between `DriverStarted` and `DriverReady` indicates 
overhead to pull image, to
-run init containers and to start SparkSession. These metrics can be used to 
analyze the overhead
-from multiple dimensions.
+The latency metrics can be used to provide insights about time spent in each 
state. For example,
+the `latency.discover` metric measures how long it takes for the operator to 
discover and begin
+processing a newly created SparkApplication resource. A long latency between 
`DriverRequested` and
+`DriverStarted` indicates overhead for driver pod to be scheduled. Latency 
between `DriverStarted`
+and `DriverReady` indicates overhead to pull image, to run init containers and 
to start SparkSession.
+These metrics can be used to analyze the overhead from multiple dimensions.
 
 ### Forward Metrics to Prometheus
 
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/SparkAppStatusRecorderSource.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/SparkAppStatusRecorderSource.java
index 644ffaf..4e88ce1 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/SparkAppStatusRecorderSource.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/SparkAppStatusRecorderSource.java
@@ -23,8 +23,10 @@ import java.time.Duration;
 import java.time.Instant;
 
 import com.codahale.metrics.MetricRegistry;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
 
 import org.apache.spark.k8s.operator.status.ApplicationState;
+import org.apache.spark.k8s.operator.status.ApplicationStateSummary;
 import org.apache.spark.k8s.operator.status.ApplicationStatus;
 import org.apache.spark.metrics.source.Source;
 
@@ -33,6 +35,7 @@ public class SparkAppStatusRecorderSource extends 
BaseOperatorSource implements
 
   public static final String RESOURCE_TYPE = "sparkapp";
   public static final String LATENCY_METRIC_FORMAT = "latency.from.%s.to.%s";
+  public static final String DISCOVER_LATENCY_NAME = "latency.discover";
 
   /** Constructs a new SparkAppStatusRecorderSource. */
   public SparkAppStatusRecorderSource() {
@@ -65,10 +68,11 @@ public class SparkAppStatusRecorderSource extends 
BaseOperatorSource implements
    * @param status The current application status.
    * @param newState The new application state.
    */
-  public void recordStatusUpdateLatency(
-      final ApplicationStatus status, final ApplicationState newState) {
-    ApplicationState currentState = status.getCurrentState();
-    if (currentState != null) {
+  public void recordStatusUpdateLatency(final ObjectMeta metadata,
+                                        final ApplicationStatus status,
+                                        final ApplicationState newState) {
+    if (status != null && status.getCurrentState() != null) {
+      ApplicationState currentState = status.getCurrentState();
       Duration duration =
           Duration.between(
               Instant.parse(currentState.getLastTransitionTime()),
@@ -80,6 +84,11 @@ public class SparkAppStatusRecorderSource extends 
BaseOperatorSource implements
                   currentState.getCurrentStateSummary().name(),
                   newState.getCurrentStateSummary().name()))
           .update(duration);
+    } else if (newState.getCurrentStateSummary() == 
ApplicationStateSummary.Submitted) {
+      Duration discoverTime = Duration.between(
+          Instant.parse(metadata.getCreationTimestamp()),
+          Instant.parse(newState.getLastTransitionTime()));
+      getTimer(RESOURCE_TYPE, DISCOVER_LATENCY_NAME).update(discoverTime);
     }
   }
 }
diff --git 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusRecorder.java
 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusRecorder.java
index 16ddfe1..c406e39 100644
--- 
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusRecorder.java
+++ 
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusRecorder.java
@@ -58,7 +58,8 @@ public class SparkAppStatusRecorder
     ApplicationStatus updatedStatus = appStatus.appendNewState(newState);
     boolean statusPersisted = persistStatus(context, updatedStatus);
     if (statusPersisted) {
-      recorderSource.recordStatusUpdateLatency(appStatus, newState);
+      recorderSource.recordStatusUpdateLatency(
+          context.getResource().getMetadata(), appStatus, newState);
     }
     return statusPersisted;
   }
diff --git 
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/SparkAppStatusRecorderSourceTest.java
 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/SparkAppStatusRecorderSourceTest.java
index 18b74f8..5f15328 100644
--- 
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/SparkAppStatusRecorderSourceTest.java
+++ 
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/SparkAppStatusRecorderSourceTest.java
@@ -22,6 +22,7 @@ package org.apache.spark.k8s.operator.metrics;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.time.Instant;
 import java.util.Map;
 
 import com.codahale.metrics.Timer;
@@ -31,7 +32,6 @@ import org.junit.jupiter.api.Test;
 import org.apache.spark.k8s.operator.SparkApplication;
 import org.apache.spark.k8s.operator.status.ApplicationState;
 import org.apache.spark.k8s.operator.status.ApplicationStateSummary;
-import org.apache.spark.k8s.operator.status.ApplicationStatus;
 
 class SparkAppStatusRecorderSourceTest {
 
@@ -46,13 +46,13 @@ class SparkAppStatusRecorderSourceTest {
     ApplicationState stateUpdate12 =
         new ApplicationState(ApplicationStateSummary.DriverRequested, "bar");
     // record short latency
-    source.recordStatusUpdateLatency(app1.getStatus(), stateUpdate11);
-    source.recordStatusUpdateLatency(app2.getStatus(), stateUpdate12);
+    source.recordStatusUpdateLatency(app1.getMetadata(), app1.getStatus(), 
stateUpdate11);
+    source.recordStatusUpdateLatency(app2.getMetadata(), app2.getStatus(), 
stateUpdate12);
     app1.setStatus(app1.getStatus().appendNewState(stateUpdate11));
 
     ApplicationState stateUpdate2 =
         new ApplicationState(ApplicationStateSummary.DriverStarted, "foo");
-    source.recordStatusUpdateLatency(app1.getStatus(), stateUpdate2);
+    source.recordStatusUpdateLatency(app1.getMetadata(), app1.getStatus(), 
stateUpdate2);
 
     Map<String, Timer> timers = source.metricRegistry().getTimers();
     assertEquals(2, timers.size());
@@ -69,10 +69,34 @@ class SparkAppStatusRecorderSourceTest {
             > 0);
   }
 
+  @Test
+  void recordDiscoverLatency() {
+    SparkAppStatusRecorderSource source = new SparkAppStatusRecorderSource();
+    SparkApplication app = prepareApplication("foo", "bar", false);
+    source.recordStatusUpdateLatency(app.getMetadata(), app.getStatus(), new 
ApplicationState());
+    Map<String, Timer> timers = source.metricRegistry().getTimers();
+    assertEquals(1, timers.size());
+    assertTrue(timers.containsKey("sparkapp.latency.discover"));
+    assertTrue(timers.get("sparkapp.latency.discover").getSnapshot().getMin() 
> 0);
+  }
+
   protected SparkApplication prepareApplication(String name, String namespace) 
{
+    return prepareApplication(name, namespace, true);
+  }
+
+  protected SparkApplication prepareApplication(String name,
+                                                String namespace,
+                                                boolean addInitStatus) {
     SparkApplication app = new SparkApplication();
-    app.setMetadata(new 
ObjectMetaBuilder().withName(name).withNamespace(namespace).build());
-    app.setStatus(new ApplicationStatus());
+    app.setMetadata(
+        new ObjectMetaBuilder()
+            .withName(name)
+            .withNamespace(namespace)
+            .withCreationTimestamp(Instant.now().toString())
+            .build());
+    if (!addInitStatus) {
+      app.setStatus(null);
+    }
     return app;
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to