[ 
https://issues.apache.org/jira/browse/SPARK-15385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-15385.
----------------------------------
    Resolution: Incomplete

> Jobs never complete for ClusterManagers that don't implement killTask
> ---------------------------------------------------------------------
>
>                 Key: SPARK-15385
>                 URL: https://issues.apache.org/jira/browse/SPARK-15385
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler
>    Affects Versions: 1.6.1
>            Reporter: Imran Rashid
>            Priority: Minor
>              Labels: bulk-closed
>
> If a {{SchedulerBackend}} doesn't implement {{killTask}}, then when a job 
> fails, if there are any tasks still running, the job never completes.
> This would be a major bug, except that all of the existing SchedulerBackend's 
> implement {{killTask}}, so in practice this doesn't effect anyone.  It does 
> complicate the scheduler code, though, so it would be nice to simplify this 
> one way or the other.
> The problem stems from [this code | 
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1473
>  ] in {{DAGScheduler}}:
> {code}
>          if (runningStages.contains(stage)) {
>             try { // cancelTasks will fail if a SchedulerBackend does not 
> implement killTask
>               taskScheduler.cancelTasks(stageId, shouldInterruptThread)
>               markStageAsFinished(stage, Some(failureReason))
>             } catch {
>               case e: UnsupportedOperationException =>
>                 logInfo(s"Could not cancel tasks for stage $stageId", e)
>               ableToCancelStages = false
>             }
>           }
>         }
>       }
>     }
>     if (ableToCancelStages) {
>       job.listener.jobFailed(error)
>       cleanupStateForJobAndIndependentStages(job)
>       listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), 
> JobFailed(error)))
>     }
> {code}
> it *appears* to handle the case where a backend doesn't support {{killTask}}, 
> but in fact if the backend doesn't support it, then we never end up doing any 
> of the actions for the failed job, even after all the remaining running tasks 
> come back.
> One fix would be to wait till we got the task finished events from the 
> backend, and then send all the job failed events.  But (a) this is a bit 
> complex and (b) you never know you'll get all those task end events, eg. if 
> there is some issue on the executor where those remaining tasks are running.
> The much simpler option: just run all the job completion logic whether or not 
> those tasks have been cancelled.  Even when the backend *does* support 
> killing tasks, its async, and so all the job failed msgs get sent before we 
> know the tasks have been killed.  I don't see how its any worse in the case 
> where the backend doesn't kill tasks at all.  If others are OK with that, I'd 
> propose that we make {{killTask}} an abstract method in {{SchedulerBackend}}, 
> to force backends to think about implementing it, but add a comment that 
> implementations are free to make it a no-op.
> You can reproduce with this test (which leverages an open pr for SPARK-10372):
> {code}
> class BackendWithoutKillTaskIntegrationSuite
>     extends SchedulerIntegrationSuite[NoKillMultiExecutorBackend] {
>   testScheduler("job failure after 4 attempts when killTask is unsupported") {
>     def runBackend(): Unit = {
>       val task = backend.beginTask()
>       val failure = new ExceptionFailure(new RuntimeException("test task 
> failure"), Seq())
>       backend.taskFailed(task, TaskState.FAILED, failure)
>     }
>     withBackend(runBackend _) {
>       val nParts = backend.defaultParallelism()
>       val jobFuture = submit(new MockRDD(sc, nParts, Nil), (0 until 
> nParts).toArray)
>       val duration = Duration(10, SECONDS)
>       Await.ready(jobFuture, duration)
>       failure.getMessage.contains("test task failure")
>     }
>     assert(results.isEmpty)
>     assertDataStructuresEmpty(noFailure = false)
>   }
> }
> class NoKillMultiExecutorBackend(
>     conf: SparkConf,
>     taskScheduler: TaskSchedulerImpl) extends MultiExecutorMockBackend(conf, 
> taskScheduler) {
>   override def killTask(taskId: Long, executorId: String, interruptThread: 
> Boolean): Unit = {
>     throw new UnsupportedOperationException()
>   }
> }
> {code}
> in the logs you'll see something like:
> {noformat}
> 16/05/18 11:30:21 INFO scheduler.TaskSetManager: Lost task 19.3 in stage 0.0 
> (TID 46) on executor host-1: java.lang.RuntimeException (test task failure) 
> [duplicate 9]
> 16/05/18 11:30:21 INFO scheduler.DAGScheduler: Could not cancel tasks for 
> stage 0
> java.lang.UnsupportedOperationException
>         at 
> org.apache.spark.scheduler.NoKillMultiExecutorBackend.killTask(SchedulerIntegrationSuite.scala:550)
>         at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$2$$anonfun$apply$3$$anonfun$apply$1.apply$mcVJ$sp(TaskSchedulerImpl.scala:219)
>         at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$2$$anonfun$apply$3$$anonfun$apply$1.apply(TaskSchedulerImpl.scala:217)
> ...
> <more task completion msgs from all the other tasks that are running>
> ...
> 16/05/18 11:30:21 INFO scheduler.SchedulerIntegrationSuite$$anon$1: Removed 
> TaskSet 0.0, whose tasks have all completed, from pool
> 16/05/18 11:30:30 INFO scheduler.BackendWithoutKillTaskIntegrationSuite:
> ===== FINISHED o.a.s.scheduler.BackendWithoutKillTaskIntegrationSuite: 'job 
> failure after 4 attempts when killTask is unsupported' =====
> [info] - job failure after 4 attempts when killTask is unsupported *** FAILED 
> *** (10 seconds, 995 milliseconds)
> [info]   java.util.concurrent.TimeoutException: Futures timed out after [10 
> seconds]
> ...
> {noformat}
> Note that the taskset is removed after those remaining tasks complete, but 
> despite waiting a whopping 10 seconds our job never realizes its done.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to