This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 0f3a251 [SPARK-32975][K8S][FOLLOWUP] Avoid None.get exception
0f3a251 is described below
commit 0f3a251af0795bfa4af75ce1efa6a845a31362fa
Author: Kent Yao <[email protected]>
AuthorDate: Thu Jun 10 13:39:39 2021 -0700
[SPARK-32975][K8S][FOLLOWUP] Avoid None.get exception
### What changes were proposed in this pull request?
A follow-up for SPARK-32975 to avoid unexpected the `None.get` exception
Run SparkPi with docker desktop, as podName is an option, we will got
```logtalk
21/06/09 01:09:12 ERROR Utils: Uncaught exception in thread main
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:529)
at scala.None$.get(Option.scala:527)
at
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$1(ExecutorPodsAllocator.scala:110)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1417)
at
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.start(ExecutorPodsAllocator.scala:111)
at
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.start(KubernetesClusterSchedulerBackend.scala:99)
at
org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:220)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:581)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2686)
at
org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:948)
at scala.Option.getOrElse(Option.scala:189)
at
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:942)
at org.apache.spark.examples.SparkPi$.main(SparkPi.scala:30)
at org.apache.spark.examples.SparkPi.main(SparkPi.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
at
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
```
### Why are the changes needed?
fix a regression
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
Manual.
Closes #32830 from yaooqinn/SPARK-32975.
Authored-by: Kent Yao <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit b4b78ce26567ce7ab83d47ce3b6af87c866bcacb)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../scheduler/cluster/k8s/ExecutorPodsAllocator.scala | 16 +++++++++-------
1 file changed, 9 insertions(+), 7 deletions(-)
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index 358058e..5429e36 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -102,13 +102,15 @@ private[spark] class ExecutorPodsAllocator(
@volatile private var deletedExecutorIds = Set.empty[Long]
def start(applicationId: String, schedulerBackend:
KubernetesClusterSchedulerBackend): Unit = {
- // Wait until the driver pod is ready before starting executors, as the
headless service won't
- // be resolvable by DNS until the driver pod is ready.
- Utils.tryLogNonFatalError {
- kubernetesClient
- .pods()
- .withName(kubernetesDriverPodName.get)
- .waitUntilReady(driverPodReadinessTimeout, TimeUnit.SECONDS)
+ driverPod.foreach { pod =>
+ // Wait until the driver pod is ready before starting executors, as the
headless service won't
+ // be resolvable by DNS until the driver pod is ready.
+ Utils.tryLogNonFatalError {
+ kubernetesClient
+ .pods()
+ .withName(pod.getMetadata.getName)
+ .waitUntilReady(driverPodReadinessTimeout, TimeUnit.SECONDS)
+ }
}
snapshotsStore.addSubscriber(podAllocationDelay) {
onNewSnapshots(applicationId, schedulerBackend, _)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]