This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 14a933bbe2eb [SPARK-46410][SQL] Assign error classes/subclasses to JdbcUtils.classifyException 14a933bbe2eb is described below commit 14a933bbe2eb1c71988f475036735f07b2e1fa6a Author: Max Gekk <max.g...@gmail.com> AuthorDate: Sun Dec 17 10:41:07 2023 +0300 [SPARK-46410][SQL] Assign error classes/subclasses to JdbcUtils.classifyException ### What changes were proposed in this pull request? In the PR, I propose to raise exceptions with only error classes from `JdbcUtils.classifyException`, and introduce new error class `FAILED_JDBC` with sub-classes linked to a particular JDBC operation. ### Why are the changes needed? To improve user experience with Spark SQL by migrating on new error framework when all Spark exceptions from the JDBC datasource have an error class. ### Does this PR introduce _any_ user-facing change? Yes, if user's code depends on exceptions from the JDBC datasource. ### How was this patch tested? By running the affected test suites: ``` $ build/sbt "test:testOnly *JDBCV2Suite" $ build/sbt "test:testOnly *JDBCTableCatalogSuite" $ build/sbt "test:testOnly *JDBCSuite" ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44358 from MaxGekk/error-class-classifyException. Authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../src/main/resources/error/error-classes.json | 78 ++++++++- .../org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 10 +- ...sql-error-conditions-failed-jdbc-error-class.md | 80 +++++++++ docs/sql-error-conditions.md | 8 + project/MimaExcludes.scala | 3 + .../sql/catalyst/analysis/NonEmptyException.scala | 3 - .../sql/execution/datasources/jdbc/JdbcUtils.scala | 7 +- .../execution/datasources/v2/jdbc/JDBCTable.scala | 25 ++- .../datasources/v2/jdbc/JDBCTableCatalog.scala | 86 ++++++++-- .../org/apache/spark/sql/jdbc/DB2Dialect.scala | 15 +- .../org/apache/spark/sql/jdbc/H2Dialect.scala | 33 ++-- .../org/apache/spark/sql/jdbc/JdbcDialects.scala | 13 +- .../apache/spark/sql/jdbc/MsSqlServerDialect.scala | 15 +- .../org/apache/spark/sql/jdbc/MySQLDialect.scala | 28 ++- .../apache/spark/sql/jdbc/PostgresDialect.scala | 57 ++++--- .../v2/jdbc/JDBCTableCatalogSuite.scala | 188 +++++++++++++++------ .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala | 6 +- 17 files changed, 501 insertions(+), 154 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index b4a3031c06c9..62e3427fdffd 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -1096,6 +1096,79 @@ ], "sqlState" : "38000" }, + "FAILED_JDBC" : { + "message" : [ + "Failed JDBC <url> on the operation:" + ], + "subClass" : { + "ALTER_TABLE" : { + "message" : [ + "Alter the table <tableName>." + ] + }, + "CREATE_INDEX" : { + "message" : [ + "Create the index <indexName> in the <tableName> table." + ] + }, + "CREATE_NAMESPACE" : { + "message" : [ + "Create the namespace <namespace>." + ] + }, + "CREATE_NAMESPACE_COMMENT" : { + "message" : [ + "Create a comment on the namespace: <namespace>." + ] + }, + "CREATE_TABLE" : { + "message" : [ + "Create the table <tableName>." + ] + }, + "DROP_INDEX" : { + "message" : [ + "Drop the index <indexName> in the <tableName> table." + ] + }, + "DROP_NAMESPACE" : { + "message" : [ + "Drop the namespace <namespace>." + ] + }, + "GET_TABLES" : { + "message" : [ + "Get tables from the namespace: <namespace>." + ] + }, + "LIST_NAMESPACES" : { + "message" : [ + "List namespaces." + ] + }, + "NAMESPACE_EXISTS" : { + "message" : [ + "Check that the namespace <namespace> exists." + ] + }, + "REMOVE_NAMESPACE_COMMENT" : { + "message" : [ + "Remove a comment on the namespace: <namespace>." + ] + }, + "RENAME_TABLE" : { + "message" : [ + "Rename the table <oldName> to <newName>." + ] + }, + "TABLE_EXISTS" : { + "message" : [ + "Check that the table <tableName> exists." + ] + } + }, + "sqlState" : "HV000" + }, "FAILED_PARSE_STRUCT_TYPE" : { "message" : [ "Failed parsing struct: <raw>." @@ -6778,11 +6851,6 @@ "pivot is not supported on a streaming DataFrames/Datasets" ] }, - "_LEGACY_ERROR_TEMP_3064" : { - "message" : [ - "<msg>" - ] - }, "_LEGACY_ERROR_TEMP_3065" : { "message" : [ "<clazz>: <msg>" diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala index b5f5b0e5f20b..76277dbc96b6 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala @@ -221,10 +221,10 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu test("CREATE TABLE with table property") { withTable(s"$catalogName.new_table") { - val m = intercept[AnalysisException] { + val e = intercept[AnalysisException] { sql(s"CREATE TABLE $catalogName.new_table (i INT) TBLPROPERTIES('a'='1')") - }.message - assert(m.contains("Failed table creation")) + } + assert(e.getErrorClass == "FAILED_JDBC.CREATE_TABLE") testCreateTableWithProperty(s"$catalogName.new_table") } } @@ -279,7 +279,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu sql(s"CREATE index i1 ON $catalogName.new_table (col1)") }, errorClass = "INDEX_ALREADY_EXISTS", - parameters = Map("indexName" -> "i1", "tableName" -> "new_table") + parameters = Map("indexName" -> "`i1`", "tableName" -> "`new_table`") ) sql(s"DROP index i1 ON $catalogName.new_table") @@ -304,7 +304,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu sql(s"DROP index i1 ON $catalogName.new_table") }, errorClass = "INDEX_NOT_FOUND", - parameters = Map("indexName" -> "i1", "tableName" -> "new_table") + parameters = Map("indexName" -> "`i1`", "tableName" -> "`new_table`") ) } } diff --git a/docs/sql-error-conditions-failed-jdbc-error-class.md b/docs/sql-error-conditions-failed-jdbc-error-class.md new file mode 100644 index 000000000000..575441e3f347 --- /dev/null +++ b/docs/sql-error-conditions-failed-jdbc-error-class.md @@ -0,0 +1,80 @@ +--- +layout: global +title: FAILED_JDBC error class +displayTitle: FAILED_JDBC error class +license: | + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--- + +SQLSTATE: HV000 + +Failed JDBC `<url>` on the operation: + +This error class has the following derived error classes: + +## ALTER_TABLE + +Alter the table `<tableName>`. + +## CREATE_INDEX + +Create the index `<indexName>` in the `<tableName>` table. + +## CREATE_NAMESPACE + +Create the namespace `<namespace>`. + +## CREATE_NAMESPACE_COMMENT + +Create a comment on the namespace: `<namespace>`. + +## CREATE_TABLE + +Create the table `<tableName>`. + +## DROP_INDEX + +Drop the index `<indexName>` in the `<tableName>` table. + +## DROP_NAMESPACE + +Drop the namespace `<namespace>`. + +## GET_TABLES + +Get tables from the namespace: `<namespace>`. + +## LIST_NAMESPACES + +List namespaces. + +## NAMESPACE_EXISTS + +Check that the namespace `<namespace>` exists. + +## REMOVE_NAMESPACE_COMMENT + +Remove a comment on the namespace: `<namespace>`. + +## RENAME_TABLE + +Rename the table `<oldName>` to `<newName>`. + +## TABLE_EXISTS + +Check that the table `<tableName>` exists. + + diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index 82befaae81df..5657877971c5 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -665,6 +665,14 @@ User defined function (`<functionName>`: (`<signature>`) => `<result>`) failed d Failed preparing of the function `<funcName>` for call. Please, double check function's arguments. +### [FAILED_JDBC](sql-error-conditions-failed-jdbc-error-class.html) + +SQLSTATE: HV000 + +Failed JDBC `<url>` on the operation: + +For more details see [FAILED_JDBC](sql-error-conditions-failed-jdbc-error-class.html) + ### FAILED_PARSE_STRUCT_TYPE [SQLSTATE: 22018](sql-error-conditions-sqlstates.html#class-22-data-exception) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 463212290877..cfda74509720 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -100,6 +100,9 @@ object MimaExcludes { ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.storage.CacheId$"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.CacheId.apply"), + // SPARK-46410: Assign error classes/subclasses to JdbcUtils.classifyException + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.jdbc.JdbcDialect.classifyException"), + (problem: Problem) => problem match { case MissingClassProblem(cls) => !cls.fullName.startsWith("org.sparkproject.jpmml") && !cls.fullName.startsWith("org.sparkproject.dmg.pmml") diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala index 6475ac3093fe..9955f1b7bd30 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala @@ -36,7 +36,4 @@ case class NonEmptyNamespaceException( "details" -> details)) { def this(namespace: Array[String]) = this(namespace, "", None) - - def this(details: String, cause: Option[Throwable]) = - this(Array.empty, details, cause) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 9be764e8b07d..e7835514a384 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -1180,12 +1180,15 @@ object JdbcUtils extends Logging with SQLConfHelper { } } - def classifyException[T](message: String, dialect: JdbcDialect)(f: => T): T = { + def classifyException[T]( + errorClass: String, + messageParameters: Map[String, String], + dialect: JdbcDialect)(f: => T): T = { try { f } catch { case e: SparkThrowable with Throwable => throw e - case e: Throwable => throw dialect.classifyException(message, e) + case e: Throwable => throw dialect.classifyException(e, errorClass, messageParameters) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala index 1065d6347476..c251010881f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala @@ -26,13 +26,18 @@ import org.apache.spark.sql.connector.catalog.TableCapability._ import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex} import org.apache.spark.sql.connector.expressions.NamedReference import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} +import org.apache.spark.sql.errors.DataTypeErrorsBase import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcOptionsInWrite, JdbcUtils} import org.apache.spark.sql.jdbc.JdbcDialects import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOptions) - extends Table with SupportsRead with SupportsWrite with SupportsIndex { + extends Table + with SupportsRead + with SupportsWrite + with SupportsIndex + with DataTypeErrorsBase { override def name(): String = ident.toString @@ -58,8 +63,13 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt columnsProperties: util.Map[NamedReference, util.Map[String, String]], properties: util.Map[String, String]): Unit = { JdbcUtils.withConnection(jdbcOptions) { conn => - JdbcUtils.classifyException(s"Failed to create index $indexName in ${name()}", - JdbcDialects.get(jdbcOptions.url)) { + JdbcUtils.classifyException( + errorClass = "FAILED_JDBC.CREATE_INDEX", + messageParameters = Map( + "url" -> jdbcOptions.url, + "indexName" -> toSQLId(indexName), + "tableName" -> toSQLId(name)), + dialect = JdbcDialects.get(jdbcOptions.url)) { JdbcUtils.createIndex( conn, indexName, ident, columns, columnsProperties, properties, jdbcOptions) } @@ -74,8 +84,13 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt override def dropIndex(indexName: String): Unit = { JdbcUtils.withConnection(jdbcOptions) { conn => - JdbcUtils.classifyException(s"Failed to drop index $indexName in ${name()}", - JdbcDialects.get(jdbcOptions.url)) { + JdbcUtils.classifyException( + errorClass = "FAILED_JDBC.DROP_INDEX", + messageParameters = Map( + "url" -> jdbcOptions.url, + "indexName" -> toSQLId(indexName), + "tableName" -> toSQLId(name)), + dialect = JdbcDialects.get(jdbcOptions.url)) { JdbcUtils.dropIndex(conn, indexName, ident, jdbcOptions) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala index 6c773d4fb1b0..976cd3f6e9aa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException import org.apache.spark.sql.connector.catalog.{FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableChange} import org.apache.spark.sql.connector.catalog.functions.UnboundFunction import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} +import org.apache.spark.sql.errors.{DataTypeErrorsBase, QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcOptionsInWrite, JDBCRDD, JdbcUtils} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects} @@ -35,7 +35,10 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap class JDBCTableCatalog extends TableCatalog - with SupportsNamespaces with FunctionCatalog with Logging { + with SupportsNamespaces + with FunctionCatalog + with DataTypeErrorsBase + with Logging { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ private var catalogName: String = null @@ -66,7 +69,11 @@ class JDBCTableCatalog extends TableCatalog JdbcUtils.withConnection(options) { conn => val schemaPattern = if (namespace.length == 1) namespace.head else null val rs = JdbcUtils.classifyException( - s"Failed get tables from: ${namespace.mkString(".")}", dialect) { + errorClass = "FAILED_JDBC.GET_TABLES", + messageParameters = Map( + "url" -> options.url, + "namespace" -> toSQLId(namespace.toSeq)), + dialect) { conn.getMetaData.getTables(null, schemaPattern, "%", Array("TABLE")) } new Iterator[Identifier] { @@ -80,7 +87,12 @@ class JDBCTableCatalog extends TableCatalog checkNamespace(ident.namespace()) val writeOptions = new JdbcOptionsInWrite( options.parameters + (JDBCOptions.JDBC_TABLE_NAME -> getTableName(ident))) - JdbcUtils.classifyException(s"Failed table existence check: $ident", dialect) { + JdbcUtils.classifyException( + errorClass = "FAILED_JDBC.TABLE_EXISTS", + messageParameters = Map( + "url" -> options.url, + "tableName" -> toSQLId(ident)), + dialect) { JdbcUtils.withConnection(options)(JdbcUtils.tableExists(_, writeOptions)) } } @@ -100,7 +112,13 @@ class JDBCTableCatalog extends TableCatalog override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = { checkNamespace(oldIdent.namespace()) JdbcUtils.withConnection(options) { conn => - JdbcUtils.classifyException(s"Failed table renaming from $oldIdent to $newIdent", dialect) { + JdbcUtils.classifyException( + errorClass = "FAILED_JDBC.RENAME_TABLE", + messageParameters = Map( + "url" -> options.url, + "oldName" -> toSQLId(oldIdent), + "newName" -> toSQLId(newIdent)), + dialect) { JdbcUtils.renameTable(conn, oldIdent, newIdent, options) } } @@ -160,7 +178,12 @@ class JDBCTableCatalog extends TableCatalog val writeOptions = new JdbcOptionsInWrite(tableOptions) val caseSensitive = SQLConf.get.caseSensitiveAnalysis JdbcUtils.withConnection(options) { conn => - JdbcUtils.classifyException(s"Failed table creation: $ident", dialect) { + JdbcUtils.classifyException( + errorClass = "FAILED_JDBC.CREATE_TABLE", + messageParameters = Map( + "url" -> options.url, + "tableName" -> toSQLId(ident)), + dialect) { JdbcUtils.createTable(conn, getTableName(ident), schema, caseSensitive, writeOptions) } } @@ -171,7 +194,12 @@ class JDBCTableCatalog extends TableCatalog override def alterTable(ident: Identifier, changes: TableChange*): Table = { checkNamespace(ident.namespace()) JdbcUtils.withConnection(options) { conn => - JdbcUtils.classifyException(s"Failed table altering: $ident", dialect) { + JdbcUtils.classifyException( + errorClass = "FAILED_JDBC.ALTER_TABLE", + messageParameters = Map( + "url" -> options.url, + "tableName" -> toSQLId(ident)), + dialect) { JdbcUtils.alterTable(conn, getTableName(ident), changes, options) } loadTable(ident) @@ -181,7 +209,12 @@ class JDBCTableCatalog extends TableCatalog override def namespaceExists(namespace: Array[String]): Boolean = namespace match { case Array(db) => JdbcUtils.withConnection(options) { conn => - JdbcUtils.classifyException(s"Failed namespace exists: ${namespace.mkString}", dialect) { + JdbcUtils.classifyException( + errorClass = "FAILED_JDBC.NAMESPACE_EXISTS", + messageParameters = Map( + "url" -> options.url, + "namespace" -> toSQLId(namespace.toSeq)), + dialect) { JdbcUtils.schemaExists(conn, options, db) } } @@ -190,7 +223,10 @@ class JDBCTableCatalog extends TableCatalog override def listNamespaces(): Array[Array[String]] = { JdbcUtils.withConnection(options) { conn => - JdbcUtils.classifyException(s"Failed list namespaces", dialect) { + JdbcUtils.classifyException( + errorClass = "FAILED_JDBC.LIST_NAMESPACES", + messageParameters = Map("url" -> options.url), + dialect) { JdbcUtils.listSchemas(conn, options) } } @@ -238,7 +274,12 @@ class JDBCTableCatalog extends TableCatalog } } JdbcUtils.withConnection(options) { conn => - JdbcUtils.classifyException(s"Failed create name space: $db", dialect) { + JdbcUtils.classifyException( + errorClass = "FAILED_JDBC.CREATE_NAMESPACE", + messageParameters = Map( + "url" -> options.url, + "namespace" -> toSQLId(db)), + dialect) { JdbcUtils.createSchema(conn, options, db, comment) } } @@ -257,7 +298,12 @@ class JDBCTableCatalog extends TableCatalog case set: NamespaceChange.SetProperty => if (set.property() == SupportsNamespaces.PROP_COMMENT) { JdbcUtils.withConnection(options) { conn => - JdbcUtils.classifyException(s"Failed create comment on name space: $db", dialect) { + JdbcUtils.classifyException( + errorClass = "FAILED_JDBC.CREATE_NAMESPACE_COMMENT", + messageParameters = Map( + "url" -> options.url, + "namespace" -> toSQLId(db)), + dialect) { JdbcUtils.alterSchemaComment(conn, options, db, set.value) } } @@ -268,7 +314,12 @@ class JDBCTableCatalog extends TableCatalog case unset: NamespaceChange.RemoveProperty => if (unset.property() == SupportsNamespaces.PROP_COMMENT) { JdbcUtils.withConnection(options) { conn => - JdbcUtils.classifyException(s"Failed remove comment on name space: $db", dialect) { + JdbcUtils.classifyException( + errorClass = "FAILED_JDBC.REMOVE_NAMESPACE_COMMENT", + messageParameters = Map( + "url" -> options.url, + "namespace" -> toSQLId(db)), + dialect) { JdbcUtils.removeSchemaComment(conn, options, db) } } @@ -290,7 +341,12 @@ class JDBCTableCatalog extends TableCatalog cascade: Boolean): Boolean = namespace match { case Array(db) if namespaceExists(namespace) => JdbcUtils.withConnection(options) { conn => - JdbcUtils.classifyException(s"Failed drop name space: $db", dialect) { + JdbcUtils.classifyException( + errorClass = "FAILED_JDBC.DROP_NAMESPACE", + messageParameters = Map( + "url" -> options.url, + "namespace" -> toSQLId(db)), + dialect) { JdbcUtils.dropSchema(conn, options, db, cascade) true } @@ -330,4 +386,8 @@ class JDBCTableCatalog extends TableCatalog throw new NoSuchFunctionException(ident) } } + + private def toSQLId(ident: Identifier): String = { + toSQLId(ident.namespace.toSeq :+ ident.name) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala index 8975a015ee8e..4f81ee031d22 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala @@ -144,15 +144,22 @@ private object DB2Dialect extends JdbcDialect { s"COMMENT ON SCHEMA ${quoteIdentifier(schema)} IS ''" } - override def classifyException(message: String, e: Throwable): AnalysisException = { + override def classifyException( + e: Throwable, + errorClass: String, + messageParameters: Map[String, String]): AnalysisException = { e match { case sqlException: SQLException => sqlException.getSQLState match { // https://www.ibm.com/docs/en/db2/11.5?topic=messages-sqlstate - case "42893" => throw new NonEmptyNamespaceException(message, cause = Some(e)) - case _ => super.classifyException(message, e) + case "42893" => + throw NonEmptyNamespaceException( + namespace = messageParameters.get("namespace").toArray, + details = sqlException.getMessage, + cause = Some(e)) + case _ => super.classifyException(e, errorClass, messageParameters) } - case _ => super.classifyException(message, e) + case _ => super.classifyException(e, errorClass, messageParameters) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala index d275f9c9cb1b..3c9bc0ed691b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala @@ -28,8 +28,7 @@ import scala.util.control.NonFatal import org.apache.commons.lang3.StringUtils import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException, UnresolvedAttribute} -import org.apache.spark.sql.catalyst.util.quoteNameParts +import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.functions.UnboundFunction import org.apache.spark.sql.connector.catalog.index.TableIndex @@ -181,7 +180,10 @@ private[sql] object H2Dialect extends JdbcDialect { (ident.namespace() :+ indexName).map(quoteIdentifier).mkString(".") } - override def classifyException(message: String, e: Throwable): AnalysisException = { + override def classifyException( + e: Throwable, + errorClass: String, + messageParameters: Map[String, String]): AnalysisException = { e match { case exception: SQLException => // Error codes are from https://www.h2database.com/javadoc/org/h2/api/ErrorCode.html @@ -192,15 +194,16 @@ private[sql] object H2Dialect extends JdbcDialect { val regex = """"((?:[^"\\]|\\[\\"ntbrf])+)"""".r val name = regex.findFirstMatchIn(e.getMessage).get.group(1) val quotedName = org.apache.spark.sql.catalyst.util.quoteIdentifier(name) - throw new TableAlreadyExistsException(errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS", + throw new TableAlreadyExistsException( + errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS", messageParameters = Map("relationName" -> quotedName), cause = Some(e)) // TABLE_OR_VIEW_NOT_FOUND_1 case 42102 => - val quotedName = quoteNameParts(UnresolvedAttribute.parseAttributeName(message)) + val relationName = messageParameters.getOrElse("tableName", "") throw new NoSuchTableException( errorClass = "TABLE_OR_VIEW_NOT_FOUND", - messageParameters = Map("relationName" -> quotedName), + messageParameters = Map("relationName" -> relationName), cause = Some(e)) // SCHEMA_NOT_FOUND_1 case 90079 => @@ -210,25 +213,21 @@ private[sql] object H2Dialect extends JdbcDialect { throw new NoSuchNamespaceException(errorClass = "SCHEMA_NOT_FOUND", messageParameters = Map("schemaName" -> quotedName)) // INDEX_ALREADY_EXISTS_1 - case 42111 => - // The message is: Failed to create index indexName in tableName - val regex = "(?s)Failed to create index (.*) in (.*)".r - val indexName = regex.findFirstMatchIn(message).get.group(1) - val tableName = regex.findFirstMatchIn(message).get.group(2) + case 42111 if errorClass == "FAILED_JDBC.CREATE_INDEX" => + val indexName = messageParameters("indexName") + val tableName = messageParameters("tableName") throw new IndexAlreadyExistsException( indexName = indexName, tableName = tableName, cause = Some(e)) // INDEX_NOT_FOUND_1 - case 42112 => - // The message is: Failed to drop index indexName in tableName - val regex = "(?s)Failed to drop index (.*) in (.*)".r - val indexName = regex.findFirstMatchIn(message).get.group(1) - val tableName = regex.findFirstMatchIn(message).get.group(2) + case 42112 if errorClass == "FAILED_JDBC.DROP_INDEX" => + val indexName = messageParameters("indexName") + val tableName = messageParameters("tableName") throw new NoSuchIndexException(indexName, tableName, cause = Some(e)) case _ => // do nothing } case _ => // do nothing } - super.classifyException(message, e) + super.classifyException(e, errorClass, messageParameters) } override def compileExpression(expr: Expression): Option[String] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 5ba4e39e8ec1..4825568d88eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -630,15 +630,16 @@ abstract class JdbcDialect extends Serializable with Logging { /** * Gets a dialect exception, classifies it and wraps it by `AnalysisException`. - * @param message The error message to be placed to the returned exception. * @param e The dialect specific exception. + * @param errorClass The error class assigned in the case of an unclassified `e` + * @param messageParameters The message parameters of `errorClass` * @return `AnalysisException` or its sub-class. */ - def classifyException(message: String, e: Throwable): AnalysisException = { - new AnalysisException( - errorClass = "_LEGACY_ERROR_TEMP_3064", - messageParameters = Map("msg" -> message), - cause = Some(e)) + def classifyException( + e: Throwable, + errorClass: String, + messageParameters: Map[String, String]): AnalysisException = { + new AnalysisException(errorClass, messageParameters, cause = Some(e)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index ee649122ca80..f63a1abdce65 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -190,14 +190,21 @@ private object MsSqlServerDialect extends JdbcDialect { if (limit > 0) s"TOP ($limit)" else "" } - override def classifyException(message: String, e: Throwable): AnalysisException = { + override def classifyException( + e: Throwable, + errorClass: String, + messageParameters: Map[String, String]): AnalysisException = { e match { case sqlException: SQLException => sqlException.getErrorCode match { - case 3729 => throw new NonEmptyNamespaceException(message, cause = Some(e)) - case _ => super.classifyException(message, e) + case 3729 => + throw NonEmptyNamespaceException( + namespace = messageParameters.get("namespace").toArray, + details = sqlException.getMessage, + cause = Some(e)) + case _ => super.classifyException(e, errorClass, messageParameters) } - case _ => super.classifyException(message, e) + case _ => super.classifyException(e, errorClass, messageParameters) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index dd74c93bc2e1..af50a8e3e359 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -270,28 +270,26 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { indexMap.values.toArray } - override def classifyException(message: String, e: Throwable): AnalysisException = { + override def classifyException( + e: Throwable, + errorClass: String, + messageParameters: Map[String, String]): AnalysisException = { e match { case sqlException: SQLException => sqlException.getErrorCode match { // ER_DUP_KEYNAME - case 1061 => - // The message is: Failed to create index indexName in tableName - val regex = "(?s)Failed to create index (.*) in (.*)".r - val indexName = regex.findFirstMatchIn(message).get.group(1) - val tableName = regex.findFirstMatchIn(message).get.group(2) - throw new IndexAlreadyExistsException( - indexName = indexName, tableName = tableName, cause = Some(e)) - case 1091 => - // The message is: Failed to drop index indexName in tableName - val regex = "(?s)Failed to drop index (.*) in (.*)".r - val indexName = regex.findFirstMatchIn(message).get.group(1) - val tableName = regex.findFirstMatchIn(message).get.group(2) + case 1061 if errorClass == "FAILED_JDBC.CREATE_INDEX" => + val indexName = messageParameters("indexName") + val tableName = messageParameters("tableName") + throw new IndexAlreadyExistsException(indexName, tableName, cause = Some(e)) + case 1091 if errorClass == "FAILED_JDBC.DROP_INDEX" => + val indexName = messageParameters("indexName") + val tableName = messageParameters("tableName") throw new NoSuchIndexException(indexName, tableName, cause = Some(e)) - case _ => super.classifyException(message, e) + case _ => super.classifyException(e, errorClass, messageParameters) } case unsupported: UnsupportedOperationException => throw unsupported - case _ => super.classifyException(message, e) + case _ => super.classifyException(e, errorClass, messageParameters) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index cff7bb5e06f0..4637a96039b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -225,40 +225,47 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { s"DROP INDEX ${quoteIdentifier(indexName)}" } - override def classifyException(message: String, e: Throwable): AnalysisException = { + // Message pattern defined by postgres specification + private final val pgAlreadyExistsRegex = """(?:.*)relation "(.*)" already exists""".r + + override def classifyException( + e: Throwable, + errorClass: String, + messageParameters: Map[String, String]): AnalysisException = { e match { case sqlException: SQLException => sqlException.getSQLState match { // https://www.postgresql.org/docs/14/errcodes-appendix.html case "42P07" => - // Message patterns defined at caller sides of spark - val indexRegex = "(?s)Failed to create index (.*) in (.*)".r - val renameRegex = "(?s)Failed table renaming from (.*) to (.*)".r - // Message pattern defined by postgres specification - val pgRegex = """(?:.*)relation "(.*)" already exists""".r - - message match { - case indexRegex(index, table) => - throw new IndexAlreadyExistsException( - indexName = index, tableName = table, cause = Some(e)) - case renameRegex(_, newTable) => - throw QueryCompilationErrors.tableAlreadyExistsError(newTable) - case _ if pgRegex.findFirstMatchIn(sqlException.getMessage).nonEmpty => - val tableName = pgRegex.findFirstMatchIn(sqlException.getMessage).get.group(1) - throw QueryCompilationErrors.tableAlreadyExistsError(tableName) - case _ => super.classifyException(message, e) + if (errorClass == "FAILED_JDBC.CREATE_INDEX") { + throw new IndexAlreadyExistsException( + indexName = messageParameters("indexName"), + tableName = messageParameters("tableName"), + cause = Some(e)) + } else if (errorClass == "FAILED_JDBC.RENAME_TABLE") { + val newTable = messageParameters("newName") + throw QueryCompilationErrors.tableAlreadyExistsError(newTable) + } else { + val tblRegexp = pgAlreadyExistsRegex.findFirstMatchIn(sqlException.getMessage) + if (tblRegexp.nonEmpty) { + throw QueryCompilationErrors.tableAlreadyExistsError(tblRegexp.get.group(1)) + } else { + super.classifyException(e, errorClass, messageParameters) + } } - case "42704" => - // The message is: Failed to drop index indexName in tableName - val regex = "(?s)Failed to drop index (.*) in (.*)".r - val indexName = regex.findFirstMatchIn(message).get.group(1) - val tableName = regex.findFirstMatchIn(message).get.group(2) + case "42704" if errorClass == "FAILED_JDBC.DROP_INDEX" => + val indexName = messageParameters("indexName") + val tableName = messageParameters("tableName") throw new NoSuchIndexException(indexName, tableName, cause = Some(e)) - case "2BP01" => throw new NonEmptyNamespaceException(message, cause = Some(e)) - case _ => super.classifyException(message, e) + case "2BP01" => + throw NonEmptyNamespaceException( + namespace = messageParameters.get("namespace").toArray, + details = sqlException.getMessage, + cause = Some(e)) + case _ => super.classifyException(e, errorClass, messageParameters) } case unsupported: UnsupportedOperationException => throw unsupported - case _ => super.classifyException(message, e) + case _ => super.classifyException(e, errorClass, messageParameters) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala index eed64b873c45..1cd4077b4ec1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala @@ -267,10 +267,20 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { val expectedSchema = new StructType().add("C2", IntegerType, true, defaultMetadata) assert(t.schema === expectedSchema) // Drop not existing column - val msg = intercept[AnalysisException] { - sql(s"ALTER TABLE $tableName DROP COLUMN bad_column") - }.getMessage - assert(msg.contains("Missing field bad_column in table h2.test.alt_table")) + val sqlText = s"ALTER TABLE $tableName DROP COLUMN bad_column" + checkError( + exception = intercept[AnalysisException] { + sql(sqlText) + }, + errorClass = "_LEGACY_ERROR_TEMP_1331", + parameters = Map( + "fieldName" -> "bad_column", + "table" -> "h2.test.alt_table", + "schema" -> + """root + | |-- C2: integer (nullable = true) + |""".stripMargin), + context = ExpectedContext(sqlText, 0, 51)) } // Drop a column to not existing table and namespace Seq( @@ -297,10 +307,21 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { .add("deptno", DoubleType, true, defaultMetadata) assert(t.schema === expectedSchema) // Update not existing column - val msg1 = intercept[AnalysisException] { - sql(s"ALTER TABLE $tableName ALTER COLUMN bad_column TYPE DOUBLE") - }.getMessage - assert(msg1.contains("Missing field bad_column in table h2.test.alt_table")) + val sqlText = s"ALTER TABLE $tableName ALTER COLUMN bad_column TYPE DOUBLE" + checkError( + exception = intercept[AnalysisException] { + sql(sqlText) + }, + errorClass = "_LEGACY_ERROR_TEMP_1331", + parameters = Map( + "fieldName" -> "bad_column", + "table" -> "h2.test.alt_table", + "schema" -> + """root + | |-- ID: double (nullable = true) + | |-- deptno: double (nullable = true) + |""".stripMargin), + context = ExpectedContext(sqlText, 0, 64)) // Update column to wrong type checkError( exception = intercept[ParseException] { @@ -335,10 +356,21 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { .add("deptno", IntegerType, true, defaultMetadata) assert(t.schema === expectedSchema) // Update nullability of not existing column - val msg = intercept[AnalysisException] { - sql(s"ALTER TABLE $tableName ALTER COLUMN bad_column DROP NOT NULL") - }.getMessage - assert(msg.contains("Missing field bad_column in table h2.test.alt_table")) + val sqlText = s"ALTER TABLE $tableName ALTER COLUMN bad_column DROP NOT NULL" + checkError( + exception = intercept[AnalysisException] { + sql(sqlText) + }, + errorClass = "_LEGACY_ERROR_TEMP_1331", + parameters = Map( + "fieldName" -> "bad_column", + "table" -> "h2.test.alt_table", + "schema" -> + """root + | |-- ID: integer (nullable = true) + | |-- deptno: integer (nullable = true) + |""".stripMargin), + context = ExpectedContext(sqlText, 0, 66)) } // Update column nullability in not existing table and namespace Seq( @@ -357,17 +389,29 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { val tableName = "h2.test.alt_table" withTable(tableName) { sql(s"CREATE TABLE $tableName (ID INTEGER)") - val exp = intercept[AnalysisException] { - sql(s"ALTER TABLE $tableName ALTER COLUMN ID COMMENT 'test'") - } - assert(exp.getErrorClass === "_LEGACY_ERROR_TEMP_1305") - assert("Unsupported TableChange (.*) in JDBC catalog\\.".r.pattern.matcher(exp.getMessage) - .matches()) + checkError( + exception = intercept[AnalysisException] { + sql(s"ALTER TABLE $tableName ALTER COLUMN ID COMMENT 'test'") + }, + errorClass = "_LEGACY_ERROR_TEMP_1305", + parameters = Map("change" -> + "org.apache.spark.sql.connector.catalog.TableChange\\$UpdateColumnComment.*"), + matchPVals = true) // Update comment for not existing column - val msg = intercept[AnalysisException] { - sql(s"ALTER TABLE $tableName ALTER COLUMN bad_column COMMENT 'test'") - }.getMessage - assert(msg.contains("Missing field bad_column in table h2.test.alt_table")) + val sqlText = s"ALTER TABLE $tableName ALTER COLUMN bad_column COMMENT 'test'" + checkError( + exception = intercept[AnalysisException] { + sql(sqlText) + }, + errorClass = "_LEGACY_ERROR_TEMP_1331", + parameters = Map( + "fieldName" -> "bad_column", + "table" -> "h2.test.alt_table", + "schema" -> + """root + | |-- ID: integer (nullable = true) + |""".stripMargin), + context = ExpectedContext(sqlText, 0, 67)) } // Update column comments in not existing table and namespace Seq( @@ -393,10 +437,21 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { assert(t.schema === expectedSchema) withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { - val msg = intercept[AnalysisException] { - sql(s"ALTER TABLE $tableName RENAME COLUMN C2 TO c3") - }.getMessage - assert(msg.contains("Missing field C2 in table h2.test.alt_table")) + val sqlText = s"ALTER TABLE $tableName RENAME COLUMN C2 TO c3" + checkError( + exception = intercept[AnalysisException] { + sql(sqlText) + }, + errorClass = "_LEGACY_ERROR_TEMP_1331", + parameters = Map( + "fieldName" -> "C2", + "table" -> "h2.test.alt_table", + "schema" -> + """root + | |-- c1: integer (nullable = true) + | |-- c2: integer (nullable = true) + |""".stripMargin), + context = ExpectedContext(sqlText, 0, 51)) } withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { @@ -409,10 +464,21 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { } withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { - val msg = intercept[AnalysisException] { - sql(s"ALTER TABLE $tableName DROP COLUMN C3") - }.getMessage - assert(msg.contains("Missing field C3 in table h2.test.alt_table")) + val sqlText = s"ALTER TABLE $tableName DROP COLUMN C3" + checkError( + exception = intercept[AnalysisException] { + sql(sqlText) + }, + errorClass = "_LEGACY_ERROR_TEMP_1331", + parameters = Map( + "fieldName" -> "C3", + "table" -> "h2.test.alt_table", + "schema" -> + """root + | |-- c1: integer (nullable = true) + | |-- c3: integer (nullable = true) + |""".stripMargin), + context = ExpectedContext(sqlText, 0, sqlText.length - 1)) } withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { @@ -423,10 +489,20 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { } withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { - val msg = intercept[AnalysisException] { - sql(s"ALTER TABLE $tableName ALTER COLUMN C1 TYPE DOUBLE") - }.getMessage - assert(msg.contains("Missing field C1 in table h2.test.alt_table")) + val sqlText = s"ALTER TABLE $tableName ALTER COLUMN C1 TYPE DOUBLE" + checkError( + exception = intercept[AnalysisException] { + sql(sqlText) + }, + errorClass = "_LEGACY_ERROR_TEMP_1331", + parameters = Map( + "fieldName" -> "C1", + "table" -> "h2.test.alt_table", + "schema" -> + """root + | |-- c1: integer (nullable = true) + |""".stripMargin), + context = ExpectedContext(sqlText, 0, sqlText.length - 1)) } withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { @@ -437,10 +513,20 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { } withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { - val msg = intercept[AnalysisException] { - sql(s"ALTER TABLE $tableName ALTER COLUMN C1 DROP NOT NULL") - }.getMessage - assert(msg.contains("Missing field C1 in table h2.test.alt_table")) + val sqlText = s"ALTER TABLE $tableName ALTER COLUMN C1 DROP NOT NULL" + checkError( + exception = intercept[AnalysisException] { + sql(sqlText) + }, + errorClass = "_LEGACY_ERROR_TEMP_1331", + parameters = Map( + "fieldName" -> "C1", + "table" -> "h2.test.alt_table", + "schema" -> + """root + | |-- c1: double (nullable = true) + |""".stripMargin), + context = ExpectedContext(sqlText, 0, sqlText.length - 1)) } withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { @@ -468,11 +554,15 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { test("CREATE TABLE with table property") { withTable("h2.test.new_table") { - val m = intercept[AnalysisException] { - sql("CREATE TABLE h2.test.new_table(i INT, j STRING)" + - " TBLPROPERTIES('ENGINE'='tableEngineName')") - }.cause.get.getMessage - assert(m.contains("\"TABLEENGINENAME\" not found")) + checkError( + exception = intercept[AnalysisException] { + sql("CREATE TABLE h2.test.new_table(i INT, j STRING)" + + " TBLPROPERTIES('ENGINE'='tableEngineName')") + }, + errorClass = "FAILED_JDBC.CREATE_TABLE", + parameters = Map( + "url" -> url, + "tableName" -> "`test`.`new_table`")) } } @@ -484,10 +574,14 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { } test("SPARK-42904: CREATE TABLE with char/varchar with invalid char length") { - val e = intercept[AnalysisException]{ - sql("CREATE TABLE h2.test.new_table(c CHAR(1000000001))") - } - assert(e.getCause.getMessage.contains("1000000001")) + checkError( + exception = intercept[AnalysisException]{ + sql("CREATE TABLE h2.test.new_table(c CHAR(1000000001))") + }, + errorClass = "FAILED_JDBC.CREATE_TABLE", + parameters = Map( + "url" -> url, + "tableName" -> "`test`.`new_table`")) } test("SPARK-42955: Skip classifyException and wrap AnalysisException for SparkThrowable") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index 0a66680edd63..5e04fca92f4b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -2924,8 +2924,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel }, errorClass = "INDEX_ALREADY_EXISTS", parameters = Map( - "indexName" -> "people_index", - "tableName" -> "test.people" + "indexName" -> "`people_index`", + "tableName" -> "`test`.`people`" ) ) assert(jdbcTable.indexExists("people_index")) @@ -2941,7 +2941,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel sql(s"DROP INDEX people_index ON TABLE h2.test.people") }, errorClass = "INDEX_NOT_FOUND", - parameters = Map("indexName" -> "people_index", "tableName" -> "test.people") + parameters = Map("indexName" -> "`people_index`", "tableName" -> "`test`.`people`") ) assert(jdbcTable.indexExists("people_index") == false) val indexes3 = jdbcTable.listIndexes() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org