This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 252dfd9 [SPARK-32975][K8S] Add config for driver readiness timeout
before executors start
252dfd9 is described below
commit 252dfd961189923e52304413036e0051346ee8e1
Author: Chris Wu <[email protected]>
AuthorDate: Fri Jun 4 06:59:49 2021 -0700
[SPARK-32975][K8S] Add config for driver readiness timeout before executors
start
### What changes were proposed in this pull request?
Add a new config that controls the timeout of waiting for driver pod's
readiness before allocating executor pods. This wait only happens once on
application start.
### Why are the changes needed?
The driver's headless service can be resolved by DNS only after the driver
pod is ready. If the executor tries to connect to the headless service before
driver pod is ready, it will hit UnkownHostException and get into error state
but will not be restarted. **This case usually happens when the driver pod has
sidecar containers but hasn't finished their creation when executors start.**
So basically there is a race condition. This issue can be mitigated by tweaking
this config.
### Does this PR introduce _any_ user-facing change?
A new config `spark.kubernetes.allocation.driver.readinessTimeout` added.
### How was this patch tested?
Exisiting tests.
Closes #32752 from cchriswu/SPARK-32975-fix.
Lead-authored-by: Chris Wu <[email protected]>
Co-authored-by: Chris Wu <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 497c80a1ad7fdd605b75c8a6601fce35c7449578)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../src/main/scala/org/apache/spark/deploy/k8s/Config.scala | 11 +++++++++++
.../spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala | 11 +++++++++++
.../scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala | 1 +
3 files changed, 23 insertions(+)
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index bec29a9..a725e68 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -235,6 +235,17 @@ private[spark] object Config extends Logging {
.checkValue(value => value > 0, "Allocation batch delay must be a
positive time value.")
.createWithDefaultString("1s")
+ val KUBERNETES_ALLOCATION_DRIVER_READINESS_TIMEOUT =
+ ConfigBuilder("spark.kubernetes.allocation.driver.readinessTimeout")
+ .doc("Time to wait for driver pod to get ready before creating executor
pods. This wait " +
+ "only happens on application start. If timeout happens, executor pods
will still be " +
+ "created.")
+ .version("3.1.3")
+ .timeConf(TimeUnit.SECONDS)
+ .checkValue(value => value > 0, "Allocation driver readiness timeout
must be a positive "
+ + "time value.")
+ .createWithDefaultString("1s")
+
val KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT =
ConfigBuilder("spark.kubernetes.allocation.executor.timeout")
.doc("Time to wait before a newly created executor POD request, which
does not reached " +
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index 5ebd172..358058e 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -18,6 +18,7 @@ package org.apache.spark.scheduler.cluster.k8s
import java.time.Instant
import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import scala.collection.JavaConverters._
@@ -61,6 +62,8 @@ private[spark] class ExecutorPodsAllocator(
podAllocationDelay * 5,
conf.get(KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT))
+ private val driverPodReadinessTimeout =
conf.get(KUBERNETES_ALLOCATION_DRIVER_READINESS_TIMEOUT)
+
private val executorIdleTimeout =
conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT) * 1000
private val namespace = conf.get(KUBERNETES_NAMESPACE)
@@ -99,6 +102,14 @@ private[spark] class ExecutorPodsAllocator(
@volatile private var deletedExecutorIds = Set.empty[Long]
def start(applicationId: String, schedulerBackend:
KubernetesClusterSchedulerBackend): Unit = {
+ // Wait until the driver pod is ready before starting executors, as the
headless service won't
+ // be resolvable by DNS until the driver pod is ready.
+ Utils.tryLogNonFatalError {
+ kubernetesClient
+ .pods()
+ .withName(kubernetesDriverPodName.get)
+ .waitUntilReady(driverPodReadinessTimeout, TimeUnit.SECONDS)
+ }
snapshotsStore.addSubscriber(podAllocationDelay) {
onNewSnapshots(applicationId, schedulerBackend, _)
}
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
index 55be80a..ed6ca2a 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
@@ -93,6 +93,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with
BeforeAndAfter {
when(kubernetesClient.pods()).thenReturn(podOperations)
when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations)
when(driverPodOperations.get).thenReturn(driverPod)
+ when(driverPodOperations.waitUntilReady(any(),
any())).thenReturn(driverPod)
when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]),
meq(secMgr),
meq(kubernetesClient),
any(classOf[ResourceProfile]))).thenAnswer(executorPodAnswer())
snapshotsStore = new DeterministicExecutorPodsSnapshotsStore()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]