Repository: spark Updated Branches: refs/heads/master c32dbd6bd -> 92b48842b
[SPARK-24954][CORE] Fail fast on job submit if run a barrier stage with dynamic resource allocation enabled ## What changes were proposed in this pull request? We don't support run a barrier stage with dynamic resource allocation enabled, it shall lead to some confusing behaviors (eg. with dynamic resource allocation enabled, it may happen that we acquire some executors (but not enough to launch all the tasks in a barrier stage) and later release them due to executor idle time expire, and then acquire again). We perform the check on job submit and fail fast if running a barrier stage with dynamic resource allocation enabled. ## How was this patch tested? Added new test suite `BarrierStageOnSubmittedSuite` to cover all the fail fast cases that submitted a job containing one or more barrier stages. Author: Xingbo Jiang <xingbo.ji...@databricks.com> Closes #21915 from jiangxb1987/SPARK-24954. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/92b48842 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/92b48842 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/92b48842 Branch: refs/heads/master Commit: 92b48842b944a3e430472294cdc3c481bad6b804 Parents: c32dbd6 Author: Xingbo Jiang <xingbo.ji...@databricks.com> Authored: Fri Aug 3 09:36:56 2018 -0700 Committer: Xiangrui Meng <m...@databricks.com> Committed: Fri Aug 3 09:36:56 2018 -0700 ---------------------------------------------------------------------- .../apache/spark/scheduler/DAGScheduler.scala | 25 +++++++++ .../spark/BarrierStageOnSubmittedSuite.scala | 57 ++++++++++++++++---- 2 files changed, 71 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/92b48842/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 3dd0718..cf1fcbc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -364,6 +364,7 @@ class DAGScheduler( */ def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = { val rdd = shuffleDep.rdd + checkBarrierStageWithDynamicAllocation(rdd) checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions) val numTasks = rdd.partitions.length val parents = getOrCreateParentStages(rdd, jobId) @@ -385,6 +386,23 @@ class DAGScheduler( } /** + * We don't support run a barrier stage with dynamic resource allocation enabled, it shall lead + * to some confusing behaviors (eg. with dynamic resource allocation enabled, it may happen that + * we acquire some executors (but not enough to launch all the tasks in a barrier stage) and + * later release them due to executor idle time expire, and then acquire again). + * + * We perform the check on job submit and fail fast if running a barrier stage with dynamic + * resource allocation enabled. + * + * TODO SPARK-24942 Improve cluster resource management with jobs containing barrier stage + */ + private def checkBarrierStageWithDynamicAllocation(rdd: RDD[_]): Unit = { + if (rdd.isBarrier() && Utils.isDynamicAllocationEnabled(sc.getConf)) { + throw new SparkException(DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION) + } + } + + /** * Create a ResultStage associated with the provided jobId. */ private def createResultStage( @@ -393,6 +411,7 @@ class DAGScheduler( partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage = { + checkBarrierStageWithDynamicAllocation(rdd) checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size) val parents = getOrCreateParentStages(rdd, jobId) val id = nextStageId.getAndIncrement() @@ -2001,4 +2020,10 @@ private[spark] object DAGScheduler { "PartitionPruningRDD). A workaround for first()/take() can be barrierRdd.collect().head " + "(scala) or barrierRdd.collect()[0] (python).\n" + "2. An RDD that depends on multiple barrier RDDs (eg. barrierRdd1.zip(barrierRdd2))." + + // Error message when running a barrier stage with dynamic resource allocation enabled. + val ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION = + "[SPARK-24942]: Barrier execution mode does not support dynamic resource allocation for " + + "now. You can disable dynamic resource allocation by setting Spark conf " + + "\"spark.dynamicAllocation.enabled\" to \"false\"." } http://git-wip-us.apache.org/repos/asf/spark/blob/92b48842/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala b/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala index f2b3884..75e13a9 100644 --- a/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala +++ b/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala @@ -20,8 +20,6 @@ package org.apache.spark import scala.concurrent.duration._ import scala.language.postfixOps -import org.scalatest.BeforeAndAfterEach - import org.apache.spark.rdd.{PartitionPruningRDD, RDD} import org.apache.spark.scheduler.DAGScheduler import org.apache.spark.util.ThreadUtils @@ -30,16 +28,13 @@ import org.apache.spark.util.ThreadUtils * This test suite covers all the cases that shall fail fast on job submitted that contains one * of more barrier stages. */ -class BarrierStageOnSubmittedSuite extends SparkFunSuite with BeforeAndAfterEach - with LocalSparkContext { - - override def beforeEach(): Unit = { - super.beforeEach() +class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext { - val conf = new SparkConf() - .setMaster("local[4]") - .setAppName("test") - sc = new SparkContext(conf) + private def createSparkContext(conf: Option[SparkConf] = None): SparkContext = { + new SparkContext(conf.getOrElse( + new SparkConf() + .setMaster("local[4]") + .setAppName("test"))) } private def testSubmitJob( @@ -62,6 +57,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with BeforeAndAfterEach } test("submit a barrier ResultStage that contains PartitionPruningRDD") { + sc = createSparkContext() val prunedRdd = new PartitionPruningRDD(sc.parallelize(1 to 10, 4), index => index > 1) val rdd = prunedRdd .barrier() @@ -71,6 +67,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with BeforeAndAfterEach } test("submit a barrier ShuffleMapStage that contains PartitionPruningRDD") { + sc = createSparkContext() val prunedRdd = new PartitionPruningRDD(sc.parallelize(1 to 10, 4), index => index > 1) val rdd = prunedRdd .barrier() @@ -82,6 +79,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with BeforeAndAfterEach } test("submit a barrier stage that doesn't contain PartitionPruningRDD") { + sc = createSparkContext() val prunedRdd = new PartitionPruningRDD(sc.parallelize(1 to 10, 4), index => index > 1) val rdd = prunedRdd .repartition(2) @@ -93,6 +91,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with BeforeAndAfterEach } test("submit a barrier stage with partial partitions") { + sc = createSparkContext() val rdd = sc.parallelize(1 to 10, 4) .barrier() .mapPartitions((iter, context) => iter) @@ -101,6 +100,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with BeforeAndAfterEach } test("submit a barrier stage with union()") { + sc = createSparkContext() val rdd1 = sc.parallelize(1 to 10, 2) .barrier() .mapPartitions((iter, context) => iter) @@ -114,6 +114,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with BeforeAndAfterEach } test("submit a barrier stage with coalesce()") { + sc = createSparkContext() val rdd = sc.parallelize(1 to 10, 4) .barrier() .mapPartitions((iter, context) => iter) @@ -125,6 +126,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with BeforeAndAfterEach } test("submit a barrier stage that contains an RDD that depends on multiple barrier RDDs") { + sc = createSparkContext() val rdd1 = sc.parallelize(1 to 10, 4) .barrier() .mapPartitions((iter, context) => iter) @@ -139,6 +141,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with BeforeAndAfterEach } test("submit a barrier stage with zip()") { + sc = createSparkContext() val rdd1 = sc.parallelize(1 to 10, 4) .barrier() .mapPartitions((iter, context) => iter) @@ -150,4 +153,36 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with BeforeAndAfterEach val result = rdd3.collect().sorted assert(result === Seq(12, 14, 16, 18, 20, 22, 24, 26, 28, 30)) } + + test("submit a barrier ResultStage with dynamic resource allocation enabled") { + val conf = new SparkConf() + .set("spark.dynamicAllocation.enabled", "true") + .set("spark.dynamicAllocation.testing", "true") + .setMaster("local[4]") + .setAppName("test") + sc = createSparkContext(Some(conf)) + + val rdd = sc.parallelize(1 to 10, 4) + .barrier() + .mapPartitions((iter, context) => iter) + testSubmitJob(sc, rdd, + message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION) + } + + test("submit a barrier ShuffleMapStage with dynamic resource allocation enabled") { + val conf = new SparkConf() + .set("spark.dynamicAllocation.enabled", "true") + .set("spark.dynamicAllocation.testing", "true") + .setMaster("local[4]") + .setAppName("test") + sc = createSparkContext(Some(conf)) + + val rdd = sc.parallelize(1 to 10, 4) + .barrier() + .mapPartitions((iter, context) => iter) + .repartition(2) + .map(x => x + 1) + testSubmitJob(sc, rdd, + message = DAGScheduler.ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org