[ 
https://issues.apache.org/jira/browse/SPARK-19576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15863534#comment-15863534
 ] 

Apache Spark commented on SPARK-19576:
--------------------------------------

User 'sharkdtu' has created a pull request for this issue:
https://github.com/apache/spark/pull/16911

> Task attempt paths exist in output path after saveAsNewAPIHadoopFile 
> completes with speculation enabled
> -------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-19576
>                 URL: https://issues.apache.org/jira/browse/SPARK-19576
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.1.0
>            Reporter: sharkd tu
>
> {{writeShard}} in {{saveAsNewAPIHadoopDataset}} always committed its tasks 
> without question. The problem is that when speculation is enabled sometimes 
> this can result in multiple tasks committing their output to the same path, 
> which may lead to task temporary paths exist in output path after 
> {{saveAsNewAPIHadoopFile}} completes.
> {code}
> -rw-r--r--    3   user group       0   2017-02-11 19:36 
> hdfs://.../output/_SUCCESS
> drwxr-xr-x    -   user group       0   2017-02-11 19:36 
> hdfs://.../output/attempt_201702111936_32487_r_000044_0
> -rw-r--r--    3   user group    8952   2017-02-11 19:36 
> hdfs://.../output/part-r-00000
> -rw-r--r--    3   user group    7878   2017-02-11 19:36 
> hdfs://.../output/part-r-00001
> ...
> {code}
> Assume there are two attempt tasks that commit at the same time, The two 
> attempt tasks maybe rename their task attempt paths to task committed path at 
> the same time. When one task's {{rename}} operation completes, the other 
> task's {{rename}} operation will let its task attempt path under the task 
> committed path.
> {{commitTask}} in {{FileOutputCommitter}}: 
> {code}
> public void commitTask(TaskAttemptContext context, Path taskAttemptPath) 
>   throws IOException {
>     TaskAttemptID attemptId = context.getTaskAttemptID();
>     if (hasOutputPath()) {
>       context.progress();
>       if(taskAttemptPath == null) {
>         taskAttemptPath = getTaskAttemptPath(context);
>       }
>       Path committedTaskPath = getCommittedTaskPath(context);
>       FileSystem fs = 
> taskAttemptPath.getFileSystem(context.getConfiguration());
>       if (fs.exists(taskAttemptPath)) {
>         if(fs.exists(committedTaskPath)) {
>           if(!fs.delete(committedTaskPath, true)) {
>             throw new IOException("Could not delete " + committedTaskPath);
>           }
>         }
>         if(!fs.rename(taskAttemptPath, committedTaskPath)) {
>           throw new IOException("Could not rename " + taskAttemptPath + " to "
>               + committedTaskPath);
>         }
>         LOG.info("Saved output of task '" + attemptId + "' to " + 
>             committedTaskPath);
>       } else {
>         LOG.warn("No Output found for " + attemptId);
>       }
>     } else {
>       LOG.warn("Output Path is null in commitTask()");
>     }
>   }
> {code}
> Anyway, it is not recommended that {{writeShard}} in 
> {{saveAsNewAPIHadoopDataset}} always committed its tasks without question. 
> Similar question in SPARK-4879 triggered by calling {{saveAsHadoopFile}}  has 
> been solved. So, we should solve another case that triggered by calling 
> {{saveAsNewAPIHadoopFile}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to