Imran Rashid created SPARK-23365:
------------------------------------

             Summary: DynamicAllocation with failure in straggler task can lead 
to a hung spark job
                 Key: SPARK-23365
                 URL: https://issues.apache.org/jira/browse/SPARK-23365
             Project: Spark
          Issue Type: Bug
          Components: Scheduler
    Affects Versions: 2.2.1, 2.1.2, 2.3.0
            Reporter: Imran Rashid


Dynamic Allocation can lead to a spark app getting stuck with 0 executors 
requested when the executors in the last tasks of a taskset fail (eg. with an 
OOM).

This happens when {{ExecutorAllocationManager}} s internal target number of 
executors gets out of sync with {{CoarseGrainedSchedulerBackend}} s target 
number.  {{EAM}} updates the {{CGSB}} in two ways: (1) it tracks how many tasks 
are active or pending in submitted stages, and computes how many executors 
would be needed for them.  And as tasks finish, it will actively decrease that 
count, informing the {{CGSB}} along the way.  (2) When it decides executors are 
inactive for long enough, then it requests that {{CGSB}} kill the executors -- 
this also tells the {{CGSB}} to update its target number of executors: 
https://github.com/apache/spark/blob/4df84c3f818aa536515729b442601e08c253ed35/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L622

So when there is just one task left, you could have the following sequence of 
events:
(1) the {{EAM}} sets the desired number of executors to 1, and updates the 
{{CGSB}} too
(2) while that final task is still running, the other executors cross the idle 
timeout, and the {{EAM}} requests the {{CGSB}} kill them
(3) now the {{EAM}} has a target of 1 executor, and the {{CGSB}} has a target 
of 0 executors

If the final task completed normally now, everything would be OK; the next 
taskset would get submitted, the {{EAM}} would increase the target number of 
executors and it would update the {{CGSB}}.

But if the executor for that final task failed (eg. an OOM), then the {{EAM}} 
thinks it [doesn't need to update 
anything|https://github.com/apache/spark/blob/4df84c3f818aa536515729b442601e08c253ed35/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L384-L386],
 because its target is already 1, which is all it needs for that final task; 
and the {{CGSB}} doesn't update anything either since its target is 0.

I think you can determine if this is the cause of a stuck app by looking for
{noformat}
yarn.YarnAllocator: Driver requested a total number of 0 executor(s).
{noformat}
in the logs of the ApplicationMaster (at least on yarn).

You can reproduce this with this test app, run with {{--conf 
"spark.dynamicAllocation.minExecutors=1" --conf 
"spark.dynamicAllocation.maxExecutors=5" --conf 
"spark.dynamicAllocation.executorIdleTimeout=5s"}}

{code}
import org.apache.spark.SparkEnv

sc.setLogLevel("INFO")

sc.parallelize(1 to 10000, 10000).count()

val execs = sc.parallelize(1 to 1000, 1000).map { _ => 
SparkEnv.get.executorId}.collect().toSet
val badExec = execs.head
println("will kill exec " + badExec)

new Thread() {
  override def run(): Unit = {
    Thread.sleep(10000)
    println("about to kill exec " + badExec)
    sc.killExecutor(badExec)
  }
}.start()

sc.parallelize(1 to 5, 5).mapPartitions { itr =>
  val exec = SparkEnv.get.executorId
  if (exec == badExec) {
    Thread.sleep(20000) // long enough that all the other tasks finish, and the 
executors cross the idle timeout
    // meanwhile, something else should kill this executor
    itr
  } else {
    itr
  }
}.collect()
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to