Repository: spark Updated Branches: refs/heads/master 3a43ae7c0 -> d785217b7
[SPARK-19549] Allow providing reason for stage/job cancelling ## What changes were proposed in this pull request? This change add an optional argument to `SparkContext.cancelStage()` and `SparkContext.cancelJob()` functions, which allows the caller to provide exact reason for the cancellation. ## How was this patch tested? Adds unit test. Author: Ala Luszczak <a...@databricks.com> Closes #16887 from ala/cancel. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d785217b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d785217b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d785217b Branch: refs/heads/master Commit: d785217b791882e075ad537852d49d78fc1ca31b Parents: 3a43ae7 Author: Ala Luszczak <a...@databricks.com> Authored: Fri Feb 10 21:10:02 2017 +0100 Committer: Reynold Xin <r...@databricks.com> Committed: Fri Feb 10 21:10:02 2017 +0100 ---------------------------------------------------------------------- .../scala/org/apache/spark/SparkContext.scala | 30 +++++++-- .../apache/spark/scheduler/DAGScheduler.scala | 35 ++++++---- .../spark/scheduler/DAGSchedulerEvent.scala | 10 ++- .../org/apache/spark/scheduler/JobWaiter.scala | 2 +- .../org/apache/spark/SparkContextSuite.scala | 69 +++++++++++++++++++- .../spark/scheduler/DAGSchedulerSuite.scala | 2 +- .../apache/spark/sql/DataFrameRangeSuite.scala | 19 ++++-- 7 files changed, 138 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d785217b/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index eb13686..cbab7b8 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2207,10 +2207,32 @@ class SparkContext(config: SparkConf) extends Logging { * Cancel a given job if it's scheduled or running. * * @param jobId the job ID to cancel + * @param reason optional reason for cancellation * @note Throws `InterruptedException` if the cancel message cannot be sent */ - def cancelJob(jobId: Int) { - dagScheduler.cancelJob(jobId) + def cancelJob(jobId: Int, reason: String): Unit = { + dagScheduler.cancelJob(jobId, Option(reason)) + } + + /** + * Cancel a given job if it's scheduled or running. + * + * @param jobId the job ID to cancel + * @note Throws `InterruptedException` if the cancel message cannot be sent + */ + def cancelJob(jobId: Int): Unit = { + dagScheduler.cancelJob(jobId, None) + } + + /** + * Cancel a given stage and all jobs associated with it. + * + * @param stageId the stage ID to cancel + * @param reason reason for cancellation + * @note Throws `InterruptedException` if the cancel message cannot be sent + */ + def cancelStage(stageId: Int, reason: String): Unit = { + dagScheduler.cancelStage(stageId, Option(reason)) } /** @@ -2219,8 +2241,8 @@ class SparkContext(config: SparkConf) extends Logging { * @param stageId the stage ID to cancel * @note Throws `InterruptedException` if the cancel message cannot be sent */ - def cancelStage(stageId: Int) { - dagScheduler.cancelStage(stageId) + def cancelStage(stageId: Int): Unit = { + dagScheduler.cancelStage(stageId, None) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/d785217b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 6177baf..b9d7e13 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -696,9 +696,9 @@ class DAGScheduler( /** * Cancel a job that is running or waiting in the queue. */ - def cancelJob(jobId: Int): Unit = { + def cancelJob(jobId: Int, reason: Option[String]): Unit = { logInfo("Asked to cancel job " + jobId) - eventProcessLoop.post(JobCancelled(jobId)) + eventProcessLoop.post(JobCancelled(jobId, reason)) } /** @@ -719,7 +719,7 @@ class DAGScheduler( private[scheduler] def doCancelAllJobs() { // Cancel all running jobs. runningStages.map(_.firstJobId).foreach(handleJobCancellation(_, - reason = "as part of cancellation of all jobs")) + Option("as part of cancellation of all jobs"))) activeJobs.clear() // These should already be empty by this point, jobIdToActiveJob.clear() // but just in case we lost track of some jobs... } @@ -727,8 +727,8 @@ class DAGScheduler( /** * Cancel all jobs associated with a running or scheduled stage. */ - def cancelStage(stageId: Int) { - eventProcessLoop.post(StageCancelled(stageId)) + def cancelStage(stageId: Int, reason: Option[String]) { + eventProcessLoop.post(StageCancelled(stageId, reason)) } /** @@ -785,7 +785,8 @@ class DAGScheduler( } } val jobIds = activeInGroup.map(_.jobId) - jobIds.foreach(handleJobCancellation(_, "part of cancelled job group %s".format(groupId))) + jobIds.foreach(handleJobCancellation(_, + Option("part of cancelled job group %s".format(groupId)))) } private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) { @@ -1377,24 +1378,30 @@ class DAGScheduler( } } - private[scheduler] def handleStageCancellation(stageId: Int) { + private[scheduler] def handleStageCancellation(stageId: Int, reason: Option[String]) { stageIdToStage.get(stageId) match { case Some(stage) => val jobsThatUseStage: Array[Int] = stage.jobIds.toArray jobsThatUseStage.foreach { jobId => - handleJobCancellation(jobId, s"because Stage $stageId was cancelled") + val reasonStr = reason match { + case Some(originalReason) => + s"because $originalReason" + case None => + s"because Stage $stageId was cancelled" + } + handleJobCancellation(jobId, Option(reasonStr)) } case None => logInfo("No active jobs to kill for Stage " + stageId) } } - private[scheduler] def handleJobCancellation(jobId: Int, reason: String = "") { + private[scheduler] def handleJobCancellation(jobId: Int, reason: Option[String]) { if (!jobIdToStageIds.contains(jobId)) { logDebug("Trying to cancel unregistered job " + jobId) } else { failJobAndIndependentStages( - jobIdToActiveJob(jobId), "Job %d cancelled %s".format(jobId, reason)) + jobIdToActiveJob(jobId), "Job %d cancelled %s".format(jobId, reason.getOrElse(""))) } } @@ -1636,11 +1643,11 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler case MapStageSubmitted(jobId, dependency, callSite, listener, properties) => dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties) - case StageCancelled(stageId) => - dagScheduler.handleStageCancellation(stageId) + case StageCancelled(stageId, reason) => + dagScheduler.handleStageCancellation(stageId, reason) - case JobCancelled(jobId) => - dagScheduler.handleJobCancellation(jobId) + case JobCancelled(jobId, reason) => + dagScheduler.handleJobCancellation(jobId, reason) case JobGroupCancelled(groupId) => dagScheduler.handleJobGroupCancelled(groupId) http://git-wip-us.apache.org/repos/asf/spark/blob/d785217b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index 03781a2..cda0585 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -53,9 +53,15 @@ private[scheduler] case class MapStageSubmitted( properties: Properties = null) extends DAGSchedulerEvent -private[scheduler] case class StageCancelled(stageId: Int) extends DAGSchedulerEvent +private[scheduler] case class StageCancelled( + stageId: Int, + reason: Option[String]) + extends DAGSchedulerEvent -private[scheduler] case class JobCancelled(jobId: Int) extends DAGSchedulerEvent +private[scheduler] case class JobCancelled( + jobId: Int, + reason: Option[String]) + extends DAGSchedulerEvent private[scheduler] case class JobGroupCancelled(groupId: String) extends DAGSchedulerEvent http://git-wip-us.apache.org/repos/asf/spark/blob/d785217b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala index 9012289..65d7184 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala @@ -50,7 +50,7 @@ private[spark] class JobWaiter[T]( * will fail this job with a SparkException. */ def cancel() { - dagScheduler.cancelJob(jobId) + dagScheduler.cancelJob(jobId, None) } override def taskSucceeded(index: Int, result: Any): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/d785217b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 8ae5d2f..5a41e1c 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -22,19 +22,21 @@ import java.net.MalformedURLException import java.nio.charset.StandardCharsets import java.util.concurrent.TimeUnit +import scala.concurrent.duration._ import scala.concurrent.Await -import scala.concurrent.duration.Duration import com.google.common.io.Files import org.apache.hadoop.io.{BytesWritable, LongWritable, Text} import org.apache.hadoop.mapred.TextInputFormat import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFormat} +import org.scalatest.concurrent.Eventually import org.scalatest.Matchers._ -import org.apache.spark.scheduler.SparkListener +import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart, SparkListenerTaskStart} import org.apache.spark.util.Utils -class SparkContextSuite extends SparkFunSuite with LocalSparkContext { + +class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventually { test("Only one SparkContext may be active at a time") { // Regression test for SPARK-4180 @@ -465,4 +467,65 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { assert(!sc.listenerBus.listeners.contains(sparkListener1)) assert(sc.listenerBus.listeners.contains(sparkListener2)) } + + test("Cancelling stages/jobs with custom reasons.") { + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) + val REASON = "You shall not pass" + + val listener = new SparkListener { + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { + if (SparkContextSuite.cancelStage) { + eventually(timeout(10.seconds)) { + assert(SparkContextSuite.isTaskStarted) + } + sc.cancelStage(taskStart.stageId, REASON) + SparkContextSuite.cancelStage = false + } + } + + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + if (SparkContextSuite.cancelJob) { + eventually(timeout(10.seconds)) { + assert(SparkContextSuite.isTaskStarted) + } + sc.cancelJob(jobStart.jobId, REASON) + SparkContextSuite.cancelJob = false + } + } + } + sc.addSparkListener(listener) + + for (cancelWhat <- Seq("stage", "job")) { + SparkContextSuite.isTaskStarted = false + SparkContextSuite.cancelStage = (cancelWhat == "stage") + SparkContextSuite.cancelJob = (cancelWhat == "job") + + val ex = intercept[SparkException] { + sc.range(0, 10000L).mapPartitions { x => + org.apache.spark.SparkContextSuite.isTaskStarted = true + x + }.cartesian(sc.range(0, 10L))count() + } + + ex.getCause() match { + case null => + assert(ex.getMessage().contains(REASON)) + case cause: SparkException => + assert(cause.getMessage().contains(REASON)) + case cause: Throwable => + fail("Expected the cause to be SparkException, got " + cause.toString() + " instead.") + } + + eventually(timeout(20.seconds)) { + assert(sc.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0) + } + } + } + +} + +object SparkContextSuite { + @volatile var cancelJob = false + @volatile var cancelStage = false + @volatile var isTaskStarted = false } http://git-wip-us.apache.org/repos/asf/spark/blob/d785217b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index f3d3f70..4e5f267 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -329,7 +329,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou /** Sends JobCancelled to the DAG scheduler. */ private def cancel(jobId: Int) { - runEvent(JobCancelled(jobId)) + runEvent(JobCancelled(jobId, None)) } test("[SPARK-3353] parent stage should have lower stage id") { http://git-wip-us.apache.org/repos/asf/spark/blob/d785217b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala index 3ebfd9a..03bf2d7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.test.SharedSQLContext class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventually { + import testImplicits._ test("SPARK-7150 range api") { // numSlice is greater than length @@ -137,7 +138,9 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall test("Cancelling stage in a query with Range.") { val listener = new SparkListener { override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { - Thread.sleep(100) + eventually(timeout(10.seconds)) { + assert(DataFrameRangeSuite.isTaskStarted) + } sparkContext.cancelStage(taskStart.stageId) } } @@ -145,9 +148,12 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall sparkContext.addSparkListener(listener) for (codegen <- Seq(true, false)) { withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) { + DataFrameRangeSuite.isTaskStarted = false val ex = intercept[SparkException] { - spark.range(100000L).crossJoin(spark.range(100000L)) - .toDF("a", "b").agg(sum("a"), sum("b")).collect() + spark.range(100000L).mapPartitions { x => + DataFrameRangeSuite.isTaskStarted = true + x + }.crossJoin(spark.range(100L)).toDF("a", "b").agg(sum("a"), sum("b")).collect() } ex.getCause() match { case null => @@ -155,7 +161,7 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall case cause: SparkException => assert(cause.getMessage().contains("cancelled")) case cause: Throwable => - fail("Expected the casue to be SparkException, got " + cause.toString() + " instead.") + fail("Expected the cause to be SparkException, got " + cause.toString() + " instead.") } } eventually(timeout(20.seconds)) { @@ -164,3 +170,8 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall } } } + +object DataFrameRangeSuite { + @volatile var isTaskStarted = false +} + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org