This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 484db68  [SPARK-35106][CORE][SQL] Avoid failing rename caused by 
destination directory not exist
484db68 is described below

commit 484db68d6901a98d4dd82411def2534f12c14f29
Author: Yuzhou Sun <yuzho...@amazon.com>
AuthorDate: Wed May 19 15:46:27 2021 +0800

    [SPARK-35106][CORE][SQL] Avoid failing rename caused by destination 
directory not exist
    
    ### What changes were proposed in this pull request?
    
    1. In HadoopMapReduceCommitProtocol, create parent directory before 
renaming custom partition path staging files
    2. In InMemoryCatalog and HiveExternalCatalog, create new partition 
directory before renaming old partition path
    3. Check return value of FileSystem#rename, if false, throw exception to 
avoid silent data loss cause by rename failure
    4. Change DebugFilesystem#rename behavior to make it match HDFS's behavior 
(return false without rename when dst parent directory not exist)
    
    ### Why are the changes needed?
    
    Depends on FileSystem#rename implementation, when destination directory 
does not exist, file system may
    1. return false without renaming file nor throwing exception (e.g. HDFS), or
    2. create destination directory, rename files, and return true (e.g. 
LocalFileSystem)
    
    In the first case above, renames in HadoopMapReduceCommitProtocol for 
custom partition path will fail silently if the destination partition path does 
not exist. Failed renames can happen when
    1. dynamicPartitionOverwrite == true, the custom partition path directories 
are deleted by the job before the rename; or
    2. the custom partition path directories do not exist before the job; or
    3. something else is wrong when file system handle `rename`
    
    The renames in MemoryCatalog and HiveExternalCatalog for partition renaming 
also have similar issue.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Modified DebugFilesystem#rename, and added new unit tests.
    
    Without the fix in src code, five InsertSuite tests and one 
AlterTableRenamePartitionSuite test failed:
    InsertSuite.SPARK-20236: dynamic partition overwrite with custom partition 
path (existing test with modified FS)
    ```
    == Results ==
    !== Correct Answer - 1 ==   == Spark Answer - 0 ==
    struct<>                   struct<>
    ![2,1,1]
    ```
    
    InsertSuite.SPARK-35106: insert overwrite with custom partition path
    ```
    == Results ==
    !== Correct Answer - 1 ==   == Spark Answer - 0 ==
    struct<>                   struct<>
    ![2,1,1]
    ```
    
    InsertSuite.SPARK-35106: dynamic partition overwrite with custom partition 
path
    ```
    == Results ==
    !== Correct Answer - 2 ==   == Spark Answer - 1 ==
    !struct<>                   struct<i:int,part1:int,part2:int>
     [1,1,1]                    [1,1,1]
    ![1,1,2]
    ```
    
    InsertSuite.SPARK-35106: Throw exception when rename custom partition paths 
returns false
    ```
    Expected exception org.apache.spark.SparkException to be thrown, but no 
exception was thrown
    ```
    
    InsertSuite.SPARK-35106: Throw exception when rename dynamic partition 
paths returns false
    ```
    Expected exception org.apache.spark.SparkException to be thrown, but no 
exception was thrown
    ```
    
    AlterTableRenamePartitionSuite.ALTER TABLE .. RENAME PARTITION V1: multi 
part partition (existing test with modified FS)
    ```
    == Results ==
    !== Correct Answer - 1 ==   == Spark Answer - 0 ==
     struct<>                   struct<>
    ![3,123,3]
    ```
    
    Closes #32530 from YuzhouSun/SPARK-35106.
    
    Authored-by: Yuzhou Sun <yuzho...@amazon.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit a72d05c7e632fbb0d8a6082c3cacdf61f36518b4)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../io/HadoopMapReduceCommitProtocol.scala         |  19 +++-
 .../scala/org/apache/spark/DebugFilesystem.scala   |  14 ++-
 .../sql/catalyst/catalog/InMemoryCatalog.scala     |   6 +-
 .../org/apache/spark/sql/sources/InsertSuite.scala | 116 ++++++++++++++++++++-
 .../spark/sql/hive/HiveExternalCatalog.scala       |   6 +-
 5 files changed, 151 insertions(+), 10 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 
b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
index 30f9a65..c061d61 100644
--- 
a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
+++ 
b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
@@ -188,13 +188,18 @@ class HadoopMapReduceCommitProtocol(
 
       val filesToMove = allAbsPathFiles.foldLeft(Map[String, String]())(_ ++ _)
       logDebug(s"Committing files staged for absolute locations $filesToMove")
+      val absParentPaths = filesToMove.values.map(new Path(_).getParent).toSet
       if (dynamicPartitionOverwrite) {
-        val absPartitionPaths = filesToMove.values.map(new 
Path(_).getParent).toSet
-        logDebug(s"Clean up absolute partition directories for overwriting: 
$absPartitionPaths")
-        absPartitionPaths.foreach(fs.delete(_, true))
+        logDebug(s"Clean up absolute partition directories for overwriting: 
$absParentPaths")
+        absParentPaths.foreach(fs.delete(_, true))
       }
+      logDebug(s"Create absolute parent directories: $absParentPaths")
+      absParentPaths.foreach(fs.mkdirs)
       for ((src, dst) <- filesToMove) {
-        fs.rename(new Path(src), new Path(dst))
+        if (!fs.rename(new Path(src), new Path(dst))) {
+          throw new IOException(s"Failed to rename $src to $dst when 
committing files staged for " +
+            s"absolute locations")
+        }
       }
 
       if (dynamicPartitionOverwrite) {
@@ -213,7 +218,11 @@ class HadoopMapReduceCommitProtocol(
             // a parent that exists, otherwise we may get unexpected result on 
the rename.
             fs.mkdirs(finalPartPath.getParent)
           }
-          fs.rename(new Path(stagingDir, part), finalPartPath)
+          val stagingPartPath = new Path(stagingDir, part)
+          if (!fs.rename(stagingPartPath, finalPartPath)) {
+            throw new IOException(s"Failed to rename $stagingPartPath to 
$finalPartPath when " +
+              s"committing files staged for overwriting dynamic partitions")
+          }
         }
       }
 
diff --git a/core/src/test/scala/org/apache/spark/DebugFilesystem.scala 
b/core/src/test/scala/org/apache/spark/DebugFilesystem.scala
index 1d3e28b..8f22080 100644
--- a/core/src/test/scala/org/apache/spark/DebugFilesystem.scala
+++ b/core/src/test/scala/org/apache/spark/DebugFilesystem.scala
@@ -57,8 +57,14 @@ object DebugFilesystem extends Logging {
 }
 
 /**
- * DebugFilesystem wraps file open calls to track all open connections. This 
can be used in tests
- * to check that connections are not leaked.
+ * DebugFilesystem wraps
+ *     1) file open calls to track all open connections. This can be used in 
tests to check that
+ *        connections are not leaked;
+ *     2) rename calls to return false when destination's parent path does not 
exist. When
+ *        destination parent does not exist, LocalFileSystem uses 
FileUtil#copy to copy the
+ *        file and returns true if succeed, while many other hadoop file 
systems (e.g. HDFS, S3A)
+ *        return false without renaming any file. This helps to test that 
Spark can work with the
+ *        latter file systems.
  */
 // TODO(ekl) we should consider always interposing this to expose num open 
conns as a metric
 class DebugFilesystem extends LocalFileSystem {
@@ -120,4 +126,8 @@ class DebugFilesystem extends LocalFileSystem {
       override def hashCode(): Int = wrapped.hashCode()
     }
   }
+
+  override def rename(src: Path, dst: Path): Boolean = {
+    exists(dst.getParent) && super.rename(src, dst)
+  }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 08b54fc..5809751 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -499,7 +499,11 @@ class InMemoryCatalog(
           newSpec, partitionColumnNames, tablePath)
         try {
           val fs = tablePath.getFileSystem(hadoopConfig)
-          fs.rename(oldPartPath, newPartPath)
+          fs.mkdirs(newPartPath)
+          if(!fs.rename(oldPartPath, newPartPath)) {
+            throw new IOException(s"Renaming partition path from $oldPartPath 
to " +
+              s"$newPartPath returned false")
+          }
         } catch {
           case e: IOException =>
             throw new SparkException(s"Unable to rename partition path 
$oldPartPath", e)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 4513ef6..1d0a57a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.sources
 
-import java.io.File
+import java.io.{File, IOException}
 import java.sql.Date
 
 import org.apache.hadoop.fs.{FileAlreadyExistsException, FSDataOutputStream, 
Path, RawLocalFileSystem}
@@ -950,6 +950,110 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
       checkAnswer(spark.table("t2"), Nil)
     }
   }
+
+  test("SPARK-35106: insert overwrite with custom partition path") {
+    withTempPath { path =>
+      withTable("t") {
+      sql(
+        """
+          |create table t(i int, part1 int, part2 int) using parquet
+          |partitioned by (part1, part2)
+        """.stripMargin)
+
+        sql(s"alter table t add partition(part1=1, part2=1) location 
'${path.getAbsolutePath}'")
+        sql(s"insert into t partition(part1=1, part2=1) select 1")
+        checkAnswer(spark.table("t"), Row(1, 1, 1))
+
+        sql("insert overwrite table t partition(part1=1, part2=1) select 2")
+        checkAnswer(spark.table("t"), Row(2, 1, 1))
+
+        sql("insert overwrite table t partition(part1=2, part2) select 2, 2")
+        checkAnswer(spark.table("t"), Row(2, 1, 1) :: Row(2, 2, 2) :: Nil)
+
+        sql("insert overwrite table t partition(part1=1, part2=2) select 3")
+        checkAnswer(spark.table("t"), Row(2, 1, 1) :: Row(2, 2, 2) :: Row(3, 
1, 2) :: Nil)
+
+        sql("insert overwrite table t partition(part1=1, part2) select 4, 1")
+        checkAnswer(spark.table("t"), Row(4, 1, 1) :: Row(2, 2, 2) :: Nil)
+      }
+    }
+  }
+
+  test("SPARK-35106: dynamic partition overwrite with custom partition path") {
+    withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> 
PartitionOverwriteMode.DYNAMIC.toString) {
+      withTempPath { path =>
+        withTable("t") {
+          sql(
+            """
+              |create table t(i int, part1 int, part2 int) using parquet
+              |partitioned by (part1, part2)
+            """.stripMargin)
+
+          sql(s"insert into t partition(part1=1, part2=1) select 1")
+          checkAnswer(spark.table("t"), Row(1, 1, 1))
+
+          sql(s"alter table t add partition(part1=1, part2=2) location 
'${path.getAbsolutePath}'")
+
+          // dynamic partition overwrite to empty custom partition
+          sql(s"insert overwrite table t partition(part1=1, part2=2) select 1")
+          checkAnswer(spark.table("t"), Row(1, 1, 1) :: Row(1, 1, 2) :: Nil)
+
+          // dynamic partition overwrite to non-empty custom partition
+          sql("insert overwrite table t partition(part1=1, part2=2) select 2")
+          checkAnswer(spark.table("t"), Row(1, 1, 1) :: Row(2, 1, 2) :: Nil)
+        }
+      }
+    }
+  }
+
+  test("SPARK-35106: Throw exception when rename custom partition paths 
returns false") {
+    withSQLConf(
+      "fs.file.impl" -> 
classOf[RenameFromSparkStagingToFinalDirAlwaysTurnsFalseFilesystem].getName,
+      "fs.file.impl.disable.cache" -> "true") {
+      withTempPath { path =>
+        withTable("t") {
+          sql(
+            """
+              |create table t(i int, part1 int, part2 int) using parquet
+              |partitioned by (part1, part2)
+            """.stripMargin)
+
+          sql(s"alter table t add partition(part1=1, part2=1) location 
'${path.getAbsolutePath}'")
+
+          val e = intercept[SparkException] {
+            sql(s"insert into t partition(part1=1, part2=1) select 1")
+          }.getCause
+          assert(e.isInstanceOf[IOException])
+          assert(e.getMessage.contains("Failed to rename"))
+          assert(e.getMessage.contains("when committing files staged for 
absolute location"))
+        }
+      }
+    }
+  }
+
+  test("SPARK-35106: Throw exception when rename dynamic partition paths 
returns false") {
+    withSQLConf(
+      "fs.file.impl" -> 
classOf[RenameFromSparkStagingToFinalDirAlwaysTurnsFalseFilesystem].getName,
+      "fs.file.impl.disable.cache" -> "true",
+      SQLConf.PARTITION_OVERWRITE_MODE.key -> 
PartitionOverwriteMode.DYNAMIC.toString) {
+
+      withTable("t") {
+        sql(
+          """
+            |create table t(i int, part1 int, part2 int) using parquet
+            |partitioned by (part1, part2)
+          """.stripMargin)
+
+        val e = intercept[SparkException] {
+          sql(s"insert overwrite table t partition(part1, part2) values (1, 1, 
1)")
+        }.getCause
+        assert(e.isInstanceOf[IOException])
+        assert(e.getMessage.contains("Failed to rename"))
+        assert(e.getMessage.contains(
+          "when committing files staged for overwriting dynamic partitions"))
+      }
+    }
+  }
 }
 
 class FileExistingTestFileSystem extends RawLocalFileSystem {
@@ -962,3 +1066,13 @@ class FileExistingTestFileSystem extends 
RawLocalFileSystem {
     throw new FileAlreadyExistsException(s"${f.toString} already exists")
   }
 }
+
+class RenameFromSparkStagingToFinalDirAlwaysTurnsFalseFilesystem extends 
RawLocalFileSystem {
+  override def rename(src: Path, dst: Path): Boolean = {
+    (!isSparkStagingDir(src) || isSparkStagingDir(dst)) && super.rename(src, 
dst)
+  }
+
+  private def isSparkStagingDir(path: Path): Boolean = {
+    path.toString.contains(".spark-staging-")
+  }
+}
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 46ebcb7..019718c 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -1081,7 +1081,11 @@ private[spark] class HiveExternalCatalog(conf: 
SparkConf, hadoopConf: Configurat
         // scalastyle:on caselocale
         val actualPartitionPath = new Path(currentFullPath, 
actualPartitionString)
         try {
-          fs.rename(actualPartitionPath, expectedPartitionPath)
+          fs.mkdirs(expectedPartitionPath)
+          if(!fs.rename(actualPartitionPath, expectedPartitionPath)) {
+            throw new IOException(s"Renaming partition path from 
$actualPartitionPath to " +
+              s"$expectedPartitionPath returned false")
+          }
         } catch {
           case e: IOException =>
             throw new SparkException("Unable to rename partition path from " +

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to