This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 38115cb  [SPARK-37501][SQL] CREATE/REPLACE TABLE should qualify 
location for v2 command
38115cb is described below

commit 38115cb907ec93151382260cda327330e78ca340
Author: PengLei <peng.8...@gmail.com>
AuthorDate: Wed Dec 1 22:04:35 2021 +0800

    [SPARK-37501][SQL] CREATE/REPLACE TABLE should qualify location for v2 
command
    
    ### What changes were proposed in this pull request?
    1. Rename method name `makeQualifiedNamespacePath` -> 
`makeQualifiedLocationPath` in `CatalogUtils`, so it not only for db/namespace, 
also for table.
    2. Override the method `makeQualifiedLocationPath` to take more types of 
parameters
    3. In `CreateTableExec` add handle the `location` properties convert.
    4. Add handle for `Replace table` command.
    
    ### Why are the changes needed?
    keep consistent for v1 and v2, and disscuss at 
[#comments](https://github.com/apache/spark/pull/34719#discussion_r758156938)
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    existed test case.
    
    Closes #34758 from Peng-Lei/qualify-location.
    
    Authored-by: PengLei <peng.8...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../sql/catalyst/catalog/ExternalCatalogUtils.scala  | 10 +++++++++-
 .../spark/sql/catalyst/catalog/SessionCatalog.scala  |  2 +-
 .../datasources/v2/DataSourceV2Strategy.scala        | 20 +++++++++++---------
 .../spark/sql/connector/DataSourceV2SQLSuite.scala   | 13 +++++++------
 4 files changed, 28 insertions(+), 17 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
index 4b0e676..67c57ec 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
@@ -259,7 +259,7 @@ object CatalogUtils {
     new Path(str).toUri
   }
 
-  def makeQualifiedNamespacePath(
+  def makeQualifiedDBObjectPath(
       locationUri: URI,
       warehousePath: String,
       hadoopConf: Configuration): URI = {
@@ -271,6 +271,14 @@ object CatalogUtils {
     }
   }
 
+  def makeQualifiedDBObjectPath(
+      warehouse: String,
+      location: String,
+      hadoopConf: Configuration): String = {
+    val nsPath = makeQualifiedDBObjectPath(stringToURI(location), warehouse, 
hadoopConf)
+    URIToString(nsPath)
+  }
+
   def makeQualifiedPath(path: URI, hadoopConf: Configuration): URI = {
     val hadoopPath = new Path(path)
     val fs = hadoopPath.getFileSystem(hadoopConf)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 610a683..60f68fb 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -252,7 +252,7 @@ class SessionCatalog(
   }
 
   private def makeQualifiedDBPath(locationUri: URI): URI = {
-    CatalogUtils.makeQualifiedNamespacePath(locationUri, conf.warehousePath, 
hadoopConf)
+    CatalogUtils.makeQualifiedDBObjectPath(locationUri, conf.warehousePath, 
hadoopConf)
   }
 
   def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): 
Unit = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index f64c1ee..fc44f70 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -94,11 +94,9 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
     session.sharedState.cacheManager.uncacheQuery(session, v2Relation, cascade 
= true)
   }
 
-  private def makeQualifiedNamespacePath(location: String): String = {
-    val warehousePath = session.sharedState.conf.get(WAREHOUSE_PATH)
-    val nsPath = CatalogUtils.makeQualifiedNamespacePath(
-      CatalogUtils.stringToURI(location), warehousePath, 
session.sharedState.hadoopConf)
-    CatalogUtils.URIToString(nsPath)
+  private def makeQualifiedDBObjectPath(location: String): String = {
+    
CatalogUtils.makeQualifiedDBObjectPath(session.sharedState.conf.get(WAREHOUSE_PATH),
+      location, session.sharedState.hadoopConf)
   }
 
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
@@ -167,8 +165,9 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
 
     case CreateTable(ResolvedDBObjectName(catalog, ident), schema, 
partitioning,
         tableSpec, ifNotExists) =>
+      val qualifiedLocation = 
tableSpec.location.map(makeQualifiedDBObjectPath(_))
       CreateTableExec(catalog.asTableCatalog, ident.asIdentifier, schema,
-        partitioning, tableSpec, ifNotExists) :: Nil
+        partitioning, tableSpec.copy(location = qualifiedLocation), 
ifNotExists) :: Nil
 
     case CreateTableAsSelect(catalog, ident, parts, query, props, options, 
ifNotExists) =>
       val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
@@ -186,7 +185,10 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
       RefreshTableExec(r.catalog, r.identifier, recacheTable(r)) :: Nil
 
     case ReplaceTable(catalog, ident, schema, parts, props, orCreate) =>
-      val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
+      val newProps = props.get(TableCatalog.PROP_LOCATION).map { loc =>
+        props + (TableCatalog.PROP_LOCATION -> makeQualifiedDBObjectPath(loc))
+      }.getOrElse(props)
+      val propsWithOwner = CatalogV2Util.withDefaultOwnership(newProps)
       catalog match {
         case staging: StagingTableCatalog =>
           AtomicReplaceTableExec(
@@ -324,7 +326,7 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
       AlterNamespaceSetPropertiesExec(
         catalog.asNamespaceCatalog,
         ns,
-        Map(SupportsNamespaces.PROP_LOCATION -> 
makeQualifiedNamespacePath(location))) :: Nil
+        Map(SupportsNamespaces.PROP_LOCATION -> 
makeQualifiedDBObjectPath(location))) :: Nil
 
     case CommentOnNamespace(ResolvedNamespace(catalog, ns), comment) =>
       AlterNamespaceSetPropertiesExec(
@@ -334,7 +336,7 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
 
     case CreateNamespace(ResolvedDBObjectName(catalog, name), ifNotExists, 
properties) =>
       val finalProperties = 
properties.get(SupportsNamespaces.PROP_LOCATION).map { loc =>
-        properties + (SupportsNamespaces.PROP_LOCATION -> 
makeQualifiedNamespacePath(loc))
+        properties + (SupportsNamespaces.PROP_LOCATION -> 
makeQualifiedDBObjectPath(loc))
       }.getOrElse(properties)
       CreateNamespaceExec(catalog.asNamespaceCatalog, name, ifNotExists, 
finalProperties) :: Nil
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 4769106..949abfe 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -126,7 +126,7 @@ class DataSourceV2SQLSuite
       " PARTITIONED BY (id)" +
       " TBLPROPERTIES ('bar'='baz')" +
       " COMMENT 'this is a test table'" +
-      " LOCATION '/tmp/testcat/table_name'")
+      " LOCATION 'file:/tmp/testcat/table_name'")
     val descriptionDf = spark.sql("DESCRIBE TABLE EXTENDED testcat.table_name")
     assert(descriptionDf.schema.map(field => (field.name, field.dataType))
       === Seq(
@@ -149,7 +149,7 @@ class DataSourceV2SQLSuite
       Array("# Detailed Table Information", "", ""),
       Array("Name", "testcat.table_name", ""),
       Array("Comment", "this is a test table", ""),
-      Array("Location", "/tmp/testcat/table_name", ""),
+      Array("Location", "file:/tmp/testcat/table_name", ""),
       Array("Provider", "foo", ""),
       Array(TableCatalog.PROP_OWNER.capitalize, defaultUser, ""),
       Array("Table Properties", "[bar=baz]", "")))
@@ -1179,8 +1179,9 @@ class DataSourceV2SQLSuite
               s" ('path'='bar', 'Path'='noop')")
             val tableCatalog = catalog("testcat").asTableCatalog
             val identifier = Identifier.of(Array(), "reservedTest")
-            assert(tableCatalog.loadTable(identifier).properties()
-              .get(TableCatalog.PROP_LOCATION) == "foo",
+            val location = tableCatalog.loadTable(identifier).properties()
+              .get(TableCatalog.PROP_LOCATION)
+            assert(location.startsWith("file:") && location.endsWith("foo"),
               "path as a table property should not have side effects")
             assert(tableCatalog.loadTable(identifier).properties().get("path") 
== "bar",
               "path as a table property should not have side effects")
@@ -2012,7 +2013,7 @@ class DataSourceV2SQLSuite
            |COMMENT 'This is a comment'
            |TBLPROPERTIES ('prop1' = '1', 'prop2' = '2', 'prop3' = 3, 'prop4' 
= 4)
            |PARTITIONED BY (a)
-           |LOCATION '/tmp'
+           |LOCATION 'file:/tmp'
         """.stripMargin)
       val showDDL = getShowCreateDDL(s"SHOW CREATE TABLE $t")
       assert(showDDL === Array(
@@ -2029,7 +2030,7 @@ class DataSourceV2SQLSuite
         "'via' = '2')",
         "PARTITIONED BY (a)",
         "COMMENT 'This is a comment'",
-        "LOCATION '/tmp'",
+        "LOCATION 'file:/tmp'",
         "TBLPROPERTIES(",
         "'prop1' = '1',",
         "'prop2' = '2',",

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to