[ 
https://issues.apache.org/jira/browse/SPARK-27194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Reza Safi updated SPARK-27194:
------------------------------
    Description: 
When a container fails for some reason (for example when killed by yarn for 
exceeding memory limits), the subsequent task attempts for the tasks that were 
running on that container all fail with a FileAlreadyExistsException. The 
original task attempt does not seem to successfully call abortTask (or at least 
its "best effort" delete is unsuccessful) and clean up the parquet file it was 
writing to, so when later task attempts try to write to the same spark-staging 
directory using the same file name, the job fails.

Here is what transpires in the logs:

The container where task 200.0 is running is killed and the task is lost:

19/02/20 09:33:25 ERROR cluster.YarnClusterScheduler: Lost executor y on 
t.y.z.com: Container killed by YARN for exceeding memory limits. 8.1 GB of 8 GB 
physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
 19/02/20 09:33:25 WARN scheduler.TaskSetManager: Lost task 200.0 in stage 0.0 
(TID xxx, t.y.z.com, executor 93): ExecutorLostFailure (executor 93 exited 
caused by one of the running tasks) Reason: Container killed by YARN for 
exceeding memory limits. 8.1 GB of 8 GB physical memory used. Consider boosting 
spark.yarn.executor.memoryOverhead.

The task is re-attempted on a different executor and fails because the 
part-00200-blah-blah.c000.snappy.parquet file from the first task attempt 
already exists:

19/02/20 09:35:01 WARN scheduler.TaskSetManager: Lost task 200.1 in stage 0.0 
(TID 594, tn.y.z.com, executor 70): org.apache.spark.SparkException: Task 
failed while writing rows.
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
 at org.apache.spark.scheduler.Task.run(Task.scala:109)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
/user/hive/warehouse/tmp_supply_feb1/.spark-staging-blah-blah-blah/dt=2019-02-17/part-00200-blah-blah.c000.snappy.parquet
 for client 17.161.235.91 already exists

The job fails when the the configured task attempts (spark.task.maxFailures) 
have failed with the same error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 200 in 
stage 0.0 failed 20 times, most recent failure: Lost task 284.19 in stage 0.0 
(TID yyy, tm.y.z.com, executor 16): org.apache.spark.SparkException: Task 
failed while writing rows.
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
 ...
 Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
/user/hive/warehouse/tmp_supply_feb1/.spark-staging-blah-blah-blah/dt=2019-02-17/part-00200-blah-blah.c000.snappy.parquet
 for client i.p.a.d already exists

 

SPARK-26682 wasn't the root cause here, since there wasn't any stage reattempt.

This seems that happens when

spark.sql.sources.partitionOverwriteMode=dynamic. 

 

  was:
When a container fails for some reason (for example when killed by yarn for 
exceeding memory limits), the subsequent task attempts for the tasks that were 
running on that container all fail with a FileAlreadyExistsException. The 
original task attempt does not seem to successfully call abortTask (or at least 
its "best effort" delete is unsuccessful) and clean up the parquet file it was 
writing to, so when later task attempts try to write to the same spark-staging 
directory using the same file name, the job fails.

Here is what transpires in the logs:

The container where task 200.0 is running is killed and the task is lost:

19/02/20 09:33:25 ERROR cluster.YarnClusterScheduler: Lost executor y on 
t.y.z.com: Container killed by YARN for exceeding memory limits. 8.1 GB of 8 GB 
physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
 19/02/20 09:33:25 WARN scheduler.TaskSetManager: Lost task 200.0 in stage 0.0 
(TID xxx, t.y.z.com, executor 93): ExecutorLostFailure (executor 93 exited 
caused by one of the running tasks) Reason: Container killed by YARN for 
exceeding memory limits. 8.1 GB of 8 GB physical memory used. Consider boosting 
spark.yarn.executor.memoryOverhead.

The task is re-attempted on a different executor and fails because the 
part-00200-blah-blah.c000.snappy.parquet file from the first task attempt 
already exists:

19/02/20 09:35:01 WARN scheduler.TaskSetManager: Lost task 200.1 in stage 0.0 
(TID 594, tn.y.z.com, executor 70): org.apache.spark.SparkException: Task 
failed while writing rows.
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
 at org.apache.spark.scheduler.Task.run(Task.scala:109)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
/user/hive/warehouse/tmp_supply_feb1/.spark-staging-blah-blah-blah/dt=2019-02-17/part-00200-blah-blah.c000.snappy.parquet
 for client 17.161.235.91 already exists

The job fails when the the configured task attempts (spark.task.maxFailures) 
have failed with the same error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 200 in 
stage 0.0 failed 20 times, most recent failure: Lost task 284.19 in stage 0.0 
(TID yyy, tm.y.z.com, executor 16): org.apache.spark.SparkException: Task 
failed while writing rows.
 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
 ...
 Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
/user/hive/warehouse/tmp_supply_feb1/.spark-staging-blah-blah-blah/dt=2019-02-17/part-00200-blah-blah.c000.snappy.parquet
 for client i.p.a.d already exists

 

SPARK-26682 wasn't the root cause here, since there wasn't any stage reattempt.

This seems that happens when dynamicPartitionOverwrite=dynamic. 

 


> Job failures when task attempts do not clean up spark-staging parquet files
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-27194
>                 URL: https://issues.apache.org/jira/browse/SPARK-27194
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core, SQL
>    Affects Versions: 2.3.1, 2.3.2
>            Reporter: Reza Safi
>            Priority: Major
>
> When a container fails for some reason (for example when killed by yarn for 
> exceeding memory limits), the subsequent task attempts for the tasks that 
> were running on that container all fail with a FileAlreadyExistsException. 
> The original task attempt does not seem to successfully call abortTask (or at 
> least its "best effort" delete is unsuccessful) and clean up the parquet file 
> it was writing to, so when later task attempts try to write to the same 
> spark-staging directory using the same file name, the job fails.
> Here is what transpires in the logs:
> The container where task 200.0 is running is killed and the task is lost:
> 19/02/20 09:33:25 ERROR cluster.YarnClusterScheduler: Lost executor y on 
> t.y.z.com: Container killed by YARN for exceeding memory limits. 8.1 GB of 8 
> GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
>  19/02/20 09:33:25 WARN scheduler.TaskSetManager: Lost task 200.0 in stage 
> 0.0 (TID xxx, t.y.z.com, executor 93): ExecutorLostFailure (executor 93 
> exited caused by one of the running tasks) Reason: Container killed by YARN 
> for exceeding memory limits. 8.1 GB of 8 GB physical memory used. Consider 
> boosting spark.yarn.executor.memoryOverhead.
> The task is re-attempted on a different executor and fails because the 
> part-00200-blah-blah.c000.snappy.parquet file from the first task attempt 
> already exists:
> 19/02/20 09:35:01 WARN scheduler.TaskSetManager: Lost task 200.1 in stage 0.0 
> (TID 594, tn.y.z.com, executor 70): org.apache.spark.SparkException: Task 
> failed while writing rows.
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>  at org.apache.spark.scheduler.Task.run(Task.scala:109)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
>  Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
> /user/hive/warehouse/tmp_supply_feb1/.spark-staging-blah-blah-blah/dt=2019-02-17/part-00200-blah-blah.c000.snappy.parquet
>  for client 17.161.235.91 already exists
> The job fails when the the configured task attempts (spark.task.maxFailures) 
> have failed with the same error:
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 200 
> in stage 0.0 failed 20 times, most recent failure: Lost task 284.19 in stage 
> 0.0 (TID yyy, tm.y.z.com, executor 16): org.apache.spark.SparkException: Task 
> failed while writing rows.
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
>  ...
>  Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
> /user/hive/warehouse/tmp_supply_feb1/.spark-staging-blah-blah-blah/dt=2019-02-17/part-00200-blah-blah.c000.snappy.parquet
>  for client i.p.a.d already exists
>  
> SPARK-26682 wasn't the root cause here, since there wasn't any stage 
> reattempt.
> This seems that happens when
> spark.sql.sources.partitionOverwriteMode=dynamic. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to