This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new eb09be9  [SPARK-36052][K8S] Introducing a limit for pending PODs
eb09be9 is described below

commit eb09be9e68737bf2f29ca5391874f4c5fa0de3e2
Author: attilapiros <[email protected]>
AuthorDate: Tue Aug 10 20:16:21 2021 -0700

    [SPARK-36052][K8S] Introducing a limit for pending PODs
    
    Introducing a limit for pending PODs (newly created/requested executors 
included).
    This limit is global for all the resource profiles. So first we have to 
count all the newly created and pending PODs (decreased by the ones which 
requested to be deleted) then we can share the remaining pending POD slots 
among the resource profiles.
    
    Without this PR dynamic allocation could request too many PODs and the K8S 
scheduler could be overloaded and scheduling of PODs will be affected by the 
load.
    
    No.
    
    With new unit tests.
    
    Closes #33492 from attilapiros/SPARK-36052.
    
    Authored-by: attilapiros <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit 1dced492fb286a7ada73d886fe264f5df0e2b3da)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../scala/org/apache/spark/deploy/k8s/Config.scala | 12 +++
 .../cluster/k8s/ExecutorPodsAllocator.scala        | 85 +++++++++++++++-------
 .../cluster/k8s/ExecutorPodsAllocatorSuite.scala   | 77 ++++++++++++++++++++
 3 files changed, 148 insertions(+), 26 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 9fceca7..4d352f7 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -543,6 +543,18 @@ private[spark] object Config extends Logging {
       .checkValue(delay => delay > 0, "delay must be a positive time value")
       .createWithDefaultString("30s")
 
+  val KUBERNETES_MAX_PENDING_PODS =
+    ConfigBuilder("spark.kubernetes.allocation.maxPendingPods")
+      .doc("Maximum number of pending PODs allowed during executor allocation 
for this " +
+        "application. Those newly requested executors which are unknown by 
Kubernetes yet are " +
+        "also counted into this limit as they will change into pending PODs by 
time. " +
+        "This limit is independent from the resource profiles as it limits the 
sum of all " +
+        "allocation for all the used resource profiles.")
+      .version("3.2.0")
+      .intConf
+      .checkValue(value => value > 0, "Maximum number of pending pods should 
be a positive integer")
+      .createWithDefault(Int.MaxValue)
+
   val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
   val KUBERNETES_DRIVER_ANNOTATION_PREFIX = 
"spark.kubernetes.driver.annotation."
   val KUBERNETES_DRIVER_SERVICE_ANNOTATION_PREFIX = 
"spark.kubernetes.driver.service.annotation."
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index d6dc13e..cee5360 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -57,6 +57,8 @@ private[spark] class ExecutorPodsAllocator(
 
   private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
 
+  private val maxPendingPods = conf.get(KUBERNETES_MAX_PENDING_PODS)
+
   private val podCreationTimeout = math.max(
     podAllocationDelay * 5,
     conf.get(KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT))
@@ -217,9 +219,15 @@ private[spark] class ExecutorPodsAllocator(
       }
     }
 
+    // sum of all the pending pods unknown by the scheduler (total for all the 
resources)
     var totalPendingCount = 0
-    // The order we request executors for each ResourceProfile is not 
guaranteed.
-    totalExpectedExecutorsPerResourceProfileId.asScala.foreach { case (rpId, 
targetNum) =>
+    // total not running pods (including scheduler known & unknown, pending & 
newly requested ones)
+    var totalNotRunningPodCount = 0
+    val podsToAllocateWithRpId = totalExpectedExecutorsPerResourceProfileId
+      .asScala
+      .toSeq
+      .sortBy(_._1)
+      .flatMap { case (rpId, targetNum) =>
       val podsForRpId = rpIdToExecsAndPodState.getOrElse(rpId, 
mutable.HashMap.empty)
 
       val currentRunningCount = podsForRpId.values.count {
@@ -235,7 +243,7 @@ private[spark] class ExecutorPodsAllocator(
       }
       // This variable is used later to print some debug logs. It's updated 
when cleaning up
       // excess pod requests, since currentPendingExecutorsForRpId is 
immutable.
-      var knownPendingCount = currentPendingExecutorsForRpId.size
+      var pendingCountForRpId = currentPendingExecutorsForRpId.size
 
       val newlyCreatedExecutorsForRpId =
         newlyCreatedExecutors.filter { case (_, (waitingRpId, _)) =>
@@ -248,12 +256,12 @@ private[spark] class ExecutorPodsAllocator(
         }
 
       if (podsForRpId.nonEmpty) {
-        logDebug(s"ResourceProfile Id: $rpId " +
+        logDebug(s"ResourceProfile Id: $rpId (" +
           s"pod allocation status: $currentRunningCount running, " +
           s"${currentPendingExecutorsForRpId.size} unknown pending, " +
           s"${schedulerKnownPendingExecsForRpId.size} scheduler backend known 
pending, " +
           s"${newlyCreatedExecutorsForRpId.size} unknown newly created, " +
-          s"${schedulerKnownNewlyCreatedExecsForRpId.size} scheduler backend 
known newly created.")
+          s"${schedulerKnownNewlyCreatedExecsForRpId.size} scheduler backend 
known newly created)")
       }
 
       // It's possible that we have outstanding pods that are outdated when 
dynamic allocation
@@ -264,21 +272,22 @@ private[spark] class ExecutorPodsAllocator(
       //
       // TODO: with dynamic allocation off, handle edge cases if we end up 
with more running
       // executors than expected.
-      val knownPodCount = currentRunningCount +
+      var notRunningPodCountForRpId =
         currentPendingExecutorsForRpId.size + 
schedulerKnownPendingExecsForRpId.size +
         newlyCreatedExecutorsForRpId.size + 
schedulerKnownNewlyCreatedExecsForRpId.size
+      val podCountForRpId = currentRunningCount + notRunningPodCountForRpId
 
-      if (knownPodCount > targetNum) {
-        val excess = knownPodCount - targetNum
+      if (podCountForRpId > targetNum) {
+        val excess = podCountForRpId - targetNum
         val newlyCreatedToDelete = newlyCreatedExecutorsForRpId
           .filter { case (_, (_, createTime)) =>
             currentTime - createTime > executorIdleTimeout
           }.keys.take(excess).toList
-        val knownPendingToDelete = currentPendingExecutorsForRpId
+        val pendingToDelete = currentPendingExecutorsForRpId
           .filter(x => isExecutorIdleTimedOut(x._2, currentTime))
           .take(excess - newlyCreatedToDelete.size)
           .map { case (id, _) => id }
-        val toDelete = newlyCreatedToDelete ++ knownPendingToDelete
+        val toDelete = newlyCreatedToDelete ++ pendingToDelete
 
         if (toDelete.nonEmpty) {
           logInfo(s"Deleting ${toDelete.size} excess pod requests 
(${toDelete.mkString(",")}).")
@@ -293,32 +302,49 @@ private[spark] class ExecutorPodsAllocator(
               .withLabelIn(SPARK_EXECUTOR_ID_LABEL, 
toDelete.sorted.map(_.toString): _*)
               .delete()
             newlyCreatedExecutors --= newlyCreatedToDelete
-            knownPendingCount -= knownPendingToDelete.size
+            pendingCountForRpId -= pendingToDelete.size
+            notRunningPodCountForRpId -= toDelete.size
           }
         }
       }
-
-      if (newlyCreatedExecutorsForRpId.isEmpty
-        && knownPodCount < targetNum) {
-        requestNewExecutors(targetNum, knownPodCount, applicationId, rpId, 
k8sKnownPVCNames)
-      }
-      totalPendingCount += knownPendingCount
+      totalPendingCount += pendingCountForRpId
+      totalNotRunningPodCount += notRunningPodCountForRpId
 
       // The code below just prints debug messages, which are only useful when 
there's a change
       // in the snapshot state. Since the messages are a little spammy, avoid 
them when we know
       // there are no useful updates.
       if (log.isDebugEnabled && snapshots.nonEmpty) {
-        val outstanding = knownPendingCount + newlyCreatedExecutorsForRpId.size
+        val outstanding = pendingCountForRpId + 
newlyCreatedExecutorsForRpId.size
         if (currentRunningCount >= targetNum && !dynamicAllocationEnabled) {
           logDebug(s"Current number of running executors for ResourceProfile 
Id $rpId is " +
             "equal to the number of requested executors. Not scaling up 
further.")
         } else {
-          if (outstanding > 0) {
-            logDebug(s"Still waiting for $outstanding executors for 
ResourceProfile " +
-              s"Id $rpId before requesting more.")
+          if (newlyCreatedExecutorsForRpId.nonEmpty) {
+            logDebug(s"Still waiting for ${newlyCreatedExecutorsForRpId.size} 
executors for " +
+              s"ResourceProfile Id $rpId before requesting more.")
           }
         }
       }
+      if (newlyCreatedExecutorsForRpId.isEmpty && podCountForRpId < targetNum) 
{
+        Some(rpId, podCountForRpId, targetNum)
+      } else {
+        // for this resource profile we do not request more PODs
+        None
+      }
+    }
+
+    val remainingSlotFromPendingPods = maxPendingPods - totalNotRunningPodCount
+    if (remainingSlotFromPendingPods > 0 && podsToAllocateWithRpId.size > 0) {
+      ExecutorPodsAllocator.splitSlots(podsToAllocateWithRpId, 
remainingSlotFromPendingPods)
+        .foreach { case ((rpId, podCountForRpId, targetNum), 
sharedSlotFromPendingPods) =>
+        val numMissingPodsForRpId = targetNum - podCountForRpId
+        val numExecutorsToAllocate =
+          math.min(math.min(numMissingPodsForRpId, podAllocationSize), 
sharedSlotFromPendingPods)
+        logInfo(s"Going to request $numExecutorsToAllocate executors from 
Kubernetes for " +
+          s"ResourceProfile Id: $rpId, target: $targetNum, known: 
$podCountForRpId, " +
+          s"sharedSlotFromPendingPods: $sharedSlotFromPendingPods.")
+        requestNewExecutors(numExecutorsToAllocate, applicationId, rpId, 
k8sKnownPVCNames)
+      }
     }
     deletedExecutorIds = _deletedExecutorIds
 
@@ -347,14 +373,10 @@ private[spark] class ExecutorPodsAllocator(
   }
 
   private def requestNewExecutors(
-      expected: Int,
-      running: Int,
+      numExecutorsToAllocate: Int,
       applicationId: String,
       resourceProfileId: Int,
       pvcsInUse: Seq[String]): Unit = {
-    val numExecutorsToAllocate = math.min(expected - running, 
podAllocationSize)
-    logInfo(s"Going to request $numExecutorsToAllocate executors from 
Kubernetes for " +
-      s"ResourceProfile Id: $resourceProfileId, target: $expected running: 
$running.")
     // Check reusable PVCs for this executor allocation batch
     val reusablePVCs = getReusablePVCs(applicationId, pvcsInUse)
     for ( _ <- 0 until numExecutorsToAllocate) {
@@ -440,3 +462,14 @@ private[spark] class ExecutorPodsAllocator(
     }
   }
 }
+
+private[spark] object ExecutorPodsAllocator {
+
+  // A utility function to split the available slots among the specified 
consumers
+  def splitSlots[T](consumers: Seq[T], slots: Int): Seq[(T, Int)] = {
+    val d = slots / consumers.size
+    val r = slots % consumers.size
+    consumers.take(r).map((_, d + 1)) ++ consumers.takeRight(consumers.size - 
r).map((_, d))
+  }
+
+}
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
index a54f1a1..5b33da6 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
@@ -117,6 +117,83 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite 
with BeforeAndAfter {
     podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)
   }
 
+  test("SPARK-36052: test splitSlots") {
+    val seq1 = Seq("a")
+    assert(ExecutorPodsAllocator.splitSlots(seq1, 0) === Seq(("a", 0)))
+    assert(ExecutorPodsAllocator.splitSlots(seq1, 1) === Seq(("a", 1)))
+    assert(ExecutorPodsAllocator.splitSlots(seq1, 2) === Seq(("a", 2)))
+
+    val seq2 = Seq("a", "b", "c")
+    assert(ExecutorPodsAllocator.splitSlots(seq2, 0) === Seq(("a", 0), ("b", 
0), ("c", 0)))
+    assert(ExecutorPodsAllocator.splitSlots(seq2, 1) === Seq(("a", 1), ("b", 
0), ("c", 0)))
+    assert(ExecutorPodsAllocator.splitSlots(seq2, 2) === Seq(("a", 1), ("b", 
1), ("c", 0)))
+    assert(ExecutorPodsAllocator.splitSlots(seq2, 3) === Seq(("a", 1), ("b", 
1), ("c", 1)))
+    assert(ExecutorPodsAllocator.splitSlots(seq2, 4) === Seq(("a", 2), ("b", 
1), ("c", 1)))
+  }
+
+  test("SPARK-36052: pending pod limit with multiple resource profiles") {
+    when(podOperations
+      .withField("status.phase", "Pending"))
+      .thenReturn(podOperations)
+    when(podOperations
+      .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
+      .thenReturn(podOperations)
+    when(podOperations
+      .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
+      .thenReturn(podOperations)
+    when(podOperations
+      .withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any()))
+      .thenReturn(podOperations)
+
+    val startTime = Instant.now.toEpochMilli
+    waitForExecutorPodsClock.setTime(startTime)
+
+    val rpb = new ResourceProfileBuilder()
+    val ereq = new ExecutorResourceRequests()
+    val treq = new TaskResourceRequests()
+    ereq.cores(4).memory("2g")
+    treq.cpus(2)
+    rpb.require(ereq).require(treq)
+    val rp = rpb.build()
+
+    val confWithLowMaxPendingPods = 
conf.clone.set(KUBERNETES_MAX_PENDING_PODS.key, "3")
+    podsAllocatorUnderTest = new 
ExecutorPodsAllocator(confWithLowMaxPendingPods, secMgr,
+      executorBuilder, kubernetesClient, snapshotsStore, 
waitForExecutorPodsClock)
+    podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)
+
+    podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2, 
rp -> 3))
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
+    verify(podOperations).create(podWithAttachedContainerForId(1, 
defaultProfile.id))
+    verify(podOperations).create(podWithAttachedContainerForId(2, 
defaultProfile.id))
+    verify(podOperations).create(podWithAttachedContainerForId(3, rp.id))
+
+    // Mark executor 2 and 3 as pending, leave 1 as newly created but this 
does not free up
+    // any pending pod slot so no new pod is requested
+    snapshotsStore.updatePod(pendingExecutor(2, defaultProfile.id))
+    snapshotsStore.updatePod(pendingExecutor(3, rp.id))
+    snapshotsStore.notifySubscribers()
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
+    verify(podOperations, times(3)).create(any())
+    verify(podOperations, never()).delete()
+
+    // Downscaling for defaultProfile resource ID with 1 executor to make one 
free slot
+    // for pendings pods
+    waitForExecutorPodsClock.advance(executorIdleTimeout * 2)
+    podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, 
rp -> 3))
+    snapshotsStore.notifySubscribers()
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
+    verify(podOperations).create(podWithAttachedContainerForId(4, rp.id))
+    verify(podOperations, times(1)).delete()
+
+    // Make one pod running this way we have one more free slot for pending 
pods
+    snapshotsStore.updatePod(runningExecutor(3, rp.id))
+    snapshotsStore.updatePod(pendingExecutor(4, rp.id))
+    snapshotsStore.notifySubscribers()
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
+    verify(podOperations).create(podWithAttachedContainerForId(5, rp.id))
+    verify(podOperations, times(1)).delete()
+  }
+
   test("Initially request executors in batches. Do not request another batch 
if the" +
     " first has not finished.") {
     podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 
(podAllocationSize + 1)))

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to