This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 14a933bbe2eb [SPARK-46410][SQL] Assign error classes/subclasses to
JdbcUtils.classifyException
14a933bbe2eb is described below
commit 14a933bbe2eb1c71988f475036735f07b2e1fa6a
Author: Max Gekk <[email protected]>
AuthorDate: Sun Dec 17 10:41:07 2023 +0300
[SPARK-46410][SQL] Assign error classes/subclasses to
JdbcUtils.classifyException
### What changes were proposed in this pull request?
In the PR, I propose to raise exceptions with only error classes from
`JdbcUtils.classifyException`, and introduce new error class `FAILED_JDBC` with
sub-classes linked to a particular JDBC operation.
### Why are the changes needed?
To improve user experience with Spark SQL by migrating on new error
framework when all Spark exceptions from the JDBC datasource have an error
class.
### Does this PR introduce _any_ user-facing change?
Yes, if user's code depends on exceptions from the JDBC datasource.
### How was this patch tested?
By running the affected test suites:
```
$ build/sbt "test:testOnly *JDBCV2Suite"
$ build/sbt "test:testOnly *JDBCTableCatalogSuite"
$ build/sbt "test:testOnly *JDBCSuite"
```
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #44358 from MaxGekk/error-class-classifyException.
Authored-by: Max Gekk <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
---
.../src/main/resources/error/error-classes.json | 78 ++++++++-
.../org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 10 +-
...sql-error-conditions-failed-jdbc-error-class.md | 80 +++++++++
docs/sql-error-conditions.md | 8 +
project/MimaExcludes.scala | 3 +
.../sql/catalyst/analysis/NonEmptyException.scala | 3 -
.../sql/execution/datasources/jdbc/JdbcUtils.scala | 7 +-
.../execution/datasources/v2/jdbc/JDBCTable.scala | 25 ++-
.../datasources/v2/jdbc/JDBCTableCatalog.scala | 86 ++++++++--
.../org/apache/spark/sql/jdbc/DB2Dialect.scala | 15 +-
.../org/apache/spark/sql/jdbc/H2Dialect.scala | 33 ++--
.../org/apache/spark/sql/jdbc/JdbcDialects.scala | 13 +-
.../apache/spark/sql/jdbc/MsSqlServerDialect.scala | 15 +-
.../org/apache/spark/sql/jdbc/MySQLDialect.scala | 28 ++-
.../apache/spark/sql/jdbc/PostgresDialect.scala | 57 ++++---
.../v2/jdbc/JDBCTableCatalogSuite.scala | 188 +++++++++++++++------
.../org/apache/spark/sql/jdbc/JDBCV2Suite.scala | 6 +-
17 files changed, 501 insertions(+), 154 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-classes.json
b/common/utils/src/main/resources/error/error-classes.json
index b4a3031c06c9..62e3427fdffd 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -1096,6 +1096,79 @@
],
"sqlState" : "38000"
},
+ "FAILED_JDBC" : {
+ "message" : [
+ "Failed JDBC <url> on the operation:"
+ ],
+ "subClass" : {
+ "ALTER_TABLE" : {
+ "message" : [
+ "Alter the table <tableName>."
+ ]
+ },
+ "CREATE_INDEX" : {
+ "message" : [
+ "Create the index <indexName> in the <tableName> table."
+ ]
+ },
+ "CREATE_NAMESPACE" : {
+ "message" : [
+ "Create the namespace <namespace>."
+ ]
+ },
+ "CREATE_NAMESPACE_COMMENT" : {
+ "message" : [
+ "Create a comment on the namespace: <namespace>."
+ ]
+ },
+ "CREATE_TABLE" : {
+ "message" : [
+ "Create the table <tableName>."
+ ]
+ },
+ "DROP_INDEX" : {
+ "message" : [
+ "Drop the index <indexName> in the <tableName> table."
+ ]
+ },
+ "DROP_NAMESPACE" : {
+ "message" : [
+ "Drop the namespace <namespace>."
+ ]
+ },
+ "GET_TABLES" : {
+ "message" : [
+ "Get tables from the namespace: <namespace>."
+ ]
+ },
+ "LIST_NAMESPACES" : {
+ "message" : [
+ "List namespaces."
+ ]
+ },
+ "NAMESPACE_EXISTS" : {
+ "message" : [
+ "Check that the namespace <namespace> exists."
+ ]
+ },
+ "REMOVE_NAMESPACE_COMMENT" : {
+ "message" : [
+ "Remove a comment on the namespace: <namespace>."
+ ]
+ },
+ "RENAME_TABLE" : {
+ "message" : [
+ "Rename the table <oldName> to <newName>."
+ ]
+ },
+ "TABLE_EXISTS" : {
+ "message" : [
+ "Check that the table <tableName> exists."
+ ]
+ }
+ },
+ "sqlState" : "HV000"
+ },
"FAILED_PARSE_STRUCT_TYPE" : {
"message" : [
"Failed parsing struct: <raw>."
@@ -6778,11 +6851,6 @@
"pivot is not supported on a streaming DataFrames/Datasets"
]
},
- "_LEGACY_ERROR_TEMP_3064" : {
- "message" : [
- "<msg>"
- ]
- },
"_LEGACY_ERROR_TEMP_3065" : {
"message" : [
"<clazz>: <msg>"
diff --git
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
index b5f5b0e5f20b..76277dbc96b6 100644
---
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
+++
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
@@ -221,10 +221,10 @@ private[v2] trait V2JDBCTest extends SharedSparkSession
with DockerIntegrationFu
test("CREATE TABLE with table property") {
withTable(s"$catalogName.new_table") {
- val m = intercept[AnalysisException] {
+ val e = intercept[AnalysisException] {
sql(s"CREATE TABLE $catalogName.new_table (i INT)
TBLPROPERTIES('a'='1')")
- }.message
- assert(m.contains("Failed table creation"))
+ }
+ assert(e.getErrorClass == "FAILED_JDBC.CREATE_TABLE")
testCreateTableWithProperty(s"$catalogName.new_table")
}
}
@@ -279,7 +279,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession
with DockerIntegrationFu
sql(s"CREATE index i1 ON $catalogName.new_table (col1)")
},
errorClass = "INDEX_ALREADY_EXISTS",
- parameters = Map("indexName" -> "i1", "tableName" -> "new_table")
+ parameters = Map("indexName" -> "`i1`", "tableName" -> "`new_table`")
)
sql(s"DROP index i1 ON $catalogName.new_table")
@@ -304,7 +304,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession
with DockerIntegrationFu
sql(s"DROP index i1 ON $catalogName.new_table")
},
errorClass = "INDEX_NOT_FOUND",
- parameters = Map("indexName" -> "i1", "tableName" -> "new_table")
+ parameters = Map("indexName" -> "`i1`", "tableName" -> "`new_table`")
)
}
}
diff --git a/docs/sql-error-conditions-failed-jdbc-error-class.md
b/docs/sql-error-conditions-failed-jdbc-error-class.md
new file mode 100644
index 000000000000..575441e3f347
--- /dev/null
+++ b/docs/sql-error-conditions-failed-jdbc-error-class.md
@@ -0,0 +1,80 @@
+---
+layout: global
+title: FAILED_JDBC error class
+displayTitle: FAILED_JDBC error class
+license: |
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+---
+
+SQLSTATE: HV000
+
+Failed JDBC `<url>` on the operation:
+
+This error class has the following derived error classes:
+
+## ALTER_TABLE
+
+Alter the table `<tableName>`.
+
+## CREATE_INDEX
+
+Create the index `<indexName>` in the `<tableName>` table.
+
+## CREATE_NAMESPACE
+
+Create the namespace `<namespace>`.
+
+## CREATE_NAMESPACE_COMMENT
+
+Create a comment on the namespace: `<namespace>`.
+
+## CREATE_TABLE
+
+Create the table `<tableName>`.
+
+## DROP_INDEX
+
+Drop the index `<indexName>` in the `<tableName>` table.
+
+## DROP_NAMESPACE
+
+Drop the namespace `<namespace>`.
+
+## GET_TABLES
+
+Get tables from the namespace: `<namespace>`.
+
+## LIST_NAMESPACES
+
+List namespaces.
+
+## NAMESPACE_EXISTS
+
+Check that the namespace `<namespace>` exists.
+
+## REMOVE_NAMESPACE_COMMENT
+
+Remove a comment on the namespace: `<namespace>`.
+
+## RENAME_TABLE
+
+Rename the table `<oldName>` to `<newName>`.
+
+## TABLE_EXISTS
+
+Check that the table `<tableName>` exists.
+
+
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 82befaae81df..5657877971c5 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -665,6 +665,14 @@ User defined function (`<functionName>`: (`<signature>`)
=> `<result>`) failed d
Failed preparing of the function `<funcName>` for call. Please, double check
function's arguments.
+### [FAILED_JDBC](sql-error-conditions-failed-jdbc-error-class.html)
+
+SQLSTATE: HV000
+
+Failed JDBC `<url>` on the operation:
+
+For more details see
[FAILED_JDBC](sql-error-conditions-failed-jdbc-error-class.html)
+
### FAILED_PARSE_STRUCT_TYPE
[SQLSTATE: 22018](sql-error-conditions-sqlstates.html#class-22-data-exception)
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 463212290877..cfda74509720 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -100,6 +100,9 @@ object MimaExcludes {
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.storage.CacheId$"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.CacheId.apply"),
+ // SPARK-46410: Assign error classes/subclasses to
JdbcUtils.classifyException
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.jdbc.JdbcDialect.classifyException"),
+
(problem: Problem) => problem match {
case MissingClassProblem(cls) =>
!cls.fullName.startsWith("org.sparkproject.jpmml") &&
!cls.fullName.startsWith("org.sparkproject.dmg.pmml")
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala
index 6475ac3093fe..9955f1b7bd30 100644
---
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala
+++
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala
@@ -36,7 +36,4 @@ case class NonEmptyNamespaceException(
"details" -> details)) {
def this(namespace: Array[String]) = this(namespace, "", None)
-
- def this(details: String, cause: Option[Throwable]) =
- this(Array.empty, details, cause)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 9be764e8b07d..e7835514a384 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -1180,12 +1180,15 @@ object JdbcUtils extends Logging with SQLConfHelper {
}
}
- def classifyException[T](message: String, dialect: JdbcDialect)(f: => T): T
= {
+ def classifyException[T](
+ errorClass: String,
+ messageParameters: Map[String, String],
+ dialect: JdbcDialect)(f: => T): T = {
try {
f
} catch {
case e: SparkThrowable with Throwable => throw e
- case e: Throwable => throw dialect.classifyException(message, e)
+ case e: Throwable => throw dialect.classifyException(e, errorClass,
messageParameters)
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
index 1065d6347476..c251010881f3 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
@@ -26,13 +26,18 @@ import
org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex}
import org.apache.spark.sql.connector.expressions.NamedReference
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
+import org.apache.spark.sql.errors.DataTypeErrorsBase
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions,
JdbcOptionsInWrite, JdbcUtils}
import org.apache.spark.sql.jdbc.JdbcDialects
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions:
JDBCOptions)
- extends Table with SupportsRead with SupportsWrite with SupportsIndex {
+ extends Table
+ with SupportsRead
+ with SupportsWrite
+ with SupportsIndex
+ with DataTypeErrorsBase {
override def name(): String = ident.toString
@@ -58,8 +63,13 @@ case class JDBCTable(ident: Identifier, schema: StructType,
jdbcOptions: JDBCOpt
columnsProperties: util.Map[NamedReference, util.Map[String, String]],
properties: util.Map[String, String]): Unit = {
JdbcUtils.withConnection(jdbcOptions) { conn =>
- JdbcUtils.classifyException(s"Failed to create index $indexName in
${name()}",
- JdbcDialects.get(jdbcOptions.url)) {
+ JdbcUtils.classifyException(
+ errorClass = "FAILED_JDBC.CREATE_INDEX",
+ messageParameters = Map(
+ "url" -> jdbcOptions.url,
+ "indexName" -> toSQLId(indexName),
+ "tableName" -> toSQLId(name)),
+ dialect = JdbcDialects.get(jdbcOptions.url)) {
JdbcUtils.createIndex(
conn, indexName, ident, columns, columnsProperties, properties,
jdbcOptions)
}
@@ -74,8 +84,13 @@ case class JDBCTable(ident: Identifier, schema: StructType,
jdbcOptions: JDBCOpt
override def dropIndex(indexName: String): Unit = {
JdbcUtils.withConnection(jdbcOptions) { conn =>
- JdbcUtils.classifyException(s"Failed to drop index $indexName in
${name()}",
- JdbcDialects.get(jdbcOptions.url)) {
+ JdbcUtils.classifyException(
+ errorClass = "FAILED_JDBC.DROP_INDEX",
+ messageParameters = Map(
+ "url" -> jdbcOptions.url,
+ "indexName" -> toSQLId(indexName),
+ "tableName" -> toSQLId(name)),
+ dialect = JdbcDialects.get(jdbcOptions.url)) {
JdbcUtils.dropIndex(conn, indexName, ident, jdbcOptions)
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala
index 6c773d4fb1b0..976cd3f6e9aa 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala
@@ -27,7 +27,7 @@ import
org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException
import org.apache.spark.sql.connector.catalog.{FunctionCatalog, Identifier,
NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableChange}
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import org.apache.spark.sql.connector.expressions.Transform
-import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors}
+import org.apache.spark.sql.errors.{DataTypeErrorsBase,
QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions,
JdbcOptionsInWrite, JDBCRDD, JdbcUtils}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
@@ -35,7 +35,10 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
class JDBCTableCatalog extends TableCatalog
- with SupportsNamespaces with FunctionCatalog with Logging {
+ with SupportsNamespaces
+ with FunctionCatalog
+ with DataTypeErrorsBase
+ with Logging {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
private var catalogName: String = null
@@ -66,7 +69,11 @@ class JDBCTableCatalog extends TableCatalog
JdbcUtils.withConnection(options) { conn =>
val schemaPattern = if (namespace.length == 1) namespace.head else null
val rs = JdbcUtils.classifyException(
- s"Failed get tables from: ${namespace.mkString(".")}", dialect) {
+ errorClass = "FAILED_JDBC.GET_TABLES",
+ messageParameters = Map(
+ "url" -> options.url,
+ "namespace" -> toSQLId(namespace.toSeq)),
+ dialect) {
conn.getMetaData.getTables(null, schemaPattern, "%", Array("TABLE"))
}
new Iterator[Identifier] {
@@ -80,7 +87,12 @@ class JDBCTableCatalog extends TableCatalog
checkNamespace(ident.namespace())
val writeOptions = new JdbcOptionsInWrite(
options.parameters + (JDBCOptions.JDBC_TABLE_NAME ->
getTableName(ident)))
- JdbcUtils.classifyException(s"Failed table existence check: $ident",
dialect) {
+ JdbcUtils.classifyException(
+ errorClass = "FAILED_JDBC.TABLE_EXISTS",
+ messageParameters = Map(
+ "url" -> options.url,
+ "tableName" -> toSQLId(ident)),
+ dialect) {
JdbcUtils.withConnection(options)(JdbcUtils.tableExists(_, writeOptions))
}
}
@@ -100,7 +112,13 @@ class JDBCTableCatalog extends TableCatalog
override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit =
{
checkNamespace(oldIdent.namespace())
JdbcUtils.withConnection(options) { conn =>
- JdbcUtils.classifyException(s"Failed table renaming from $oldIdent to
$newIdent", dialect) {
+ JdbcUtils.classifyException(
+ errorClass = "FAILED_JDBC.RENAME_TABLE",
+ messageParameters = Map(
+ "url" -> options.url,
+ "oldName" -> toSQLId(oldIdent),
+ "newName" -> toSQLId(newIdent)),
+ dialect) {
JdbcUtils.renameTable(conn, oldIdent, newIdent, options)
}
}
@@ -160,7 +178,12 @@ class JDBCTableCatalog extends TableCatalog
val writeOptions = new JdbcOptionsInWrite(tableOptions)
val caseSensitive = SQLConf.get.caseSensitiveAnalysis
JdbcUtils.withConnection(options) { conn =>
- JdbcUtils.classifyException(s"Failed table creation: $ident", dialect) {
+ JdbcUtils.classifyException(
+ errorClass = "FAILED_JDBC.CREATE_TABLE",
+ messageParameters = Map(
+ "url" -> options.url,
+ "tableName" -> toSQLId(ident)),
+ dialect) {
JdbcUtils.createTable(conn, getTableName(ident), schema,
caseSensitive, writeOptions)
}
}
@@ -171,7 +194,12 @@ class JDBCTableCatalog extends TableCatalog
override def alterTable(ident: Identifier, changes: TableChange*): Table = {
checkNamespace(ident.namespace())
JdbcUtils.withConnection(options) { conn =>
- JdbcUtils.classifyException(s"Failed table altering: $ident", dialect) {
+ JdbcUtils.classifyException(
+ errorClass = "FAILED_JDBC.ALTER_TABLE",
+ messageParameters = Map(
+ "url" -> options.url,
+ "tableName" -> toSQLId(ident)),
+ dialect) {
JdbcUtils.alterTable(conn, getTableName(ident), changes, options)
}
loadTable(ident)
@@ -181,7 +209,12 @@ class JDBCTableCatalog extends TableCatalog
override def namespaceExists(namespace: Array[String]): Boolean = namespace
match {
case Array(db) =>
JdbcUtils.withConnection(options) { conn =>
- JdbcUtils.classifyException(s"Failed namespace exists:
${namespace.mkString}", dialect) {
+ JdbcUtils.classifyException(
+ errorClass = "FAILED_JDBC.NAMESPACE_EXISTS",
+ messageParameters = Map(
+ "url" -> options.url,
+ "namespace" -> toSQLId(namespace.toSeq)),
+ dialect) {
JdbcUtils.schemaExists(conn, options, db)
}
}
@@ -190,7 +223,10 @@ class JDBCTableCatalog extends TableCatalog
override def listNamespaces(): Array[Array[String]] = {
JdbcUtils.withConnection(options) { conn =>
- JdbcUtils.classifyException(s"Failed list namespaces", dialect) {
+ JdbcUtils.classifyException(
+ errorClass = "FAILED_JDBC.LIST_NAMESPACES",
+ messageParameters = Map("url" -> options.url),
+ dialect) {
JdbcUtils.listSchemas(conn, options)
}
}
@@ -238,7 +274,12 @@ class JDBCTableCatalog extends TableCatalog
}
}
JdbcUtils.withConnection(options) { conn =>
- JdbcUtils.classifyException(s"Failed create name space: $db", dialect)
{
+ JdbcUtils.classifyException(
+ errorClass = "FAILED_JDBC.CREATE_NAMESPACE",
+ messageParameters = Map(
+ "url" -> options.url,
+ "namespace" -> toSQLId(db)),
+ dialect) {
JdbcUtils.createSchema(conn, options, db, comment)
}
}
@@ -257,7 +298,12 @@ class JDBCTableCatalog extends TableCatalog
case set: NamespaceChange.SetProperty =>
if (set.property() == SupportsNamespaces.PROP_COMMENT) {
JdbcUtils.withConnection(options) { conn =>
- JdbcUtils.classifyException(s"Failed create comment on name
space: $db", dialect) {
+ JdbcUtils.classifyException(
+ errorClass = "FAILED_JDBC.CREATE_NAMESPACE_COMMENT",
+ messageParameters = Map(
+ "url" -> options.url,
+ "namespace" -> toSQLId(db)),
+ dialect) {
JdbcUtils.alterSchemaComment(conn, options, db, set.value)
}
}
@@ -268,7 +314,12 @@ class JDBCTableCatalog extends TableCatalog
case unset: NamespaceChange.RemoveProperty =>
if (unset.property() == SupportsNamespaces.PROP_COMMENT) {
JdbcUtils.withConnection(options) { conn =>
- JdbcUtils.classifyException(s"Failed remove comment on name
space: $db", dialect) {
+ JdbcUtils.classifyException(
+ errorClass = "FAILED_JDBC.REMOVE_NAMESPACE_COMMENT",
+ messageParameters = Map(
+ "url" -> options.url,
+ "namespace" -> toSQLId(db)),
+ dialect) {
JdbcUtils.removeSchemaComment(conn, options, db)
}
}
@@ -290,7 +341,12 @@ class JDBCTableCatalog extends TableCatalog
cascade: Boolean): Boolean = namespace match {
case Array(db) if namespaceExists(namespace) =>
JdbcUtils.withConnection(options) { conn =>
- JdbcUtils.classifyException(s"Failed drop name space: $db", dialect) {
+ JdbcUtils.classifyException(
+ errorClass = "FAILED_JDBC.DROP_NAMESPACE",
+ messageParameters = Map(
+ "url" -> options.url,
+ "namespace" -> toSQLId(db)),
+ dialect) {
JdbcUtils.dropSchema(conn, options, db, cascade)
true
}
@@ -330,4 +386,8 @@ class JDBCTableCatalog extends TableCatalog
throw new NoSuchFunctionException(ident)
}
}
+
+ private def toSQLId(ident: Identifier): String = {
+ toSQLId(ident.namespace.toSeq :+ ident.name)
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
index 8975a015ee8e..4f81ee031d22 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
@@ -144,15 +144,22 @@ private object DB2Dialect extends JdbcDialect {
s"COMMENT ON SCHEMA ${quoteIdentifier(schema)} IS ''"
}
- override def classifyException(message: String, e: Throwable):
AnalysisException = {
+ override def classifyException(
+ e: Throwable,
+ errorClass: String,
+ messageParameters: Map[String, String]): AnalysisException = {
e match {
case sqlException: SQLException =>
sqlException.getSQLState match {
// https://www.ibm.com/docs/en/db2/11.5?topic=messages-sqlstate
- case "42893" => throw new NonEmptyNamespaceException(message, cause
= Some(e))
- case _ => super.classifyException(message, e)
+ case "42893" =>
+ throw NonEmptyNamespaceException(
+ namespace = messageParameters.get("namespace").toArray,
+ details = sqlException.getMessage,
+ cause = Some(e))
+ case _ => super.classifyException(e, errorClass, messageParameters)
}
- case _ => super.classifyException(message, e)
+ case _ => super.classifyException(e, errorClass, messageParameters)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
index d275f9c9cb1b..3c9bc0ed691b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
@@ -28,8 +28,7 @@ import scala.util.control.NonFatal
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException,
NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException,
TableAlreadyExistsException, UnresolvedAttribute}
-import org.apache.spark.sql.catalyst.util.quoteNameParts
+import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException,
NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException,
TableAlreadyExistsException}
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import org.apache.spark.sql.connector.catalog.index.TableIndex
@@ -181,7 +180,10 @@ private[sql] object H2Dialect extends JdbcDialect {
(ident.namespace() :+ indexName).map(quoteIdentifier).mkString(".")
}
- override def classifyException(message: String, e: Throwable):
AnalysisException = {
+ override def classifyException(
+ e: Throwable,
+ errorClass: String,
+ messageParameters: Map[String, String]): AnalysisException = {
e match {
case exception: SQLException =>
// Error codes are from
https://www.h2database.com/javadoc/org/h2/api/ErrorCode.html
@@ -192,15 +194,16 @@ private[sql] object H2Dialect extends JdbcDialect {
val regex = """"((?:[^"\\]|\\[\\"ntbrf])+)"""".r
val name = regex.findFirstMatchIn(e.getMessage).get.group(1)
val quotedName =
org.apache.spark.sql.catalyst.util.quoteIdentifier(name)
- throw new TableAlreadyExistsException(errorClass =
"TABLE_OR_VIEW_ALREADY_EXISTS",
+ throw new TableAlreadyExistsException(
+ errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS",
messageParameters = Map("relationName" -> quotedName),
cause = Some(e))
// TABLE_OR_VIEW_NOT_FOUND_1
case 42102 =>
- val quotedName =
quoteNameParts(UnresolvedAttribute.parseAttributeName(message))
+ val relationName = messageParameters.getOrElse("tableName", "")
throw new NoSuchTableException(
errorClass = "TABLE_OR_VIEW_NOT_FOUND",
- messageParameters = Map("relationName" -> quotedName),
+ messageParameters = Map("relationName" -> relationName),
cause = Some(e))
// SCHEMA_NOT_FOUND_1
case 90079 =>
@@ -210,25 +213,21 @@ private[sql] object H2Dialect extends JdbcDialect {
throw new NoSuchNamespaceException(errorClass = "SCHEMA_NOT_FOUND",
messageParameters = Map("schemaName" -> quotedName))
// INDEX_ALREADY_EXISTS_1
- case 42111 =>
- // The message is: Failed to create index indexName in tableName
- val regex = "(?s)Failed to create index (.*) in (.*)".r
- val indexName = regex.findFirstMatchIn(message).get.group(1)
- val tableName = regex.findFirstMatchIn(message).get.group(2)
+ case 42111 if errorClass == "FAILED_JDBC.CREATE_INDEX" =>
+ val indexName = messageParameters("indexName")
+ val tableName = messageParameters("tableName")
throw new IndexAlreadyExistsException(
indexName = indexName, tableName = tableName, cause = Some(e))
// INDEX_NOT_FOUND_1
- case 42112 =>
- // The message is: Failed to drop index indexName in tableName
- val regex = "(?s)Failed to drop index (.*) in (.*)".r
- val indexName = regex.findFirstMatchIn(message).get.group(1)
- val tableName = regex.findFirstMatchIn(message).get.group(2)
+ case 42112 if errorClass == "FAILED_JDBC.DROP_INDEX" =>
+ val indexName = messageParameters("indexName")
+ val tableName = messageParameters("tableName")
throw new NoSuchIndexException(indexName, tableName, cause =
Some(e))
case _ => // do nothing
}
case _ => // do nothing
}
- super.classifyException(message, e)
+ super.classifyException(e, errorClass, messageParameters)
}
override def compileExpression(expr: Expression): Option[String] = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index 5ba4e39e8ec1..4825568d88eb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -630,15 +630,16 @@ abstract class JdbcDialect extends Serializable with
Logging {
/**
* Gets a dialect exception, classifies it and wraps it by
`AnalysisException`.
- * @param message The error message to be placed to the returned exception.
* @param e The dialect specific exception.
+ * @param errorClass The error class assigned in the case of an unclassified
`e`
+ * @param messageParameters The message parameters of `errorClass`
* @return `AnalysisException` or its sub-class.
*/
- def classifyException(message: String, e: Throwable): AnalysisException = {
- new AnalysisException(
- errorClass = "_LEGACY_ERROR_TEMP_3064",
- messageParameters = Map("msg" -> message),
- cause = Some(e))
+ def classifyException(
+ e: Throwable,
+ errorClass: String,
+ messageParameters: Map[String, String]): AnalysisException = {
+ new AnalysisException(errorClass, messageParameters, cause = Some(e))
}
/**
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
index ee649122ca80..f63a1abdce65 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
@@ -190,14 +190,21 @@ private object MsSqlServerDialect extends JdbcDialect {
if (limit > 0) s"TOP ($limit)" else ""
}
- override def classifyException(message: String, e: Throwable):
AnalysisException = {
+ override def classifyException(
+ e: Throwable,
+ errorClass: String,
+ messageParameters: Map[String, String]): AnalysisException = {
e match {
case sqlException: SQLException =>
sqlException.getErrorCode match {
- case 3729 => throw new NonEmptyNamespaceException(message, cause =
Some(e))
- case _ => super.classifyException(message, e)
+ case 3729 =>
+ throw NonEmptyNamespaceException(
+ namespace = messageParameters.get("namespace").toArray,
+ details = sqlException.getMessage,
+ cause = Some(e))
+ case _ => super.classifyException(e, errorClass, messageParameters)
}
- case _ => super.classifyException(message, e)
+ case _ => super.classifyException(e, errorClass, messageParameters)
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
index dd74c93bc2e1..af50a8e3e359 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -270,28 +270,26 @@ private case object MySQLDialect extends JdbcDialect with
SQLConfHelper {
indexMap.values.toArray
}
- override def classifyException(message: String, e: Throwable):
AnalysisException = {
+ override def classifyException(
+ e: Throwable,
+ errorClass: String,
+ messageParameters: Map[String, String]): AnalysisException = {
e match {
case sqlException: SQLException =>
sqlException.getErrorCode match {
// ER_DUP_KEYNAME
- case 1061 =>
- // The message is: Failed to create index indexName in tableName
- val regex = "(?s)Failed to create index (.*) in (.*)".r
- val indexName = regex.findFirstMatchIn(message).get.group(1)
- val tableName = regex.findFirstMatchIn(message).get.group(2)
- throw new IndexAlreadyExistsException(
- indexName = indexName, tableName = tableName, cause = Some(e))
- case 1091 =>
- // The message is: Failed to drop index indexName in tableName
- val regex = "(?s)Failed to drop index (.*) in (.*)".r
- val indexName = regex.findFirstMatchIn(message).get.group(1)
- val tableName = regex.findFirstMatchIn(message).get.group(2)
+ case 1061 if errorClass == "FAILED_JDBC.CREATE_INDEX" =>
+ val indexName = messageParameters("indexName")
+ val tableName = messageParameters("tableName")
+ throw new IndexAlreadyExistsException(indexName, tableName, cause
= Some(e))
+ case 1091 if errorClass == "FAILED_JDBC.DROP_INDEX" =>
+ val indexName = messageParameters("indexName")
+ val tableName = messageParameters("tableName")
throw new NoSuchIndexException(indexName, tableName, cause =
Some(e))
- case _ => super.classifyException(message, e)
+ case _ => super.classifyException(e, errorClass, messageParameters)
}
case unsupported: UnsupportedOperationException => throw unsupported
- case _ => super.classifyException(message, e)
+ case _ => super.classifyException(e, errorClass, messageParameters)
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
index cff7bb5e06f0..4637a96039b8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
@@ -225,40 +225,47 @@ private object PostgresDialect extends JdbcDialect with
SQLConfHelper {
s"DROP INDEX ${quoteIdentifier(indexName)}"
}
- override def classifyException(message: String, e: Throwable):
AnalysisException = {
+ // Message pattern defined by postgres specification
+ private final val pgAlreadyExistsRegex = """(?:.*)relation "(.*)" already
exists""".r
+
+ override def classifyException(
+ e: Throwable,
+ errorClass: String,
+ messageParameters: Map[String, String]): AnalysisException = {
e match {
case sqlException: SQLException =>
sqlException.getSQLState match {
// https://www.postgresql.org/docs/14/errcodes-appendix.html
case "42P07" =>
- // Message patterns defined at caller sides of spark
- val indexRegex = "(?s)Failed to create index (.*) in (.*)".r
- val renameRegex = "(?s)Failed table renaming from (.*) to (.*)".r
- // Message pattern defined by postgres specification
- val pgRegex = """(?:.*)relation "(.*)" already exists""".r
-
- message match {
- case indexRegex(index, table) =>
- throw new IndexAlreadyExistsException(
- indexName = index, tableName = table, cause = Some(e))
- case renameRegex(_, newTable) =>
- throw QueryCompilationErrors.tableAlreadyExistsError(newTable)
- case _ if
pgRegex.findFirstMatchIn(sqlException.getMessage).nonEmpty =>
- val tableName =
pgRegex.findFirstMatchIn(sqlException.getMessage).get.group(1)
- throw QueryCompilationErrors.tableAlreadyExistsError(tableName)
- case _ => super.classifyException(message, e)
+ if (errorClass == "FAILED_JDBC.CREATE_INDEX") {
+ throw new IndexAlreadyExistsException(
+ indexName = messageParameters("indexName"),
+ tableName = messageParameters("tableName"),
+ cause = Some(e))
+ } else if (errorClass == "FAILED_JDBC.RENAME_TABLE") {
+ val newTable = messageParameters("newName")
+ throw QueryCompilationErrors.tableAlreadyExistsError(newTable)
+ } else {
+ val tblRegexp =
pgAlreadyExistsRegex.findFirstMatchIn(sqlException.getMessage)
+ if (tblRegexp.nonEmpty) {
+ throw
QueryCompilationErrors.tableAlreadyExistsError(tblRegexp.get.group(1))
+ } else {
+ super.classifyException(e, errorClass, messageParameters)
+ }
}
- case "42704" =>
- // The message is: Failed to drop index indexName in tableName
- val regex = "(?s)Failed to drop index (.*) in (.*)".r
- val indexName = regex.findFirstMatchIn(message).get.group(1)
- val tableName = regex.findFirstMatchIn(message).get.group(2)
+ case "42704" if errorClass == "FAILED_JDBC.DROP_INDEX" =>
+ val indexName = messageParameters("indexName")
+ val tableName = messageParameters("tableName")
throw new NoSuchIndexException(indexName, tableName, cause =
Some(e))
- case "2BP01" => throw new NonEmptyNamespaceException(message, cause
= Some(e))
- case _ => super.classifyException(message, e)
+ case "2BP01" =>
+ throw NonEmptyNamespaceException(
+ namespace = messageParameters.get("namespace").toArray,
+ details = sqlException.getMessage,
+ cause = Some(e))
+ case _ => super.classifyException(e, errorClass, messageParameters)
}
case unsupported: UnsupportedOperationException => throw unsupported
- case _ => super.classifyException(message, e)
+ case _ => super.classifyException(e, errorClass, messageParameters)
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
index eed64b873c45..1cd4077b4ec1 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
@@ -267,10 +267,20 @@ class JDBCTableCatalogSuite extends QueryTest with
SharedSparkSession {
val expectedSchema = new StructType().add("C2", IntegerType, true,
defaultMetadata)
assert(t.schema === expectedSchema)
// Drop not existing column
- val msg = intercept[AnalysisException] {
- sql(s"ALTER TABLE $tableName DROP COLUMN bad_column")
- }.getMessage
- assert(msg.contains("Missing field bad_column in table
h2.test.alt_table"))
+ val sqlText = s"ALTER TABLE $tableName DROP COLUMN bad_column"
+ checkError(
+ exception = intercept[AnalysisException] {
+ sql(sqlText)
+ },
+ errorClass = "_LEGACY_ERROR_TEMP_1331",
+ parameters = Map(
+ "fieldName" -> "bad_column",
+ "table" -> "h2.test.alt_table",
+ "schema" ->
+ """root
+ | |-- C2: integer (nullable = true)
+ |""".stripMargin),
+ context = ExpectedContext(sqlText, 0, 51))
}
// Drop a column to not existing table and namespace
Seq(
@@ -297,10 +307,21 @@ class JDBCTableCatalogSuite extends QueryTest with
SharedSparkSession {
.add("deptno", DoubleType, true, defaultMetadata)
assert(t.schema === expectedSchema)
// Update not existing column
- val msg1 = intercept[AnalysisException] {
- sql(s"ALTER TABLE $tableName ALTER COLUMN bad_column TYPE DOUBLE")
- }.getMessage
- assert(msg1.contains("Missing field bad_column in table
h2.test.alt_table"))
+ val sqlText = s"ALTER TABLE $tableName ALTER COLUMN bad_column TYPE
DOUBLE"
+ checkError(
+ exception = intercept[AnalysisException] {
+ sql(sqlText)
+ },
+ errorClass = "_LEGACY_ERROR_TEMP_1331",
+ parameters = Map(
+ "fieldName" -> "bad_column",
+ "table" -> "h2.test.alt_table",
+ "schema" ->
+ """root
+ | |-- ID: double (nullable = true)
+ | |-- deptno: double (nullable = true)
+ |""".stripMargin),
+ context = ExpectedContext(sqlText, 0, 64))
// Update column to wrong type
checkError(
exception = intercept[ParseException] {
@@ -335,10 +356,21 @@ class JDBCTableCatalogSuite extends QueryTest with
SharedSparkSession {
.add("deptno", IntegerType, true, defaultMetadata)
assert(t.schema === expectedSchema)
// Update nullability of not existing column
- val msg = intercept[AnalysisException] {
- sql(s"ALTER TABLE $tableName ALTER COLUMN bad_column DROP NOT NULL")
- }.getMessage
- assert(msg.contains("Missing field bad_column in table
h2.test.alt_table"))
+ val sqlText = s"ALTER TABLE $tableName ALTER COLUMN bad_column DROP NOT
NULL"
+ checkError(
+ exception = intercept[AnalysisException] {
+ sql(sqlText)
+ },
+ errorClass = "_LEGACY_ERROR_TEMP_1331",
+ parameters = Map(
+ "fieldName" -> "bad_column",
+ "table" -> "h2.test.alt_table",
+ "schema" ->
+ """root
+ | |-- ID: integer (nullable = true)
+ | |-- deptno: integer (nullable = true)
+ |""".stripMargin),
+ context = ExpectedContext(sqlText, 0, 66))
}
// Update column nullability in not existing table and namespace
Seq(
@@ -357,17 +389,29 @@ class JDBCTableCatalogSuite extends QueryTest with
SharedSparkSession {
val tableName = "h2.test.alt_table"
withTable(tableName) {
sql(s"CREATE TABLE $tableName (ID INTEGER)")
- val exp = intercept[AnalysisException] {
- sql(s"ALTER TABLE $tableName ALTER COLUMN ID COMMENT 'test'")
- }
- assert(exp.getErrorClass === "_LEGACY_ERROR_TEMP_1305")
- assert("Unsupported TableChange (.*) in JDBC
catalog\\.".r.pattern.matcher(exp.getMessage)
- .matches())
+ checkError(
+ exception = intercept[AnalysisException] {
+ sql(s"ALTER TABLE $tableName ALTER COLUMN ID COMMENT 'test'")
+ },
+ errorClass = "_LEGACY_ERROR_TEMP_1305",
+ parameters = Map("change" ->
+
"org.apache.spark.sql.connector.catalog.TableChange\\$UpdateColumnComment.*"),
+ matchPVals = true)
// Update comment for not existing column
- val msg = intercept[AnalysisException] {
- sql(s"ALTER TABLE $tableName ALTER COLUMN bad_column COMMENT 'test'")
- }.getMessage
- assert(msg.contains("Missing field bad_column in table
h2.test.alt_table"))
+ val sqlText = s"ALTER TABLE $tableName ALTER COLUMN bad_column COMMENT
'test'"
+ checkError(
+ exception = intercept[AnalysisException] {
+ sql(sqlText)
+ },
+ errorClass = "_LEGACY_ERROR_TEMP_1331",
+ parameters = Map(
+ "fieldName" -> "bad_column",
+ "table" -> "h2.test.alt_table",
+ "schema" ->
+ """root
+ | |-- ID: integer (nullable = true)
+ |""".stripMargin),
+ context = ExpectedContext(sqlText, 0, 67))
}
// Update column comments in not existing table and namespace
Seq(
@@ -393,10 +437,21 @@ class JDBCTableCatalogSuite extends QueryTest with
SharedSparkSession {
assert(t.schema === expectedSchema)
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
- val msg = intercept[AnalysisException] {
- sql(s"ALTER TABLE $tableName RENAME COLUMN C2 TO c3")
- }.getMessage
- assert(msg.contains("Missing field C2 in table h2.test.alt_table"))
+ val sqlText = s"ALTER TABLE $tableName RENAME COLUMN C2 TO c3"
+ checkError(
+ exception = intercept[AnalysisException] {
+ sql(sqlText)
+ },
+ errorClass = "_LEGACY_ERROR_TEMP_1331",
+ parameters = Map(
+ "fieldName" -> "C2",
+ "table" -> "h2.test.alt_table",
+ "schema" ->
+ """root
+ | |-- c1: integer (nullable = true)
+ | |-- c2: integer (nullable = true)
+ |""".stripMargin),
+ context = ExpectedContext(sqlText, 0, 51))
}
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
@@ -409,10 +464,21 @@ class JDBCTableCatalogSuite extends QueryTest with
SharedSparkSession {
}
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
- val msg = intercept[AnalysisException] {
- sql(s"ALTER TABLE $tableName DROP COLUMN C3")
- }.getMessage
- assert(msg.contains("Missing field C3 in table h2.test.alt_table"))
+ val sqlText = s"ALTER TABLE $tableName DROP COLUMN C3"
+ checkError(
+ exception = intercept[AnalysisException] {
+ sql(sqlText)
+ },
+ errorClass = "_LEGACY_ERROR_TEMP_1331",
+ parameters = Map(
+ "fieldName" -> "C3",
+ "table" -> "h2.test.alt_table",
+ "schema" ->
+ """root
+ | |-- c1: integer (nullable = true)
+ | |-- c3: integer (nullable = true)
+ |""".stripMargin),
+ context = ExpectedContext(sqlText, 0, sqlText.length - 1))
}
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
@@ -423,10 +489,20 @@ class JDBCTableCatalogSuite extends QueryTest with
SharedSparkSession {
}
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
- val msg = intercept[AnalysisException] {
- sql(s"ALTER TABLE $tableName ALTER COLUMN C1 TYPE DOUBLE")
- }.getMessage
- assert(msg.contains("Missing field C1 in table h2.test.alt_table"))
+ val sqlText = s"ALTER TABLE $tableName ALTER COLUMN C1 TYPE DOUBLE"
+ checkError(
+ exception = intercept[AnalysisException] {
+ sql(sqlText)
+ },
+ errorClass = "_LEGACY_ERROR_TEMP_1331",
+ parameters = Map(
+ "fieldName" -> "C1",
+ "table" -> "h2.test.alt_table",
+ "schema" ->
+ """root
+ | |-- c1: integer (nullable = true)
+ |""".stripMargin),
+ context = ExpectedContext(sqlText, 0, sqlText.length - 1))
}
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
@@ -437,10 +513,20 @@ class JDBCTableCatalogSuite extends QueryTest with
SharedSparkSession {
}
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
- val msg = intercept[AnalysisException] {
- sql(s"ALTER TABLE $tableName ALTER COLUMN C1 DROP NOT NULL")
- }.getMessage
- assert(msg.contains("Missing field C1 in table h2.test.alt_table"))
+ val sqlText = s"ALTER TABLE $tableName ALTER COLUMN C1 DROP NOT NULL"
+ checkError(
+ exception = intercept[AnalysisException] {
+ sql(sqlText)
+ },
+ errorClass = "_LEGACY_ERROR_TEMP_1331",
+ parameters = Map(
+ "fieldName" -> "C1",
+ "table" -> "h2.test.alt_table",
+ "schema" ->
+ """root
+ | |-- c1: double (nullable = true)
+ |""".stripMargin),
+ context = ExpectedContext(sqlText, 0, sqlText.length - 1))
}
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
@@ -468,11 +554,15 @@ class JDBCTableCatalogSuite extends QueryTest with
SharedSparkSession {
test("CREATE TABLE with table property") {
withTable("h2.test.new_table") {
- val m = intercept[AnalysisException] {
- sql("CREATE TABLE h2.test.new_table(i INT, j STRING)" +
- " TBLPROPERTIES('ENGINE'='tableEngineName')")
- }.cause.get.getMessage
- assert(m.contains("\"TABLEENGINENAME\" not found"))
+ checkError(
+ exception = intercept[AnalysisException] {
+ sql("CREATE TABLE h2.test.new_table(i INT, j STRING)" +
+ " TBLPROPERTIES('ENGINE'='tableEngineName')")
+ },
+ errorClass = "FAILED_JDBC.CREATE_TABLE",
+ parameters = Map(
+ "url" -> url,
+ "tableName" -> "`test`.`new_table`"))
}
}
@@ -484,10 +574,14 @@ class JDBCTableCatalogSuite extends QueryTest with
SharedSparkSession {
}
test("SPARK-42904: CREATE TABLE with char/varchar with invalid char length")
{
- val e = intercept[AnalysisException]{
- sql("CREATE TABLE h2.test.new_table(c CHAR(1000000001))")
- }
- assert(e.getCause.getMessage.contains("1000000001"))
+ checkError(
+ exception = intercept[AnalysisException]{
+ sql("CREATE TABLE h2.test.new_table(c CHAR(1000000001))")
+ },
+ errorClass = "FAILED_JDBC.CREATE_TABLE",
+ parameters = Map(
+ "url" -> url,
+ "tableName" -> "`test`.`new_table`"))
}
test("SPARK-42955: Skip classifyException and wrap AnalysisException for
SparkThrowable") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
index 0a66680edd63..5e04fca92f4b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
@@ -2924,8 +2924,8 @@ class JDBCV2Suite extends QueryTest with
SharedSparkSession with ExplainSuiteHel
},
errorClass = "INDEX_ALREADY_EXISTS",
parameters = Map(
- "indexName" -> "people_index",
- "tableName" -> "test.people"
+ "indexName" -> "`people_index`",
+ "tableName" -> "`test`.`people`"
)
)
assert(jdbcTable.indexExists("people_index"))
@@ -2941,7 +2941,7 @@ class JDBCV2Suite extends QueryTest with
SharedSparkSession with ExplainSuiteHel
sql(s"DROP INDEX people_index ON TABLE h2.test.people")
},
errorClass = "INDEX_NOT_FOUND",
- parameters = Map("indexName" -> "people_index", "tableName" ->
"test.people")
+ parameters = Map("indexName" -> "`people_index`", "tableName" ->
"`test`.`people`")
)
assert(jdbcTable.indexExists("people_index") == false)
val indexes3 = jdbcTable.listIndexes()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]