Chowdary Davuluri created SPARK-36812:
-----------------------------------------

             Summary: Spark on K8s without External Shuffle Service: Improve 
job failure details on missing shuffle data when blacklisting is enabled
                 Key: SPARK-36812
                 URL: https://issues.apache.org/jira/browse/SPARK-36812
             Project: Spark
          Issue Type: Improvement
          Components: Kubernetes, Spark Core
    Affects Versions: 2.4.7
            Reporter: Chowdary Davuluri


This behavior is noticed with Spark (all versions) on K8s when executor 
blacklisting is enabled and no External Shuffle Service is used.

Currently, when a stage is aborted, the failure reason displayed in the driver 
logs (and surfaced in the event logs) only includes the failure details of the 
most recently failed task. When executor blacklisting is enabled, the most 
recent task failure is always likely to be an instance of 
org.apache.spark.shuffle.MetadataFetchFailedException. Enhancing this error to 
include the failure details of the most recent failed task which resulted in an 
executor getting blacklisted, and resulted in the subsequent shuffle fetch 
failure, will benefit the users.

 

Code to reproduce the issue:
{code:java}
object TestApp {
  def main (arg: Array[String]): Unit = {
    val jobName = "TestApp"
    val logger = LogManager.getLogger("TestApp")
    try {
      logger.info(s"Starting execution..")
      val maxCount = 100
      val numbers  = (1 to maxCount).zipWithIndex.map(_.swap)      
      val context = 
SparkSession.builder().appName(jobName).getOrCreate().sparkContext;      
      context
        .parallelize(numbers, maxCount)
        .partitionBy(new Partitioner {
          override def numPartitions: Int          = maxCount
          override def getPartition(key: Any): Int = key.asInstanceOf[Int]
        })
        .map {
          case (_, number) =>
            if (true) sys.error("Something bad has happened")
            else number
        }
        .count()    } catch {
      case e: Exception => {
        logger.error(s"$jobName error in main", e)
      }
    }
  }
}
{code}
Config used:
{code:java}
"spark.executor.memory": "2G",
"spark.blacklist.killBlacklistedExecutors": "true",
"spark.blacklist.enabled": "true",
"spark.blacklist.application.maxFailedTasksPerExecutor": "1",
"spark.blacklist.timeout": "10800s",
"spark.blacklist.task.maxTaskAttemptsPerNode": "3",
"spark.blacklist.task.maxTaskAttemptsPerExecutor": "1",
"spark.blacklist.stage.maxFailedTasksPerExecutor": "1",
"spark.blacklist.stage.maxFailedExecutorsPerNode": "3",
"spark.blacklist.decommissioning.timeout": "1h",
"spark.blacklist.decommissioning.enabled": "true",
"spark.executor.instances": "3",
"spark.blacklist.application.maxFailedExecutorsPerNode": "3"
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to