This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ebd2b78f87fa [SPARK-46727][SQL] Port `classifyException()` in JDBC
dialects on error classes
ebd2b78f87fa is described below
commit ebd2b78f87fa6086c41d5e6bcade5efeefac75d0
Author: Max Gekk <[email protected]>
AuthorDate: Tue Jan 16 17:47:09 2024 +0300
[SPARK-46727][SQL] Port `classifyException()` in JDBC dialects on error
classes
### What changes were proposed in this pull request?
In the PR, I propose to port the existing `classifyException()` method
which accepts a description to new one w/ an error class added by
https://github.com/apache/spark/pull/44358. The modified JDBC dialects are:
DB2, H2, Oracle, MS SQL Server, MySQL and PostgreSQL.
### Why are the changes needed?
The old method `classifyException()` which accepts a `description` only has
been deprecated already by ...
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
By existing integration tests, and the modified test suite:
```
$ build/sbt "test:testOnly *JDBCV2Suite"
```
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #44739 from MaxGekk/port-jdbc-classifyException.
Authored-by: Max Gekk <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
---
.../org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 4 +-
.../org/apache/spark/sql/jdbc/DB2Dialect.scala | 17 +++++--
.../org/apache/spark/sql/jdbc/H2Dialect.scala | 34 ++++++-------
.../apache/spark/sql/jdbc/MsSqlServerDialect.scala | 17 +++++--
.../org/apache/spark/sql/jdbc/MySQLDialect.scala | 29 ++++++-----
.../apache/spark/sql/jdbc/PostgresDialect.scala | 56 ++++++++++++----------
.../org/apache/spark/sql/jdbc/JDBCV2Suite.scala | 6 +--
7 files changed, 91 insertions(+), 72 deletions(-)
diff --git
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
index d1d247967b4b..bae274788212 100644
---
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
+++
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
@@ -279,7 +279,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession
with DockerIntegrationFu
sql(s"CREATE index i1 ON $catalogName.new_table (col1)")
},
errorClass = "INDEX_ALREADY_EXISTS",
- parameters = Map("indexName" -> "i1", "tableName" -> "new_table")
+ parameters = Map("indexName" -> "`i1`", "tableName" -> "`new_table`")
)
sql(s"DROP index i1 ON $catalogName.new_table")
@@ -304,7 +304,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession
with DockerIntegrationFu
sql(s"DROP index i1 ON $catalogName.new_table")
},
errorClass = "INDEX_NOT_FOUND",
- parameters = Map("indexName" -> "i1", "tableName" -> "new_table")
+ parameters = Map("indexName" -> "`i1`", "tableName" -> "`new_table`")
)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
index d5a132c7dd48..f745e466ed9e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
@@ -144,16 +144,23 @@ private object DB2Dialect extends JdbcDialect {
s"COMMENT ON SCHEMA ${quoteIdentifier(schema)} IS ''"
}
- override def classifyException(message: String, e: Throwable):
AnalysisException = {
+ override def classifyException(
+ e: Throwable,
+ errorClass: String,
+ messageParameters: Map[String, String],
+ description: String): AnalysisException = {
e match {
case sqlException: SQLException =>
sqlException.getSQLState match {
// https://www.ibm.com/docs/en/db2/11.5?topic=messages-sqlstate
- case "42893" => throw NonEmptyNamespaceException(
- namespace = Array.empty, details = message, cause = Some(e))
- case _ => super.classifyException(message, e)
+ case "42893" =>
+ throw NonEmptyNamespaceException(
+ namespace = messageParameters.get("namespace").toArray,
+ details = sqlException.getMessage,
+ cause = Some(e))
+ case _ => super.classifyException(e, errorClass, messageParameters,
description)
}
- case _ => super.classifyException(message, e)
+ case _ => super.classifyException(e, errorClass, messageParameters,
description)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
index ae3a3addf7bf..cd151f790adf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
@@ -28,8 +28,7 @@ import scala.util.control.NonFatal
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException,
NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException,
TableAlreadyExistsException, UnresolvedAttribute}
-import org.apache.spark.sql.catalyst.util.quoteNameParts
+import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException,
NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException,
TableAlreadyExistsException}
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import org.apache.spark.sql.connector.catalog.index.TableIndex
@@ -195,7 +194,11 @@ private[sql] object H2Dialect extends JdbcDialect {
(ident.namespace() :+ indexName).map(quoteIdentifier).mkString(".")
}
- override def classifyException(message: String, e: Throwable):
AnalysisException = {
+ override def classifyException(
+ e: Throwable,
+ errorClass: String,
+ messageParameters: Map[String, String],
+ description: String): AnalysisException = {
e match {
case exception: SQLException =>
// Error codes are from
https://www.h2database.com/javadoc/org/h2/api/ErrorCode.html
@@ -206,15 +209,16 @@ private[sql] object H2Dialect extends JdbcDialect {
val regex = """"((?:[^"\\]|\\[\\"ntbrf])+)"""".r
val name = regex.findFirstMatchIn(e.getMessage).get.group(1)
val quotedName =
org.apache.spark.sql.catalyst.util.quoteIdentifier(name)
- throw new TableAlreadyExistsException(errorClass =
"TABLE_OR_VIEW_ALREADY_EXISTS",
+ throw new TableAlreadyExistsException(
+ errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS",
messageParameters = Map("relationName" -> quotedName),
cause = Some(e))
// TABLE_OR_VIEW_NOT_FOUND_1
case 42102 =>
- val quotedName =
quoteNameParts(UnresolvedAttribute.parseAttributeName(message))
+ val relationName = messageParameters.getOrElse("tableName", "")
throw new NoSuchTableException(
errorClass = "TABLE_OR_VIEW_NOT_FOUND",
- messageParameters = Map("relationName" -> quotedName),
+ messageParameters = Map("relationName" -> relationName),
cause = Some(e))
// SCHEMA_NOT_FOUND_1
case 90079 =>
@@ -224,25 +228,21 @@ private[sql] object H2Dialect extends JdbcDialect {
throw new NoSuchNamespaceException(errorClass = "SCHEMA_NOT_FOUND",
messageParameters = Map("schemaName" -> quotedName))
// INDEX_ALREADY_EXISTS_1
- case 42111 =>
- // The message is: Failed to create index indexName in tableName
- val regex = "(?s)Failed to create index (.*) in (.*)".r
- val indexName = regex.findFirstMatchIn(message).get.group(1)
- val tableName = regex.findFirstMatchIn(message).get.group(2)
+ case 42111 if errorClass == "FAILED_JDBC.CREATE_INDEX" =>
+ val indexName = messageParameters("indexName")
+ val tableName = messageParameters("tableName")
throw new IndexAlreadyExistsException(
indexName = indexName, tableName = tableName, cause = Some(e))
// INDEX_NOT_FOUND_1
- case 42112 =>
- // The message is: Failed to drop index indexName in tableName
- val regex = "(?s)Failed to drop index (.*) in (.*)".r
- val indexName = regex.findFirstMatchIn(message).get.group(1)
- val tableName = regex.findFirstMatchIn(message).get.group(2)
+ case 42112 if errorClass == "FAILED_JDBC.DROP_INDEX" =>
+ val indexName = messageParameters("indexName")
+ val tableName = messageParameters("tableName")
throw new NoSuchIndexException(indexName, tableName, cause =
Some(e))
case _ => // do nothing
}
case _ => // do nothing
}
- super.classifyException(message, e)
+ super.classifyException(e, errorClass, messageParameters, description)
}
override def compileExpression(expr: Expression): Option[String] = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
index 9776cff3f7c8..aaee6be24e61 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
@@ -190,15 +190,22 @@ private object MsSqlServerDialect extends JdbcDialect {
if (limit > 0) s"TOP ($limit)" else ""
}
- override def classifyException(message: String, e: Throwable):
AnalysisException = {
+ override def classifyException(
+ e: Throwable,
+ errorClass: String,
+ messageParameters: Map[String, String],
+ description: String): AnalysisException = {
e match {
case sqlException: SQLException =>
sqlException.getErrorCode match {
- case 3729 => throw NonEmptyNamespaceException(
- namespace = Array.empty, details = message, cause = Some(e))
- case _ => super.classifyException(message, e)
+ case 3729 =>
+ throw NonEmptyNamespaceException(
+ namespace = messageParameters.get("namespace").toArray,
+ details = sqlException.getMessage,
+ cause = Some(e))
+ case _ => super.classifyException(e, errorClass, messageParameters,
description)
}
- case _ => super.classifyException(message, e)
+ case _ => super.classifyException(e, errorClass, messageParameters,
description)
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
index dd74c93bc2e1..cbed1d1e6384 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -270,28 +270,27 @@ private case object MySQLDialect extends JdbcDialect with
SQLConfHelper {
indexMap.values.toArray
}
- override def classifyException(message: String, e: Throwable):
AnalysisException = {
+ override def classifyException(
+ e: Throwable,
+ errorClass: String,
+ messageParameters: Map[String, String],
+ description: String): AnalysisException = {
e match {
case sqlException: SQLException =>
sqlException.getErrorCode match {
// ER_DUP_KEYNAME
- case 1061 =>
- // The message is: Failed to create index indexName in tableName
- val regex = "(?s)Failed to create index (.*) in (.*)".r
- val indexName = regex.findFirstMatchIn(message).get.group(1)
- val tableName = regex.findFirstMatchIn(message).get.group(2)
- throw new IndexAlreadyExistsException(
- indexName = indexName, tableName = tableName, cause = Some(e))
- case 1091 =>
- // The message is: Failed to drop index indexName in tableName
- val regex = "(?s)Failed to drop index (.*) in (.*)".r
- val indexName = regex.findFirstMatchIn(message).get.group(1)
- val tableName = regex.findFirstMatchIn(message).get.group(2)
+ case 1061 if errorClass == "FAILED_JDBC.CREATE_INDEX" =>
+ val indexName = messageParameters("indexName")
+ val tableName = messageParameters("tableName")
+ throw new IndexAlreadyExistsException(indexName, tableName, cause
= Some(e))
+ case 1091 if errorClass == "FAILED_JDBC.DROP_INDEX" =>
+ val indexName = messageParameters("indexName")
+ val tableName = messageParameters("tableName")
throw new NoSuchIndexException(indexName, tableName, cause =
Some(e))
- case _ => super.classifyException(message, e)
+ case _ => super.classifyException(e, errorClass, messageParameters,
description)
}
case unsupported: UnsupportedOperationException => throw unsupported
- case _ => super.classifyException(message, e)
+ case _ => super.classifyException(e, errorClass, messageParameters,
description)
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
index 901e66e5efcb..3eb065a5d4f0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
@@ -225,42 +225,48 @@ private object PostgresDialect extends JdbcDialect with
SQLConfHelper {
s"DROP INDEX ${quoteIdentifier(indexName)}"
}
- override def classifyException(message: String, e: Throwable):
AnalysisException = {
+ // Message pattern defined by postgres specification
+ private final val pgAlreadyExistsRegex = """(?:.*)relation "(.*)" already
exists""".r
+
+ override def classifyException(
+ e: Throwable,
+ errorClass: String,
+ messageParameters: Map[String, String],
+ description: String): AnalysisException = {
e match {
case sqlException: SQLException =>
sqlException.getSQLState match {
// https://www.postgresql.org/docs/14/errcodes-appendix.html
case "42P07" =>
- // Message patterns defined at caller sides of spark
- val indexRegex = "(?s)Failed to create index (.*) in (.*)".r
- val renameRegex = "(?s)Failed table renaming from (.*) to (.*)".r
- // Message pattern defined by postgres specification
- val pgRegex = """(?:.*)relation "(.*)" already exists""".r
-
- message match {
- case indexRegex(index, table) =>
- throw new IndexAlreadyExistsException(
- indexName = index, tableName = table, cause = Some(e))
- case renameRegex(_, newTable) =>
- throw QueryCompilationErrors.tableAlreadyExistsError(newTable)
- case _ if
pgRegex.findFirstMatchIn(sqlException.getMessage).nonEmpty =>
- val tableName =
pgRegex.findFirstMatchIn(sqlException.getMessage).get.group(1)
- throw QueryCompilationErrors.tableAlreadyExistsError(tableName)
- case _ => super.classifyException(message, e)
+ if (errorClass == "FAILED_JDBC.CREATE_INDEX") {
+ throw new IndexAlreadyExistsException(
+ indexName = messageParameters("indexName"),
+ tableName = messageParameters("tableName"),
+ cause = Some(e))
+ } else if (errorClass == "FAILED_JDBC.RENAME_TABLE") {
+ val newTable = messageParameters("newName")
+ throw QueryCompilationErrors.tableAlreadyExistsError(newTable)
+ } else {
+ val tblRegexp =
pgAlreadyExistsRegex.findFirstMatchIn(sqlException.getMessage)
+ if (tblRegexp.nonEmpty) {
+ throw
QueryCompilationErrors.tableAlreadyExistsError(tblRegexp.get.group(1))
+ } else {
+ super.classifyException(e, errorClass, messageParameters,
description)
+ }
}
- case "42704" =>
- // The message is: Failed to drop index indexName in tableName
- val regex = "(?s)Failed to drop index (.*) in (.*)".r
- val indexName = regex.findFirstMatchIn(message).get.group(1)
- val tableName = regex.findFirstMatchIn(message).get.group(2)
+ case "42704" if errorClass == "FAILED_JDBC.DROP_INDEX" =>
+ val indexName = messageParameters("indexName")
+ val tableName = messageParameters("tableName")
throw new NoSuchIndexException(indexName, tableName, cause =
Some(e))
case "2BP01" =>
throw NonEmptyNamespaceException(
- namespace = Array.empty, details = message, cause = Some(e))
- case _ => super.classifyException(message, e)
+ namespace = messageParameters.get("namespace").toArray,
+ details = sqlException.getMessage,
+ cause = Some(e))
+ case _ => super.classifyException(e, errorClass, messageParameters,
description)
}
case unsupported: UnsupportedOperationException => throw unsupported
- case _ => super.classifyException(message, e)
+ case _ => super.classifyException(e, errorClass, messageParameters,
description)
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
index 05b3787d0ff2..a3990f3cfbb3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
@@ -2980,8 +2980,8 @@ class JDBCV2Suite extends QueryTest with
SharedSparkSession with ExplainSuiteHel
},
errorClass = "INDEX_ALREADY_EXISTS",
parameters = Map(
- "indexName" -> "people_index",
- "tableName" -> "test.people"
+ "indexName" -> "`people_index`",
+ "tableName" -> "`test`.`people`"
)
)
assert(jdbcTable.indexExists("people_index"))
@@ -2997,7 +2997,7 @@ class JDBCV2Suite extends QueryTest with
SharedSparkSession with ExplainSuiteHel
sql(s"DROP INDEX people_index ON TABLE h2.test.people")
},
errorClass = "INDEX_NOT_FOUND",
- parameters = Map("indexName" -> "people_index", "tableName" ->
"test.people")
+ parameters = Map("indexName" -> "`people_index`", "tableName" ->
"`test`.`people`")
)
assert(jdbcTable.indexExists("people_index") == false)
val indexes3 = jdbcTable.listIndexes()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]