This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 38115cb [SPARK-37501][SQL] CREATE/REPLACE TABLE should qualify location for v2 command 38115cb is described below commit 38115cb907ec93151382260cda327330e78ca340 Author: PengLei <peng.8...@gmail.com> AuthorDate: Wed Dec 1 22:04:35 2021 +0800 [SPARK-37501][SQL] CREATE/REPLACE TABLE should qualify location for v2 command ### What changes were proposed in this pull request? 1. Rename method name `makeQualifiedNamespacePath` -> `makeQualifiedLocationPath` in `CatalogUtils`, so it not only for db/namespace, also for table. 2. Override the method `makeQualifiedLocationPath` to take more types of parameters 3. In `CreateTableExec` add handle the `location` properties convert. 4. Add handle for `Replace table` command. ### Why are the changes needed? keep consistent for v1 and v2, and disscuss at [#comments](https://github.com/apache/spark/pull/34719#discussion_r758156938) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existed test case. Closes #34758 from Peng-Lei/qualify-location. Authored-by: PengLei <peng.8...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/catalyst/catalog/ExternalCatalogUtils.scala | 10 +++++++++- .../spark/sql/catalyst/catalog/SessionCatalog.scala | 2 +- .../datasources/v2/DataSourceV2Strategy.scala | 20 +++++++++++--------- .../spark/sql/connector/DataSourceV2SQLSuite.scala | 13 +++++++------ 4 files changed, 28 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala index 4b0e676..67c57ec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala @@ -259,7 +259,7 @@ object CatalogUtils { new Path(str).toUri } - def makeQualifiedNamespacePath( + def makeQualifiedDBObjectPath( locationUri: URI, warehousePath: String, hadoopConf: Configuration): URI = { @@ -271,6 +271,14 @@ object CatalogUtils { } } + def makeQualifiedDBObjectPath( + warehouse: String, + location: String, + hadoopConf: Configuration): String = { + val nsPath = makeQualifiedDBObjectPath(stringToURI(location), warehouse, hadoopConf) + URIToString(nsPath) + } + def makeQualifiedPath(path: URI, hadoopConf: Configuration): URI = { val hadoopPath = new Path(path) val fs = hadoopPath.getFileSystem(hadoopConf) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 610a683..60f68fb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -252,7 +252,7 @@ class SessionCatalog( } private def makeQualifiedDBPath(locationUri: URI): URI = { - CatalogUtils.makeQualifiedNamespacePath(locationUri, conf.warehousePath, hadoopConf) + CatalogUtils.makeQualifiedDBObjectPath(locationUri, conf.warehousePath, hadoopConf) } def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index f64c1ee..fc44f70 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -94,11 +94,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat session.sharedState.cacheManager.uncacheQuery(session, v2Relation, cascade = true) } - private def makeQualifiedNamespacePath(location: String): String = { - val warehousePath = session.sharedState.conf.get(WAREHOUSE_PATH) - val nsPath = CatalogUtils.makeQualifiedNamespacePath( - CatalogUtils.stringToURI(location), warehousePath, session.sharedState.hadoopConf) - CatalogUtils.URIToString(nsPath) + private def makeQualifiedDBObjectPath(location: String): String = { + CatalogUtils.makeQualifiedDBObjectPath(session.sharedState.conf.get(WAREHOUSE_PATH), + location, session.sharedState.hadoopConf) } override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { @@ -167,8 +165,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case CreateTable(ResolvedDBObjectName(catalog, ident), schema, partitioning, tableSpec, ifNotExists) => + val qualifiedLocation = tableSpec.location.map(makeQualifiedDBObjectPath(_)) CreateTableExec(catalog.asTableCatalog, ident.asIdentifier, schema, - partitioning, tableSpec, ifNotExists) :: Nil + partitioning, tableSpec.copy(location = qualifiedLocation), ifNotExists) :: Nil case CreateTableAsSelect(catalog, ident, parts, query, props, options, ifNotExists) => val propsWithOwner = CatalogV2Util.withDefaultOwnership(props) @@ -186,7 +185,10 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat RefreshTableExec(r.catalog, r.identifier, recacheTable(r)) :: Nil case ReplaceTable(catalog, ident, schema, parts, props, orCreate) => - val propsWithOwner = CatalogV2Util.withDefaultOwnership(props) + val newProps = props.get(TableCatalog.PROP_LOCATION).map { loc => + props + (TableCatalog.PROP_LOCATION -> makeQualifiedDBObjectPath(loc)) + }.getOrElse(props) + val propsWithOwner = CatalogV2Util.withDefaultOwnership(newProps) catalog match { case staging: StagingTableCatalog => AtomicReplaceTableExec( @@ -324,7 +326,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat AlterNamespaceSetPropertiesExec( catalog.asNamespaceCatalog, ns, - Map(SupportsNamespaces.PROP_LOCATION -> makeQualifiedNamespacePath(location))) :: Nil + Map(SupportsNamespaces.PROP_LOCATION -> makeQualifiedDBObjectPath(location))) :: Nil case CommentOnNamespace(ResolvedNamespace(catalog, ns), comment) => AlterNamespaceSetPropertiesExec( @@ -334,7 +336,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case CreateNamespace(ResolvedDBObjectName(catalog, name), ifNotExists, properties) => val finalProperties = properties.get(SupportsNamespaces.PROP_LOCATION).map { loc => - properties + (SupportsNamespaces.PROP_LOCATION -> makeQualifiedNamespacePath(loc)) + properties + (SupportsNamespaces.PROP_LOCATION -> makeQualifiedDBObjectPath(loc)) }.getOrElse(properties) CreateNamespaceExec(catalog.asNamespaceCatalog, name, ifNotExists, finalProperties) :: Nil diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 4769106..949abfe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -126,7 +126,7 @@ class DataSourceV2SQLSuite " PARTITIONED BY (id)" + " TBLPROPERTIES ('bar'='baz')" + " COMMENT 'this is a test table'" + - " LOCATION '/tmp/testcat/table_name'") + " LOCATION 'file:/tmp/testcat/table_name'") val descriptionDf = spark.sql("DESCRIBE TABLE EXTENDED testcat.table_name") assert(descriptionDf.schema.map(field => (field.name, field.dataType)) === Seq( @@ -149,7 +149,7 @@ class DataSourceV2SQLSuite Array("# Detailed Table Information", "", ""), Array("Name", "testcat.table_name", ""), Array("Comment", "this is a test table", ""), - Array("Location", "/tmp/testcat/table_name", ""), + Array("Location", "file:/tmp/testcat/table_name", ""), Array("Provider", "foo", ""), Array(TableCatalog.PROP_OWNER.capitalize, defaultUser, ""), Array("Table Properties", "[bar=baz]", ""))) @@ -1179,8 +1179,9 @@ class DataSourceV2SQLSuite s" ('path'='bar', 'Path'='noop')") val tableCatalog = catalog("testcat").asTableCatalog val identifier = Identifier.of(Array(), "reservedTest") - assert(tableCatalog.loadTable(identifier).properties() - .get(TableCatalog.PROP_LOCATION) == "foo", + val location = tableCatalog.loadTable(identifier).properties() + .get(TableCatalog.PROP_LOCATION) + assert(location.startsWith("file:") && location.endsWith("foo"), "path as a table property should not have side effects") assert(tableCatalog.loadTable(identifier).properties().get("path") == "bar", "path as a table property should not have side effects") @@ -2012,7 +2013,7 @@ class DataSourceV2SQLSuite |COMMENT 'This is a comment' |TBLPROPERTIES ('prop1' = '1', 'prop2' = '2', 'prop3' = 3, 'prop4' = 4) |PARTITIONED BY (a) - |LOCATION '/tmp' + |LOCATION 'file:/tmp' """.stripMargin) val showDDL = getShowCreateDDL(s"SHOW CREATE TABLE $t") assert(showDDL === Array( @@ -2029,7 +2030,7 @@ class DataSourceV2SQLSuite "'via' = '2')", "PARTITIONED BY (a)", "COMMENT 'This is a comment'", - "LOCATION '/tmp'", + "LOCATION 'file:/tmp'", "TBLPROPERTIES(", "'prop1' = '1',", "'prop2' = '2',", --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org