[
https://issues.apache.org/jira/browse/SPARK-15385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hyukjin Kwon resolved SPARK-15385.
----------------------------------
Resolution: Incomplete
> Jobs never complete for ClusterManagers that don't implement killTask
> ---------------------------------------------------------------------
>
> Key: SPARK-15385
> URL: https://issues.apache.org/jira/browse/SPARK-15385
> Project: Spark
> Issue Type: Bug
> Components: Scheduler
> Affects Versions: 1.6.1
> Reporter: Imran Rashid
> Priority: Minor
> Labels: bulk-closed
>
> If a {{SchedulerBackend}} doesn't implement {{killTask}}, then when a job
> fails, if there are any tasks still running, the job never completes.
> This would be a major bug, except that all of the existing SchedulerBackend's
> implement {{killTask}}, so in practice this doesn't effect anyone. It does
> complicate the scheduler code, though, so it would be nice to simplify this
> one way or the other.
> The problem stems from [this code |
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1473
> ] in {{DAGScheduler}}:
> {code}
> if (runningStages.contains(stage)) {
> try { // cancelTasks will fail if a SchedulerBackend does not
> implement killTask
> taskScheduler.cancelTasks(stageId, shouldInterruptThread)
> markStageAsFinished(stage, Some(failureReason))
> } catch {
> case e: UnsupportedOperationException =>
> logInfo(s"Could not cancel tasks for stage $stageId", e)
> ableToCancelStages = false
> }
> }
> }
> }
> }
> if (ableToCancelStages) {
> job.listener.jobFailed(error)
> cleanupStateForJobAndIndependentStages(job)
> listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(),
> JobFailed(error)))
> }
> {code}
> it *appears* to handle the case where a backend doesn't support {{killTask}},
> but in fact if the backend doesn't support it, then we never end up doing any
> of the actions for the failed job, even after all the remaining running tasks
> come back.
> One fix would be to wait till we got the task finished events from the
> backend, and then send all the job failed events. But (a) this is a bit
> complex and (b) you never know you'll get all those task end events, eg. if
> there is some issue on the executor where those remaining tasks are running.
> The much simpler option: just run all the job completion logic whether or not
> those tasks have been cancelled. Even when the backend *does* support
> killing tasks, its async, and so all the job failed msgs get sent before we
> know the tasks have been killed. I don't see how its any worse in the case
> where the backend doesn't kill tasks at all. If others are OK with that, I'd
> propose that we make {{killTask}} an abstract method in {{SchedulerBackend}},
> to force backends to think about implementing it, but add a comment that
> implementations are free to make it a no-op.
> You can reproduce with this test (which leverages an open pr for SPARK-10372):
> {code}
> class BackendWithoutKillTaskIntegrationSuite
> extends SchedulerIntegrationSuite[NoKillMultiExecutorBackend] {
> testScheduler("job failure after 4 attempts when killTask is unsupported") {
> def runBackend(): Unit = {
> val task = backend.beginTask()
> val failure = new ExceptionFailure(new RuntimeException("test task
> failure"), Seq())
> backend.taskFailed(task, TaskState.FAILED, failure)
> }
> withBackend(runBackend _) {
> val nParts = backend.defaultParallelism()
> val jobFuture = submit(new MockRDD(sc, nParts, Nil), (0 until
> nParts).toArray)
> val duration = Duration(10, SECONDS)
> Await.ready(jobFuture, duration)
> failure.getMessage.contains("test task failure")
> }
> assert(results.isEmpty)
> assertDataStructuresEmpty(noFailure = false)
> }
> }
> class NoKillMultiExecutorBackend(
> conf: SparkConf,
> taskScheduler: TaskSchedulerImpl) extends MultiExecutorMockBackend(conf,
> taskScheduler) {
> override def killTask(taskId: Long, executorId: String, interruptThread:
> Boolean): Unit = {
> throw new UnsupportedOperationException()
> }
> }
> {code}
> in the logs you'll see something like:
> {noformat}
> 16/05/18 11:30:21 INFO scheduler.TaskSetManager: Lost task 19.3 in stage 0.0
> (TID 46) on executor host-1: java.lang.RuntimeException (test task failure)
> [duplicate 9]
> 16/05/18 11:30:21 INFO scheduler.DAGScheduler: Could not cancel tasks for
> stage 0
> java.lang.UnsupportedOperationException
> at
> org.apache.spark.scheduler.NoKillMultiExecutorBackend.killTask(SchedulerIntegrationSuite.scala:550)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$2$$anonfun$apply$3$$anonfun$apply$1.apply$mcVJ$sp(TaskSchedulerImpl.scala:219)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$2$$anonfun$apply$3$$anonfun$apply$1.apply(TaskSchedulerImpl.scala:217)
> ...
> <more task completion msgs from all the other tasks that are running>
> ...
> 16/05/18 11:30:21 INFO scheduler.SchedulerIntegrationSuite$$anon$1: Removed
> TaskSet 0.0, whose tasks have all completed, from pool
> 16/05/18 11:30:30 INFO scheduler.BackendWithoutKillTaskIntegrationSuite:
> ===== FINISHED o.a.s.scheduler.BackendWithoutKillTaskIntegrationSuite: 'job
> failure after 4 attempts when killTask is unsupported' =====
> [info] - job failure after 4 attempts when killTask is unsupported *** FAILED
> *** (10 seconds, 995 milliseconds)
> [info] java.util.concurrent.TimeoutException: Futures timed out after [10
> seconds]
> ...
> {noformat}
> Note that the taskset is removed after those remaining tasks complete, but
> despite waiting a whopping 10 seconds our job never realizes its done.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]