This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new d50a3ce4a42 [SPARK-43203][SQL][3.4] Move all Drop Table case to
DataSource V2
d50a3ce4a42 is described below
commit d50a3ce4a42ce95d3a7eb4c1a4db4a77a070da13
Author: Jia Fan <[email protected]>
AuthorDate: Fri Sep 8 08:22:17 2023 -0700
[SPARK-43203][SQL][3.4] Move all Drop Table case to DataSource V2
### What changes were proposed in this pull request?
cherry pick #41348 and #42056 , this a bug fixed should be included in 3.4.2
### Why are the changes needed?
Fix DROP table behavior in session catalog
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Tested by:
- V2 table catalog tests:
`org.apache.spark.sql.execution.command.v2.DropTableSuite`
- V1 table catalog tests:
`org.apache.spark.sql.execution.command.v1.DropTableSuiteBase`
Closes #41765 from Hisoka-X/move_drop_table_v2_to_3.4.2.
Lead-authored-by: Jia Fan <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../catalyst/analysis/ResolveSessionCatalog.scala | 2 +-
.../apache/spark/sql/execution/CacheManager.scala | 6 ++++
.../datasources/v2/V2SessionCatalog.scala | 42 +++++++++++++++++-----
.../execution/command/PlanResolutionSuite.scala | 17 ++++-----
.../hive/execution/command/DropTableSuite.scala | 2 +-
5 files changed, 50 insertions(+), 19 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index b2b35b40492..e5a36394a44 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -210,7 +210,7 @@ class ResolveSessionCatalog(val catalogManager:
CatalogManager)
c
}
- case DropTable(ResolvedV1Identifier(ident), ifExists, purge) =>
+ case DropTable(ResolvedV1Identifier(ident), ifExists, purge) if
conf.useV1Command =>
DropTableCommand(ident, ifExists, isView = false, purge = purge)
// v1 DROP TABLE supports temp view.
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index d41611439f0..b1153d7a1e8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute,
SubqueryExpression}
import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint
import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData,
LogicalPlan, ResolvedHint, SubqueryAlias, View}
@@ -190,6 +191,11 @@ class CacheManager extends Logging with
AdaptiveSparkPlanHelper {
isSameName(ident.qualifier :+ ident.name) &&
isSameName(v1Ident.catalog.toSeq ++ v1Ident.database :+
v1Ident.table)
+ case SubqueryAlias(ident, HiveTableRelation(catalogTable, _, _, _, _)) =>
+ val v1Ident = catalogTable.identifier
+ isSameName(ident.qualifier :+ ident.name) &&
+ isSameName(v1Ident.catalog.toSeq ++ v1Ident.database :+
v1Ident.table)
+
case _ => false
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index b4789c98df9..d48f771b9db 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -70,7 +70,12 @@ class V2SessionCatalog(catalog: SessionCatalog)
}
override def loadTable(ident: Identifier): Table = {
- V1Table(catalog.getTableMetadata(ident.asTableIdentifier))
+ try {
+ V1Table(catalog.getTableMetadata(ident.asTableIdentifier))
+ } catch {
+ case _: NoSuchDatabaseException =>
+ throw QueryCompilationErrors.noSuchTableError(ident)
+ }
}
override def loadTable(ident: Identifier, timestamp: Long): Table = {
@@ -189,16 +194,35 @@ class V2SessionCatalog(catalog: SessionCatalog)
loadTable(ident)
}
+ override def purgeTable(ident: Identifier): Boolean = {
+ dropTableInternal(ident, purge = true)
+ }
+
override def dropTable(ident: Identifier): Boolean = {
+ dropTableInternal(ident)
+ }
+
+ private def dropTableInternal(ident: Identifier, purge: Boolean = false):
Boolean = {
try {
- if (loadTable(ident) != null) {
- catalog.dropTable(
- ident.asTableIdentifier,
- ignoreIfNotExists = true,
- purge = true /* skip HDFS trash */)
- true
- } else {
- false
+ loadTable(ident) match {
+ case V1Table(v1Table) if v1Table.tableType == CatalogTableType.VIEW =>
+ throw QueryCompilationErrors.wrongCommandForObjectTypeError(
+ operation = "DROP TABLE",
+ requiredType = s"${CatalogTableType.EXTERNAL.name} or" +
+ s" ${CatalogTableType.MANAGED.name}",
+ objectName = v1Table.qualifiedName,
+ foundType = v1Table.tableType.name,
+ alternative = "DROP VIEW"
+ )
+ case null =>
+ false
+ case _ =>
+ catalog.invalidateCachedTable(ident.asTableIdentifier)
+ catalog.dropTable(
+ ident.asTableIdentifier,
+ ignoreIfNotExists = true,
+ purge = purge)
+ true
}
} catch {
case _: NoSuchTableException =>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 2cf4792b8c1..dc69acc6f91 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -691,22 +691,23 @@ class PlanResolutionSuite extends AnalysisTest {
test("drop table") {
val tableName1 = "db.v1Table"
- val tableIdent1 = TableIdentifier("v1Table", Option("db"),
Some(SESSION_CATALOG_NAME))
+ val tableIdent1 = ResolvedIdentifier(v2SessionCatalog,
Identifier.of(Array("db"), "v1Table"))
val tableName2 = "v1Table"
- val tableIdent2 = TableIdentifier("v1Table", Some("default"),
Some(SESSION_CATALOG_NAME))
+ val tableIdent2 = ResolvedIdentifier(v2SessionCatalog,
Identifier.of(Array("default"),
+ "v1Table"))
parseResolveCompare(s"DROP TABLE $tableName1",
- DropTableCommand(tableIdent1, ifExists = false, isView = false, purge =
false))
+ DropTable(tableIdent1, ifExists = false, purge = false))
parseResolveCompare(s"DROP TABLE IF EXISTS $tableName1",
- DropTableCommand(tableIdent1, ifExists = true, isView = false, purge =
false))
+ DropTable(tableIdent1, ifExists = true, purge = false))
parseResolveCompare(s"DROP TABLE $tableName2",
- DropTableCommand(tableIdent2, ifExists = false, isView = false, purge =
false))
+ DropTable(tableIdent2, ifExists = false, purge = false))
parseResolveCompare(s"DROP TABLE IF EXISTS $tableName2",
- DropTableCommand(tableIdent2, ifExists = true, isView = false, purge =
false))
+ DropTable(tableIdent2, ifExists = true, purge = false))
parseResolveCompare(s"DROP TABLE $tableName2 PURGE",
- DropTableCommand(tableIdent2, ifExists = false, isView = false, purge =
true))
+ DropTable(tableIdent2, ifExists = false, purge = true))
parseResolveCompare(s"DROP TABLE IF EXISTS $tableName2 PURGE",
- DropTableCommand(tableIdent2, ifExists = true, isView = false, purge =
true))
+ DropTable(tableIdent2, ifExists = true, purge = true))
}
test("drop table in v2 catalog") {
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/DropTableSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/DropTableSuite.scala
index 8c6d718f18a..e847e92c4ce 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/DropTableSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/DropTableSuite.scala
@@ -26,7 +26,7 @@ class DropTableSuite extends v1.DropTableSuiteBase with
CommandSuiteBase {
test("hive client calls") {
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (id int) $defaultUsing")
- checkHiveClientCalls(expected = 11) {
+ checkHiveClientCalls(expected = 10) {
sql(s"DROP TABLE $t")
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]