[
https://issues.apache.org/jira/browse/SPARK-36812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chowdary Davuluri updated SPARK-36812:
--------------------------------------
Description:
This behavior is noticed with Spark (all versions) on K8s when executor
blacklisting is enabled and no External Shuffle Service is used.
Currently, when a stage is aborted, the failure reason displayed in the driver
logs (and surfaced in the event logs) only includes the failure details of the
most recently failed task. When executor blacklisting is enabled, the most
recent task failure is always likely to be an instance of
org.apache.spark.shuffle.MetadataFetchFailedException. Enhancing this error to
include the failure details of the most recent failed task which resulted in
the executor getting blacklisted, and led to the subsequent shuffle fetch
failure, will benefit the users.
Code to reproduce the issue:
{code:java}
object TestApp {
def main (arg: Array[String]): Unit = {
val jobName = "TestApp"
val logger = LogManager.getLogger("TestApp")
try {
logger.info(s"Starting execution..")
val maxCount = 100
val numbers = (1 to maxCount).zipWithIndex.map(_.swap)
val context =
SparkSession.builder().appName(jobName).getOrCreate().sparkContext;
context
.parallelize(numbers, maxCount)
.partitionBy(new Partitioner {
override def numPartitions: Int = maxCount
override def getPartition(key: Any): Int = key.asInstanceOf[Int]
})
.map {
case (_, number) =>
if (true) sys.error("Something bad has happened")
else number
}
.count() } catch {
case e: Exception => {
logger.error(s"$jobName error in main", e)
}
}
}
}
{code}
Config used:
{code:java}
"spark.executor.memory": "2G",
"spark.blacklist.killBlacklistedExecutors": "true",
"spark.blacklist.enabled": "true",
"spark.blacklist.application.maxFailedTasksPerExecutor": "1",
"spark.blacklist.timeout": "10800s",
"spark.blacklist.task.maxTaskAttemptsPerNode": "3",
"spark.blacklist.task.maxTaskAttemptsPerExecutor": "1",
"spark.blacklist.stage.maxFailedTasksPerExecutor": "1",
"spark.blacklist.stage.maxFailedExecutorsPerNode": "3",
"spark.blacklist.decommissioning.timeout": "1h",
"spark.blacklist.decommissioning.enabled": "true",
"spark.executor.instances": "3",
"spark.blacklist.application.maxFailedExecutorsPerNode": "3"
{code}
Error message in the driver log:
{noformat}
21/09/09 18:52:28 INFO DAGScheduler: Job 0 failed: count at TestApp.scala:30,
took 35.360700 s
21/09/09 18:52:28 ERROR TestApp: TestApp error in main
org.apache.spark.SparkException: Job aborted due to stage failure: ResultStage
1 (count at TestApp.scala:30) has failed the maximum allowable number of times:
4. Most recent failure reason:
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle 0 at
org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$4.apply(MapOutputTracker.scala:972)
at
org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$4.apply(MapOutputTracker.scala:968)
at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
org.apache.spark.MapOutputTracker$.convertMapStatuses(MapOutputTracker.scala:968)
at
org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:759)
at
org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:118)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:104) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at
org.apache.spark.scheduler.Task.run(Task.scala:123) at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2080)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2068)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2067)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2067)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1568)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2298)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2250)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2239)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:799)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD.count(RDD.scala:1213)
at org.example.TestApp$.main(TestApp.scala:30)
at org.example.TestApp.main(TestApp.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)
at
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
{noformat}
was:
This behavior is noticed with Spark (all versions) on K8s when executor
blacklisting is enabled and no External Shuffle Service is used.
Currently, when a stage is aborted, the failure reason displayed in the driver
logs (and surfaced in the event logs) only includes the failure details of the
most recently failed task. When executor blacklisting is enabled, the most
recent task failure is always likely to be an instance of
org.apache.spark.shuffle.MetadataFetchFailedException. Enhancing this error to
include the failure details of the most recent failed task which resulted in an
executor getting blacklisted, and resulted in the subsequent shuffle fetch
failure, will benefit the users.
Code to reproduce the issue:
{code:java}
object TestApp {
def main (arg: Array[String]): Unit = {
val jobName = "TestApp"
val logger = LogManager.getLogger("TestApp")
try {
logger.info(s"Starting execution..")
val maxCount = 100
val numbers = (1 to maxCount).zipWithIndex.map(_.swap)
val context =
SparkSession.builder().appName(jobName).getOrCreate().sparkContext;
context
.parallelize(numbers, maxCount)
.partitionBy(new Partitioner {
override def numPartitions: Int = maxCount
override def getPartition(key: Any): Int = key.asInstanceOf[Int]
})
.map {
case (_, number) =>
if (true) sys.error("Something bad has happened")
else number
}
.count() } catch {
case e: Exception => {
logger.error(s"$jobName error in main", e)
}
}
}
}
{code}
Config used:
{code:java}
"spark.executor.memory": "2G",
"spark.blacklist.killBlacklistedExecutors": "true",
"spark.blacklist.enabled": "true",
"spark.blacklist.application.maxFailedTasksPerExecutor": "1",
"spark.blacklist.timeout": "10800s",
"spark.blacklist.task.maxTaskAttemptsPerNode": "3",
"spark.blacklist.task.maxTaskAttemptsPerExecutor": "1",
"spark.blacklist.stage.maxFailedTasksPerExecutor": "1",
"spark.blacklist.stage.maxFailedExecutorsPerNode": "3",
"spark.blacklist.decommissioning.timeout": "1h",
"spark.blacklist.decommissioning.enabled": "true",
"spark.executor.instances": "3",
"spark.blacklist.application.maxFailedExecutorsPerNode": "3"
{code}
Error message in the driver log:
{noformat}
21/09/09 18:52:28 INFO DAGScheduler: Job 0 failed: count at TestApp.scala:30,
took 35.360700 s
21/09/09 18:52:28 ERROR TestApp: TestApp error in main
org.apache.spark.SparkException: Job aborted due to stage failure: ResultStage
1 (count at TestApp.scala:30) has failed the maximum allowable number of times:
4. Most recent failure reason:
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle 0 at
org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$4.apply(MapOutputTracker.scala:972)
at
org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$4.apply(MapOutputTracker.scala:968)
at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
org.apache.spark.MapOutputTracker$.convertMapStatuses(MapOutputTracker.scala:968)
at
org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:759)
at
org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:118)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:104) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at
org.apache.spark.scheduler.Task.run(Task.scala:123) at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2080)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2068)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2067)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2067)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1568)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2298)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2250)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2239)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:799)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD.count(RDD.scala:1213)
at org.example.TestApp$.main(TestApp.scala:30)
at org.example.TestApp.main(TestApp.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)
at
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
{noformat}
> Spark on K8s without External Shuffle Service: Improve job failure details on
> missing shuffle data when blacklisting is enabled
> -------------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-36812
> URL: https://issues.apache.org/jira/browse/SPARK-36812
> Project: Spark
> Issue Type: Improvement
> Components: Kubernetes, Spark Core
> Affects Versions: 2.4.7
> Reporter: Chowdary Davuluri
> Priority: Major
>
> This behavior is noticed with Spark (all versions) on K8s when executor
> blacklisting is enabled and no External Shuffle Service is used.
> Currently, when a stage is aborted, the failure reason displayed in the
> driver logs (and surfaced in the event logs) only includes the failure
> details of the most recently failed task. When executor blacklisting is
> enabled, the most recent task failure is always likely to be an instance of
> org.apache.spark.shuffle.MetadataFetchFailedException. Enhancing this error
> to include the failure details of the most recent failed task which resulted
> in the executor getting blacklisted, and led to the subsequent shuffle fetch
> failure, will benefit the users.
>
> Code to reproduce the issue:
> {code:java}
> object TestApp {
> def main (arg: Array[String]): Unit = {
> val jobName = "TestApp"
> val logger = LogManager.getLogger("TestApp")
> try {
> logger.info(s"Starting execution..")
> val maxCount = 100
> val numbers = (1 to maxCount).zipWithIndex.map(_.swap)
> val context =
> SparkSession.builder().appName(jobName).getOrCreate().sparkContext;
> context
> .parallelize(numbers, maxCount)
> .partitionBy(new Partitioner {
> override def numPartitions: Int = maxCount
> override def getPartition(key: Any): Int = key.asInstanceOf[Int]
> })
> .map {
> case (_, number) =>
> if (true) sys.error("Something bad has happened")
> else number
> }
> .count() } catch {
> case e: Exception => {
> logger.error(s"$jobName error in main", e)
> }
> }
> }
> }
> {code}
> Config used:
> {code:java}
> "spark.executor.memory": "2G",
> "spark.blacklist.killBlacklistedExecutors": "true",
> "spark.blacklist.enabled": "true",
> "spark.blacklist.application.maxFailedTasksPerExecutor": "1",
> "spark.blacklist.timeout": "10800s",
> "spark.blacklist.task.maxTaskAttemptsPerNode": "3",
> "spark.blacklist.task.maxTaskAttemptsPerExecutor": "1",
> "spark.blacklist.stage.maxFailedTasksPerExecutor": "1",
> "spark.blacklist.stage.maxFailedExecutorsPerNode": "3",
> "spark.blacklist.decommissioning.timeout": "1h",
> "spark.blacklist.decommissioning.enabled": "true",
> "spark.executor.instances": "3",
> "spark.blacklist.application.maxFailedExecutorsPerNode": "3"
> {code}
> Error message in the driver log:
> {noformat}
> 21/09/09 18:52:28 INFO DAGScheduler: Job 0 failed: count at TestApp.scala:30,
> took 35.360700 s
> 21/09/09 18:52:28 ERROR TestApp: TestApp error in main
> org.apache.spark.SparkException: Job aborted due to stage failure:
> ResultStage 1 (count at TestApp.scala:30) has failed the maximum allowable
> number of times: 4. Most recent failure reason:
> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
> location for shuffle 0 at
> org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$4.apply(MapOutputTracker.scala:972)
> at
> org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$4.apply(MapOutputTracker.scala:968)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
> org.apache.spark.MapOutputTracker$.convertMapStatuses(MapOutputTracker.scala:968)
> at
> org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:759)
> at
> org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:118)
> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:104)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at
> org.apache.spark.scheduler.Task.run(Task.scala:123) at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405) at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2080)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2068)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2067)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2067)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1568)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2298)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2250)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2239)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
> at
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:799)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
> at org.apache.spark.rdd.RDD.count(RDD.scala:1213)
> at org.example.TestApp$.main(TestApp.scala:30)
> at org.example.TestApp.main(TestApp.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
> at
> org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)
> at
> org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
> at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
> at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
> at
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> {noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]