[ 
https://issues.apache.org/jira/browse/SPARK-15385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Imran Rashid updated SPARK-15385:
---------------------------------
    Description: 
If a {{SchedulerBackend}} doesn't implement {{killTask}}, then when a job 
fails, if there are any tasks still running, the job never completes.

This would be a major bug, except that all of the existing SchedulerBackend's 
implement {{killTask}}, so in practice this doesn't effect anyone.  It does 
complicate the scheduler code, though, so it would be nice to simplify this one 
way or the other.

The problem stems from [this code | 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1473
 ] in {{DAGScheduler}}:

{code}
         if (runningStages.contains(stage)) {
            try { // cancelTasks will fail if a SchedulerBackend does not 
implement killTask
              taskScheduler.cancelTasks(stageId, shouldInterruptThread)
              markStageAsFinished(stage, Some(failureReason))
            } catch {
              case e: UnsupportedOperationException =>
                logInfo(s"Could not cancel tasks for stage $stageId", e)
              ableToCancelStages = false
            }
          }
        }
      }
    }

    if (ableToCancelStages) {
      job.listener.jobFailed(error)
      cleanupStateForJobAndIndependentStages(job)
      listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), 
JobFailed(error)))
    }
{code}

it *appears* to handle the case where a backend doesn't support {{killTask}}, 
but in fact if the backend doesn't support it, then we never end up doing any 
of the actions for the failed job, even after all the remaining running tasks 
come back.

One fix would be to wait till we got the task finished events from the backend, 
and then send all the job failed events.  But (a) this is a bit complex and (b) 
you never know you'll get all those task end events, eg. if there is some issue 
on the executor where those remaining tasks are running.

The much simpler option: just run all the job completion logic whether or not 
those tasks have been cancelled.  Even when the backend *does* support killing 
tasks, its async, and so all the job failed msgs get sent before we know the 
tasks have been killed.  I don't see how its any worse in the case where the 
backend doesn't kill tasks at all.  If others are OK with that, I'd propose 
that we make {{killTask}} an abstract method in {{SchedulerBackend}}, to force 
backends to think about implementing it, but add a comment that implementations 
are free to make it a no-op.

You can reproduce with this test (which leverages an open pr for SPARK-10372):

{code}
class BackendWithoutKillTaskIntegrationSuite
    extends SchedulerIntegrationSuite[NoKillMultiExecutorBackend] {
  testScheduler("job failure after 4 attempts when killTask is unsupported") {
    def runBackend(): Unit = {
      val task = backend.beginTask()
      val failure = new ExceptionFailure(new RuntimeException("test task 
failure"), Seq())
      backend.taskFailed(task, TaskState.FAILED, failure)
    }
    withBackend(runBackend _) {
      val nParts = backend.defaultParallelism()
      val jobFuture = submit(new MockRDD(sc, nParts, Nil), (0 until 
nParts).toArray)
      val duration = Duration(10, SECONDS)
      Await.ready(jobFuture, duration)
      failure.getMessage.contains("test task failure")
    }
    assert(results.isEmpty)
    assertDataStructuresEmpty(noFailure = false)
  }
}

class NoKillMultiExecutorBackend(
    conf: SparkConf,
    taskScheduler: TaskSchedulerImpl) extends MultiExecutorMockBackend(conf, 
taskScheduler) {

  override def killTask(taskId: Long, executorId: String, interruptThread: 
Boolean): Unit = {
    throw new UnsupportedOperationException()
  }
}
{code}

in the logs you'll see something like:

{noformat}
16/05/18 11:30:21 INFO scheduler.TaskSetManager: Lost task 19.3 in stage 0.0 
(TID 46) on executor host-1: java.lang.RuntimeException (test task failure) 
[duplicate 9]
16/05/18 11:30:21 INFO scheduler.DAGScheduler: Could not cancel tasks for stage 0
java.lang.UnsupportedOperationException
        at 
org.apache.spark.scheduler.NoKillMultiExecutorBackend.killTask(SchedulerIntegrationSuite.scala:550)
        at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$2$$anonfun$apply$3$$anonfun$apply$1.apply$mcVJ$sp(TaskSchedulerImpl.scala:219)
        at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$2$$anonfun$apply$3$$anonfun$apply$1.apply(TaskSchedulerImpl.scala:217)
...
<more task completion msgs from all the other tasks that are running>
...
16/05/18 11:30:21 INFO scheduler.SchedulerIntegrationSuite$$anon$1: Removed 
TaskSet 0.0, whose tasks have all completed, from pool
16/05/18 11:30:30 INFO scheduler.BackendWithoutKillTaskIntegrationSuite:

===== FINISHED o.a.s.scheduler.BackendWithoutKillTaskIntegrationSuite: 'job 
failure after 4 attempts when killTask is unsupported' =====

[info] - job failure after 4 attempts when killTask is unsupported *** FAILED 
*** (10 seconds, 995 milliseconds)
[info]   java.util.concurrent.TimeoutException: Futures timed out after [10 
seconds]
...
{noformat}

Note that the taskset is removed after those remaining tasks complete, but 
despite waiting a whopping 10 seconds our job never realizes its done.

  was:
If a {{SchedulerBackend}} doesn't implement {{killTask}}, then when a job 
fails, if there are any tasks still running, the job never completes.

This would be a major bug, except that all of the existing SchedulerBackend's 
implement {{killTask}}, so in practice this doesn't effect anyone.  It does 
complicate the scheduler code, though, so it would be nice to simplify this one 
way or the other.

The problem stems from [this code | ] in {{DAGScheduler}}:

{code}
         if (runningStages.contains(stage)) {
            try { // cancelTasks will fail if a SchedulerBackend does not 
implement killTask
              taskScheduler.cancelTasks(stageId, shouldInterruptThread)
              markStageAsFinished(stage, Some(failureReason))
            } catch {
              case e: UnsupportedOperationException =>
                logInfo(s"Could not cancel tasks for stage $stageId", e)
              ableToCancelStages = false
            }
          }
        }
      }
    }

    if (ableToCancelStages) {
      job.listener.jobFailed(error)
      cleanupStateForJobAndIndependentStages(job)
      listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), 
JobFailed(error)))
    }
{code}

it *appears* to handle the case where a backend doesn't support {{killTask}}, 
but in fact if the backend doesn't support it, then we never end up doing any 
of the actions for the failed job, even after all the remaining running tasks 
come back.

One fix would be to wait till we got the task finished events from the backend, 
and then send all the job failed events.  But (a) this is a bit complex and (b) 
you never know you'll get all those task end events, eg. if there is some issue 
on the executor where those remaining tasks are running.

The much simpler option: just run all the job completion logic whether or not 
those tasks have been cancelled.  Even when the backend *does* support killing 
tasks, its async, and so all the job failed msgs get sent before we know the 
tasks have been killed.  I don't see how its any worse in the case where the 
backend doesn't kill tasks at all.  If others are OK with that, I'd propose 
that we make {{killTask}} an abstract method in {{SchedulerBackend}}, to force 
backends to think about implementing it, but add a comment that implementations 
are free to make it a no-op.

You can reproduce with this test (which leverages an open pr for SPARK-10372):

{code}
class BackendWithoutKillTaskIntegrationSuite
    extends SchedulerIntegrationSuite[NoKillMultiExecutorBackend] {
  testScheduler("job failure after 4 attempts when killTask is unsupported") {
    def runBackend(): Unit = {
      val task = backend.beginTask()
      val failure = new ExceptionFailure(new RuntimeException("test task 
failure"), Seq())
      backend.taskFailed(task, TaskState.FAILED, failure)
    }
    withBackend(runBackend _) {
      val nParts = backend.defaultParallelism()
      val jobFuture = submit(new MockRDD(sc, nParts, Nil), (0 until 
nParts).toArray)
      val duration = Duration(10, SECONDS)
      Await.ready(jobFuture, duration)
      failure.getMessage.contains("test task failure")
    }
    assert(results.isEmpty)
    assertDataStructuresEmpty(noFailure = false)
  }
}

class NoKillMultiExecutorBackend(
    conf: SparkConf,
    taskScheduler: TaskSchedulerImpl) extends MultiExecutorMockBackend(conf, 
taskScheduler) {

  override def killTask(taskId: Long, executorId: String, interruptThread: 
Boolean): Unit = {
    throw new UnsupportedOperationException()
  }
}
{code}

in the logs you'll see something like:

{noformat}
16/05/18 11:30:21 INFO scheduler.TaskSetManager: Lost task 19.3 in stage 0.0 
(TID 46) on executor host-1: java.lang.RuntimeException (test task failure) 
[duplicate 9]
16/05/18 11:30:21 INFO scheduler.DAGScheduler: Could not cancel tasks for stage 0
java.lang.UnsupportedOperationException
        at 
org.apache.spark.scheduler.NoKillMultiExecutorBackend.killTask(SchedulerIntegrationSuite.scala:550)
        at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$2$$anonfun$apply$3$$anonfun$apply$1.apply$mcVJ$sp(TaskSchedulerImpl.scala:219)
        at 
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$2$$anonfun$apply$3$$anonfun$apply$1.apply(TaskSchedulerImpl.scala:217)
...
<more task completion msgs from all the other tasks that are running>
...
16/05/18 11:30:21 INFO scheduler.SchedulerIntegrationSuite$$anon$1: Removed 
TaskSet 0.0, whose tasks have all completed, from pool
16/05/18 11:30:30 INFO scheduler.BackendWithoutKillTaskIntegrationSuite:

===== FINISHED o.a.s.scheduler.BackendWithoutKillTaskIntegrationSuite: 'job 
failure after 4 attempts when killTask is unsupported' =====

[info] - job failure after 4 attempts when killTask is unsupported *** FAILED 
*** (10 seconds, 995 milliseconds)
[info]   java.util.concurrent.TimeoutException: Futures timed out after [10 
seconds]
...
{noformat}

Note that the taskset is removed after those remaining tasks complete, but 
despite waiting a whopping 10 seconds our job never realizes its done.


> Jobs never complete for ClusterManagers that don't implement killTask
> ---------------------------------------------------------------------
>
>                 Key: SPARK-15385
>                 URL: https://issues.apache.org/jira/browse/SPARK-15385
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler
>    Affects Versions: 1.6.1
>            Reporter: Imran Rashid
>            Priority: Minor
>
> If a {{SchedulerBackend}} doesn't implement {{killTask}}, then when a job 
> fails, if there are any tasks still running, the job never completes.
> This would be a major bug, except that all of the existing SchedulerBackend's 
> implement {{killTask}}, so in practice this doesn't effect anyone.  It does 
> complicate the scheduler code, though, so it would be nice to simplify this 
> one way or the other.
> The problem stems from [this code | 
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1473
>  ] in {{DAGScheduler}}:
> {code}
>          if (runningStages.contains(stage)) {
>             try { // cancelTasks will fail if a SchedulerBackend does not 
> implement killTask
>               taskScheduler.cancelTasks(stageId, shouldInterruptThread)
>               markStageAsFinished(stage, Some(failureReason))
>             } catch {
>               case e: UnsupportedOperationException =>
>                 logInfo(s"Could not cancel tasks for stage $stageId", e)
>               ableToCancelStages = false
>             }
>           }
>         }
>       }
>     }
>     if (ableToCancelStages) {
>       job.listener.jobFailed(error)
>       cleanupStateForJobAndIndependentStages(job)
>       listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), 
> JobFailed(error)))
>     }
> {code}
> it *appears* to handle the case where a backend doesn't support {{killTask}}, 
> but in fact if the backend doesn't support it, then we never end up doing any 
> of the actions for the failed job, even after all the remaining running tasks 
> come back.
> One fix would be to wait till we got the task finished events from the 
> backend, and then send all the job failed events.  But (a) this is a bit 
> complex and (b) you never know you'll get all those task end events, eg. if 
> there is some issue on the executor where those remaining tasks are running.
> The much simpler option: just run all the job completion logic whether or not 
> those tasks have been cancelled.  Even when the backend *does* support 
> killing tasks, its async, and so all the job failed msgs get sent before we 
> know the tasks have been killed.  I don't see how its any worse in the case 
> where the backend doesn't kill tasks at all.  If others are OK with that, I'd 
> propose that we make {{killTask}} an abstract method in {{SchedulerBackend}}, 
> to force backends to think about implementing it, but add a comment that 
> implementations are free to make it a no-op.
> You can reproduce with this test (which leverages an open pr for SPARK-10372):
> {code}
> class BackendWithoutKillTaskIntegrationSuite
>     extends SchedulerIntegrationSuite[NoKillMultiExecutorBackend] {
>   testScheduler("job failure after 4 attempts when killTask is unsupported") {
>     def runBackend(): Unit = {
>       val task = backend.beginTask()
>       val failure = new ExceptionFailure(new RuntimeException("test task 
> failure"), Seq())
>       backend.taskFailed(task, TaskState.FAILED, failure)
>     }
>     withBackend(runBackend _) {
>       val nParts = backend.defaultParallelism()
>       val jobFuture = submit(new MockRDD(sc, nParts, Nil), (0 until 
> nParts).toArray)
>       val duration = Duration(10, SECONDS)
>       Await.ready(jobFuture, duration)
>       failure.getMessage.contains("test task failure")
>     }
>     assert(results.isEmpty)
>     assertDataStructuresEmpty(noFailure = false)
>   }
> }
> class NoKillMultiExecutorBackend(
>     conf: SparkConf,
>     taskScheduler: TaskSchedulerImpl) extends MultiExecutorMockBackend(conf, 
> taskScheduler) {
>   override def killTask(taskId: Long, executorId: String, interruptThread: 
> Boolean): Unit = {
>     throw new UnsupportedOperationException()
>   }
> }
> {code}
> in the logs you'll see something like:
> {noformat}
> 16/05/18 11:30:21 INFO scheduler.TaskSetManager: Lost task 19.3 in stage 0.0 
> (TID 46) on executor host-1: java.lang.RuntimeException (test task failure) 
> [duplicate 9]
> 16/05/18 11:30:21 INFO scheduler.DAGScheduler: Could not cancel tasks for 
> stage 0
> java.lang.UnsupportedOperationException
>         at 
> org.apache.spark.scheduler.NoKillMultiExecutorBackend.killTask(SchedulerIntegrationSuite.scala:550)
>         at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$2$$anonfun$apply$3$$anonfun$apply$1.apply$mcVJ$sp(TaskSchedulerImpl.scala:219)
>         at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$2$$anonfun$apply$3$$anonfun$apply$1.apply(TaskSchedulerImpl.scala:217)
> ...
> <more task completion msgs from all the other tasks that are running>
> ...
> 16/05/18 11:30:21 INFO scheduler.SchedulerIntegrationSuite$$anon$1: Removed 
> TaskSet 0.0, whose tasks have all completed, from pool
> 16/05/18 11:30:30 INFO scheduler.BackendWithoutKillTaskIntegrationSuite:
> ===== FINISHED o.a.s.scheduler.BackendWithoutKillTaskIntegrationSuite: 'job 
> failure after 4 attempts when killTask is unsupported' =====
> [info] - job failure after 4 attempts when killTask is unsupported *** FAILED 
> *** (10 seconds, 995 milliseconds)
> [info]   java.util.concurrent.TimeoutException: Futures timed out after [10 
> seconds]
> ...
> {noformat}
> Note that the taskset is removed after those remaining tasks complete, but 
> despite waiting a whopping 10 seconds our job never realizes its done.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to