This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 3e8966c [SPARK-55292] Add discover latency metric to track operator
processing delay
3e8966c is described below
commit 3e8966c074c1d2b125019010d34356cd43868fcc
Author: Zhou JIANG <[email protected]>
AuthorDate: Mon Feb 2 20:28:14 2026 -0800
[SPARK-55292] Add discover latency metric to track operator processing delay
### What changes were proposed in this pull request?
This PR adds a new metric `sparkapp.discover.latency` to track the time
between when a SparkApplication resource is created in Kubernetes and when the
operator first processes it (transitions to Submitted state).
### Why are the changes needed?
When troubleshooting slow application startup, it's important to understand
how much time is spent in different phases. Currently, we can track latency
between state transitions, but we lack visibility into the initial delay
between resource creation and when the operator begins processing it.
### Does this PR introduce _any_ user-facing change?
A new metric `sparkapp.discover.latency` is now available for monitoring
and can be exported to Prometheus or other metrics backends configured for the
operator.
### How was this patch tested?
1. Unit tests added in `SparkAppStatusRecorderSourceTest.java` to verify
the discover latency is recorded correctly
2. Manual testing with Prometheus to verify the metric appears and shows
expected values
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #478 from jiangzho/discoverLatency.
Authored-by: Zhou JIANG <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
docs/configuration.md | 12 +++++---
.../metrics/SparkAppStatusRecorderSource.java | 17 +++++++---
.../k8s/operator/utils/SparkAppStatusRecorder.java | 3 +-
.../metrics/SparkAppStatusRecorderSourceTest.java | 36 ++++++++++++++++++----
4 files changed, 52 insertions(+), 16 deletions(-)
diff --git a/docs/configuration.md b/docs/configuration.md
index 04f818b..ab22d41 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -97,13 +97,15 @@ Spark Operator also measures the latency between each state
transition for apps,
| Metrics Name | Type | Description
|
|------------------------------------------------------|-------|------------------------------------------------------------------|
+| sparkapp.latency.discover | Timer | Tracking
latency from resource creation to first Submitted state |
| sparkapp.latency.from.`<fromState>`.to.`<toState>` | Timer | Tracking
latency for app of transition from one state to another |
-The latency metrics can be used to provide insights about time spent in each
state. For example, a
-long latency between `DriverRequested` and `DriverStarted` indicates overhead
for driver pod to be
-scheduled. Latency between `DriverStarted` and `DriverReady` indicates
overhead to pull image, to
-run init containers and to start SparkSession. These metrics can be used to
analyze the overhead
-from multiple dimensions.
+The latency metrics can be used to provide insights about time spent in each
state. For example,
+the `latency.discover` metric measures how long it takes for the operator to
discover and begin
+processing a newly created SparkApplication resource. A long latency between
`DriverRequested` and
+`DriverStarted` indicates overhead for driver pod to be scheduled. Latency
between `DriverStarted`
+and `DriverReady` indicates overhead to pull image, to run init containers and
to start SparkSession.
+These metrics can be used to analyze the overhead from multiple dimensions.
### Forward Metrics to Prometheus
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/SparkAppStatusRecorderSource.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/SparkAppStatusRecorderSource.java
index 644ffaf..4e88ce1 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/SparkAppStatusRecorderSource.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/SparkAppStatusRecorderSource.java
@@ -23,8 +23,10 @@ import java.time.Duration;
import java.time.Instant;
import com.codahale.metrics.MetricRegistry;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
import org.apache.spark.k8s.operator.status.ApplicationState;
+import org.apache.spark.k8s.operator.status.ApplicationStateSummary;
import org.apache.spark.k8s.operator.status.ApplicationStatus;
import org.apache.spark.metrics.source.Source;
@@ -33,6 +35,7 @@ public class SparkAppStatusRecorderSource extends
BaseOperatorSource implements
public static final String RESOURCE_TYPE = "sparkapp";
public static final String LATENCY_METRIC_FORMAT = "latency.from.%s.to.%s";
+ public static final String DISCOVER_LATENCY_NAME = "latency.discover";
/** Constructs a new SparkAppStatusRecorderSource. */
public SparkAppStatusRecorderSource() {
@@ -65,10 +68,11 @@ public class SparkAppStatusRecorderSource extends
BaseOperatorSource implements
* @param status The current application status.
* @param newState The new application state.
*/
- public void recordStatusUpdateLatency(
- final ApplicationStatus status, final ApplicationState newState) {
- ApplicationState currentState = status.getCurrentState();
- if (currentState != null) {
+ public void recordStatusUpdateLatency(final ObjectMeta metadata,
+ final ApplicationStatus status,
+ final ApplicationState newState) {
+ if (status != null && status.getCurrentState() != null) {
+ ApplicationState currentState = status.getCurrentState();
Duration duration =
Duration.between(
Instant.parse(currentState.getLastTransitionTime()),
@@ -80,6 +84,11 @@ public class SparkAppStatusRecorderSource extends
BaseOperatorSource implements
currentState.getCurrentStateSummary().name(),
newState.getCurrentStateSummary().name()))
.update(duration);
+ } else if (newState.getCurrentStateSummary() ==
ApplicationStateSummary.Submitted) {
+ Duration discoverTime = Duration.between(
+ Instant.parse(metadata.getCreationTimestamp()),
+ Instant.parse(newState.getLastTransitionTime()));
+ getTimer(RESOURCE_TYPE, DISCOVER_LATENCY_NAME).update(discoverTime);
}
}
}
diff --git
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusRecorder.java
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusRecorder.java
index 16ddfe1..c406e39 100644
---
a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusRecorder.java
+++
b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusRecorder.java
@@ -58,7 +58,8 @@ public class SparkAppStatusRecorder
ApplicationStatus updatedStatus = appStatus.appendNewState(newState);
boolean statusPersisted = persistStatus(context, updatedStatus);
if (statusPersisted) {
- recorderSource.recordStatusUpdateLatency(appStatus, newState);
+ recorderSource.recordStatusUpdateLatency(
+ context.getResource().getMetadata(), appStatus, newState);
}
return statusPersisted;
}
diff --git
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/SparkAppStatusRecorderSourceTest.java
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/SparkAppStatusRecorderSourceTest.java
index 18b74f8..5f15328 100644
---
a/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/SparkAppStatusRecorderSourceTest.java
+++
b/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/SparkAppStatusRecorderSourceTest.java
@@ -22,6 +22,7 @@ package org.apache.spark.k8s.operator.metrics;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.time.Instant;
import java.util.Map;
import com.codahale.metrics.Timer;
@@ -31,7 +32,6 @@ import org.junit.jupiter.api.Test;
import org.apache.spark.k8s.operator.SparkApplication;
import org.apache.spark.k8s.operator.status.ApplicationState;
import org.apache.spark.k8s.operator.status.ApplicationStateSummary;
-import org.apache.spark.k8s.operator.status.ApplicationStatus;
class SparkAppStatusRecorderSourceTest {
@@ -46,13 +46,13 @@ class SparkAppStatusRecorderSourceTest {
ApplicationState stateUpdate12 =
new ApplicationState(ApplicationStateSummary.DriverRequested, "bar");
// record short latency
- source.recordStatusUpdateLatency(app1.getStatus(), stateUpdate11);
- source.recordStatusUpdateLatency(app2.getStatus(), stateUpdate12);
+ source.recordStatusUpdateLatency(app1.getMetadata(), app1.getStatus(),
stateUpdate11);
+ source.recordStatusUpdateLatency(app2.getMetadata(), app2.getStatus(),
stateUpdate12);
app1.setStatus(app1.getStatus().appendNewState(stateUpdate11));
ApplicationState stateUpdate2 =
new ApplicationState(ApplicationStateSummary.DriverStarted, "foo");
- source.recordStatusUpdateLatency(app1.getStatus(), stateUpdate2);
+ source.recordStatusUpdateLatency(app1.getMetadata(), app1.getStatus(),
stateUpdate2);
Map<String, Timer> timers = source.metricRegistry().getTimers();
assertEquals(2, timers.size());
@@ -69,10 +69,34 @@ class SparkAppStatusRecorderSourceTest {
> 0);
}
+ @Test
+ void recordDiscoverLatency() {
+ SparkAppStatusRecorderSource source = new SparkAppStatusRecorderSource();
+ SparkApplication app = prepareApplication("foo", "bar", false);
+ source.recordStatusUpdateLatency(app.getMetadata(), app.getStatus(), new
ApplicationState());
+ Map<String, Timer> timers = source.metricRegistry().getTimers();
+ assertEquals(1, timers.size());
+ assertTrue(timers.containsKey("sparkapp.latency.discover"));
+ assertTrue(timers.get("sparkapp.latency.discover").getSnapshot().getMin()
> 0);
+ }
+
protected SparkApplication prepareApplication(String name, String namespace)
{
+ return prepareApplication(name, namespace, true);
+ }
+
+ protected SparkApplication prepareApplication(String name,
+ String namespace,
+ boolean addInitStatus) {
SparkApplication app = new SparkApplication();
- app.setMetadata(new
ObjectMetaBuilder().withName(name).withNamespace(namespace).build());
- app.setStatus(new ApplicationStatus());
+ app.setMetadata(
+ new ObjectMetaBuilder()
+ .withName(name)
+ .withNamespace(namespace)
+ .withCreationTimestamp(Instant.now().toString())
+ .build());
+ if (!addInitStatus) {
+ app.setStatus(null);
+ }
return app;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]