This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8483191dfbb [SPARK-44185][SQL] Fix inconsistent path qualifying
between catalog and data operations
8483191dfbb is described below
commit 8483191dfbb2842c10d5e8d2409eafed6dfc4abd
Author: Kent Yao <[email protected]>
AuthorDate: Mon Jul 3 09:45:20 2023 +0800
[SPARK-44185][SQL] Fix inconsistent path qualifying between catalog and
data operations
### What changes were proposed in this pull request?
This PR adds a new analysis rule to qualify the table path before execution
to fix inconsistent path qualifying between catalog and data operations
### Why are the changes needed?
To fix bugs like
- CREATE TABLE statement with relative LOCATION w/o table schema will infer
schema from files from the directory relative to the current working directory
and store the directory relative to the warehouse path.
- CTAS statement with relative LOCATION cannot assert empty root path as it
checks the wrong path it will finally use.
- DataframeWriter does not qualify the path before checking
### Does this PR introduce _any_ user-facing change?
yes.
For table statements with a LOCATION clause or 'path' option, when you
specify a relative path, spark now uses warehouse path to qualify both the
catalog and data operation. Before this patch, it used warehouse path for
catalog mostly and CWD for data.
### How was this patch tested?
new unit tests.
Closes #41733 from yaooqinn/SPARK-44185.
Authored-by: Kent Yao <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
.../sql/catalyst/catalog/SessionCatalog.scala | 2 +-
.../spark/sql/execution/datasources/ddl.scala | 9 +++
.../spark/sql/execution/datasources/rules.scala | 22 ++++++
.../sql/internal/BaseSessionStateBuilder.scala | 1 +
.../execution/datasources/TableLocationSuite.scala | 79 ++++++++++++++++++++++
.../spark/sql/hive/HiveSessionStateBuilder.scala | 1 +
.../spark/sql/hive/MetastoreDataSourcesSuite.scala | 17 ++++-
.../spark/sql/hive/execution/HiveSerDeSuite.scala | 2 +-
8 files changed, 128 insertions(+), 5 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index cd4b4cfaf6b..d18736deda4 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -416,7 +416,7 @@ class SessionCatalog(
}
}
- private def makeQualifiedTablePath(locationUri: URI, database: String): URI
= {
+ def makeQualifiedTablePath(locationUri: URI, database: String): URI = {
if (locationUri.isAbsolute) {
locationUri
} else if (new Path(locationUri).isAbsolute) {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index dc5894e42e7..42d6769525c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -59,6 +59,15 @@ case class CreateTable(
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[LogicalPlan]): LogicalPlan =
copy(query = if (query.isDefined) Some(newChildren.head) else None)
+
+ /**
+ * Identifies the underlying table's location is qualified or absent.
+ *
+ * @return true if the location is absolute or absent, false otherwise.
+ */
+ def locationQualifiedOrAbsent: Boolean = {
+ tableDesc.storage.locationUri.map(_.isAbsolute).getOrElse(true)
+ }
}
/**
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index af98bb13c73..0b4df18eb7c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -553,3 +553,25 @@ object PreWriteCheck extends (LogicalPlan => Unit) {
}
}
}
+
+/**
+ * A rule to qualify relative locations with warehouse path before it breaks
in catalog
+ * operation and data reading and writing.
+ *
+ * @param catalog the session catalog
+ */
+case class QualifyLocationWithWarehouse(catalog: SessionCatalog) extends
Rule[LogicalPlan] {
+
+ override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+ case c @ CreateTableV1(tableDesc, _, _) if !c.locationQualifiedOrAbsent =>
+ val qualifiedTableIdent = catalog.qualifyIdentifier(tableDesc.identifier)
+ val loc = tableDesc.storage.locationUri.get
+ val db = qualifiedTableIdent.database.get
+ val newLocation = catalog.makeQualifiedTablePath(loc, db)
+ val newTable = tableDesc.copy(
+ identifier = qualifiedTableIdent,
+ storage = tableDesc.storage.copy(locationUri = Some(newLocation))
+ )
+ c.copy(tableDesc = newTable)
+ }
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
index bc4b308f75d..5543b409d17 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
@@ -195,6 +195,7 @@ abstract class BaseSessionStateBuilder(
override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
DetectAmbiguousSelfJoin +:
+ QualifyLocationWithWarehouse(catalog) +:
PreprocessTableCreation(catalog) +:
PreprocessTableInsertion +:
DataSourceAnalysis +:
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/TableLocationSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/TableLocationSuite.scala
new file mode 100644
index 00000000000..09dc29bb927
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/TableLocationSuite.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+
+class TableLocationSuite extends QueryTest with SharedSparkSession {
+
+ test("SPARK-44185: relative LOCATION in CTAS should be qualified with
warehouse") {
+ withSQLConf(SQLConf.ALLOW_NON_EMPTY_LOCATION_IN_CTAS.key -> "false") {
+ withTable("ctas1", "ctas2") {
+ sql("CREATE TABLE ctas1 USING parquet AS SELECT 1 AS ID")
+ val m = intercept[AnalysisException] {
+ // 'ctas1' should be qualified with warehouse path for the checker
as same as
+ // table creation in catalog. Otherwise, the data could be polluted
accidentally.
+ sql("CREATE TABLE ctas2 USING parquet LOCATION 'ctas1' AS SELECT 1
AS ID")
+ }.getMessage
+ assert(m.contains("CREATE-TABLE-AS-SELECT cannot create table with
location to a " +
+ "non-empty directory"))
+ }
+ }
+ }
+
+
+ test("SPARK-44185: relative LOCATION with Append SaveMode shall check the
qualified path") {
+ withTable("ctas1", "ctas2") {
+ sql("CREATE TABLE ctas1 USING parquet AS SELECT 1L AS ID")
+ spark.range(10)
+ .write
+ .mode("append")
+ .option("path", "ctas1")
+ .format("parquet")
+ .saveAsTable("ctas2")
+ checkAnswer(spark.table("ctas1"), spark.table("ctas2"))
+ }
+ }
+
+ test("SPARK-44185: relative LOCATION in CREATE TABLE shall lookup the path
qualified with" +
+ " warehouse for consistency") {
+ withTable("ct2", "ct1") {
+ try {
+ sql("CREATE TABLE ct1 USING parquet SELECT 1 AS ID")
+ // TODO(SPARK-44185): INSERT OVERWRITE DIRECTORY shall be qualified
with current working
+ // directory(AS-IS) or with warehouse path?
+ sql("INSERT OVERWRITE DIRECTORY 'ct1' USING parquet " + "SELECT 1 AS
ID1, 2 AS ID2")
+
+ // When schema is absent, ct2's infered from data files. Here table
'ct2' should not
+ // use 'current working directory'/ct1 to infer and `warehouse
path`/xx..x/ct1 to read
+ // data, which shall be consistent to each other.
+ sql("CREATE TABLE ct2 USING parquet LOCATION 'ct1'")
+ checkAnswer(spark.table("ct1"), spark.table("ct2"))
+ } finally {
+ val path = new Path("ct1")
+ val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
+ val qualified = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
+ fs.delete(qualified, true)
+ }
+ }
+ }
+}
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
index a3ee3e0fc33..2d0bcdff071 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
@@ -98,6 +98,7 @@ class HiveSessionStateBuilder(
DetectAmbiguousSelfJoin +:
new DetermineTableStats(session) +:
RelationConversions(catalog) +:
+ QualifyLocationWithWarehouse(catalog) +:
PreprocessTableCreation(catalog) +:
PreprocessTableInsertion +:
DataSourceAnalysis +:
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 6fd0d971a5e..0f1a32d30b8 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -525,10 +525,21 @@ class MetastoreDataSourcesSuite extends QueryTest
assert(table("createdJsonTable").schema === df.schema)
checkAnswer(sql("SELECT * FROM createdJsonTable"), df)
- val e = intercept[AnalysisException] {
- sparkSession.catalog.createTable("createdJsonTable",
jsonFilePath.toString)
+ Seq("true", "false").foreach { caseSensitive =>
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive) {
+ val e = intercept[AnalysisException] {
+ sparkSession.catalog.createTable("createdJsonTable",
tempPath.toString)
+ }
+ val expectedTableName = s"`$SESSION_CATALOG_NAME`.`default`." + {
+ if (caseSensitive.toBoolean) {
+ "`createdJsonTable`"
+ } else {
+ "`createdjsontable`"
+ }
+ }
+ checkErrorTableAlreadyExists(e, expectedTableName)
}
- checkErrorTableAlreadyExists(e,
s"`$SESSION_CATALOG_NAME`.`default`.`createdJsonTable`")
+ }
}
// Data should not be deleted.
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeSuite.scala
index 160127a4b61..e92dee09cc1 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeSuite.scala
@@ -145,7 +145,7 @@ class HiveSerDeSuite extends HiveComparisonTest with
PlanTest with BeforeAndAfte
.add("id", "int")
.add("name", "string", nullable = true, comment = "blabla"))
assert(table.provider == Some(DDLUtils.HIVE_PROVIDER))
- assert(table.storage.locationUri == Some(new URI("/tmp/file")))
+ assert(table.storage.locationUri == Some(new URI("file:///tmp/file")))
assert(table.storage.properties == Map("my_prop" -> "1"))
assert(table.comment == Some("BLABLA"))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]