This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d9f0fccd967b [SPARK-46332][SQL] Migrate `CatalogNotFoundException` to
the error class `CATALOG_NOT_FOUND`
d9f0fccd967b is described below
commit d9f0fccd967b5c8686353d524d2b31e27b7a473b
Author: Max Gekk <[email protected]>
AuthorDate: Fri Dec 8 12:54:20 2023 -0800
[SPARK-46332][SQL] Migrate `CatalogNotFoundException` to the error class
`CATALOG_NOT_FOUND`
### What changes were proposed in this pull request?
In the PR, I propose to migrate the `CatalogNotFoundException` exception to
the new error class `CATALOG_NOT_FOUND`, improve the format of the exception
message, and prohibit creation of the exception without the error class.
### Why are the changes needed?
This is a part of the migration process onto error classes and new error
framework. The changes improve user experience w/ Spark SQL, and make
`CatalogNotFoundException` consistent to other Spark exceptions.
### Does this PR introduce _any_ user-facing change?
Yes, if user's code depends on the error message format of
`CatalogNotFoundException`.
### How was this patch tested?
By running the affected test suites:
```
$ build/sbt "core/testOnly *SparkThrowableSuite"
```
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #44259 from MaxGekk/catalog-plugin-not-found.
Authored-by: Max Gekk <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
R/pkg/tests/fulltests/test_sparkSQL.R | 5 +----
common/utils/src/main/resources/error/error-classes.json | 6 ++++++
.../jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala | 6 +++---
docs/sql-error-conditions.md | 6 ++++++
.../spark/sql/connector/catalog/CatalogNotFoundException.scala | 10 +++++++---
.../org/apache/spark/sql/connector/catalog/Catalogs.scala | 2 +-
.../org/apache/spark/sql/errors/QueryExecutionErrors.scala | 7 +++++--
.../spark/sql/connector/catalog/CatalogLoadingSuite.java | 7 ++-----
.../spark/sql/catalyst/analysis/TableLookupCacheSuite.scala | 6 +++---
.../spark/sql/connector/catalog/LookupCatalogSuite.scala | 5 +++--
.../org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala | 6 ++++--
.../sql/execution/command/AlignAssignmentsSuiteBase.scala | 5 +++--
.../spark/sql/execution/command/PlanResolutionSuite.scala | 9 ++++-----
13 files changed, 48 insertions(+), 32 deletions(-)
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R
b/R/pkg/tests/fulltests/test_sparkSQL.R
index f2bef7a00446..0d96f708a544 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -4103,10 +4103,7 @@ test_that("catalog APIs, listCatalogs,
setCurrentCatalog, currentCatalog", {
expect_equal(currentCatalog(), "spark_catalog")
expect_error(setCurrentCatalog("spark_catalog"), NA)
expect_error(setCurrentCatalog("zxwtyswklpf"),
- paste0("Error in setCurrentCatalog : ",
-
"org.apache.spark.sql.connector.catalog.CatalogNotFoundException: ",
- "Catalog 'zxwtyswklpf' plugin class not found: ",
- "spark.sql.catalog.zxwtyswklpf is not defined"))
+ "[CATALOG_NOT_FOUND]*`zxwtyswklpf`*")
catalogs <- collect(listCatalogs())
})
diff --git a/common/utils/src/main/resources/error/error-classes.json
b/common/utils/src/main/resources/error/error-classes.json
index 7a672fa5e557..62d10c0d34cb 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -407,6 +407,12 @@
],
"sqlState" : "22003"
},
+ "CATALOG_NOT_FOUND" : {
+ "message" : [
+ "The catalog <catalogName> not found. Consider to set the SQL config
<config> to a catalog plugin."
+ ],
+ "sqlState" : "42P08"
+ },
"CHECKPOINT_RDD_BLOCK_ID_NOT_FOUND" : {
"message" : [
"Checkpoint block <rddBlockId> not found!",
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala
index cefa63ecd353..d646fad00c07 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala
@@ -66,10 +66,10 @@ class CatalogSuite extends RemoteSparkSession with
SQLHelper {
val catalogs = spark.catalog.listCatalogs().collect()
assert(catalogs.length == 1)
assert(catalogs.map(_.name) sameElements Array("spark_catalog"))
- val message = intercept[SparkException] {
+ val exception = intercept[SparkException] {
spark.catalog.setCurrentCatalog("notExists")
- }.getMessage
- assert(message.contains("plugin class not found"))
+ }
+ assert(exception.getErrorClass == "CATALOG_NOT_FOUND")
spark.catalog.setCurrentCatalog("testcat")
assert(spark.catalog.currentCatalog().equals("testcat"))
val catalogsAfterChange = spark.catalog.listCatalogs().collect()
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index d97e2ceef4c2..82befaae81df 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -335,6 +335,12 @@ The value `<value>` of the type `<sourceType>` cannot be
cast to `<targetType>`
Fail to assign a value of `<sourceType>` type to the `<targetType>` type
column or variable `<columnName>` due to an overflow. Use `try_cast` on the
input value to tolerate overflow and return NULL instead.
+### CATALOG_NOT_FOUND
+
+[SQLSTATE:
42P08](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+The catalog `<catalogName>` not found. Consider to set the SQL config
`<config>` to a catalog plugin.
+
### CHECKPOINT_RDD_BLOCK_ID_NOT_FOUND
SQLSTATE: 56000
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogNotFoundException.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogNotFoundException.scala
index d376b98afa41..4a8910fde4c5 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogNotFoundException.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogNotFoundException.scala
@@ -21,8 +21,12 @@ import org.apache.spark.SparkException
import org.apache.spark.annotation.Experimental
@Experimental
-class CatalogNotFoundException(message: String, cause: Throwable)
- extends SparkException(message, cause) {
+class CatalogNotFoundException(
+ errorClass: String,
+ messageParameters: Map[String, String],
+ cause: Throwable)
+ extends SparkException(errorClass, messageParameters, cause) {
- def this(message: String) = this(message, null)
+ def this(errorClass: String, messageParameters: Map[String, String]) =
+ this(errorClass, messageParameters, null)
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/Catalogs.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/Catalogs.scala
index 5a49883be408..419191f8f9c0 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/Catalogs.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/Catalogs.scala
@@ -53,7 +53,7 @@ private[sql] object Catalogs {
_pluginClassName
} catch {
case _: NoSuchElementException =>
- throw QueryExecutionErrors.catalogPluginClassNotFoundError(name)
+ throw QueryExecutionErrors.catalogNotFoundError(name)
}
val loader = Utils.getContextOrSparkClassLoader
try {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 24332479f193..113f995968a0 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -1811,9 +1811,12 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase with ExecutionE
cause = null)
}
- def catalogPluginClassNotFoundError(name: String): Throwable = {
+ def catalogNotFoundError(name: String): Throwable = {
new CatalogNotFoundException(
- s"Catalog '$name' plugin class not found: spark.sql.catalog.$name is not
defined")
+ errorClass = "CATALOG_NOT_FOUND",
+ messageParameters = Map(
+ "catalogName" -> toSQLId(name),
+ "config" -> toSQLConf(s"spark.sql.catalog.$name")))
}
def catalogPluginClassNotImplementedError(name: String, pluginClassName:
String): Throwable = {
diff --git
a/sql/catalyst/src/test/java/org/apache/spark/sql/connector/catalog/CatalogLoadingSuite.java
b/sql/catalyst/src/test/java/org/apache/spark/sql/connector/catalog/CatalogLoadingSuite.java
index e6c6a18623b3..238b8ac04e7e 100644
---
a/sql/catalyst/src/test/java/org/apache/spark/sql/connector/catalog/CatalogLoadingSuite.java
+++
b/sql/catalyst/src/test/java/org/apache/spark/sql/connector/catalog/CatalogLoadingSuite.java
@@ -80,11 +80,8 @@ public class CatalogLoadingSuite {
SparkException exc =
Assertions.assertThrows(CatalogNotFoundException.class,
() -> Catalogs.load("missing", conf));
- Assertions.assertTrue(
- exc.getMessage().contains("plugin class not found:
spark.sql.catalog.missing is not defined"),
- "Should complain that implementation is not configured");
- Assertions.assertTrue(exc.getMessage().contains("missing"),
- "Should identify the catalog by name");
+ Assertions.assertEquals(exc.getErrorClass(), "CATALOG_NOT_FOUND");
+ Assertions.assertEquals(exc.getMessageParameters().get("catalogName"),
"`missing`");
}
@Test
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala
index 2c4215e70287..189509e31736 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala
@@ -29,7 +29,8 @@ import org.scalatest.matchers.must.Matchers
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase,
CatalogStorageFormat, CatalogTable, CatalogTableType, ExternalCatalog,
InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.connector.catalog.{CatalogManager,
CatalogNotFoundException, Identifier, InMemoryTable, InMemoryTableCatalog,
Table}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier,
InMemoryTable, InMemoryTableCatalog, Table}
+import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types._
class TableLookupCacheSuite extends AnalysisTest with Matchers {
@@ -60,8 +61,7 @@ class TableLookupCacheSuite extends AnalysisTest with
Matchers {
when(catalogManager.catalog(any())).thenAnswer((invocation:
InvocationOnMock) => {
invocation.getArgument[String](0) match {
case CatalogManager.SESSION_CATALOG_NAME => v2Catalog
- case name =>
- throw new CatalogNotFoundException(s"No such catalog: $name")
+ case name => throw QueryExecutionErrors.catalogNotFoundError(name)
}
})
when(catalogManager.v1SessionCatalog).thenReturn(v1Catalog)
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/LookupCatalogSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/LookupCatalogSuite.scala
index 0db758d5147f..49e119b56bc8 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/LookupCatalogSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/LookupCatalogSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.FakeV2SessionCatalog
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -46,7 +47,7 @@ class LookupCatalogSuite extends SparkFunSuite with
LookupCatalog with Inside {
val manager = mock(classOf[CatalogManager])
when(manager.catalog(any())).thenAnswer((invocation: InvocationOnMock) => {
val name = invocation.getArgument[String](0)
- catalogs.getOrElse(name, throw new CatalogNotFoundException(s"$name not
found"))
+ catalogs.getOrElse(name, throw
QueryExecutionErrors.catalogNotFoundError(name))
})
when(manager.currentCatalog).thenReturn(sessionCatalog)
when(manager.v2SessionCatalog).thenReturn(sessionCatalog)
@@ -114,7 +115,7 @@ class LookupCatalogWithDefaultSuite extends SparkFunSuite
with LookupCatalog wit
val manager = mock(classOf[CatalogManager])
when(manager.catalog(any())).thenAnswer((invocation: InvocationOnMock) => {
val name = invocation.getArgument[String](0)
- catalogs.getOrElse(name, throw new CatalogNotFoundException(s"$name not
found"))
+ catalogs.getOrElse(name, throw
QueryExecutionErrors.catalogNotFoundError(name))
})
when(manager.currentCatalog).thenReturn(catalogs("prod"))
when(manager.currentNamespace).thenReturn(Array.empty[String])
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 2b93e8bd3200..302a8e5d41db 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -2763,8 +2763,10 @@ class DataSourceV2SQLSuiteV1Filter
exception = intercept[CatalogNotFoundException] {
sql("SET CATALOG not_exist_catalog")
},
- errorClass = null,
- parameters = Map.empty)
+ errorClass = "CATALOG_NOT_FOUND",
+ parameters = Map(
+ "catalogName" -> "`not_exist_catalog`",
+ "config" -> "\"spark.sql.catalog.not_exist_catalog\""))
}
test("SPARK-35973: ShowCatalogs") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignAssignmentsSuiteBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignAssignmentsSuiteBase.scala
index 2bc747c0abee..ebb719a35a8b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignAssignmentsSuiteBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignAssignmentsSuiteBase.scala
@@ -30,8 +30,9 @@ import
org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.connector.catalog.{CatalogManager,
CatalogNotFoundException, CatalogV2Util, Column, ColumnDefaultValue,
Identifier, SupportsRowLevelOperations, TableCapability, TableCatalog}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util,
Column, ColumnDefaultValue, Identifier, SupportsRowLevelOperations,
TableCapability, TableCatalog}
import org.apache.spark.sql.connector.expressions.{LiteralValue, Transform}
+import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{BooleanType, IntegerType, StructType}
@@ -177,7 +178,7 @@ abstract class AlignAssignmentsSuiteBase extends
AnalysisTest {
invocation.getArguments()(0).asInstanceOf[String] match {
case "testcat" => v2Catalog
case CatalogManager.SESSION_CATALOG_NAME => v2SessionCatalog
- case name => throw new CatalogNotFoundException(s"No such catalog:
$name")
+ case name => throw QueryExecutionErrors.catalogNotFoundError(name)
}
})
when(manager.currentCatalog).thenReturn(v2Catalog)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 69b3285fc7f1..db6c7175c526 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -36,9 +36,10 @@ import
org.apache.spark.sql.catalyst.plans.logical.{AlterColumn, AnalysisOnlyCom
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId
import org.apache.spark.sql.connector.FakeV2Provider
-import org.apache.spark.sql.connector.catalog.{CatalogManager,
CatalogNotFoundException, Column, ColumnDefaultValue, Identifier,
SupportsDelete, Table, TableCapability, TableCatalog, V1Table}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, Column,
ColumnDefaultValue, Identifier, SupportsDelete, Table, TableCapability,
TableCatalog, V1Table}
import
org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.connector.expressions.{LiteralValue, Transform}
+import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.{CreateTable =>
CreateTableV1}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
@@ -194,8 +195,7 @@ class PlanResolutionSuite extends AnalysisTest {
testCat
case CatalogManager.SESSION_CATALOG_NAME =>
v2SessionCatalog
- case name =>
- throw new CatalogNotFoundException(s"No such catalog: $name")
+ case name => throw QueryExecutionErrors.catalogNotFoundError(name)
}
})
when(manager.currentCatalog).thenReturn(testCat)
@@ -211,8 +211,7 @@ class PlanResolutionSuite extends AnalysisTest {
invocation.getArguments()(0).asInstanceOf[String] match {
case "testcat" =>
testCat
- case name =>
- throw new CatalogNotFoundException(s"No such catalog: $name")
+ case name => throw QueryExecutionErrors.catalogNotFoundError(name)
}
})
when(manager.currentCatalog).thenReturn(v2SessionCatalog)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]