[
https://issues.apache.org/jira/browse/SPARK-19576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15863534#comment-15863534
]
Apache Spark commented on SPARK-19576:
--------------------------------------
User 'sharkdtu' has created a pull request for this issue:
https://github.com/apache/spark/pull/16911
> Task attempt paths exist in output path after saveAsNewAPIHadoopFile
> completes with speculation enabled
> -------------------------------------------------------------------------------------------------------
>
> Key: SPARK-19576
> URL: https://issues.apache.org/jira/browse/SPARK-19576
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.1.0
> Reporter: sharkd tu
>
> {{writeShard}} in {{saveAsNewAPIHadoopDataset}} always committed its tasks
> without question. The problem is that when speculation is enabled sometimes
> this can result in multiple tasks committing their output to the same path,
> which may lead to task temporary paths exist in output path after
> {{saveAsNewAPIHadoopFile}} completes.
> {code}
> -rw-r--r-- 3 user group 0 2017-02-11 19:36
> hdfs://.../output/_SUCCESS
> drwxr-xr-x - user group 0 2017-02-11 19:36
> hdfs://.../output/attempt_201702111936_32487_r_000044_0
> -rw-r--r-- 3 user group 8952 2017-02-11 19:36
> hdfs://.../output/part-r-00000
> -rw-r--r-- 3 user group 7878 2017-02-11 19:36
> hdfs://.../output/part-r-00001
> ...
> {code}
> Assume there are two attempt tasks that commit at the same time, The two
> attempt tasks maybe rename their task attempt paths to task committed path at
> the same time. When one task's {{rename}} operation completes, the other
> task's {{rename}} operation will let its task attempt path under the task
> committed path.
> {{commitTask}} in {{FileOutputCommitter}}:
> {code}
> public void commitTask(TaskAttemptContext context, Path taskAttemptPath)
> throws IOException {
> TaskAttemptID attemptId = context.getTaskAttemptID();
> if (hasOutputPath()) {
> context.progress();
> if(taskAttemptPath == null) {
> taskAttemptPath = getTaskAttemptPath(context);
> }
> Path committedTaskPath = getCommittedTaskPath(context);
> FileSystem fs =
> taskAttemptPath.getFileSystem(context.getConfiguration());
> if (fs.exists(taskAttemptPath)) {
> if(fs.exists(committedTaskPath)) {
> if(!fs.delete(committedTaskPath, true)) {
> throw new IOException("Could not delete " + committedTaskPath);
> }
> }
> if(!fs.rename(taskAttemptPath, committedTaskPath)) {
> throw new IOException("Could not rename " + taskAttemptPath + " to "
> + committedTaskPath);
> }
> LOG.info("Saved output of task '" + attemptId + "' to " +
> committedTaskPath);
> } else {
> LOG.warn("No Output found for " + attemptId);
> }
> } else {
> LOG.warn("Output Path is null in commitTask()");
> }
> }
> {code}
> Anyway, it is not recommended that {{writeShard}} in
> {{saveAsNewAPIHadoopDataset}} always committed its tasks without question.
> Similar question in SPARK-4879 triggered by calling {{saveAsHadoopFile}} has
> been solved. So, we should solve another case that triggered by calling
> {{saveAsNewAPIHadoopFile}}.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]