Wenjun Ruan created SPARK-55134:
-----------------------------------

             Summary: If the executor’s resource request is greater than its 
limit, the driver is expected to exit.
                 Key: SPARK-55134
                 URL: https://issues.apache.org/jira/browse/SPARK-55134
             Project: Spark
          Issue Type: Improvement
          Components: Kubernetes
    Affects Versions: 3.3.4
            Reporter: Wenjun Ruan


When an application is submitted with invalid executor resource requests or 
limits, the driver does not exit. Instead, it keeps retrying to create the 
executor pod.

```
26/01/22 11:26:55 INFO ExecutorPodsAllocator: Going to request 1 executors from 
Kubernetes for ResourceProfile Id: 0, target: 1, known: 0, 
sharedSlotFromPendingPods: 2147483647.
26/01/22 11:26:55 INFO KubernetesClientUtils: Spark configuration files loaded 
from Some(/opt/spark/conf) : core-site.xml,spark-env.sh,log4j2.properties
26/01/22 11:26:55 INFO KubernetesClientUtils: Spark configuration files loaded 
from Some(/opt/spark/conf) : core-site.xml,spark-env.sh,log4j2.properties
26/01/22 11:26:55 INFO BasicExecutorFeatureStep: Decommissioning not enabled, 
skipping shutdown script
26/01/22 11:26:55 INFO Utils: Successfully started service 
'org.apache.spark.network.netty.NettyBlockTransferService' on port 7079.
26/01/22 11:26:55 INFO NettyBlockTransferService: Server created on 
spark-1381a29be3bd3691-driver-svc.spark.svc:7079
26/01/22 11:26:55 INFO BlockManager: Using 
org.apache.spark.storage.RandomBlockReplicationPolicy for block replication 
policy
26/01/22 11:26:55 INFO BlockManagerMaster: Registering BlockManager 
BlockManagerId(driver, spark-1381a29be3bd3691-driver-svc.spark.svc, 7079, None)
26/01/22 11:26:55 INFO BlockManagerMasterEndpoint: Registering block manager 
spark-1381a29be3bd3691-driver-svc.spark.svc:7079 with 2.2 GiB RAM, 
BlockManagerId(driver, spark-1381a29be3bd3691-driver-svc.spark.svc, 7079, None)
26/01/22 11:26:55 INFO BlockManagerMaster: Registered BlockManager 
BlockManagerId(driver, spark-1381a29be3bd3691-driver-svc.spark.svc, 7079, None)
26/01/22 11:26:55 INFO BlockManager: Initialized BlockManager: 
BlockManagerId(driver, spark-1381a29be3bd3691-driver-svc.spark.svc, 7079, None)
26/01/22 11:26:55 WARN ExecutorPodsSnapshotsStoreImpl: Exception when notifying 
snapshot subscriber.
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: POST 
at: [https://kubernetes.default.svc/api/v1/namespaces/spark/pods]. Message: Pod 
"xx-spark-cfe3beda-3403-4fa5-8d1d-ab670d83083e-exec-1" is invalid: 
spec.containers[0].resources.requests: Invalid value: "4": must be less than or 
equal to cpu limit of 1. Received status: Status(apiVersion=v1, code=422, 
details=StatusDetails(causes=[StatusCause(field=spec.containers[0].resources.requests,
 message=Invalid value: "4": must be less than or equal to cpu limit of 1, 
reason=FieldValueInvalid, additionalProperties={})], group=null, kind=Pod, 
name=xx-spark-cfe3beda-3403-4fa5-8d1d-ab670d83083e-exec-1, 
retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, 
message=Pod "xx-spark-cfe3beda-3403-4fa5-8d1d-ab670d83083e-exec-1" is invalid: 
spec.containers[0].resources.requests: Invalid value: "4": must be less than or 
equal to cpu limit of 1, metadata=ListMeta(_continue=null, 
remainingItemCount=null, resourceVersion=null, selfLink=null, 
additionalProperties={}), reason=Invalid, status=Failure, 
additionalProperties={}).
at 
io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:682)
at 
io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:661)
at 
io.fabric8.kubernetes.client.dsl.base.OperationSupport.assertResponseCode(OperationSupport.java:612)
at 
io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:555)
at 
io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:518)
at 
io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleCreate(OperationSupport.java:305)
at 
io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleCreate(BaseOperation.java:644)
at 
io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleCreate(BaseOperation.java:83)
at 
io.fabric8.kubernetes.client.dsl.base.CreateOnlyResourceOperation.create(CreateOnlyResourceOperation.java:61)
at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$requestNewExecutors$1(ExecutorPodsAllocator.scala:407)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.requestNewExecutors(ExecutorPodsAllocator.scala:389)
at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$onNewSnapshots$35(ExecutorPodsAllocator.scala:349)
at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$onNewSnapshots$35$adapted(ExecutorPodsAllocator.scala:342)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.onNewSnapshots(ExecutorPodsAllocator.scala:342)
at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$3(ExecutorPodsAllocator.scala:120)
at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$3$adapted(ExecutorPodsAllocator.scala:120)
at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl$SnapshotsSubscriber.org$apache$spark$scheduler$cluster$k8s$ExecutorPodsSnapshotsStoreImpl$SnapshotsSubscriber$$processSnapshotsInternal(ExecutorPodsSnapshotsStoreImpl.scala:138)
at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl$SnapshotsSubscriber.processSnapshots(ExecutorPodsSnapshotsStoreImpl.scala:126)
at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.$anonfun$addSubscriber$1(ExecutorPodsSnapshotsStoreImpl.scala:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
```

 

I find the driver will exist when catch a `IllegalArgumentException`, so maybe 
we can throw `IllegalArgumentException` at `BasicExecutorFeatureStep` when find 
the limit request is larger than request

`

```
private def processSnapshotsInternal(): Unit = {
if (lock.tryLock()) {
// Check whether there are pending notifications before calling the subscriber. 
This
// is needed to avoid calling the subscriber spuriously when the race described 
in the
// comment below happens.
if (notificationCount.get() > 0) {
try {
val snapshots = new ArrayList[ExecutorPodsSnapshot]()
snapshotsBuffer.drainTo(snapshots)
onNewSnapshots(snapshots.asScala.toSeq)
} catch {
case e: IllegalArgumentException =>
logError("Going to stop due to IllegalArgumentException", e)
System.exit(1)
case NonFatal(e) => logWarning("Exception when notifying snapshot subscriber.", 
e)
} finally {
lock.unlock()
}

if (notificationCount.decrementAndGet() > 0) {
// There was another concurrent request for this subscriber. Schedule a task to
// immediately process snapshots again, so that the subscriber can pick up any
// changes that may have happened between the time it started looking at 
snapshots
// above, and the time the concurrent request arrived.
//
// This has to be done outside of the lock, otherwise we might miss a 
notification
// arriving after the above check, but before we've released the lock. Flip 
side is
// that we may schedule a useless task that will just fail to grab the lock.
subscribersExecutor.submit(new Runnable() {
override def run(): Unit = processSnapshotsInternal()
})
}
} else {
lock.unlock()
}
}
}
```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to