[
https://issues.apache.org/jira/browse/SPARK-55134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18053488#comment-18053488
]
Wenjun Ruan commented on SPARK-55134:
-------------------------------------
cc @[~dongjoon]
> If the executor’s resource request is greater than its limit, the driver is
> expected to exit.
> ---------------------------------------------------------------------------------------------
>
> Key: SPARK-55134
> URL: https://issues.apache.org/jira/browse/SPARK-55134
> Project: Spark
> Issue Type: Improvement
> Components: Kubernetes
> Affects Versions: 3.3.4
> Reporter: Wenjun Ruan
> Priority: Major
>
> When an application is submitted with invalid executor resource requests or
> limits, the driver does not exit. Instead, it keeps retrying to create the
> executor pod.
> ```
> 26/01/22 11:26:55 INFO ExecutorPodsAllocator: Going to request 1 executors
> from Kubernetes for ResourceProfile Id: 0, target: 1, known: 0,
> sharedSlotFromPendingPods: 2147483647.
> 26/01/22 11:26:55 INFO KubernetesClientUtils: Spark configuration files
> loaded from Some(/opt/spark/conf) :
> core-site.xml,spark-env.sh,log4j2.properties
> 26/01/22 11:26:55 INFO KubernetesClientUtils: Spark configuration files
> loaded from Some(/opt/spark/conf) :
> core-site.xml,spark-env.sh,log4j2.properties
> 26/01/22 11:26:55 INFO BasicExecutorFeatureStep: Decommissioning not enabled,
> skipping shutdown script
> 26/01/22 11:26:55 INFO Utils: Successfully started service
> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 7079.
> 26/01/22 11:26:55 INFO NettyBlockTransferService: Server created on
> spark-1381a29be3bd3691-driver-svc.spark.svc:7079
> 26/01/22 11:26:55 INFO BlockManager: Using
> org.apache.spark.storage.RandomBlockReplicationPolicy for block replication
> policy
> 26/01/22 11:26:55 INFO BlockManagerMaster: Registering BlockManager
> BlockManagerId(driver, spark-1381a29be3bd3691-driver-svc.spark.svc, 7079,
> None)
> 26/01/22 11:26:55 INFO BlockManagerMasterEndpoint: Registering block manager
> spark-1381a29be3bd3691-driver-svc.spark.svc:7079 with 2.2 GiB RAM,
> BlockManagerId(driver, spark-1381a29be3bd3691-driver-svc.spark.svc, 7079,
> None)
> 26/01/22 11:26:55 INFO BlockManagerMaster: Registered BlockManager
> BlockManagerId(driver, spark-1381a29be3bd3691-driver-svc.spark.svc, 7079,
> None)
> 26/01/22 11:26:55 INFO BlockManager: Initialized BlockManager:
> BlockManagerId(driver, spark-1381a29be3bd3691-driver-svc.spark.svc, 7079,
> None)
> 26/01/22 11:26:55 WARN ExecutorPodsSnapshotsStoreImpl: Exception when
> notifying snapshot subscriber.
> io.fabric8.kubernetes.client.KubernetesClientException: Failure executing:
> POST at: [https://kubernetes.default.svc/api/v1/namespaces/spark/pods].
> Message: Pod "xx-spark-cfe3beda-3403-4fa5-8d1d-ab670d83083e-exec-1" is
> invalid: spec.containers[0].resources.requests: Invalid value: "4": must be
> less than or equal to cpu limit of 1. Received status: Status(apiVersion=v1,
> code=422,
> details=StatusDetails(causes=[StatusCause(field=spec.containers[0].resources.requests,
> message=Invalid value: "4": must be less than or equal to cpu limit of 1,
> reason=FieldValueInvalid, additionalProperties={})], group=null, kind=Pod,
> name=xx-spark-cfe3beda-3403-4fa5-8d1d-ab670d83083e-exec-1,
> retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status,
> message=Pod "xx-spark-cfe3beda-3403-4fa5-8d1d-ab670d83083e-exec-1" is
> invalid: spec.containers[0].resources.requests: Invalid value: "4": must be
> less than or equal to cpu limit of 1, metadata=ListMeta(_continue=null,
> remainingItemCount=null, resourceVersion=null, selfLink=null,
> additionalProperties={}), reason=Invalid, status=Failure,
> additionalProperties={}).
> at
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:682)
> at
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:661)
> at
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.assertResponseCode(OperationSupport.java:612)
> at
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:555)
> at
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:518)
> at
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleCreate(OperationSupport.java:305)
> at
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleCreate(BaseOperation.java:644)
> at
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleCreate(BaseOperation.java:83)
> at
> io.fabric8.kubernetes.client.dsl.base.CreateOnlyResourceOperation.create(CreateOnlyResourceOperation.java:61)
> at
> org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$requestNewExecutors$1(ExecutorPodsAllocator.scala:407)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
> at
> org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.requestNewExecutors(ExecutorPodsAllocator.scala:389)
> at
> org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$onNewSnapshots$35(ExecutorPodsAllocator.scala:349)
> at
> org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$onNewSnapshots$35$adapted(ExecutorPodsAllocator.scala:342)
> at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
> at
> org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.onNewSnapshots(ExecutorPodsAllocator.scala:342)
> at
> org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$3(ExecutorPodsAllocator.scala:120)
> at
> org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$3$adapted(ExecutorPodsAllocator.scala:120)
> at
> org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl$SnapshotsSubscriber.org$apache$spark$scheduler$cluster$k8s$ExecutorPodsSnapshotsStoreImpl$SnapshotsSubscriber$$processSnapshotsInternal(ExecutorPodsSnapshotsStoreImpl.scala:138)
> at
> org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl$SnapshotsSubscriber.processSnapshots(ExecutorPodsSnapshotsStoreImpl.scala:126)
> at
> org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.$anonfun$addSubscriber$1(ExecutorPodsSnapshotsStoreImpl.scala:81)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
> ```
>
> I find the driver will exist when catch a `IllegalArgumentException`, so
> maybe we can throw `IllegalArgumentException` at `BasicExecutorFeatureStep`
> when find the limit request is larger than request
> `
> ```
> private def processSnapshotsInternal(): Unit = {
> if (lock.tryLock()) {
> // Check whether there are pending notifications before calling the
> subscriber. This
> // is needed to avoid calling the subscriber spuriously when the race
> described in the
> // comment below happens.
> if (notificationCount.get() > 0) {
> try {
> val snapshots = new ArrayList[ExecutorPodsSnapshot]()
> snapshotsBuffer.drainTo(snapshots)
> onNewSnapshots(snapshots.asScala.toSeq)
> } catch {
> case e: IllegalArgumentException =>
> logError("Going to stop due to IllegalArgumentException", e)
> System.exit(1)
> case NonFatal(e) => logWarning("Exception when notifying snapshot
> subscriber.", e)
> } finally {
> lock.unlock()
> }
> if (notificationCount.decrementAndGet() > 0) {
> // There was another concurrent request for this subscriber. Schedule a task
> to
> // immediately process snapshots again, so that the subscriber can pick up any
> // changes that may have happened between the time it started looking at
> snapshots
> // above, and the time the concurrent request arrived.
> //
> // This has to be done outside of the lock, otherwise we might miss a
> notification
> // arriving after the above check, but before we've released the lock. Flip
> side is
> // that we may schedule a useless task that will just fail to grab the lock.
> subscribersExecutor.submit(new Runnable() {
> override def run(): Unit = processSnapshotsInternal()
> })
> }
> } else {
> lock.unlock()
> }
> }
> }
> ```
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]