This is an automated email from the ASF dual-hosted git repository. ptoth pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new cdcef65 [SPARK-53405] Add metrics recording for latency of Spark app state transition cdcef65 is described below commit cdcef652ad57bc8a66f8f6df495cea5d4fccb28a Author: Zhou JIANG <jiang...@umich.edu> AuthorDate: Thu Sep 4 10:15:17 2025 +0200 [SPARK-53405] Add metrics recording for latency of Spark app state transition ### What changes were proposed in this pull request? This PR adds metrics tracking state transition latency for Spark Application in format of ``` sparkapp.latency.from.<fromState>.to.<toState> ``` ### Why are the changes needed? Latency measuring would be useful to analyze the performance from scheduling / orchestration perspective. For example, to analyze which state causes significant overhad and therefore optimize at cluster/app level. ### Does this PR introduce _any_ user-facing change? More metrics becomes available. ### How was this patch tested? CIs. New unit test added. ### Was this patch authored or co-authored using generative AI tooling? No Closes #299 from jiangzho/state_transition. Authored-by: Zhou JIANG <jiang...@umich.edu> Signed-off-by: Peter Toth <peter.t...@gmail.com> --- docs/configuration.md | 14 +++ .../apache/spark/k8s/operator/SparkOperator.java | 6 +- .../k8s/operator/metrics/BaseOperatorSource.java | 91 ++++++++++++++++++ .../metrics/SparkAppStatusRecorderSource.java | 67 +++++++++++++ .../metrics/source/OperatorJosdkMetrics.java | 106 +++++++-------------- .../k8s/operator/utils/SparkAppStatusRecorder.java | 11 ++- .../metrics/SparkAppStatusRecorderSourceTest.java | 78 +++++++++++++++ 7 files changed, 301 insertions(+), 72 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index d13a82c..b957285 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -91,6 +91,20 @@ via [Codahale JVM Metrics](https://javadoc.io/doc/com.codahale.metrics/metrics-j | kubernetes.client.`ResourceName`.`Method` | Meter | Tracking the rates of HTTP request for a combination of one Kubernetes resource and one http method | | kubernetes.client.`NamespaceName`.`ResourceName`.`Method` | Meter | Tracking the rates of HTTP request for a combination of one namespace-scoped Kubernetes resource and one http method | +### Latency for State Transition + +Spark Operator also measures the latency between each state transition for apps, in the format of + +| Metrics Name | Type | Description | +|------------------------------------------------------|-------|------------------------------------------------------------------| +| sparkapp.latency.from.`<fromState>`.to.`<toState>` | Timer | Tracking latency for app of transition from one state to another | + +The latency metrics can be used to provide insights about time spent in each state. For example, a +long latency between `DriverRequested` and `DriverStarted` indicates overhead for driver pod to be +scheduled. Latency between `DriverStarted` and `DriverReady` indicates overhead to pull image, to +run init containers and to start SparkSession. These metrics can be used to analyze the overhead +from multiple dimensions. + ### Forward Metrics to Prometheus In this section, we will show you how to forward Spark Operator metrics diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java index 3d492bd..1daffbc 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java @@ -46,6 +46,7 @@ import org.apache.spark.k8s.operator.config.SparkOperatorConfigMapReconciler; import org.apache.spark.k8s.operator.metrics.MetricsService; import org.apache.spark.k8s.operator.metrics.MetricsSystem; import org.apache.spark.k8s.operator.metrics.MetricsSystemFactory; +import org.apache.spark.k8s.operator.metrics.SparkAppStatusRecorderSource; import org.apache.spark.k8s.operator.metrics.healthcheck.SentinelManager; import org.apache.spark.k8s.operator.metrics.source.KubernetesMetricsInterceptor; import org.apache.spark.k8s.operator.metrics.source.OperatorJosdkMetrics; @@ -83,7 +84,10 @@ public class SparkOperator { KubernetesClientFactory.buildKubernetesClient(getClientInterceptors(metricsSystem)); this.appSubmissionWorker = new SparkAppSubmissionWorker(); this.clusterSubmissionWorker = new SparkClusterSubmissionWorker(); - this.sparkAppStatusRecorder = new SparkAppStatusRecorder(getAppStatusListener()); + SparkAppStatusRecorderSource recorderSource = new SparkAppStatusRecorderSource(); + this.metricsSystem.registerSource(recorderSource); + this.sparkAppStatusRecorder = + new SparkAppStatusRecorder(getAppStatusListener(), recorderSource); this.sparkClusterStatusRecorder = new SparkClusterStatusRecorder(getClusterStatusListener()); this.registeredSparkControllers = new HashSet<>(); this.watchedNamespaces = getWatchedNamespaces(); diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/BaseOperatorSource.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/BaseOperatorSource.java new file mode 100644 index 0000000..3953cf6 --- /dev/null +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/BaseOperatorSource.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.k8s.operator.metrics; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +public class BaseOperatorSource { + protected final MetricRegistry metricRegistry; + protected final Map<String, Histogram> histograms = new ConcurrentHashMap<>(); + protected final Map<String, Counter> counters = new ConcurrentHashMap<>(); + protected final Map<String, Gauge<?>> gauges = new ConcurrentHashMap<>(); + protected final Map<String, Timer> timers = new ConcurrentHashMap<>(); + + protected Histogram getHistogram(String metricNamePrefix, String... names) { + Histogram histogram; + String metricName = MetricRegistry.name(metricNamePrefix, names).toLowerCase(); + if (histograms.containsKey(metricName)) { + histogram = histograms.get(metricName); + } else { + histogram = metricRegistry.histogram(metricName); + histograms.put(metricName, histogram); + } + return histogram; + } + + protected Counter getCounter(String metricNamePrefix, String... names) { + Counter counter; + String metricName = MetricRegistry.name(metricNamePrefix, names).toLowerCase(); + if (counters.containsKey(metricName)) { + counter = counters.get(metricName); + } else { + counter = metricRegistry.counter(metricName); + counters.put(metricName, counter); + } + return counter; + } + + protected Gauge<?> getGauge(Gauge<?> defaultGauge, String metricNamePrefix, String... names) { + Gauge<?> gauge; + String metricName = MetricRegistry.name(metricNamePrefix, names).toLowerCase(); + if (gauges.containsKey(metricName)) { + gauge = gauges.get(metricName); + } else { + gauge = defaultGauge; + gauges.put(metricName, defaultGauge); + } + return gauge; + } + + protected Timer getTimer(String metricNamePrefix, String... names) { + Timer timer; + String metricName = MetricRegistry.name(metricNamePrefix, names).toLowerCase(); + if (timers.containsKey(metricName)) { + timer = timers.get(metricName); + } else { + timer = metricRegistry.timer(metricName); + timers.put(metricName, timer); + } + return timer; + } + + protected String getMetricNamePrefix(Class<?> klass) { + return klass.getSimpleName(); + } +} diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/SparkAppStatusRecorderSource.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/SparkAppStatusRecorderSource.java new file mode 100644 index 0000000..622b425 --- /dev/null +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/SparkAppStatusRecorderSource.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.k8s.operator.metrics; + +import java.time.Duration; +import java.time.Instant; + +import com.codahale.metrics.MetricRegistry; + +import org.apache.spark.k8s.operator.status.ApplicationState; +import org.apache.spark.k8s.operator.status.ApplicationStatus; +import org.apache.spark.metrics.source.Source; + +public class SparkAppStatusRecorderSource extends BaseOperatorSource implements Source { + + public static final String RESOURCE_TYPE = "sparkapp"; + public static final String LATENCY_METRIC_FORMAT = "latency.from.%s.to.%s"; + + public SparkAppStatusRecorderSource() { + super(new MetricRegistry()); + } + + @Override + public String sourceName() { + return "SparkAppStatusRecorder"; + } + + @Override + public MetricRegistry metricRegistry() { + return metricRegistry; + } + + public void recordStatusUpdateLatency( + final ApplicationStatus status, final ApplicationState newState) { + ApplicationState currentState = status.getCurrentState(); + if (currentState != null) { + Duration duration = + Duration.between( + Instant.parse(currentState.getLastTransitionTime()), + Instant.parse(newState.getLastTransitionTime())); + getTimer( + RESOURCE_TYPE, + String.format( + LATENCY_METRIC_FORMAT, + currentState.getCurrentStateSummary().name(), + newState.getCurrentStateSummary().name())) + .update(duration); + } + } +} diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/OperatorJosdkMetrics.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/OperatorJosdkMetrics.java index eee97a4..967a2d6 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/OperatorJosdkMetrics.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/source/OperatorJosdkMetrics.java @@ -23,12 +23,9 @@ import static io.javaoperatorsdk.operator.api.reconciler.Constants.CONTROLLER_NA import java.util.Map; import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import com.codahale.metrics.Counter; import com.codahale.metrics.Gauge; -import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.api.monitoring.Metrics; @@ -45,20 +42,18 @@ import lombok.extern.slf4j.Slf4j; import org.apache.spark.k8s.operator.BaseResource; import org.apache.spark.k8s.operator.SparkApplication; import org.apache.spark.k8s.operator.SparkCluster; +import org.apache.spark.k8s.operator.metrics.BaseOperatorSource; import org.apache.spark.metrics.source.Source; import org.apache.spark.util.Clock; import org.apache.spark.util.SystemClock; /** Metrics for the Java Operator SDK. */ @Slf4j -public class OperatorJosdkMetrics implements Source, Metrics { +public class OperatorJosdkMetrics extends BaseOperatorSource implements Source, Metrics { public static final String FINISHED = "finished"; public static final String CLEANUP = "cleanup"; public static final String FAILED = "failed"; public static final String RETRIES = "retries"; - private final Map<String, Histogram> histograms = new ConcurrentHashMap<>(); - private final Map<String, Counter> counters = new ConcurrentHashMap<>(); - private final Map<String, Gauge<?>> gauges = new ConcurrentHashMap<>(); private static final String RECONCILIATION = "reconciliation"; private static final String RESOURCE = "resource"; private static final String EVENT = "event"; @@ -72,11 +67,10 @@ public class OperatorJosdkMetrics implements Source, Metrics { private static final String SIZE = "size"; private final Clock clock; - private final MetricRegistry metricRegistry; public OperatorJosdkMetrics() { + super(new MetricRegistry()); this.clock = new SystemClock(); - this.metricRegistry = new MetricRegistry(); } @Override @@ -104,10 +98,12 @@ public class OperatorJosdkMetrics implements Source, Metrics { getResourceClass(metadata); final Optional<String> namespaceOptional = event.getRelatedCustomResourceID().getNamespace(); resource.ifPresent( - aClass -> getCounter(aClass, action.name().toLowerCase(), RESOURCE, EVENT).inc()); + aClass -> + getCounter(getMetricNamePrefix(aClass), action.name().toLowerCase(), RESOURCE, EVENT) + .inc()); if (resource.isPresent() && namespaceOptional.isPresent()) { getCounter( - resource.get(), + getMetricNamePrefix(resource.get()), namespaceOptional.get(), action.name().toLowerCase(), RESOURCE, @@ -133,18 +129,13 @@ public class OperatorJosdkMetrics implements Source, Metrics { T result = execution.execute(); final String successType = execution.successTypeName(result); if (resourceClass.isPresent()) { - getHistogram(resourceClass.get(), name, execName, successType).update(toSeconds(startTime)); - getCounter(resourceClass.get(), name, execName, SUCCESS, successType).inc(); + String metricsPrefix = getMetricNamePrefix(resourceClass.get()); + getHistogram(metricsPrefix, name, execName, successType).update(toSeconds(startTime)); + getCounter(metricsPrefix, name, execName, SUCCESS, successType).inc(); if (namespaceOptional.isPresent()) { - getHistogram(resourceClass.get(), namespaceOptional.get(), name, execName, successType) + getHistogram(metricsPrefix, namespaceOptional.get(), name, execName, successType) .update(toSeconds(startTime)); - getCounter( - resourceClass.get(), - namespaceOptional.get(), - name, - execName, - SUCCESS, - successType) + getCounter(metricsPrefix, namespaceOptional.get(), name, execName, SUCCESS, successType) .inc(); } } @@ -154,13 +145,14 @@ public class OperatorJosdkMetrics implements Source, Metrics { "Controller execution failed for resource {}, metadata {}", resourceID, metadata, e); final String exception = e.getClass().getSimpleName(); if (resourceClass.isPresent()) { - getHistogram(resourceClass.get(), name, execName, FAILURE).update(toSeconds(startTime)); - getCounter(resourceClass.get(), name, execName, FAILURE, EXCEPTION, exception).inc(); + String metricsPrefix = getMetricNamePrefix(resourceClass.get()); + getHistogram(metricsPrefix, name, execName, FAILURE).update(toSeconds(startTime)); + getCounter(metricsPrefix, name, execName, FAILURE, EXCEPTION, exception).inc(); if (namespaceOptional.isPresent()) { - getHistogram(resourceClass.get(), namespaceOptional.get(), name, execName, FAILURE) + getHistogram(metricsPrefix, namespaceOptional.get(), name, execName, FAILURE) .update(toSeconds(startTime)); getCounter( - resourceClass.get(), + metricsPrefix, namespaceOptional.get(), name, execName, @@ -182,13 +174,13 @@ public class OperatorJosdkMetrics implements Source, Metrics { resource, retryInfo, metadata); + String metricsPrefix = getMetricNamePrefix(resource.getClass()); if (retryInfo != null) { final String namespace = resource.getMetadata().getNamespace(); - getCounter(resource.getClass(), RECONCILIATION, RETRIES).inc(); - getCounter(resource.getClass(), namespace, RECONCILIATION, RETRIES).inc(); + getCounter(metricsPrefix, RECONCILIATION, RETRIES).inc(); + getCounter(metricsPrefix, namespace, RECONCILIATION, RETRIES).inc(); } - getCounter( - resource.getClass(), (String) metadata.get(CONTROLLER_NAME), RECONCILIATIONS_QUEUE_SIZE) + getCounter(metricsPrefix, (String) metadata.get(CONTROLLER_NAME), RECONCILIATIONS_QUEUE_SIZE) .inc(); } @@ -197,26 +189,27 @@ public class OperatorJosdkMetrics implements Source, Metrics { HasMetadata resource, Exception exception, Map<String, Object> metadata) { log.error( "Failed reconciliation for resource {} with metadata {}", resource, exception, exception); - getCounter(resource.getClass(), RECONCILIATION, FAILED).inc(); - getCounter(resource.getClass(), resource.getMetadata().getNamespace(), RECONCILIATION, FAILED) - .inc(); + String metricsPrefix = getMetricNamePrefix(resource.getClass()); + getCounter(metricsPrefix, RECONCILIATION, FAILED).inc(); + getCounter(metricsPrefix, resource.getMetadata().getNamespace(), RECONCILIATION, FAILED).inc(); } @Override public void finishedReconciliation(HasMetadata resource, Map<String, Object> metadata) { log.debug("Finished reconciliation for resource {} with metadata {}", resource, metadata); - getCounter(resource.getClass(), RECONCILIATION, FINISHED).inc(); - getCounter( - resource.getClass(), resource.getMetadata().getNamespace(), RECONCILIATION, FINISHED); + String metricsPrefix = getMetricNamePrefix(resource.getClass()); + getCounter(metricsPrefix, RECONCILIATION, FINISHED).inc(); + getCounter(metricsPrefix, resource.getMetadata().getNamespace(), RECONCILIATION, FINISHED); } @Override public void cleanupDoneFor(ResourceID resourceID, Map<String, Object> metadata) { log.debug("Cleanup Done for resource {} with metadata {}", resourceID, metadata); - getCounter(resourceID.getClass(), RECONCILIATION, CLEANUP).inc(); + String metricsPrefix = getMetricNamePrefix(resourceID.getClass()); + getCounter(metricsPrefix, RECONCILIATION, CLEANUP).inc(); resourceID .getNamespace() - .ifPresent(ns -> getCounter(resourceID.getClass(), ns, RECONCILIATION, CLEANUP).inc()); + .ifPresent(ns -> getCounter(metricsPrefix, ns, RECONCILIATION, CLEANUP).inc()); } @Override @@ -237,11 +230,11 @@ public class OperatorJosdkMetrics implements Source, Metrics { public void reconciliationExecutionStarted(HasMetadata resource, Map<String, Object> metadata) { log.debug("Reconciliation execution started"); String namespace = resource.getMetadata().getNamespace(); - getCounter( - resource.getClass(), (String) metadata.get(CONTROLLER_NAME), RECONCILIATIONS_EXECUTIONS) + String metricsPrefix = getMetricNamePrefix(resource.getClass()); + getCounter(metricsPrefix, (String) metadata.get(CONTROLLER_NAME), RECONCILIATIONS_EXECUTIONS) .inc(); getCounter( - resource.getClass(), + metricsPrefix, namespace, (String) metadata.get(CONTROLLER_NAME), RECONCILIATIONS_EXECUTIONS) @@ -252,17 +245,16 @@ public class OperatorJosdkMetrics implements Source, Metrics { public void reconciliationExecutionFinished(HasMetadata resource, Map<String, Object> metadata) { log.debug("Reconciliation execution finished"); String namespace = resource.getMetadata().getNamespace(); - getCounter( - resource.getClass(), (String) metadata.get(CONTROLLER_NAME), RECONCILIATIONS_EXECUTIONS) + String metricsPrefix = getMetricNamePrefix(resource.getClass()); + getCounter(metricsPrefix, (String) metadata.get(CONTROLLER_NAME), RECONCILIATIONS_EXECUTIONS) .dec(); getCounter( - resource.getClass(), + metricsPrefix, namespace, (String) metadata.get(CONTROLLER_NAME), RECONCILIATIONS_EXECUTIONS) .dec(); - getCounter( - resource.getClass(), (String) metadata.get(CONTROLLER_NAME), RECONCILIATIONS_QUEUE_SIZE) + getCounter(metricsPrefix, (String) metadata.get(CONTROLLER_NAME), RECONCILIATIONS_QUEUE_SIZE) .dec(); } @@ -270,30 +262,6 @@ public class OperatorJosdkMetrics implements Source, Metrics { return TimeUnit.MILLISECONDS.toSeconds(clock.getTimeMillis() - startTimeInMilliseconds); } - private Histogram getHistogram(Class<?> kclass, String... names) { - String name = MetricRegistry.name(kclass.getSimpleName(), names).toLowerCase(); - Histogram histogram; - if (histograms.containsKey(name)) { - histogram = histograms.get(name); - } else { - histogram = metricRegistry.histogram(name); - histograms.put(name, histogram); - } - return histogram; - } - - private Counter getCounter(Class<?> klass, String... names) { - String name = MetricRegistry.name(klass.getSimpleName(), names).toLowerCase(); - Counter counter; - if (counters.containsKey(name)) { - counter = counters.get(name); - } else { - counter = metricRegistry.counter(name); - counters.put(name, counter); - } - return counter; - } - private Optional<Class<? extends BaseResource<?, ?, ?, ?, ?>>> getResourceClass( Map<String, Object> metadata) { GroupVersionKind resourceGvk = (GroupVersionKind) metadata.get(Constants.RESOURCE_GVK_KEY); diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusRecorder.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusRecorder.java index b03d6e3..e9f224e 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusRecorder.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/utils/SparkAppStatusRecorder.java @@ -24,18 +24,25 @@ import java.util.List; import org.apache.spark.k8s.operator.SparkApplication; import org.apache.spark.k8s.operator.context.SparkAppContext; import org.apache.spark.k8s.operator.listeners.SparkAppStatusListener; +import org.apache.spark.k8s.operator.metrics.SparkAppStatusRecorderSource; import org.apache.spark.k8s.operator.status.ApplicationState; import org.apache.spark.k8s.operator.status.ApplicationStatus; /** Records the status of a Spark application. */ public class SparkAppStatusRecorder extends StatusRecorder<ApplicationStatus, SparkApplication, SparkAppStatusListener> { - public SparkAppStatusRecorder(List<SparkAppStatusListener> statusListeners) { + protected final SparkAppStatusRecorderSource recorderSource; + + public SparkAppStatusRecorder( + List<SparkAppStatusListener> statusListeners, SparkAppStatusRecorderSource recorderSource) { super(statusListeners, ApplicationStatus.class, SparkApplication.class); + this.recorderSource = recorderSource; } public void appendNewStateAndPersist(SparkAppContext context, ApplicationState newState) { - ApplicationStatus updatedStatus = context.getResource().getStatus().appendNewState(newState); + ApplicationStatus appStatus = context.getResource().getStatus(); + recorderSource.recordStatusUpdateLatency(appStatus, newState); + ApplicationStatus updatedStatus = appStatus.appendNewState(newState); persistStatus(context, updatedStatus); } } diff --git a/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/SparkAppStatusRecorderSourceTest.java b/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/SparkAppStatusRecorderSourceTest.java new file mode 100644 index 0000000..18b74f8 --- /dev/null +++ b/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/SparkAppStatusRecorderSourceTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.k8s.operator.metrics; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Map; + +import com.codahale.metrics.Timer; +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import org.junit.jupiter.api.Test; + +import org.apache.spark.k8s.operator.SparkApplication; +import org.apache.spark.k8s.operator.status.ApplicationState; +import org.apache.spark.k8s.operator.status.ApplicationStateSummary; +import org.apache.spark.k8s.operator.status.ApplicationStatus; + +class SparkAppStatusRecorderSourceTest { + + @Test + void recordStatusUpdateLatency() { + SparkAppStatusRecorderSource source = new SparkAppStatusRecorderSource(); + SparkApplication app1 = prepareApplication("foo-1", "bar-1"); + SparkApplication app2 = prepareApplication("foo-2", "bar-2"); + + ApplicationState stateUpdate11 = + new ApplicationState(ApplicationStateSummary.DriverRequested, "foo"); + ApplicationState stateUpdate12 = + new ApplicationState(ApplicationStateSummary.DriverRequested, "bar"); + // record short latency + source.recordStatusUpdateLatency(app1.getStatus(), stateUpdate11); + source.recordStatusUpdateLatency(app2.getStatus(), stateUpdate12); + app1.setStatus(app1.getStatus().appendNewState(stateUpdate11)); + + ApplicationState stateUpdate2 = + new ApplicationState(ApplicationStateSummary.DriverStarted, "foo"); + source.recordStatusUpdateLatency(app1.getStatus(), stateUpdate2); + + Map<String, Timer> timers = source.metricRegistry().getTimers(); + assertEquals(2, timers.size()); + assertTrue(timers.containsKey("sparkapp.latency.from.submitted.to.driverrequested")); + assertTrue( + timers.get("sparkapp.latency.from.submitted.to.driverrequested").getSnapshot().getMin() + > 0); + assertEquals(2, timers.get("sparkapp.latency.from.submitted.to.driverrequested").getCount()); + assertTrue(timers.containsKey("sparkapp.latency.from.driverrequested.to.driverstarted")); + assertEquals( + 1, timers.get("sparkapp.latency.from.driverrequested.to.driverstarted").getCount()); + assertTrue( + timers.get("sparkapp.latency.from.driverrequested.to.driverstarted").getSnapshot().getMin() + > 0); + } + + protected SparkApplication prepareApplication(String name, String namespace) { + SparkApplication app = new SparkApplication(); + app.setMetadata(new ObjectMetaBuilder().withName(name).withNamespace(namespace).build()); + app.setStatus(new ApplicationStatus()); + return app; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org