This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 178bcecb9c2f [SPARK-55075][K8S] Track executor pod creation errors
with ExecutorFailureTracker
178bcecb9c2f is described below
commit 178bcecb9c2f3b980bf6ab27bb2fbb16b1b87fdc
Author: Parth Chandra <[email protected]>
AuthorDate: Mon Feb 23 21:07:55 2026 -0800
[SPARK-55075][K8S] Track executor pod creation errors with
ExecutorFailureTracker
### What changes were proposed in this pull request?
Adds tracking of executor pod creation with the ExecutorFailureTracker.
### Why are the changes needed?
If there are unrecoverable pod creation errors then Spark continues to try
and create pods instead of failing. An example is where a note book server is
constrained to have a maximum number of pods and the user tries to start a
notebook with twice the number of executors as the limit. In this case the user
gets and 'Unauthorized' message in the logs but Spark will keep on trying to
spin up new pods. By tracking pod creation failures we can stop trying after
reaching max executor failures.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
New unit tests added
### Was this patch authored or co-authored using generative AI tooling?
Unit tests generated using Claude Code.
Closes #53840 from parthchandra/k8s-failures.
Authored-by: Parth Chandra <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../cluster/k8s/AbstractPodsAllocator.scala | 14 ++++
.../cluster/k8s/ExecutorPodsAllocator.scala | 84 +++++++++++++++-------
.../cluster/k8s/ExecutorPodsLifecycleManager.scala | 8 +++
.../cluster/k8s/KubernetesClusterManager.scala | 21 ++++--
.../cluster/k8s/ExecutorPodsAllocatorSuite.scala | 28 ++++++++
5 files changed, 126 insertions(+), 29 deletions(-)
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/AbstractPodsAllocator.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/AbstractPodsAllocator.scala
index cc081202cf89..8803d3d031e4 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/AbstractPodsAllocator.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/AbstractPodsAllocator.scala
@@ -35,6 +35,20 @@ import org.apache.spark.resource.ResourceProfile
*/
@DeveloperApi
abstract class AbstractPodsAllocator {
+ /*
+ * Optional lifecycle manager for tracking executor pod lifecycle events.
+ * Set via setExecutorPodsLifecycleManager for backward compatibility.
+ */
+ protected var executorPodsLifecycleManager:
Option[ExecutorPodsLifecycleManager] = None
+
+ /*
+ * Set the lifecycle manager for tracking executor pod lifecycle events.
+ * This method is optional and may not exist in custom implementations based
on older versions.
+ */
+ def setExecutorPodsLifecycleManager(manager: ExecutorPodsLifecycleManager):
Unit = {
+ executorPodsLifecycleManager = Some(manager)
+ }
+
/*
* Set the total expected executors for an application
*/
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index 126d07c0926f..340cc9c76e46 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -118,6 +118,9 @@ class ExecutorPodsAllocator(
protected val numOutstandingPods = new AtomicInteger()
+ // Track total failed pod creation attempts across the application lifecycle
+ protected val totalFailedPodCreations = new AtomicInteger(0)
+
protected var lastSnapshot = ExecutorPodsSnapshot()
protected var appId: String = _
@@ -459,32 +462,55 @@ class ExecutorPodsAllocator(
.build()
val resources = replacePVCsIfNeeded(
podWithAttachedContainer,
resolvedExecutorSpec.executorKubernetesResources, reusablePVCs)
- val createdExecutorPod =
-
kubernetesClient.pods().inNamespace(namespace).resource(podWithAttachedContainer).create()
- try {
- addOwnerReference(createdExecutorPod, resources)
- resources
- .filter(_.getKind == "PersistentVolumeClaim")
- .foreach { resource =>
- if (conf.get(KUBERNETES_DRIVER_OWN_PVC) && driverPod.nonEmpty) {
- addOwnerReference(driverPod.get, Seq(resource))
- }
- val pvc = resource.asInstanceOf[PersistentVolumeClaim]
- logInfo(log"Trying to create PersistentVolumeClaim " +
- log"${MDC(LogKeys.PVC_METADATA_NAME, pvc.getMetadata.getName)}
with " +
- log"StorageClass ${MDC(LogKeys.CLASS_NAME,
pvc.getSpec.getStorageClassName)}")
-
kubernetesClient.persistentVolumeClaims().inNamespace(namespace).resource(pvc).create()
- PVC_COUNTER.incrementAndGet()
- }
- newlyCreatedExecutors(newExecutorId) = (resourceProfileId,
clock.getTimeMillis())
- logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
+ val optCreatedExecutorPod = try {
+ Some(kubernetesClient
+ .pods()
+ .inNamespace(namespace)
+ .resource(podWithAttachedContainer)
+ .create())
} catch {
case NonFatal(e) =>
- kubernetesClient.pods()
- .inNamespace(namespace)
- .resource(createdExecutorPod)
- .delete()
- throw e
+ // Register failure with global tracker if lifecycle manager is
available
+ val failureCount = registerPodCreationFailure()
+ logError(log"Failed to create executor pod
${MDC(LogKeys.EXECUTOR_ID, newExecutorId)}. " +
+ log"Total failures: ${MDC(LogKeys.TOTAL, failureCount)}", e)
+ None
+ }
+ optCreatedExecutorPod.foreach { createdExecutorPod =>
+ try {
+ addOwnerReference(createdExecutorPod, resources)
+ resources
+ .filter(_.getKind == "PersistentVolumeClaim")
+ .foreach { resource =>
+ if (conf.get(KUBERNETES_DRIVER_OWN_PVC) && driverPod.nonEmpty) {
+ addOwnerReference(driverPod.get, Seq(resource))
+ }
+ val pvc = resource.asInstanceOf[PersistentVolumeClaim]
+ logInfo(log"Trying to create PersistentVolumeClaim " +
+ log"${MDC(LogKeys.PVC_METADATA_NAME, pvc.getMetadata.getName)}
with " +
+ log"StorageClass ${MDC(LogKeys.CLASS_NAME,
pvc.getSpec.getStorageClassName)}")
+ kubernetesClient
+ .persistentVolumeClaims()
+ .inNamespace(namespace)
+ .resource(pvc)
+ .create()
+ PVC_COUNTER.incrementAndGet()
+ }
+ newlyCreatedExecutors(newExecutorId) = (resourceProfileId,
clock.getTimeMillis())
+ logDebug(s"Requested executor with id $newExecutorId from
Kubernetes.")
+ } catch {
+ case NonFatal(e) =>
+ // Register failure with global tracker if lifecycle manager is
available
+ val failureCount = registerPodCreationFailure()
+ logError(log"Failed to add owner reference or create PVC for
executor pod " +
+ log"${MDC(LogKeys.EXECUTOR_ID, newExecutorId)}. " +
+ log"Total failures: ${MDC(LogKeys.TOTAL, failureCount)}", e)
+ kubernetesClient.pods()
+ .inNamespace(namespace)
+ .resource(createdExecutorPod)
+ .delete()
+ throw e
+ }
}
}
}
@@ -521,6 +547,16 @@ class ExecutorPodsAllocator(
resources.filterNot(replacedResources.contains)
}
+ /**
+ * Registers a pod creation failure with the lifecycle manager and
increments the local counter.
+ * Returns the total failure count for logging purposes.
+ */
+ protected def registerPodCreationFailure(): Int = {
+ val failureCount = totalFailedPodCreations.incrementAndGet()
+ executorPodsLifecycleManager.foreach(_.registerExecutorFailure())
+ failureCount
+ }
+
protected def isExecutorIdleTimedOut(state: ExecutorPodState, currentTime:
Long): Boolean = {
try {
val creationTime =
Instant.parse(state.pod.getMetadata.getCreationTimestamp).toEpochMilli()
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
index c57a014dcfa6..84cfd0d72b3b 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
@@ -77,6 +77,14 @@ private[spark] class ExecutorPodsLifecycleManager(
protected[spark] def getNumExecutorsFailed: Int =
failureTracker.numFailedExecutors
+ /**
+ * Register an executor failure. This increments the global executor failure
count
+ * which is checked against spark.executor.maxNumFailures.
+ */
+ protected[spark] def registerExecutorFailure(): Unit = {
+ failureTracker.registerExecutorFailure()
+ }
+
def start(schedulerBackend: KubernetesClusterSchedulerBackend): Unit = {
val eventProcessingInterval =
conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL)
snapshotsStore.addSubscriber(eventProcessingInterval) {
executorPodsSnapshot =>
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
index d2c6789f2bb5..782fac670fa8 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -134,7 +134,8 @@ private[spark] class KubernetesClusterManager extends
ExternalClusterManager wit
kubernetesClient,
snapshotsStore)
- val executorPodsAllocator = makeExecutorPodsAllocator(sc,
kubernetesClient, snapshotsStore)
+ val executorPodsAllocator = makeExecutorPodsAllocator(
+ sc, kubernetesClient, snapshotsStore, Some(executorPodsLifecycleManager))
val podsWatchEventSource = new ExecutorPodsWatchSnapshotSource(
snapshotsStore,
@@ -158,8 +159,11 @@ private[spark] class KubernetesClusterManager extends
ExternalClusterManager wit
podsPollingEventSource)
}
- private[k8s] def makeExecutorPodsAllocator(sc: SparkContext,
kubernetesClient: KubernetesClient,
- snapshotsStore: ExecutorPodsSnapshotsStore) = {
+ private[k8s] def makeExecutorPodsAllocator(
+ sc: SparkContext,
+ kubernetesClient: KubernetesClient,
+ snapshotsStore: ExecutorPodsSnapshotsStore,
+ lifecycleManager: Option[ExecutorPodsLifecycleManager] = None) = {
val allocator = sc.conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR)
if (allocator == "deployment" && Utils.isDynamicAllocationEnabled(sc.conf)
&&
sc.conf.get(KUBERNETES_EXECUTOR_POD_DELETION_COST).isEmpty) {
@@ -184,13 +188,20 @@ private[spark] class KubernetesClusterManager extends
ExternalClusterManager wit
classOf[SparkConf], classOf[org.apache.spark.SecurityManager],
classOf[KubernetesExecutorBuilder], classOf[KubernetesClient],
classOf[ExecutorPodsSnapshotsStore], classOf[Clock])
- cstr.newInstance(
+ val allocatorInstance = cstr.newInstance(
sc.conf,
sc.env.securityManager,
new KubernetesExecutorBuilder(),
kubernetesClient,
snapshotsStore,
- new SystemClock())
+ new SystemClock()).asInstanceOf[AbstractPodsAllocator]
+
+ // Set the lifecycle manager if provided
+ lifecycleManager.foreach { manager =>
+ allocatorInstance.setExecutorPodsLifecycleManager(manager)
+ }
+
+ allocatorInstance
}
override def initialize(scheduler: TaskScheduler, backend:
SchedulerBackend): Unit = {
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
index e994ccbed9a6..fcdf248b2a22 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
@@ -112,6 +112,9 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with
BeforeAndAfter {
@Mock
private var schedulerBackend: KubernetesClusterSchedulerBackend = _
+ @Mock
+ private var lifecycleManager: ExecutorPodsLifecycleManager = _
+
private var snapshotsStore: DeterministicExecutorPodsSnapshotsStore = _
private var podsAllocatorUnderTest: ExecutorPodsAllocator = _
@@ -142,6 +145,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with
BeforeAndAfter {
waitForExecutorPodsClock = new ManualClock(0L)
podsAllocatorUnderTest = new ExecutorPodsAllocator(
conf, secMgr, executorBuilder, kubernetesClient, snapshotsStore,
waitForExecutorPodsClock)
+ podsAllocatorUnderTest.setExecutorPodsLifecycleManager(lifecycleManager)
when(schedulerBackend.getExecutorIds()).thenReturn(Seq.empty)
podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)
when(kubernetesClient.persistentVolumeClaims()).thenReturn(persistentVolumeClaims)
@@ -202,6 +206,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with
BeforeAndAfter {
val confWithLowMaxPendingPods =
conf.clone.set(KUBERNETES_MAX_PENDING_PODS.key, "3")
podsAllocatorUnderTest = new
ExecutorPodsAllocator(confWithLowMaxPendingPods, secMgr,
executorBuilder, kubernetesClient, snapshotsStore,
waitForExecutorPodsClock)
+ podsAllocatorUnderTest.setExecutorPodsLifecycleManager(lifecycleManager)
podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2,
rp -> 3))
@@ -268,6 +273,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with
BeforeAndAfter {
.set(KUBERNETES_MAX_PENDING_PODS_PER_RPID.key, "2")
podsAllocatorUnderTest = new
ExecutorPodsAllocator(confWithLowMaxPendingPodsPerRpId, secMgr,
executorBuilder, kubernetesClient, snapshotsStore,
waitForExecutorPodsClock)
+ podsAllocatorUnderTest.setExecutorPodsLifecycleManager(lifecycleManager)
podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)
// Request more than the max per rp for one rp
@@ -321,6 +327,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with
BeforeAndAfter {
val confWithAllocationMaximum =
conf.clone.set(KUBERNETES_ALLOCATION_MAXIMUM.key, "1")
podsAllocatorUnderTest = new
ExecutorPodsAllocator(confWithAllocationMaximum, secMgr,
executorBuilder, kubernetesClient, snapshotsStore,
waitForExecutorPodsClock)
+ podsAllocatorUnderTest.setExecutorPodsLifecycleManager(lifecycleManager)
podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)
val counter = PrivateMethod[AtomicInteger](Symbol("EXECUTOR_ID_COUNTER"))()
@@ -838,6 +845,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with
BeforeAndAfter {
podsAllocatorUnderTest = new ExecutorPodsAllocator(
confWithPVC, secMgr, executorBuilder,
kubernetesClient, snapshotsStore, waitForExecutorPodsClock)
+ podsAllocatorUnderTest.setExecutorPodsLifecycleManager(lifecycleManager)
podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)
when(podsWithNamespace
@@ -936,6 +944,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with
BeforeAndAfter {
podsAllocatorUnderTest = new ExecutorPodsAllocator(
confWithPVC, secMgr, executorBuilder,
kubernetesClient, snapshotsStore, waitForExecutorPodsClock)
+ podsAllocatorUnderTest.setExecutorPodsLifecycleManager(lifecycleManager)
podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)
when(podsWithNamespace
@@ -1005,6 +1014,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite
with BeforeAndAfter {
podsAllocatorUnderTest = new ExecutorPodsAllocator(
confWithPVC, secMgr, executorBuilder,
kubernetesClient, snapshotsStore, waitForExecutorPodsClock)
+ podsAllocatorUnderTest.setExecutorPodsLifecycleManager(lifecycleManager)
podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)
val startTime = Instant.now.toEpochMilli
@@ -1052,4 +1062,22 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite
with BeforeAndAfter {
KubernetesExecutorSpec(executorPodWithId(k8sConf.executorId.toInt,
k8sConf.resourceProfileId.toInt), Seq.empty)
}
+
+ test("SPARK-55075: Pod creation failures are tracked by
ExecutorFailureTracker") {
+ // Make all pod creation attempts fail
+ when(podResource.create()).thenThrow(new
KubernetesClientException("Simulated pod" +
+ " creation failure"))
+
+ // Request 3 executors
+ podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 3))
+
+ // Verify that pod creation was attempted 3 times (once per executor, no
retries)
+ verify(podResource, times(3)).create()
+
+ // Verify that registerPodCreationFailure was called 3 times (once per
failed executor)
+ verify(lifecycleManager, times(3)).registerExecutorFailure()
+
+ // Verify no pods were created since all attempts failed
+ assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 0)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]