[ 
https://issues.apache.org/jira/browse/HADOOP-17258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17195429#comment-17195429
 ] 

Steve Loughran commented on HADOOP-17258:
-----------------------------------------


That failure implies that attempt 1 succeeded & created the file, so TA 2 
couldn't. Maybe the thing to do here is on that second write, say "the file is 
there, so let's gracefully abort our commit, relying on attempt #1 to be 
correct. Then it should abort its uploads and return success

Makes sense?

What happens to the whole job? Does it fail?

(FWIW, failures during task commit is something the s3a committers MUST be 
resilient to. Spark expects commits to be atomic and exactly one attempt to 
succeed, but it doesn't care which (more specifically: your code mustn't care)

> MagicS3GuardCommitter fails with `pendingset` already exists
> ------------------------------------------------------------
>
>                 Key: HADOOP-17258
>                 URL: https://issues.apache.org/jira/browse/HADOOP-17258
>             Project: Hadoop Common
>          Issue Type: Bug
>          Components: fs/s3
>    Affects Versions: 3.2.0
>            Reporter: Dongjoon Hyun
>            Priority: Major
>
> In `trunk/branch-3.3/branch-3.2`, `MagicS3GuardCommitter.innerCommitTask` has 
> `false` at `pendingSet.save`.
> {code}
>     try {
>       pendingSet.save(getDestFS(), taskOutcomePath, false);
>     } catch (IOException e) {
>       LOG.warn("Failed to save task commit data to {} ",
>           taskOutcomePath, e);
>       abortPendingUploads(context, pendingSet.getCommits(), true);
>       throw e;
>     }
> {code}
> And, it can cause a job failure like the following.
> {code}
> WARN TaskSetManager: Lost task 1562.1 in stage 1.0 (TID 1788, 100.92.11.63, 
> executor 26): org.apache.spark.SparkException: Task failed while writing rows.
>     at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
>     at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
>     at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>     at org.apache.spark.scheduler.Task.run(Task.scala:123)
>     at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
> Source)
>     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
> Source)
>     at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
> s3a://xxx/__magic/app-attempt-0000/task_20200911063607_0001_m_001562.pendingset
>  already exists
>     at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:761)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1118)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1098)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:987)
>     at 
> org.apache.hadoop.util.JsonSerialization.save(JsonSerialization.java:269)
>     at 
> org.apache.hadoop.fs.s3a.commit.files.PendingSet.save(PendingSet.java:170)
>     at 
> org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter.innerCommitTask(MagicS3GuardCommitter.java:220)
>     at 
> org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter.commitTask(MagicS3GuardCommitter.java:165)
>     at 
> org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
>     at 
> org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:77)
>     at 
> org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:244)
>     at 
> org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:78)
>     at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247)
>     at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
> {code}
> {code}
> 20/09/11 07:44:38 ERROR TaskSetManager: Task 957.1 in stage 1.0 (TID 1412) 
> can not write to output file: 
> org.apache.hadoop.fs.FileAlreadyExistsException: 
> s3a://xxx/t/__magic/app-attempt-0000/task_20200911073922_0001_m_000957.pendingset
>  already exists; not retrying
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to