[ 
https://issues.apache.org/jira/browse/SPARK-26873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-26873:
----------------------------------
    Affects Version/s: 2.2.0

> FileFormatWriter creates inconsistent MR job IDs
> ------------------------------------------------
>
>                 Key: SPARK-26873
>                 URL: https://issues.apache.org/jira/browse/SPARK-26873
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.2.0, 2.2.3, 2.3.2, 2.4.0
>            Reporter: Ryan Blue
>            Assignee: Ryan Blue
>            Priority: Blocker
>              Labels: correctness
>             Fix For: 2.3.4, 2.4.1, 3.0.0
>
>
> FileFormatWriter uses the current time to create a Job ID that is used when 
> calling Hadoop committers. This ID is used to produce task and task attempt 
> IDs used in commits.
> The problem is that Spark [generates this Job 
> ID|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L209]
>  in {{executeTask}} for every task:
> {code:lang=scala}
>   /** Writes data out in a single Spark task. */
>   private def executeTask(
>       description: WriteJobDescription,
>       sparkStageId: Int,
>       sparkPartitionId: Int,
>       sparkAttemptNumber: Int,
>       committer: FileCommitProtocol,
>       iterator: Iterator[InternalRow]): WriteTaskResult = {
>     val jobId = SparkHadoopWriterUtils.createJobID(new Date, sparkStageId)
>     val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
>     val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
> ...
> {code}
> Because this is called in each task, the Job ID used is not consistent across 
> tasks, which violates the contract expected by Hadoop committers.
> If a committer expects identical task IDs across attempts for correctness, 
> this breaks correctness. For example, a Hadoop committer should be able to 
> rename an output file to a path based on the task ID to ensure that only one 
> copy is committed.
> We hit this issue when preemption caused a task to die just after the commit 
> operation. The commit coordinator authorized a second task commit because the 
> first did not complete due to preemption.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to