This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new a08b3fdeff7a [SPARK-52159][SQL] Properly handle table existence check for jdbc dialects a08b3fdeff7a is described below commit a08b3fdeff7a6d0da835b4f4f71f1c14191dec22 Author: milastdbx <milan.stefano...@databricks.com> AuthorDate: Mon May 19 11:38:55 2025 +0200 [SPARK-52159][SQL] Properly handle table existence check for jdbc dialects ### What changes were proposed in this pull request? In this PR, I propose that we rethrow exception when we are doing table existence check in jdbc dialect, if exception is not related to table/schema not being found. I propose this because currently all exceptions get swallowed and method returns false. From the perspective of the system its as if table doesn't exist, which is a wrong message (e.g, we can get table does not exist even if it was network failure). This issue is mostly exposed when TableCatalog API is used, ``` override def loadTable(ident: Identifier): Table = { if (!tableExists(ident)) { throw QueryCompilationErrors.noSuchTableError(ident) } ``` Error code links: - https://www.ibm.com/docs/en/db2-for-zos/12.0.0?topic=codes-error-sql - https://db.apache.org/derby/docs/10.4/ref/rrefexcept71493.html - https://docs.databricks.com/aws/en/error-messages/sqlstates - https://www.h2database.com/javadoc/org/h2/api/ErrorCode.html - https://learn.microsoft.com/en-us/sql/relational-databases/errors-events/database-engine-events-and-errors-0-to-999?view=sql-server-ver16 - https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html - https://www.postgresql.org/docs/current/errcodes-appendix.html How its implemented today, tableExist cannot throw anything, so every exception gets converted to noSuchTableError which is wrong. ### Does this PR introduce _any_ user-facing change? Customers will get proper error messages. ### How was this patch tested? Tests ### Was this patch authored or co-authored using generative AI tooling? Generetad by: COPILOT Closes #50835 from milastdbx/sparkHandleTableNotFound. Authored-by: milastdbx <milan.stefano...@databricks.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 26 ++++++++++++++++++++++ .../sql/execution/datasources/jdbc/JdbcUtils.scala | 17 ++++++++------ .../org/apache/spark/sql/jdbc/DB2Dialect.scala | 4 ++++ .../apache/spark/sql/jdbc/DatabricksDialect.scala | 6 ++++- .../org/apache/spark/sql/jdbc/DerbyDialect.scala | 8 ++++++- .../org/apache/spark/sql/jdbc/H2Dialect.scala | 4 ++++ .../org/apache/spark/sql/jdbc/JdbcDialects.scala | 5 ++++- .../apache/spark/sql/jdbc/MsSqlServerDialect.scala | 4 ++++ .../org/apache/spark/sql/jdbc/MySQLDialect.scala | 4 ++++ .../org/apache/spark/sql/jdbc/OracleDialect.scala | 5 +++++ .../apache/spark/sql/jdbc/PostgresDialect.scala | 6 +++++ .../apache/spark/sql/jdbc/SnowflakeDialect.scala | 5 +++++ .../apache/spark/sql/jdbc/TeradataDialect.scala | 6 ++++- 13 files changed, 89 insertions(+), 11 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala index 51862ae1535c..b7dc397464df 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala @@ -1050,6 +1050,32 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu } } + test("SPARK-48618: Test table does not exists error") { + val tbl = s"$catalogName.tbl1" + val sqlStatement = s"SELECT * FROM $tbl" + val startPos = sqlStatement.indexOf(tbl) + + withTable(tbl) { + sql(s"CREATE TABLE $tbl (col1 INT, col2 INT)") + sql(s"INSERT INTO $tbl VALUES (1, 2)") + val df = sql(sqlStatement) + val row = df.collect() + assert(row.length === 1) + + // Drop the table + sql(s"DROP TABLE IF EXISTS $tbl") + + checkError( + exception = intercept[AnalysisException] { + sql(sqlStatement).collect() + }, + condition = "TABLE_OR_VIEW_NOT_FOUND", + parameters = Map("relationName" -> s"`$catalogName`.`tbl1`"), + context = ExpectedContext(tbl, startPos, startPos + tbl.length - 1) + ) + } + } + def testDatetime(tbl: String): Unit = {} test("scan with filter push-down with date time functions") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 8112cf1c80ef..0077012e2b0e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -26,7 +26,7 @@ import java.util import scala.annotation.tailrec import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ -import scala.util.Try +import scala.util.{Failure, Success, Try} import scala.util.control.NonFatal import org.apache.spark.{SparkThrowable, SparkUnsupportedOperationException, TaskContext} @@ -64,18 +64,21 @@ object JdbcUtils extends Logging with SQLConfHelper { def tableExists(conn: Connection, options: JdbcOptionsInWrite): Boolean = { val dialect = JdbcDialects.get(options.url) - // Somewhat hacky, but there isn't a good way to identify whether a table exists for all - // SQL database systems using JDBC meta data calls, considering "table" could also include - // the database name. Query used to find table exists can be overridden by the dialects. - Try { + val executionResult = Try { val statement = conn.prepareStatement(dialect.getTableExistsQuery(options.table)) try { statement.setQueryTimeout(options.queryTimeout) - statement.executeQuery() + statement.executeQuery() // Success means table exists (query executed without error) } finally { statement.close() } - }.isSuccess + } + + executionResult match { + case Success(_) => true + case Failure(e: SQLException) if dialect.isObjectNotFoundException(e) => false + case Failure(e) => throw e // Re-throw unexpected exceptions + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala index 427275592ada..b748975eef65 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala @@ -47,6 +47,10 @@ private case class DB2Dialect() extends JdbcDialect with SQLConfHelper with NoLe override def isSupportedFunction(funcName: String): Boolean = supportedFunctions.contains(funcName) + override def isObjectNotFoundException(e: SQLException): Boolean = { + e.getErrorCode == -204 + } + class DB2SQLBuilder extends JDBCSQLBuilder { override def visitAggregateFunction( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala index 1aa2282f4a84..cb3cfecd940b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.jdbc -import java.sql.Connection +import java.sql.{Connection, SQLException} import scala.collection.mutable.ArrayBuilder @@ -31,6 +31,10 @@ private case class DatabricksDialect() extends JdbcDialect with NoLegacyJDBCErro url.startsWith("jdbc:databricks") } + override def isObjectNotFoundException(e: SQLException): Boolean = { + e.getSQLState == "42P01" || e.getSQLState == "42704" + } + override def getCatalystType( sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { sqlType match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala index 7b65a01b5e70..f4e6e25f58db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.jdbc -import java.sql.Types +import java.sql.{SQLException, Types} import java.util.Locale import org.apache.spark.sql.connector.catalog.Identifier @@ -38,6 +38,12 @@ private case class DerbyDialect() extends JdbcDialect with NoLegacyJDBCError { override def isSupportedFunction(funcName: String): Boolean = supportedFunctions.contains(funcName) + override def isObjectNotFoundException(e: SQLException): Boolean = { + e.getSQLState.equalsIgnoreCase("42Y07") || + e.getSQLState.equalsIgnoreCase("42X05") || + e.getSQLState.equalsIgnoreCase("X0X05") + } + override def getCatalystType( sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { if (sqlType == Types.REAL) Option(FloatType) else None diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala index 82f6f5c6264c..956e7c05cd5f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala @@ -57,6 +57,10 @@ private[sql] case class H2Dialect() extends JdbcDialect with NoLegacyJDBCError { override def isSupportedFunction(funcName: String): Boolean = supportedFunctions.contains(funcName) + override def isObjectNotFoundException(e: SQLException): Boolean = { + Set(42102, 42103, 42104, 90079).contains(e.getErrorCode) + } + override def getCatalystType( sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { sqlType match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 7719d6a67053..d6fe564c1520 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.jdbc -import java.sql.{Connection, Date, Driver, ResultSetMetaData, Statement, Timestamp} +import java.sql.{Connection, Date, Driver, ResultSetMetaData, SQLException, Statement, Timestamp} import java.time.{Instant, LocalDate, LocalDateTime} import java.util import java.util.ServiceLoader @@ -758,6 +758,9 @@ abstract class JdbcDialect extends Serializable with Logging { throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3182") } + @Since("4.1.0") + def isObjectNotFoundException(e: SQLException): Boolean = true + /** * Gets a dialect exception, classifies it and wraps it by `AnalysisException`. * @param e The dialect specific exception. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index 531e5d4f0f3a..7efdc52f35be 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -38,6 +38,10 @@ private case class MsSqlServerDialect() extends JdbcDialect with NoLegacyJDBCErr override def canHandle(url: String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:sqlserver") + override def isObjectNotFoundException(e: SQLException): Boolean = { + e.getErrorCode == 208 + } + // Microsoft SQL Server does not have the boolean type. // Compile the boolean value to the bit data type instead. // scalastyle:off line.size.limit diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index 4323fa4ed99b..5b894e71619a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -50,6 +50,10 @@ private case class MySQLDialect() extends JdbcDialect with SQLConfHelper with No override def isSupportedFunction(funcName: String): Boolean = supportedFunctions.contains(funcName) + override def isObjectNotFoundException(e: SQLException): Boolean = { + e.getErrorCode == 1146 + } + class MySQLSQLBuilder extends JDBCSQLBuilder { override def visitExtract(extract: Extract): String = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala index 851b0e04d5e5..236d9469a58d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala @@ -49,6 +49,11 @@ private case class OracleDialect() extends JdbcDialect with SQLConfHelper with N override def isSupportedFunction(funcName: String): Boolean = supportedFunctions.contains(funcName) + override def isObjectNotFoundException(e: SQLException): Boolean = { + e.getMessage.contains("ORA-00942") || + e.getMessage.contains("ORA-39165") + } + class OracleSQLBuilder extends JDBCSQLBuilder { override def visitAggregateFunction( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index b4cd5f578ccd..73b10f72e21b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -54,6 +54,12 @@ private case class PostgresDialect() override def isSupportedFunction(funcName: String): Boolean = supportedFunctions.contains(funcName) + override def isObjectNotFoundException(e: SQLException): Boolean = { + e.getSQLState == "42P01" || + e.getSQLState == "3F000" || + e.getSQLState == "42704" + } + override def getCatalystType( sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { sqlType match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala index a443a798db7c..d4ac21a45300 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.jdbc +import java.sql.SQLException import java.util.Locale import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils @@ -26,6 +27,10 @@ private case class SnowflakeDialect() extends JdbcDialect with NoLegacyJDBCError override def canHandle(url: String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:snowflake") + override def isObjectNotFoundException(e: SQLException): Boolean = { + e.getSQLState == "002003" + } + override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { case BooleanType => // By default, BOOLEAN is mapped to BIT(1). diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala index 322b259485f5..bbdab81201fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.jdbc -import java.sql.Types +import java.sql.{SQLException, Types} import java.util.Locale import org.apache.spark.sql.connector.catalog.Identifier @@ -39,6 +39,10 @@ private case class TeradataDialect() extends JdbcDialect with NoLegacyJDBCError override def isSupportedFunction(funcName: String): Boolean = supportedFunctions.contains(funcName) + override def isObjectNotFoundException(e: SQLException): Boolean = { + e.getErrorCode == 3807 + } + override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { case StringType => Some(JdbcType("VARCHAR(255)", java.sql.Types.VARCHAR)) case BooleanType => Option(JdbcType("CHAR(1)", java.sql.Types.CHAR)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org