This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8483191dfbb [SPARK-44185][SQL] Fix inconsistent path qualifying 
between catalog and data operations
8483191dfbb is described below

commit 8483191dfbb2842c10d5e8d2409eafed6dfc4abd
Author: Kent Yao <[email protected]>
AuthorDate: Mon Jul 3 09:45:20 2023 +0800

    [SPARK-44185][SQL] Fix inconsistent path qualifying between catalog and 
data operations
    
    ### What changes were proposed in this pull request?
    
    This PR adds a new analysis rule to qualify the table path before execution 
to fix inconsistent path qualifying between catalog and data operations
    
    ### Why are the changes needed?
    
    To fix bugs like
    
    - CREATE TABLE statement with relative LOCATION w/o table schema will infer 
schema from files from the directory relative to the current working directory 
and store the directory relative to the warehouse path.
    - CTAS statement with relative LOCATION cannot assert empty root path as it 
checks the wrong path it will finally use.
    - DataframeWriter does not qualify the path before checking
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes.
     For table statements with a LOCATION clause or 'path' option, when you 
specify a relative path, spark now uses warehouse path to qualify both the 
catalog and data operation. Before this patch, it used warehouse path for 
catalog mostly and CWD for data.
    
    ### How was this patch tested?
    
    new unit tests.
    
    Closes #41733 from yaooqinn/SPARK-44185.
    
    Authored-by: Kent Yao <[email protected]>
    Signed-off-by: Kent Yao <[email protected]>
---
 .../sql/catalyst/catalog/SessionCatalog.scala      |  2 +-
 .../spark/sql/execution/datasources/ddl.scala      |  9 +++
 .../spark/sql/execution/datasources/rules.scala    | 22 ++++++
 .../sql/internal/BaseSessionStateBuilder.scala     |  1 +
 .../execution/datasources/TableLocationSuite.scala | 79 ++++++++++++++++++++++
 .../spark/sql/hive/HiveSessionStateBuilder.scala   |  1 +
 .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 17 ++++-
 .../spark/sql/hive/execution/HiveSerDeSuite.scala  |  2 +-
 8 files changed, 128 insertions(+), 5 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index cd4b4cfaf6b..d18736deda4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -416,7 +416,7 @@ class SessionCatalog(
     }
   }
 
-  private def makeQualifiedTablePath(locationUri: URI, database: String): URI 
= {
+  def makeQualifiedTablePath(locationUri: URI, database: String): URI = {
     if (locationUri.isAbsolute) {
       locationUri
     } else if (new Path(locationUri).isAbsolute) {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index dc5894e42e7..42d6769525c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -59,6 +59,15 @@ case class CreateTable(
   override protected def withNewChildrenInternal(
       newChildren: IndexedSeq[LogicalPlan]): LogicalPlan =
     copy(query = if (query.isDefined) Some(newChildren.head) else None)
+
+  /**
+   * Identifies the underlying table's location is qualified or absent.
+   *
+   * @return true if the location is absolute or absent, false otherwise.
+   */
+  def locationQualifiedOrAbsent: Boolean = {
+    tableDesc.storage.locationUri.map(_.isAbsolute).getOrElse(true)
+  }
 }
 
 /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index af98bb13c73..0b4df18eb7c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -553,3 +553,25 @@ object PreWriteCheck extends (LogicalPlan => Unit) {
     }
   }
 }
+
+/**
+ * A rule to qualify relative locations with warehouse path before it breaks 
in catalog
+ * operation and data reading and writing.
+ *
+ * @param catalog the session catalog
+ */
+case class QualifyLocationWithWarehouse(catalog: SessionCatalog) extends 
Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+    case c @ CreateTableV1(tableDesc, _, _) if !c.locationQualifiedOrAbsent =>
+      val qualifiedTableIdent = catalog.qualifyIdentifier(tableDesc.identifier)
+      val loc = tableDesc.storage.locationUri.get
+      val db = qualifiedTableIdent.database.get
+      val newLocation = catalog.makeQualifiedTablePath(loc, db)
+      val newTable = tableDesc.copy(
+        identifier = qualifiedTableIdent,
+        storage = tableDesc.storage.copy(locationUri = Some(newLocation))
+      )
+      c.copy(tableDesc = newTable)
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
index bc4b308f75d..5543b409d17 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
@@ -195,6 +195,7 @@ abstract class BaseSessionStateBuilder(
 
     override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
       DetectAmbiguousSelfJoin +:
+        QualifyLocationWithWarehouse(catalog) +:
         PreprocessTableCreation(catalog) +:
         PreprocessTableInsertion +:
         DataSourceAnalysis +:
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/TableLocationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/TableLocationSuite.scala
new file mode 100644
index 00000000000..09dc29bb927
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/TableLocationSuite.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+
+class TableLocationSuite extends QueryTest with SharedSparkSession {
+
+  test("SPARK-44185: relative LOCATION in CTAS should be qualified with 
warehouse") {
+    withSQLConf(SQLConf.ALLOW_NON_EMPTY_LOCATION_IN_CTAS.key -> "false") {
+      withTable("ctas1", "ctas2") {
+        sql("CREATE TABLE ctas1 USING parquet AS SELECT 1 AS ID")
+        val m = intercept[AnalysisException] {
+          // 'ctas1' should be qualified with warehouse path for the checker 
as same as
+          // table creation in catalog. Otherwise, the data could be polluted 
accidentally.
+          sql("CREATE TABLE ctas2 USING parquet LOCATION 'ctas1' AS SELECT 1 
AS ID")
+        }.getMessage
+        assert(m.contains("CREATE-TABLE-AS-SELECT cannot create table with 
location to a " +
+          "non-empty directory"))
+      }
+    }
+  }
+
+
+  test("SPARK-44185: relative LOCATION with Append SaveMode shall check the 
qualified path") {
+    withTable("ctas1", "ctas2") {
+      sql("CREATE TABLE ctas1 USING parquet AS SELECT 1L AS ID")
+      spark.range(10)
+        .write
+        .mode("append")
+        .option("path", "ctas1")
+        .format("parquet")
+        .saveAsTable("ctas2")
+      checkAnswer(spark.table("ctas1"), spark.table("ctas2"))
+    }
+  }
+
+  test("SPARK-44185: relative LOCATION in CREATE TABLE shall lookup the path 
qualified with" +
+    " warehouse for consistency") {
+    withTable("ct2", "ct1") {
+      try {
+        sql("CREATE TABLE ct1 USING parquet SELECT 1 AS ID")
+        // TODO(SPARK-44185): INSERT OVERWRITE DIRECTORY shall be qualified 
with current working
+        //   directory(AS-IS) or with warehouse path?
+        sql("INSERT OVERWRITE DIRECTORY 'ct1' USING parquet " + "SELECT 1 AS 
ID1, 2 AS ID2")
+
+        // When schema is absent, ct2's infered from data files. Here table 
'ct2' should not
+        // use 'current working directory'/ct1 to infer and `warehouse 
path`/xx..x/ct1 to read
+        // data, which shall be consistent to each other.
+        sql("CREATE TABLE ct2 USING parquet LOCATION 'ct1'")
+        checkAnswer(spark.table("ct1"), spark.table("ct2"))
+      } finally {
+        val path = new Path("ct1")
+        val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
+        val qualified = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
+        fs.delete(qualified, true)
+      }
+    }
+  }
+}
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
index a3ee3e0fc33..2d0bcdff071 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
@@ -98,6 +98,7 @@ class HiveSessionStateBuilder(
       DetectAmbiguousSelfJoin +:
         new DetermineTableStats(session) +:
         RelationConversions(catalog) +:
+        QualifyLocationWithWarehouse(catalog) +:
         PreprocessTableCreation(catalog) +:
         PreprocessTableInsertion +:
         DataSourceAnalysis +:
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 6fd0d971a5e..0f1a32d30b8 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -525,10 +525,21 @@ class MetastoreDataSourcesSuite extends QueryTest
           assert(table("createdJsonTable").schema === df.schema)
           checkAnswer(sql("SELECT * FROM createdJsonTable"), df)
 
-          val e = intercept[AnalysisException] {
-              sparkSession.catalog.createTable("createdJsonTable", 
jsonFilePath.toString)
+          Seq("true", "false").foreach { caseSensitive =>
+            withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive) {
+              val e = intercept[AnalysisException] {
+                sparkSession.catalog.createTable("createdJsonTable", 
tempPath.toString)
+              }
+              val expectedTableName = s"`$SESSION_CATALOG_NAME`.`default`." + {
+                if (caseSensitive.toBoolean) {
+                  "`createdJsonTable`"
+                } else {
+                  "`createdjsontable`"
+                }
+              }
+              checkErrorTableAlreadyExists(e, expectedTableName)
             }
-          checkErrorTableAlreadyExists(e, 
s"`$SESSION_CATALOG_NAME`.`default`.`createdJsonTable`")
+          }
         }
 
         // Data should not be deleted.
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeSuite.scala
index 160127a4b61..e92dee09cc1 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeSuite.scala
@@ -145,7 +145,7 @@ class HiveSerDeSuite extends HiveComparisonTest with 
PlanTest with BeforeAndAfte
       .add("id", "int")
       .add("name", "string", nullable = true, comment = "blabla"))
     assert(table.provider == Some(DDLUtils.HIVE_PROVIDER))
-    assert(table.storage.locationUri == Some(new URI("/tmp/file")))
+    assert(table.storage.locationUri == Some(new URI("file:///tmp/file")))
     assert(table.storage.properties == Map("my_prop" -> "1"))
     assert(table.comment == Some("BLABLA"))
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to