sharkd tu created SPARK-19576:
---------------------------------
Summary: Task attempt paths exist in output path after
saveAsNewAPIHadoopFile completes with speculation enabled
Key: SPARK-19576
URL: https://issues.apache.org/jira/browse/SPARK-19576
Project: Spark
Issue Type: Bug
Components: Spark Core
Affects Versions: 2.1.0
Reporter: sharkd tu
{{writeShard}} in {{saveAsNewAPIHadoopDataset}} always committed its tasks
without question. The problem is that when speculation is enabled sometimes
this can result in multiple tasks committing their output to the same path,
which may lead to task temporary paths exist in output path after
{{saveAsNewAPIHadoopFile}} completes.
{code}
-rw-r--r-- 3 user group 0 2017-02-11 19:36 hdfs://.../out/_SUCCESS
drwxr-xr-x - user group 0 2017-02-11 19:36
hdfs://.../out/attempt_201702111936_32487_r_000044_0
-rw-r--r-- 3 user group 8952 2017-02-11 19:36 hdfs://.../out/part-r-00000
-rw-r--r-- 3 user group 7878 2017-02-11 19:36 hdfs://.../out/part-r-00001
...
{code}
Assume there are two attempt tasks that commit at the same time, The two
attempt tasks maybe rename their task attempt paths to task committed path at
the same time. When one task's {{rename}} operation completes, the other task's
{{rename}} operation will let its task attempt path under the task committed
path.
{{commitTask}} in {{FileOutputCommitter}}:
{code}
public void commitTask(TaskAttemptContext context, Path taskAttemptPath)
throws IOException {
TaskAttemptID attemptId = context.getTaskAttemptID();
if (hasOutputPath()) {
context.progress();
if(taskAttemptPath == null) {
taskAttemptPath = getTaskAttemptPath(context);
}
Path committedTaskPath = getCommittedTaskPath(context);
FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
if (fs.exists(taskAttemptPath)) {
if(fs.exists(committedTaskPath)) {
if(!fs.delete(committedTaskPath, true)) {
throw new IOException("Could not delete " + committedTaskPath);
}
}
if(!fs.rename(taskAttemptPath, committedTaskPath)) {
throw new IOException("Could not rename " + taskAttemptPath + " to "
+ committedTaskPath);
}
LOG.info("Saved output of task '" + attemptId + "' to " +
committedTaskPath);
} else {
LOG.warn("No Output found for " + attemptId);
}
} else {
LOG.warn("Output Path is null in commitTask()");
}
}
{code}
Anyway, it is not recommended that {{writeShard}} in
{{saveAsNewAPIHadoopDataset}} always committed its tasks without question.
Similar question in SPARK-4879 triggered by calling {{saveAsHadoopFile}} has
been solved. So, we should solve another case that triggered by calling
{{saveAsNewAPIHadoopFile}}.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]