[
https://issues.apache.org/jira/browse/SPARK-27112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Apache Spark reassigned SPARK-27112:
------------------------------------
Assignee: (was: Apache Spark)
> Spark Scheduler encounters two independent Deadlocks when trying to kill
> executors either due to dynamic allocation or blacklisting
> ------------------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-27112
> URL: https://issues.apache.org/jira/browse/SPARK-27112
> Project: Spark
> Issue Type: Bug
> Components: Scheduler, Spark Core
> Affects Versions: 2.4.0, 3.0.0
> Reporter: Parth Gandhi
> Priority: Major
> Attachments: Screen Shot 2019-02-26 at 4.10.26 PM.png, Screen Shot
> 2019-02-26 at 4.10.48 PM.png, Screen Shot 2019-02-26 at 4.11.11 PM.png,
> Screen Shot 2019-02-26 at 4.11.26 PM.png
>
>
> Recently, a few spark users in the organization have reported that their jobs
> were getting stuck. On further analysis, it was found out that there exist
> two independent deadlocks and either of them occur under different
> circumstances. The screenshots for these two deadlocks are attached here.
> We were able to reproduce the deadlocks with the following piece of code:
>
> {code:java}
> import org.apache.hadoop.conf.Configuration
> import org.apache.hadoop.fs.{FileSystem, Path}
> import org.apache.spark._
> import org.apache.spark.TaskContext
> // Simple example of Word Count in Scala
> object ScalaWordCount {
> def main(args: Array[String]) {
> if (args.length < 2) {
> System.err.println("Usage: ScalaWordCount <inputFilesURI> <outputFilesUri>")
> System.exit(1)
> }
> val conf = new SparkConf().setAppName("Scala Word Count")
> val sc = new SparkContext(conf)
> // get the input file uri
> val inputFilesUri = args(0)
> // get the output file uri
> val outputFilesUri = args(1)
> while (true) {
> val textFile = sc.textFile(inputFilesUri)
> val counts = textFile.flatMap(line => line.split(" "))
> .map(word => {if (TaskContext.get.partitionId == 5 &&
> TaskContext.get.attemptNumber == 0) throw new Exception("Fail for
> blacklisting") else (word, 1)})
> .reduceByKey(_ + _)
> counts.saveAsTextFile(outputFilesUri)
> val conf: Configuration = new Configuration()
> val path: Path = new Path(outputFilesUri)
> val hdfs: FileSystem = FileSystem.get(conf)
> hdfs.delete(path, true)
> }
> sc.stop()
> }
> }
> {code}
>
> Additionally, to ensure that the deadlock surfaces up soon enough, I also
> added a small delay in the Spark code here:
> [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala#L256]
>
> {code:java}
> executorIdToFailureList.remove(exec)
> updateNextExpiryTime()
> Thread.sleep(2000)
> killBlacklistedExecutor(exec)
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]