[ 
https://issues.apache.org/jira/browse/SPARK-20945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16032392#comment-16032392
 ] 

liupengcheng edited comment on SPARK-20945 at 6/1/17 3:47 AM:
--------------------------------------------------------------

I finally find out this bug may caused by some inconsistency in 
TaskSchedulerImpl. Code related is as following:
{code}
  override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = 
synchronized {
    logInfo("Cancelling stage " + stageId)
    taskSetsByStageIdAndAttempt.get(stageId).foreach { attempts =>
      attempts.foreach { case (_, tsm) =>
        // There are two possible cases here:
        // 1. The task set manager has been created and some tasks have been 
scheduled.
        //    In this case, send a kill signal to the executors to kill the 
task and then abort
        //    the stage.
        // 2. The task set manager has been created but no tasks has been 
scheduled. In this case,
        //    simply abort the stage.
        
          tsm.runningTasksSet.foreach{ tid =>
          val execId = taskIdToExecutorId(tid)
          backend.killTask(tid, execId, interruptThread)
         }
        tsm.abort("Stage %s cancelled".format(stageId))
        logInfo("Stage %d was cancelled".format(stageId))
      }
    }
  }
{code}

When Starting a new task, tid is inserted into tsm.runningTasksSet and 
taskIdToExecutorId, 
but when ExecutorLost happens, tid on the executor is cleaned up in 
taskIdToExecutorId 
with tid in tsm.runningTasksSet not cleaned until PendingLossReason got.
In this case, if another task fails 4 times and causes scheduler canceling 
stage, 
The above function cancelTasks may be triggered, and the exception is threw out.


was (Author: liupengcheng):
I finally find out this bug may caused by some inconsistency in 
TaskSchedulerImpl. Code related is as following:
{quote}
  override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = 
synchronized {
    logInfo("Cancelling stage " + stageId)
    taskSetsByStageIdAndAttempt.get(stageId).foreach { attempts =>
      attempts.foreach { case (_, tsm) =>
        // There are two possible cases here:
        // 1. The task set manager has been created and some tasks have been 
scheduled.
        //    In this case, send a kill signal to the executors to kill the 
task and then abort
        //    the stage.
        // 2. The task set manager has been created but no tasks has been 
scheduled. In this case,
        //    simply abort the stage.
        
          tsm.runningTasksSet.foreach{ tid =>
          val execId = taskIdToExecutorId(tid)
          backend.killTask(tid, execId, interruptThread)
         }
        tsm.abort("Stage %s cancelled".format(stageId))
        logInfo("Stage %d was cancelled".format(stageId))
      }
    }
  }
{quote}

When Starting a new task, tid is inserted into tsm.runningTasksSet and 
taskIdToExecutorId, 
but when ExecutorLost happens, tid on the executor is cleaned up in 
taskIdToExecutorId 
with tid in tsm.runningTasksSet not cleaned until PendingLossReason got.
In this case, if another task fails 4 times and causes scheduler canceling 
stage, 
The above function cancelTasks may be triggered, and the exception is threw out.

> NoSuchElementException key not found in TaskSchedulerImpl
> ---------------------------------------------------------
>
>                 Key: SPARK-20945
>                 URL: https://issues.apache.org/jira/browse/SPARK-20945
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.1.0, 2.1.1
>            Reporter: liupengcheng
>              Labels: scheduler
>
> We started spark ThriftServer, however when a job fails it caused the 
> thriftserver also shutted down.
> {quote}
> 2017-05-27,10:34:27,272 INFO 
> org.apache.spark.scheduler.cluster.YarnScheduler: Cancelling stage 21
> 2017-05-27,10:34:27,274 ERROR 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop: 
> DAGSchedulerEventProcessLoop failed; shutting down SparkContext
> java.util.NoSuchElementException: key not found: 14100
> at scala.collection.MapLike$class.default(MapLike.scala:228)
> at scala.collection.AbstractMap.default(Map.scala:59)
> at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
> at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$2$$anonfun$apply$3$$anonfun$apply$1.apply$mcVJ$sp(TaskSchedulerImpl.scala:226)
> at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$2$$anonfun$apply$3$$anonfun$apply$1.apply(TaskSchedulerImpl.scala:225)
> at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$2$$anonfun$apply$3$$anonfun$apply$1.apply(TaskSchedulerImpl.scala:225)
> at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
> at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$2$$anonfun$apply$3.apply(TaskSchedulerImpl.scala:225)
> at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$2$$anonfun$apply$3.apply(TaskSchedulerImpl.scala:218)
> at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
> at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
> at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
> at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$2.apply(TaskSchedulerImpl.scala:218)
> at 
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$2.apply(TaskSchedulerImpl.scala:217)
> at scala.Option.foreach(Option.scala:257)
> at 
> org.apache.spark.scheduler.TaskSchedulerImpl.cancelTasks(TaskSchedulerImpl.scala:217)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages$1.apply$mcVI$sp(DAGScheduler.scala:1461)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages$1.apply(DAGScheduler.scala:1447)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages$1.apply(DAGScheduler.scala:1447)
> at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
> at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1447)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
> at scala.Option.foreach(Option.scala:257)
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to