[ 
https://issues.apache.org/jira/browse/SPARK-30442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17490869#comment-17490869
 ] 

Paul ONeill edited comment on SPARK-30442 at 2/11/22, 11:43 AM:
----------------------------------------------------------------

Hey [~hyukjin.kwon] ,

[~abhimadav] 's comment seems to match our experience.

A task fails due to Executor OOM (for example), when that task is retired 
(Attempts 2,3,4) they all fail with FileAlreadyExistException

In our case we have DirectFileOutputCommitter and Algorithm V1 (EMR 5.33 & 
Spark 2.4.7)

 

When the Application retries it will be successful if using OverwriteMode, but 
this is obviously not desired as the first *job* attempt would succeed other 
than the FAEE problem and its expensive to rerun the entire job.

 

This seems to be the culprit, it seems like it should use the overwrite mode 
instead of default to false?
/**
 * Create a new file and open it for writing.
 * If compression is enabled in the [[JobContext]] the stream will write 
compressed data to disk.
 * An exception will be thrown if the file already exists.
*/
  def createOutputStream(context: JobContext, file: Path): OutputStream = {
    val fs = file.getFileSystem(context.getConfiguration)
    val outputStream: OutputStream = fs.create(file, 
{color:#ff0000}*false*{color})

    getCompressionCodec(context, Some(file))
      .map(codec => codec.createOutputStream(outputStream))
      .getOrElse(outputStream)
  }


was (Author: JIRAUSER285075):
Hey [~hyukjin.kwon] ,

[~abhimadav] 's comment seems to match our experience.

A task fails due to Executor OOM (for example), when that task is retired 
(Attempts 2,3,4) they all fail with FileAlreadyExistException

In our case we have DirectFileOutputCommitter and Algorithm V1 (EMR 5.33 & 
Spark 2.4.7)

 

When the Application retries it will be successful if using OverwriteMode, but 
this is obviously not desired as the first *job* attempt would succeed other 
than the FAEE problem and its expensive to rerun the entire job.

 

This seems to be the cultprit
/**
* Create a new file and open it for writing.
* If compression is enabled in the [[JobContext]] the stream will write 
compressed data to disk.
* An exception will be thrown if the file already exists.
*/
  def createOutputStream(context: JobContext, file: Path): OutputStream = {
    val fs = file.getFileSystem(context.getConfiguration)
    val outputStream: OutputStream = fs.create(file, 
{color:#FF0000}*false*{color})

    getCompressionCodec(context, Some(file))
      .map(codec => codec.createOutputStream(outputStream))
      .getOrElse(outputStream)
  }

> Write mode ignored when using CodecStreams
> ------------------------------------------
>
>                 Key: SPARK-30442
>                 URL: https://issues.apache.org/jira/browse/SPARK-30442
>             Project: Spark
>          Issue Type: Bug
>          Components: Input/Output
>    Affects Versions: 2.4.4
>            Reporter: Jesse Collins
>            Priority: Major
>
> Overwrite is hardcoded to false in the codec stream. This can cause issues, 
> particularly with aws tools, that make it impossible to retry.
> Ideally, this should be read from the write mode set for the DataWriter that 
> is writing through this codec class.
> [https://github.com/apache/spark/blame/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CodecStreams.scala#L81]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to