Repository: spark
Updated Branches:
  refs/heads/branch-2.3 2995b79d6 -> dfdf1bb9b


[SPARK-23815][CORE] Spark writer dynamic partition overwrite mode may fail to 
write output on multi level partition

## What changes were proposed in this pull request?

Spark introduced new writer mode to overwrite only related partitions in 
SPARK-20236. While we are using this feature in our production cluster, we 
found a bug when writing multi-level partitions on HDFS.

A simple test case to reproduce this issue:
val df = Seq(("1","2","3")).toDF("col1", "col2","col3")
df.write.partitionBy("col1","col2").mode("overwrite").save("/my/hdfs/location")

If HDFS location "/my/hdfs/location" does not exist, there will be no output.

This seems to be caused by the job commit change in SPARK-20236 in 
HadoopMapReduceCommitProtocol.

In the commit job process, the output has been written into staging dir 
/my/hdfs/location/.spark-staging.xxx/col1=1/col2=2, and then the code calls 
fs.rename to rename /my/hdfs/location/.spark-staging.xxx/col1=1/col2=2 to 
/my/hdfs/location/col1=1/col2=2. However, in our case the operation will fail 
on HDFS because /my/hdfs/location/col1=1 does not exists. HDFS rename can not 
create directory for more than one level.

This does not happen in the new unit test added with SPARK-20236 which uses 
local file system.

We are proposing a fix. When cleaning current partition dir 
/my/hdfs/location/col1=1/col2=2 before the rename op, if the delete op fails 
(because /my/hdfs/location/col1=1/col2=2 may not exist), we call mkdirs op to 
create the parent dir /my/hdfs/location/col1=1 (if the parent dir does not 
exist) so the following rename op can succeed.

Reference: in official HDFS 
document(https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/filesystem/filesystem.html),
 the rename command has precondition "dest must be root, or have a parent that 
exists"

## How was this patch tested?

We have tested this patch on our production cluster and it fixed the problem

Author: Fangshi Li <f...@linkedin.com>

Closes #20931 from fangshil/master.

(cherry picked from commit 4b07036799b01894826b47c73142fe282c607a57)
Signed-off-by: Wenchen Fan <wenc...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dfdf1bb9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dfdf1bb9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dfdf1bb9

Branch: refs/heads/branch-2.3
Commit: dfdf1bb9be19bd31e398f97310391b391fabfcfd
Parents: 2995b79
Author: Fangshi Li <f...@linkedin.com>
Authored: Fri Apr 13 13:46:34 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Fri Apr 13 13:47:31 2018 +0800

----------------------------------------------------------------------
 .../internal/io/HadoopMapReduceCommitProtocol.scala     | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dfdf1bb9/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 
b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
index 6d20ef1..3e60c50 100644
--- 
a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
+++ 
b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
@@ -186,7 +186,17 @@ class HadoopMapReduceCommitProtocol(
         logDebug(s"Clean up default partition directories for overwriting: 
$partitionPaths")
         for (part <- partitionPaths) {
           val finalPartPath = new Path(path, part)
-          fs.delete(finalPartPath, true)
+          if (!fs.delete(finalPartPath, true) && 
!fs.exists(finalPartPath.getParent)) {
+            // According to the official hadoop FileSystem API spec, delete op 
should assume
+            // the destination is no longer present regardless of return 
value, thus we do not
+            // need to double check if finalPartPath exists before rename.
+            // Also in our case, based on the spec, delete returns false only 
when finalPartPath
+            // does not exist. When this happens, we need to take action if 
parent of finalPartPath
+            // also does not exist(e.g. the scenario described on 
SPARK-23815), because
+            // FileSystem API spec on rename op says the rename 
dest(finalPartPath) must have
+            // a parent that exists, otherwise we may get unexpected result on 
the rename.
+            fs.mkdirs(finalPartPath.getParent)
+          }
           fs.rename(new Path(stagingDir, part), finalPartPath)
         }
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to