Repository: spark Updated Branches: refs/heads/master 4d8ae0d1c -> bfb74394a
[SPARK-24819][CORE] Fail fast when no enough slots to launch the barrier stage on job submitted ## What changes were proposed in this pull request? We shall check whether the barrier stage requires more slots (to be able to launch all tasks in the barrier stage together) than the total number of active slots currently, and fail fast if trying to submit a barrier stage that requires more slots than current total number. This PR proposes to add a new method `getNumSlots()` to try to get the total number of currently active slots in `SchedulerBackend`, support of this new method has been added to all the first-class scheduler backends except `MesosFineGrainedSchedulerBackend`. ## How was this patch tested? Added new test cases in `BarrierStageOnSubmittedSuite`. Closes #22001 from jiangxb1987/SPARK-24819. Lead-authored-by: Xingbo Jiang <[email protected]> Co-authored-by: Xiangrui Meng <[email protected]> Signed-off-by: Xiangrui Meng <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bfb74394 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bfb74394 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bfb74394 Branch: refs/heads/master Commit: bfb74394a5513134ea1da9fcf4a1783b77dd64e4 Parents: 4d8ae0d Author: Xingbo Jiang <[email protected]> Authored: Wed Aug 15 13:31:28 2018 -0700 Committer: Xiangrui Meng <[email protected]> Committed: Wed Aug 15 13:31:28 2018 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/SparkContext.scala | 9 ++ .../apache/spark/internal/config/package.scala | 27 ++++++ .../scheduler/BarrierJobAllocationFailed.scala | 62 +++++++++++++ .../apache/spark/scheduler/DAGScheduler.scala | 88 ++++++++++++++----- .../spark/scheduler/SchedulerBackend.scala | 9 ++ .../cluster/CoarseGrainedSchedulerBackend.scala | 6 ++ .../scheduler/local/LocalSchedulerBackend.scala | 2 + .../spark/BarrierStageOnSubmittedSuite.scala | 91 ++++++++++++++++++-- .../spark/ExecutorAllocationManagerSuite.scala | 2 + .../org/apache/spark/SparkContextSuite.scala | 1 + .../CoarseGrainedSchedulerBackendSuite.scala | 89 ++++++++++++++++++- .../spark/scheduler/DAGSchedulerSuite.scala | 2 +- .../scheduler/ExternalClusterManagerSuite.scala | 1 + .../scheduler/SchedulerIntegrationSuite.scala | 2 + .../scheduler/TaskSchedulerImplSuite.scala | 1 + .../MesosFineGrainedSchedulerBackend.scala | 4 + 16 files changed, 364 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/bfb74394/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index a7ffb35..e5b1e0e 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1603,6 +1603,15 @@ class SparkContext(config: SparkConf) extends Logging { } /** + * Get the max number of tasks that can be concurrent launched currently. + * Note that please don't cache the value returned by this method, because the number can change + * due to add/remove executors. + * + * @return The max number of tasks that can be concurrent launched currently. + */ + private[spark] def maxNumConcurrentTasks(): Int = schedulerBackend.maxNumConcurrentTasks() + + /** * Update the cluster manager on our scheduling needs. Three bits of information are included * to help it make decisions. * @param numExecutors The total number of executors we'd like to have. The cluster manager http://git-wip-us.apache.org/repos/asf/spark/blob/bfb74394/core/src/main/scala/org/apache/spark/internal/config/package.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index eb08628..a8aa691 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -577,4 +577,31 @@ package object config { .timeConf(TimeUnit.SECONDS) .checkValue(v => v > 0, "The value should be a positive time value.") .createWithDefaultString("365d") + + private[spark] val BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL = + ConfigBuilder("spark.scheduler.barrier.maxConcurrentTasksCheck.interval") + .doc("Time in seconds to wait between a max concurrent tasks check failure and the next " + + "check. A max concurrent tasks check ensures the cluster can launch more concurrent " + + "tasks than required by a barrier stage on job submitted. The check can fail in case " + + "a cluster has just started and not enough executors have registered, so we wait for a " + + "little while and try to perform the check again. If the check fails more than a " + + "configured max failure times for a job then fail current job submission. Note this " + + "config only applies to jobs that contain one or more barrier stages, we won't perform " + + "the check on non-barrier jobs.") + .timeConf(TimeUnit.SECONDS) + .createWithDefaultString("15s") + + private[spark] val BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES = + ConfigBuilder("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures") + .doc("Number of max concurrent tasks check failures allowed before fail a job submission. " + + "A max concurrent tasks check ensures the cluster can launch more concurrent tasks than " + + "required by a barrier stage on job submitted. The check can fail in case a cluster " + + "has just started and not enough executors have registered, so we wait for a little " + + "while and try to perform the check again. If the check fails more than a configured " + + "max failure times for a job then fail current job submission. Note this config only " + + "applies to jobs that contain one or more barrier stages, we won't perform the check on " + + "non-barrier jobs.") + .intConf + .checkValue(v => v > 0, "The max failures should be a positive value.") + .createWithDefault(40) } http://git-wip-us.apache.org/repos/asf/spark/blob/bfb74394/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala b/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala new file mode 100644 index 0000000..803a0a1 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.SparkException + +/** + * Exception thrown when submit a job with barrier stage(s) failing a required check. + */ +private[spark] class BarrierJobAllocationFailed(message: String) extends SparkException(message) + +private[spark] class BarrierJobUnsupportedRDDChainException + extends BarrierJobAllocationFailed( + BarrierJobAllocationFailed.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) + +private[spark] class BarrierJobRunWithDynamicAllocationException + extends BarrierJobAllocationFailed( + BarrierJobAllocationFailed.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION) + +private[spark] class BarrierJobSlotsNumberCheckFailed + extends BarrierJobAllocationFailed( + BarrierJobAllocationFailed.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER) + +private[spark] object BarrierJobAllocationFailed { + + // Error message when running a barrier stage that have unsupported RDD chain pattern. + val ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN = + "[SPARK-24820][SPARK-24821]: Barrier execution mode does not allow the following pattern of " + + "RDD chain within a barrier stage:\n1. Ancestor RDDs that have different number of " + + "partitions from the resulting RDD (eg. union()/coalesce()/first()/take()/" + + "PartitionPruningRDD). A workaround for first()/take() can be barrierRdd.collect().head " + + "(scala) or barrierRdd.collect()[0] (python).\n" + + "2. An RDD that depends on multiple barrier RDDs (eg. barrierRdd1.zip(barrierRdd2))." + + // Error message when running a barrier stage with dynamic resource allocation enabled. + val ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION = + "[SPARK-24942]: Barrier execution mode does not support dynamic resource allocation for " + + "now. You can disable dynamic resource allocation by setting Spark conf " + + "\"spark.dynamicAllocation.enabled\" to \"false\"." + + // Error message when running a barrier stage that requires more slots than current total number. + val ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER = + "[SPARK-24819]: Barrier execution mode does not allow run a barrier stage that requires " + + "more slots than the total number of slots in the cluster currently. Please init a new " + + "cluster with more CPU cores or repartition the input RDD(s) to reduce the number of " + + "slots required to run this barrier stage." +} http://git-wip-us.apache.org/repos/asf/spark/blob/bfb74394/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index cf1fcbc..2b0ca13 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -19,8 +19,9 @@ package org.apache.spark.scheduler import java.io.NotSerializableException import java.util.Properties -import java.util.concurrent.TimeUnit +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.concurrent.atomic.AtomicInteger +import java.util.function.BiFunction import scala.annotation.tailrec import scala.collection.Map @@ -39,7 +40,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config import org.apache.spark.network.util.JavaUtils import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} -import org.apache.spark.rdd.{PartitionPruningRDD, RDD, RDDCheckpointData} +import org.apache.spark.rdd.{RDD, RDDCheckpointData} import org.apache.spark.rpc.RpcTimeout import org.apache.spark.storage._ import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat @@ -111,8 +112,7 @@ import org.apache.spark.util._ * - When adding a new data structure, update `DAGSchedulerSuite.assertDataStructuresEmpty` to * include the new structure. This will help to catch memory leaks. */ -private[spark] -class DAGScheduler( +private[spark] class DAGScheduler( private[scheduler] val sc: SparkContext, private[scheduler] val taskScheduler: TaskScheduler, listenerBus: LiveListenerBus, @@ -203,6 +203,24 @@ class DAGScheduler( sc.getConf.getInt("spark.stage.maxConsecutiveAttempts", DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS) + /** + * Number of max concurrent tasks check failures for each barrier job. + */ + private[scheduler] val barrierJobIdToNumTasksCheckFailures = new ConcurrentHashMap[Int, Int] + + /** + * Time in seconds to wait between a max concurrent tasks check failure and the next check. + */ + private val timeIntervalNumTasksCheck = sc.getConf + .get(config.BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL) + + /** + * Max number of max concurrent tasks check failures allowed for a job before fail the job + * submission. + */ + private val maxFailureNumTasksCheck = sc.getConf + .get(config.BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES) + private val messageScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message") @@ -351,8 +369,7 @@ class DAGScheduler( val predicate: RDD[_] => Boolean = (r => r.getNumPartitions == numTasksInStage && r.dependencies.filter(_.rdd.isBarrier()).size <= 1) if (rdd.isBarrier() && !traverseParentRDDsWithinStage(rdd, predicate)) { - throw new SparkException( - DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) + throw new BarrierJobUnsupportedRDDChainException } } @@ -365,6 +382,7 @@ class DAGScheduler( def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = { val rdd = shuffleDep.rdd checkBarrierStageWithDynamicAllocation(rdd) + checkBarrierStageWithNumSlots(rdd) checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions) val numTasks = rdd.partitions.length val parents = getOrCreateParentStages(rdd, jobId) @@ -398,7 +416,20 @@ class DAGScheduler( */ private def checkBarrierStageWithDynamicAllocation(rdd: RDD[_]): Unit = { if (rdd.isBarrier() && Utils.isDynamicAllocationEnabled(sc.getConf)) { - throw new SparkException(DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION) + throw new BarrierJobRunWithDynamicAllocationException + } + } + + /** + * Check whether the barrier stage requires more slots (to be able to launch all tasks in the + * barrier stage together) than the total number of active slots currently. Fail current check + * if trying to submit a barrier stage that requires more slots than current total number. If + * the check fails consecutively beyond a configured number for a job, then fail current job + * submission. + */ + private def checkBarrierStageWithNumSlots(rdd: RDD[_]): Unit = { + if (rdd.isBarrier() && rdd.getNumPartitions > sc.maxNumConcurrentTasks) { + throw new BarrierJobSlotsNumberCheckFailed } } @@ -412,6 +443,7 @@ class DAGScheduler( jobId: Int, callSite: CallSite): ResultStage = { checkBarrierStageWithDynamicAllocation(rdd) + checkBarrierStageWithNumSlots(rdd) checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size) val parents = getOrCreateParentStages(rdd, jobId) val id = nextStageId.getAndIncrement() @@ -929,11 +961,38 @@ class DAGScheduler( // HadoopRDD whose underlying HDFS files have been deleted. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { + case e: BarrierJobSlotsNumberCheckFailed => + logWarning(s"The job $jobId requires to run a barrier stage that requires more slots " + + "than the total number of slots in the cluster currently.") + // If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically. + val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId, + new BiFunction[Int, Int, Int] { + override def apply(key: Int, value: Int): Int = value + 1 + }) + if (numCheckFailures <= maxFailureNumTasksCheck) { + messageScheduler.schedule( + new Runnable { + override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func, + partitions, callSite, listener, properties)) + }, + timeIntervalNumTasksCheck, + TimeUnit.SECONDS + ) + return + } else { + // Job failed, clear internal data. + barrierJobIdToNumTasksCheckFailures.remove(jobId) + listener.jobFailed(e) + return + } + case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return } + // Job submitted, clear internal data. + barrierJobIdToNumTasksCheckFailures.remove(jobId) val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) clearCacheLocs() @@ -2011,19 +2070,4 @@ private[spark] object DAGScheduler { // Number of consecutive stage attempts allowed before a stage is aborted val DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS = 4 - - // Error message when running a barrier stage that have unsupported RDD chain pattern. - val ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN = - "[SPARK-24820][SPARK-24821]: Barrier execution mode does not allow the following pattern of " + - "RDD chain within a barrier stage:\n1. Ancestor RDDs that have different number of " + - "partitions from the resulting RDD (eg. union()/coalesce()/first()/take()/" + - "PartitionPruningRDD). A workaround for first()/take() can be barrierRdd.collect().head " + - "(scala) or barrierRdd.collect()[0] (python).\n" + - "2. An RDD that depends on multiple barrier RDDs (eg. barrierRdd1.zip(barrierRdd2))." - - // Error message when running a barrier stage with dynamic resource allocation enabled. - val ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION = - "[SPARK-24942]: Barrier execution mode does not support dynamic resource allocation for " + - "now. You can disable dynamic resource allocation by setting Spark conf " + - "\"spark.dynamicAllocation.enabled\" to \"false\"." } http://git-wip-us.apache.org/repos/asf/spark/blob/bfb74394/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index 22db335..c187ee1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -69,4 +69,13 @@ private[spark] trait SchedulerBackend { */ def getDriverLogUrls: Option[Map[String, String]] = None + /** + * Get the max number of tasks that can be concurrent launched currently. + * Note that please don't cache the value returned by this method, because the number can change + * due to add/remove executors. + * + * @return The max number of tasks that can be concurrent launched currently. + */ + def maxNumConcurrentTasks(): Int + } http://git-wip-us.apache.org/repos/asf/spark/blob/bfb74394/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 375aeb0..747e8c7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -496,6 +496,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorDataMap.keySet.toSeq } + override def maxNumConcurrentTasks(): Int = { + executorDataMap.values.map { executor => + executor.totalCores / scheduler.CPUS_PER_TASK + }.sum + } + /** * Request an additional number of executors from the cluster manager. * @return whether the request is acknowledged. http://git-wip-us.apache.org/repos/asf/spark/blob/bfb74394/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala index cf8b0ff..0de57fb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala @@ -156,6 +156,8 @@ private[spark] class LocalSchedulerBackend( override def applicationId(): String = appId + override def maxNumConcurrentTasks(): Int = totalCores / scheduler.CPUS_PER_TASK + private def stop(finalState: SparkAppHandle.State): Unit = { localEndpoint.ask(StopExecutor) try { http://git-wip-us.apache.org/repos/asf/spark/blob/bfb74394/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala b/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala index 2f21e61..d49ab4a 100644 --- a/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala +++ b/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala @@ -21,6 +21,7 @@ import scala.concurrent.duration._ import scala.language.postfixOps import org.apache.spark.rdd.{PartitionPruningRDD, RDD} +import org.apache.spark.scheduler.BarrierJobAllocationFailed._ import org.apache.spark.scheduler.DAGScheduler import org.apache.spark.util.ThreadUtils @@ -63,7 +64,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext .barrier() .mapPartitions(iter => iter) testSubmitJob(sc, rdd, - message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) + message = ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) } test("submit a barrier ShuffleMapStage that contains PartitionPruningRDD") { @@ -75,7 +76,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext .repartition(2) .map(x => x + 1) testSubmitJob(sc, rdd, - message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) + message = ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) } test("submit a barrier stage that doesn't contain PartitionPruningRDD") { @@ -96,7 +97,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext .barrier() .mapPartitions(iter => iter) testSubmitJob(sc, rdd, Some(Seq(1, 3)), - message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) + message = ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) } test("submit a barrier stage with union()") { @@ -110,7 +111,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext .map(x => x * 2) // Fail the job on submit because the barrier RDD (rdd1) may be not assigned Task 0. testSubmitJob(sc, rdd3, - message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) + message = ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) } test("submit a barrier stage with coalesce()") { @@ -122,7 +123,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext // Fail the job on submit because the barrier RDD requires to run on 4 tasks, but the stage // only launches 1 task. testSubmitJob(sc, rdd, - message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) + message = ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) } test("submit a barrier stage that contains an RDD that depends on multiple barrier RDDs") { @@ -137,7 +138,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext .zip(rdd2) .map(x => x._1 + x._2) testSubmitJob(sc, rdd3, - message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) + message = ERROR_MESSAGE_RUN_BARRIER_WITH_UNSUPPORTED_RDD_CHAIN_PATTERN) } test("submit a barrier stage with zip()") { @@ -166,7 +167,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext .barrier() .mapPartitions(iter => iter) testSubmitJob(sc, rdd, - message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION) + message = ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION) } test("submit a barrier ShuffleMapStage with dynamic resource allocation enabled") { @@ -183,6 +184,80 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext .repartition(2) .map(x => x + 1) testSubmitJob(sc, rdd, - message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION) + message = ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION) + } + + test("submit a barrier ResultStage that requires more slots than current total under local " + + "mode") { + val conf = new SparkConf() + // Shorten the time interval between two failed checks to make the test fail faster. + .set("spark.scheduler.barrier.maxConcurrentTasksCheck.interval", "1s") + // Reduce max check failures allowed to make the test fail faster. + .set("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures", "3") + .setMaster("local[4]") + .setAppName("test") + sc = createSparkContext(Some(conf)) + val rdd = sc.parallelize(1 to 10, 5) + .barrier() + .mapPartitions(iter => iter) + testSubmitJob(sc, rdd, + message = ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER) + } + + test("submit a barrier ShuffleMapStage that requires more slots than current total under " + + "local mode") { + val conf = new SparkConf() + // Shorten the time interval between two failed checks to make the test fail faster. + .set("spark.scheduler.barrier.maxConcurrentTasksCheck.interval", "1s") + // Reduce max check failures allowed to make the test fail faster. + .set("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures", "3") + .setMaster("local[4]") + .setAppName("test") + sc = createSparkContext(Some(conf)) + val rdd = sc.parallelize(1 to 10, 5) + .barrier() + .mapPartitions(iter => iter) + .repartition(2) + .map(x => x + 1) + testSubmitJob(sc, rdd, + message = ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER) + } + + test("submit a barrier ResultStage that requires more slots than current total under " + + "local-cluster mode") { + val conf = new SparkConf() + .set("spark.task.cpus", "2") + // Shorten the time interval between two failed checks to make the test fail faster. + .set("spark.scheduler.barrier.maxConcurrentTasksCheck.interval", "1s") + // Reduce max check failures allowed to make the test fail faster. + .set("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures", "3") + .setMaster("local-cluster[4, 3, 1024]") + .setAppName("test") + sc = createSparkContext(Some(conf)) + val rdd = sc.parallelize(1 to 10, 5) + .barrier() + .mapPartitions(iter => iter) + testSubmitJob(sc, rdd, + message = ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER) + } + + test("submit a barrier ShuffleMapStage that requires more slots than current total under " + + "local-cluster mode") { + val conf = new SparkConf() + .set("spark.task.cpus", "2") + // Shorten the time interval between two failed checks to make the test fail faster. + .set("spark.scheduler.barrier.maxConcurrentTasksCheck.interval", "1s") + // Reduce max check failures allowed to make the test fail faster. + .set("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures", "3") + .setMaster("local-cluster[4, 3, 1024]") + .setAppName("test") + sc = createSparkContext(Some(conf)) + val rdd = sc.parallelize(1 to 10, 5) + .barrier() + .mapPartitions(iter => iter) + .repartition(2) + .map(x => x + 1) + testSubmitJob(sc, rdd, + message = ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER) } } http://git-wip-us.apache.org/repos/asf/spark/blob/bfb74394/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 3cfb0a9..659ebb6 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -1376,6 +1376,8 @@ private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend override def defaultParallelism(): Int = sb.defaultParallelism() + override def maxNumConcurrentTasks(): Int = sb.maxNumConcurrentTasks() + override def killExecutorsOnHost(host: String): Boolean = { false } http://git-wip-us.apache.org/repos/asf/spark/blob/bfb74394/core/src/test/scala/org/apache/spark/SparkContextSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index cb44110..e1666a3 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -654,6 +654,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu .setMaster("local-cluster[3, 1, 1024]") .setAppName("test-cluster") sc = new SparkContext(conf) + val rdd = sc.makeRDD(Seq(1, 2, 3, 4), 2) val rdd2 = rdd.barrier().mapPartitions { it => val context = BarrierTaskContext.get() http://git-wip-us.apache.org/repos/asf/spark/blob/bfb74394/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index 04cccc6..80c9c6f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -17,10 +17,18 @@ package org.apache.spark.scheduler +import java.util.concurrent.atomic.AtomicBoolean + +import scala.concurrent.duration._ + +import org.scalatest.concurrent.Eventually + import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite} +import org.apache.spark.rdd.RDD import org.apache.spark.util.{RpcUtils, SerializableBuffer} -class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext { +class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext + with Eventually { test("serialized task larger than max RPC message size") { val conf = new SparkConf @@ -38,4 +46,83 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo assert(smaller.size === 4) } + test("compute max number of concurrent tasks can be launched") { + val conf = new SparkConf() + .setMaster("local-cluster[4, 3, 1024]") + .setAppName("test") + sc = new SparkContext(conf) + eventually(timeout(10.seconds)) { + // Ensure all executors have been launched. + assert(sc.getExecutorIds().length == 4) + } + assert(sc.maxNumConcurrentTasks() == 12) + } + + test("compute max number of concurrent tasks can be launched when spark.task.cpus > 1") { + val conf = new SparkConf() + .set("spark.task.cpus", "2") + .setMaster("local-cluster[4, 3, 1024]") + .setAppName("test") + sc = new SparkContext(conf) + eventually(timeout(10.seconds)) { + // Ensure all executors have been launched. + assert(sc.getExecutorIds().length == 4) + } + // Each executor can only launch one task since `spark.task.cpus` is 2. + assert(sc.maxNumConcurrentTasks() == 4) + } + + test("compute max number of concurrent tasks can be launched when some executors are busy") { + val conf = new SparkConf() + .set("spark.task.cpus", "2") + .setMaster("local-cluster[4, 3, 1024]") + .setAppName("test") + sc = new SparkContext(conf) + val rdd = sc.parallelize(1 to 10, 4).mapPartitions { iter => + Thread.sleep(5000) + iter + } + var taskStarted = new AtomicBoolean(false) + var taskEnded = new AtomicBoolean(false) + val listener = new SparkListener() { + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { + taskStarted.set(true) + } + + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + taskEnded.set(true) + } + } + + try { + sc.addSparkListener(listener) + eventually(timeout(10.seconds)) { + // Ensure all executors have been launched. + assert(sc.getExecutorIds().length == 4) + } + + // Submit a job to trigger some tasks on active executors. + testSubmitJob(sc, rdd) + + eventually(timeout(10.seconds)) { + // Ensure some tasks have started and no task finished, so some executors must be busy. + assert(taskStarted.get() == true) + assert(taskEnded.get() == false) + // Assert we count in slots on both busy and free executors. + assert(sc.maxNumConcurrentTasks() == 4) + } + } finally { + sc.removeSparkListener(listener) + } + } + + private def testSubmitJob(sc: SparkContext, rdd: RDD[Int]): Unit = { + sc.submitJob( + rdd, + (iter: Iterator[Int]) => iter.toArray, + 0 until rdd.partitions.length, + { case (_, _) => return }: (Int, Array[Int]) => Unit, + { return } + ) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/bfb74394/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 3fbe636..6eeddbb 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -215,7 +215,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } private def init(testConf: SparkConf): Unit = { - sc = new SparkContext("local", "DAGSchedulerSuite", testConf) + sc = new SparkContext("local[2]", "DAGSchedulerSuite", testConf) sparkListener.submittedStageInfos.clear() sparkListener.successfulStages.clear() sparkListener.failedStages.clear() http://git-wip-us.apache.org/repos/asf/spark/blob/bfb74394/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index 02b19e0..b470591 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -69,6 +69,7 @@ private class DummySchedulerBackend extends SchedulerBackend { def stop() {} def reviveOffers() {} def defaultParallelism(): Int = 1 + def maxNumConcurrentTasks(): Int = 0 } private class DummyTaskScheduler extends TaskScheduler { http://git-wip-us.apache.org/repos/asf/spark/blob/bfb74394/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index 75ea409..cea7f17 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -385,6 +385,8 @@ private[spark] abstract class MockBackend( }.toIndexedSeq } + override def maxNumConcurrentTasks(): Int = 0 + /** * This is called by the scheduler whenever it has tasks it would like to schedule, when a tasks * completes (which will be in a result-getter thread), and by the reviveOffers thread for delay http://git-wip-us.apache.org/repos/asf/spark/blob/bfb74394/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 38e26a8..ca9bf08 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -36,6 +36,7 @@ class FakeSchedulerBackend extends SchedulerBackend { def stop() {} def reviveOffers() {} def defaultParallelism(): Int = 1 + def maxNumConcurrentTasks(): Int = 0 } class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfterEach http://git-wip-us.apache.org/repos/asf/spark/blob/bfb74394/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala index 71a70ff..0bb6fe0 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala @@ -453,4 +453,8 @@ private[spark] class MesosFineGrainedSchedulerBackend( super.applicationId } + override def maxNumConcurrentTasks(): Int = { + // TODO SPARK-25074 support this method for MesosFineGrainedSchedulerBackend + 0 + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
