Repository: spark
Updated Branches:
  refs/heads/master 4d8ae0d1c -> bfb74394a


[SPARK-24819][CORE] Fail fast when no enough slots to launch the barrier stage 
on job submitted

## What changes were proposed in this pull request?

We shall check whether the barrier stage requires more slots (to be able to 
launch all tasks in the barrier stage together) than the total number of active 
slots currently, and fail fast if trying to submit a barrier stage that 
requires more slots than current total number.

This PR proposes to add a new method `getNumSlots()` to try to get the total 
number of currently active slots in `SchedulerBackend`, support of this new 
method has been added to all the first-class scheduler backends except 
`MesosFineGrainedSchedulerBackend`.

## How was this patch tested?

Added new test cases in `BarrierStageOnSubmittedSuite`.

Closes #22001 from jiangxb1987/SPARK-24819.

Lead-authored-by: Xingbo Jiang <[email protected]>
Co-authored-by: Xiangrui Meng <[email protected]>
Signed-off-by: Xiangrui Meng <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bfb74394
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bfb74394
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bfb74394

Branch: refs/heads/master
Commit: bfb74394a5513134ea1da9fcf4a1783b77dd64e4
Parents: 4d8ae0d
Author: Xingbo Jiang <[email protected]>
Authored: Wed Aug 15 13:31:28 2018 -0700
Committer: Xiangrui Meng <[email protected]>
Committed: Wed Aug 15 13:31:28 2018 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  9 ++
 .../apache/spark/internal/config/package.scala  | 27 ++++++
 .../scheduler/BarrierJobAllocationFailed.scala  | 62 +++++++++++++
 .../apache/spark/scheduler/DAGScheduler.scala   | 88 ++++++++++++++-----
 .../spark/scheduler/SchedulerBackend.scala      |  9 ++
 .../cluster/CoarseGrainedSchedulerBackend.scala |  6 ++
 .../scheduler/local/LocalSchedulerBackend.scala |  2 +
 .../spark/BarrierStageOnSubmittedSuite.scala    | 91 ++++++++++++++++++--
 .../spark/ExecutorAllocationManagerSuite.scala  |  2 +
 .../org/apache/spark/SparkContextSuite.scala    |  1 +
 .../CoarseGrainedSchedulerBackendSuite.scala    | 89 ++++++++++++++++++-
 .../spark/scheduler/DAGSchedulerSuite.scala     |  2 +-
 .../scheduler/ExternalClusterManagerSuite.scala |  1 +
 .../scheduler/SchedulerIntegrationSuite.scala   |  2 +
 .../scheduler/TaskSchedulerImplSuite.scala      |  1 +
 .../MesosFineGrainedSchedulerBackend.scala      |  4 +
 16 files changed, 364 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bfb74394/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index a7ffb35..e5b1e0e 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1603,6 +1603,15 @@ class SparkContext(config: SparkConf) extends Logging {
   }
 
   /**
+   * Get the max number of tasks that can be concurrent launched currently.
+   * Note that please don't cache the value returned by this method, because 
the number can change
+   * due to add/remove executors.
+   *
+   * @return The max number of tasks that can be concurrent launched currently.
+   */
+  private[spark] def maxNumConcurrentTasks(): Int = 
schedulerBackend.maxNumConcurrentTasks()
+
+  /**
    * Update the cluster manager on our scheduling needs. Three bits of 
information are included
    * to help it make decisions.
    * @param numExecutors The total number of executors we'd like to have. The 
cluster manager

http://git-wip-us.apache.org/repos/asf/spark/blob/bfb74394/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index eb08628..a8aa691 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -577,4 +577,31 @@ package object config {
       .timeConf(TimeUnit.SECONDS)
       .checkValue(v => v > 0, "The value should be a positive time value.")
       .createWithDefaultString("365d")
+
+  private[spark] val BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL =
+    ConfigBuilder("spark.scheduler.barrier.maxConcurrentTasksCheck.interval")
+      .doc("Time in seconds to wait between a max concurrent tasks check 
failure and the next " +
+        "check. A max concurrent tasks check ensures the cluster can launch 
more concurrent " +
+        "tasks than required by a barrier stage on job submitted. The check 
can fail in case " +
+        "a cluster has just started and not enough executors have registered, 
so we wait for a " +
+        "little while and try to perform the check again. If the check fails 
more than a " +
+        "configured max failure times for a job then fail current job 
submission. Note this " +
+        "config only applies to jobs that contain one or more barrier stages, 
we won't perform " +
+        "the check on non-barrier jobs.")
+      .timeConf(TimeUnit.SECONDS)
+      .createWithDefaultString("15s")
+
+  private[spark] val BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES =
+    
ConfigBuilder("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures")
+      .doc("Number of max concurrent tasks check failures allowed before fail 
a job submission. " +
+        "A max concurrent tasks check ensures the cluster can launch more 
concurrent tasks than " +
+        "required by a barrier stage on job submitted. The check can fail in 
case a cluster " +
+        "has just started and not enough executors have registered, so we wait 
for a little " +
+        "while and try to perform the check again. If the check fails more 
than a configured " +
+        "max failure times for a job then fail current job submission. Note 
this config only " +
+        "applies to jobs that contain one or more barrier stages, we won't 
perform the check on " +
+        "non-barrier jobs.")
+      .intConf
+      .checkValue(v => v > 0, "The max failures should be a positive value.")
+      .createWithDefault(40)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/bfb74394/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala
new file mode 100644
index 0000000..803a0a1
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.SparkException
+
+/**
+ * Exception thrown when submit a job with barrier stage(s) failing a required 
check.
+ */
+private[spark] class BarrierJobAllocationFailed(message: String) extends 
SparkException(message)
+
+private[spark] class BarrierJobUnsupportedRDDChainException
+  extends BarrierJobAllocationFailed(
+    
BarrierJobAllocationFailed.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
+
+private[spark] class BarrierJobRunWithDynamicAllocationException
+  extends BarrierJobAllocationFailed(
+    BarrierJobAllocationFailed.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION)
+
+private[spark] class BarrierJobSlotsNumberCheckFailed
+  extends BarrierJobAllocationFailed(
+    
BarrierJobAllocationFailed.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER)
+
+private[spark] object BarrierJobAllocationFailed {
+
+  // Error message when running a barrier stage that have unsupported RDD 
chain pattern.
+  val ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN =
+    "[SPARK-24820][SPARK-24821]: Barrier execution mode does not allow the 
following pattern of " +
+      "RDD chain within a barrier stage:\n1. Ancestor RDDs that have different 
number of " +
+      "partitions from the resulting RDD (eg. 
union()/coalesce()/first()/take()/" +
+      "PartitionPruningRDD). A workaround for first()/take() can be 
barrierRdd.collect().head " +
+      "(scala) or barrierRdd.collect()[0] (python).\n" +
+      "2. An RDD that depends on multiple barrier RDDs (eg. 
barrierRdd1.zip(barrierRdd2))."
+
+  // Error message when running a barrier stage with dynamic resource 
allocation enabled.
+  val ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION =
+    "[SPARK-24942]: Barrier execution mode does not support dynamic resource 
allocation for " +
+      "now. You can disable dynamic resource allocation by setting Spark conf 
" +
+      "\"spark.dynamicAllocation.enabled\" to \"false\"."
+
+  // Error message when running a barrier stage that requires more slots than 
current total number.
+  val ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER =
+    "[SPARK-24819]: Barrier execution mode does not allow run a barrier stage 
that requires " +
+      "more slots than the total number of slots in the cluster currently. 
Please init a new " +
+      "cluster with more CPU cores or repartition the input RDD(s) to reduce 
the number of " +
+      "slots required to run this barrier stage."
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/bfb74394/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index cf1fcbc..2b0ca13 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -19,8 +19,9 @@ package org.apache.spark.scheduler
 
 import java.io.NotSerializableException
 import java.util.Properties
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
 import java.util.concurrent.atomic.AtomicInteger
+import java.util.function.BiFunction
 
 import scala.annotation.tailrec
 import scala.collection.Map
@@ -39,7 +40,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.partial.{ApproximateActionListener, 
ApproximateEvaluator, PartialResult}
-import org.apache.spark.rdd.{PartitionPruningRDD, RDD, RDDCheckpointData}
+import org.apache.spark.rdd.{RDD, RDDCheckpointData}
 import org.apache.spark.rpc.RpcTimeout
 import org.apache.spark.storage._
 import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
@@ -111,8 +112,7 @@ import org.apache.spark.util._
  *  - When adding a new data structure, update 
`DAGSchedulerSuite.assertDataStructuresEmpty` to
  *    include the new structure. This will help to catch memory leaks.
  */
-private[spark]
-class DAGScheduler(
+private[spark] class DAGScheduler(
     private[scheduler] val sc: SparkContext,
     private[scheduler] val taskScheduler: TaskScheduler,
     listenerBus: LiveListenerBus,
@@ -203,6 +203,24 @@ class DAGScheduler(
     sc.getConf.getInt("spark.stage.maxConsecutiveAttempts",
       DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS)
 
+  /**
+   * Number of max concurrent tasks check failures for each barrier job.
+   */
+  private[scheduler] val barrierJobIdToNumTasksCheckFailures = new 
ConcurrentHashMap[Int, Int]
+
+  /**
+   * Time in seconds to wait between a max concurrent tasks check failure and 
the next check.
+   */
+  private val timeIntervalNumTasksCheck = sc.getConf
+    .get(config.BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL)
+
+  /**
+   * Max number of max concurrent tasks check failures allowed for a job 
before fail the job
+   * submission.
+   */
+  private val maxFailureNumTasksCheck = sc.getConf
+    .get(config.BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES)
+
   private val messageScheduler =
     ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message")
 
@@ -351,8 +369,7 @@ class DAGScheduler(
     val predicate: RDD[_] => Boolean = (r =>
       r.getNumPartitions == numTasksInStage && 
r.dependencies.filter(_.rdd.isBarrier()).size <= 1)
     if (rdd.isBarrier() && !traverseParentRDDsWithinStage(rdd, predicate)) {
-      throw new SparkException(
-        
DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
+      throw new BarrierJobUnsupportedRDDChainException
     }
   }
 
@@ -365,6 +382,7 @@ class DAGScheduler(
   def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: 
Int): ShuffleMapStage = {
     val rdd = shuffleDep.rdd
     checkBarrierStageWithDynamicAllocation(rdd)
+    checkBarrierStageWithNumSlots(rdd)
     checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions)
     val numTasks = rdd.partitions.length
     val parents = getOrCreateParentStages(rdd, jobId)
@@ -398,7 +416,20 @@ class DAGScheduler(
    */
   private def checkBarrierStageWithDynamicAllocation(rdd: RDD[_]): Unit = {
     if (rdd.isBarrier() && Utils.isDynamicAllocationEnabled(sc.getConf)) {
-      throw new 
SparkException(DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION)
+      throw new BarrierJobRunWithDynamicAllocationException
+    }
+  }
+
+  /**
+   * Check whether the barrier stage requires more slots (to be able to launch 
all tasks in the
+   * barrier stage together) than the total number of active slots currently. 
Fail current check
+   * if trying to submit a barrier stage that requires more slots than current 
total number. If
+   * the check fails consecutively beyond a configured number for a job, then 
fail current job
+   * submission.
+   */
+  private def checkBarrierStageWithNumSlots(rdd: RDD[_]): Unit = {
+    if (rdd.isBarrier() && rdd.getNumPartitions > sc.maxNumConcurrentTasks) {
+      throw new BarrierJobSlotsNumberCheckFailed
     }
   }
 
@@ -412,6 +443,7 @@ class DAGScheduler(
       jobId: Int,
       callSite: CallSite): ResultStage = {
     checkBarrierStageWithDynamicAllocation(rdd)
+    checkBarrierStageWithNumSlots(rdd)
     checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
     val parents = getOrCreateParentStages(rdd, jobId)
     val id = nextStageId.getAndIncrement()
@@ -929,11 +961,38 @@ class DAGScheduler(
       // HadoopRDD whose underlying HDFS files have been deleted.
       finalStage = createResultStage(finalRDD, func, partitions, jobId, 
callSite)
     } catch {
+      case e: BarrierJobSlotsNumberCheckFailed =>
+        logWarning(s"The job $jobId requires to run a barrier stage that 
requires more slots " +
+          "than the total number of slots in the cluster currently.")
+        // If jobId doesn't exist in the map, Scala coverts its value null to 
0: Int automatically.
+        val numCheckFailures = 
barrierJobIdToNumTasksCheckFailures.compute(jobId,
+          new BiFunction[Int, Int, Int] {
+            override def apply(key: Int, value: Int): Int = value + 1
+          })
+        if (numCheckFailures <= maxFailureNumTasksCheck) {
+          messageScheduler.schedule(
+            new Runnable {
+              override def run(): Unit = 
eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,
+                partitions, callSite, listener, properties))
+            },
+            timeIntervalNumTasksCheck,
+            TimeUnit.SECONDS
+          )
+          return
+        } else {
+          // Job failed, clear internal data.
+          barrierJobIdToNumTasksCheckFailures.remove(jobId)
+          listener.jobFailed(e)
+          return
+        }
+
       case e: Exception =>
         logWarning("Creating new stage failed due to exception - job: " + 
jobId, e)
         listener.jobFailed(e)
         return
     }
+    // Job submitted, clear internal data.
+    barrierJobIdToNumTasksCheckFailures.remove(jobId)
 
     val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
     clearCacheLocs()
@@ -2011,19 +2070,4 @@ private[spark] object DAGScheduler {
 
   // Number of consecutive stage attempts allowed before a stage is aborted
   val DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS = 4
-
-  // Error message when running a barrier stage that have unsupported RDD 
chain pattern.
-  val ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN =
-    "[SPARK-24820][SPARK-24821]: Barrier execution mode does not allow the 
following pattern of " +
-      "RDD chain within a barrier stage:\n1. Ancestor RDDs that have different 
number of " +
-      "partitions from the resulting RDD (eg. 
union()/coalesce()/first()/take()/" +
-      "PartitionPruningRDD). A workaround for first()/take() can be 
barrierRdd.collect().head " +
-      "(scala) or barrierRdd.collect()[0] (python).\n" +
-      "2. An RDD that depends on multiple barrier RDDs (eg. 
barrierRdd1.zip(barrierRdd2))."
-
-  // Error message when running a barrier stage with dynamic resource 
allocation enabled.
-  val ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION =
-    "[SPARK-24942]: Barrier execution mode does not support dynamic resource 
allocation for " +
-      "now. You can disable dynamic resource allocation by setting Spark conf 
" +
-      "\"spark.dynamicAllocation.enabled\" to \"false\"."
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/bfb74394/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala 
b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
index 22db335..c187ee1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
@@ -69,4 +69,13 @@ private[spark] trait SchedulerBackend {
    */
   def getDriverLogUrls: Option[Map[String, String]] = None
 
+  /**
+   * Get the max number of tasks that can be concurrent launched currently.
+   * Note that please don't cache the value returned by this method, because 
the number can change
+   * due to add/remove executors.
+   *
+   * @return The max number of tasks that can be concurrent launched currently.
+   */
+  def maxNumConcurrentTasks(): Int
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/bfb74394/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 375aeb0..747e8c7 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -496,6 +496,12 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
     executorDataMap.keySet.toSeq
   }
 
+  override def maxNumConcurrentTasks(): Int = {
+    executorDataMap.values.map { executor =>
+      executor.totalCores / scheduler.CPUS_PER_TASK
+    }.sum
+  }
+
   /**
    * Request an additional number of executors from the cluster manager.
    * @return whether the request is acknowledged.

http://git-wip-us.apache.org/repos/asf/spark/blob/bfb74394/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
index cf8b0ff..0de57fb 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
@@ -156,6 +156,8 @@ private[spark] class LocalSchedulerBackend(
 
   override def applicationId(): String = appId
 
+  override def maxNumConcurrentTasks(): Int = totalCores / 
scheduler.CPUS_PER_TASK
+
   private def stop(finalState: SparkAppHandle.State): Unit = {
     localEndpoint.ask(StopExecutor)
     try {

http://git-wip-us.apache.org/repos/asf/spark/blob/bfb74394/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala 
b/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala
index 2f21e61..d49ab4a 100644
--- a/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala
@@ -21,6 +21,7 @@ import scala.concurrent.duration._
 import scala.language.postfixOps
 
 import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
+import org.apache.spark.scheduler.BarrierJobAllocationFailed._
 import org.apache.spark.scheduler.DAGScheduler
 import org.apache.spark.util.ThreadUtils
 
@@ -63,7 +64,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with 
LocalSparkContext
       .barrier()
       .mapPartitions(iter => iter)
     testSubmitJob(sc, rdd,
-      message = 
DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
+      message = ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
   }
 
   test("submit a barrier ShuffleMapStage that contains PartitionPruningRDD") {
@@ -75,7 +76,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with 
LocalSparkContext
       .repartition(2)
       .map(x => x + 1)
     testSubmitJob(sc, rdd,
-      message = 
DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
+      message = ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
   }
 
   test("submit a barrier stage that doesn't contain PartitionPruningRDD") {
@@ -96,7 +97,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with 
LocalSparkContext
       .barrier()
       .mapPartitions(iter => iter)
     testSubmitJob(sc, rdd, Some(Seq(1, 3)),
-      message = 
DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
+      message = ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
   }
 
   test("submit a barrier stage with union()") {
@@ -110,7 +111,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite 
with LocalSparkContext
       .map(x => x * 2)
     // Fail the job on submit because the barrier RDD (rdd1) may be not 
assigned Task 0.
     testSubmitJob(sc, rdd3,
-      message = 
DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
+      message = ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
   }
 
   test("submit a barrier stage with coalesce()") {
@@ -122,7 +123,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite 
with LocalSparkContext
     // Fail the job on submit because the barrier RDD requires to run on 4 
tasks, but the stage
     // only launches 1 task.
     testSubmitJob(sc, rdd,
-      message = 
DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
+      message = ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
   }
 
   test("submit a barrier stage that contains an RDD that depends on multiple 
barrier RDDs") {
@@ -137,7 +138,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite 
with LocalSparkContext
       .zip(rdd2)
       .map(x => x._1 + x._2)
     testSubmitJob(sc, rdd3,
-      message = 
DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
+      message = ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN)
   }
 
   test("submit a barrier stage with zip()") {
@@ -166,7 +167,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite 
with LocalSparkContext
       .barrier()
       .mapPartitions(iter => iter)
     testSubmitJob(sc, rdd,
-      message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION)
+      message = ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION)
   }
 
   test("submit a barrier ShuffleMapStage with dynamic resource allocation 
enabled") {
@@ -183,6 +184,80 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite 
with LocalSparkContext
       .repartition(2)
       .map(x => x + 1)
     testSubmitJob(sc, rdd,
-      message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION)
+      message = ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION)
+  }
+
+  test("submit a barrier ResultStage that requires more slots than current 
total under local " +
+      "mode") {
+    val conf = new SparkConf()
+      // Shorten the time interval between two failed checks to make the test 
fail faster.
+      .set("spark.scheduler.barrier.maxConcurrentTasksCheck.interval", "1s")
+      // Reduce max check failures allowed to make the test fail faster.
+      .set("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures", "3")
+      .setMaster("local[4]")
+      .setAppName("test")
+    sc = createSparkContext(Some(conf))
+    val rdd = sc.parallelize(1 to 10, 5)
+      .barrier()
+      .mapPartitions(iter => iter)
+    testSubmitJob(sc, rdd,
+      message = 
ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER)
+  }
+
+  test("submit a barrier ShuffleMapStage that requires more slots than current 
total under " +
+    "local mode") {
+    val conf = new SparkConf()
+      // Shorten the time interval between two failed checks to make the test 
fail faster.
+      .set("spark.scheduler.barrier.maxConcurrentTasksCheck.interval", "1s")
+      // Reduce max check failures allowed to make the test fail faster.
+      .set("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures", "3")
+      .setMaster("local[4]")
+      .setAppName("test")
+    sc = createSparkContext(Some(conf))
+    val rdd = sc.parallelize(1 to 10, 5)
+      .barrier()
+      .mapPartitions(iter => iter)
+      .repartition(2)
+      .map(x => x + 1)
+    testSubmitJob(sc, rdd,
+      message = 
ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER)
+  }
+
+  test("submit a barrier ResultStage that requires more slots than current 
total under " +
+    "local-cluster mode") {
+    val conf = new SparkConf()
+      .set("spark.task.cpus", "2")
+      // Shorten the time interval between two failed checks to make the test 
fail faster.
+      .set("spark.scheduler.barrier.maxConcurrentTasksCheck.interval", "1s")
+      // Reduce max check failures allowed to make the test fail faster.
+      .set("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures", "3")
+      .setMaster("local-cluster[4, 3, 1024]")
+      .setAppName("test")
+    sc = createSparkContext(Some(conf))
+    val rdd = sc.parallelize(1 to 10, 5)
+      .barrier()
+      .mapPartitions(iter => iter)
+    testSubmitJob(sc, rdd,
+      message = 
ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER)
+  }
+
+  test("submit a barrier ShuffleMapStage that requires more slots than current 
total under " +
+    "local-cluster mode") {
+    val conf = new SparkConf()
+      .set("spark.task.cpus", "2")
+      // Shorten the time interval between two failed checks to make the test 
fail faster.
+      .set("spark.scheduler.barrier.maxConcurrentTasksCheck.interval", "1s")
+      // Reduce max check failures allowed to make the test fail faster.
+      .set("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures", "3")
+      .setMaster("local-cluster[4, 3, 1024]")
+      .setAppName("test")
+    sc = createSparkContext(Some(conf))
+    val rdd = sc.parallelize(1 to 10, 5)
+      .barrier()
+      .mapPartitions(iter => iter)
+      .repartition(2)
+      .map(x => x + 1)
+    testSubmitJob(sc, rdd,
+      message = 
ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/bfb74394/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 3cfb0a9..659ebb6 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -1376,6 +1376,8 @@ private class DummyLocalSchedulerBackend (sc: 
SparkContext, sb: SchedulerBackend
 
   override def defaultParallelism(): Int = sb.defaultParallelism()
 
+  override def maxNumConcurrentTasks(): Int = sb.maxNumConcurrentTasks()
+
   override def killExecutorsOnHost(host: String): Boolean = {
     false
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/bfb74394/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index cb44110..e1666a3 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -654,6 +654,7 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
       .setMaster("local-cluster[3, 1, 1024]")
       .setAppName("test-cluster")
     sc = new SparkContext(conf)
+
     val rdd = sc.makeRDD(Seq(1, 2, 3, 4), 2)
     val rdd2 = rdd.barrier().mapPartitions { it =>
       val context = BarrierTaskContext.get()

http://git-wip-us.apache.org/repos/asf/spark/blob/bfb74394/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
index 04cccc6..80c9c6f 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
@@ -17,10 +17,18 @@
 
 package org.apache.spark.scheduler
 
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.concurrent.duration._
+
+import org.scalatest.concurrent.Eventually
+
 import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, 
SparkException, SparkFunSuite}
+import org.apache.spark.rdd.RDD
 import org.apache.spark.util.{RpcUtils, SerializableBuffer}
 
-class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with 
LocalSparkContext {
+class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with 
LocalSparkContext
+    with Eventually {
 
   test("serialized task larger than max RPC message size") {
     val conf = new SparkConf
@@ -38,4 +46,83 @@ class CoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite with LocalSparkCo
     assert(smaller.size === 4)
   }
 
+  test("compute max number of concurrent tasks can be launched") {
+    val conf = new SparkConf()
+      .setMaster("local-cluster[4, 3, 1024]")
+      .setAppName("test")
+    sc = new SparkContext(conf)
+    eventually(timeout(10.seconds)) {
+      // Ensure all executors have been launched.
+      assert(sc.getExecutorIds().length == 4)
+    }
+    assert(sc.maxNumConcurrentTasks() == 12)
+  }
+
+  test("compute max number of concurrent tasks can be launched when 
spark.task.cpus > 1") {
+    val conf = new SparkConf()
+      .set("spark.task.cpus", "2")
+      .setMaster("local-cluster[4, 3, 1024]")
+      .setAppName("test")
+    sc = new SparkContext(conf)
+    eventually(timeout(10.seconds)) {
+      // Ensure all executors have been launched.
+      assert(sc.getExecutorIds().length == 4)
+    }
+    // Each executor can only launch one task since `spark.task.cpus` is 2.
+    assert(sc.maxNumConcurrentTasks() == 4)
+  }
+
+  test("compute max number of concurrent tasks can be launched when some 
executors are busy") {
+    val conf = new SparkConf()
+      .set("spark.task.cpus", "2")
+      .setMaster("local-cluster[4, 3, 1024]")
+      .setAppName("test")
+    sc = new SparkContext(conf)
+    val rdd = sc.parallelize(1 to 10, 4).mapPartitions { iter =>
+      Thread.sleep(5000)
+      iter
+    }
+    var taskStarted = new AtomicBoolean(false)
+    var taskEnded = new AtomicBoolean(false)
+    val listener = new SparkListener() {
+      override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+        taskStarted.set(true)
+      }
+
+      override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
+        taskEnded.set(true)
+      }
+    }
+
+    try {
+      sc.addSparkListener(listener)
+      eventually(timeout(10.seconds)) {
+        // Ensure all executors have been launched.
+        assert(sc.getExecutorIds().length == 4)
+      }
+
+      // Submit a job to trigger some tasks on active executors.
+      testSubmitJob(sc, rdd)
+
+      eventually(timeout(10.seconds)) {
+        // Ensure some tasks have started and no task finished, so some 
executors must be busy.
+        assert(taskStarted.get() == true)
+        assert(taskEnded.get() == false)
+        // Assert we count in slots on both busy and free executors.
+        assert(sc.maxNumConcurrentTasks() == 4)
+      }
+    } finally {
+      sc.removeSparkListener(listener)
+    }
+  }
+
+  private def testSubmitJob(sc: SparkContext, rdd: RDD[Int]): Unit = {
+    sc.submitJob(
+      rdd,
+      (iter: Iterator[Int]) => iter.toArray,
+      0 until rdd.partitions.length,
+      { case (_, _) => return }: (Int, Array[Int]) => Unit,
+      { return }
+    )
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/bfb74394/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 3fbe636..6eeddbb 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -215,7 +215,7 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
   }
 
   private def init(testConf: SparkConf): Unit = {
-    sc = new SparkContext("local", "DAGSchedulerSuite", testConf)
+    sc = new SparkContext("local[2]", "DAGSchedulerSuite", testConf)
     sparkListener.submittedStageInfos.clear()
     sparkListener.successfulStages.clear()
     sparkListener.failedStages.clear()

http://git-wip-us.apache.org/repos/asf/spark/blob/bfb74394/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
index 02b19e0..b470591 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
@@ -69,6 +69,7 @@ private class DummySchedulerBackend extends SchedulerBackend {
   def stop() {}
   def reviveOffers() {}
   def defaultParallelism(): Int = 1
+  def maxNumConcurrentTasks(): Int = 0
 }
 
 private class DummyTaskScheduler extends TaskScheduler {

http://git-wip-us.apache.org/repos/asf/spark/blob/bfb74394/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index 75ea409..cea7f17 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -385,6 +385,8 @@ private[spark] abstract class MockBackend(
     }.toIndexedSeq
   }
 
+  override def maxNumConcurrentTasks(): Int = 0
+
   /**
    * This is called by the scheduler whenever it has tasks it would like to 
schedule, when a tasks
    * completes (which will be in a result-getter thread), and by the 
reviveOffers thread for delay

http://git-wip-us.apache.org/repos/asf/spark/blob/bfb74394/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 38e26a8..ca9bf08 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -36,6 +36,7 @@ class FakeSchedulerBackend extends SchedulerBackend {
   def stop() {}
   def reviveOffers() {}
   def defaultParallelism(): Int = 1
+  def maxNumConcurrentTasks(): Int = 0
 }
 
 class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with 
BeforeAndAfterEach

http://git-wip-us.apache.org/repos/asf/spark/blob/bfb74394/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index 71a70ff..0bb6fe0 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -453,4 +453,8 @@ private[spark] class MesosFineGrainedSchedulerBackend(
       super.applicationId
     }
 
+  override def maxNumConcurrentTasks(): Int = {
+    // TODO SPARK-25074 support this method for 
MesosFineGrainedSchedulerBackend
+    0
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to