This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a08b3fdeff7a [SPARK-52159][SQL] Properly handle table existence check 
for jdbc dialects
a08b3fdeff7a is described below

commit a08b3fdeff7a6d0da835b4f4f71f1c14191dec22
Author: milastdbx <milan.stefano...@databricks.com>
AuthorDate: Mon May 19 11:38:55 2025 +0200

    [SPARK-52159][SQL] Properly handle table existence check for jdbc dialects
    
    ### What changes were proposed in this pull request?
    
    In this PR, I propose that we rethrow exception when we are doing table 
existence check in jdbc dialect, if exception is not related to table/schema 
not being found. I propose this because currently all exceptions get swallowed 
and method returns false. From the perspective of the system its as if table 
doesn't exist, which is a wrong message (e.g, we can get table does not exist 
even if it was network failure).
    
    This issue is mostly exposed when TableCatalog API is used,
    
    ```
      override def loadTable(ident: Identifier): Table = {
        if (!tableExists(ident)) {
          throw QueryCompilationErrors.noSuchTableError(ident)
        }
    
    ```
    
    Error code links:
     - https://www.ibm.com/docs/en/db2-for-zos/12.0.0?topic=codes-error-sql
     - https://db.apache.org/derby/docs/10.4/ref/rrefexcept71493.html
     - https://docs.databricks.com/aws/en/error-messages/sqlstates
     - https://www.h2database.com/javadoc/org/h2/api/ErrorCode.html
     - 
https://learn.microsoft.com/en-us/sql/relational-databases/errors-events/database-engine-events-and-errors-0-to-999?view=sql-server-ver16
     - https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html
     - https://www.postgresql.org/docs/current/errcodes-appendix.html
    
    How its implemented today, tableExist cannot throw anything, so every 
exception gets converted to noSuchTableError which is wrong.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Customers will get proper error messages.
    
    ### How was this patch tested?
    
    Tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generetad by: COPILOT
    
    Closes #50835 from milastdbx/sparkHandleTableNotFound.
    
    Authored-by: milastdbx <milan.stefano...@databricks.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala  | 26 ++++++++++++++++++++++
 .../sql/execution/datasources/jdbc/JdbcUtils.scala | 17 ++++++++------
 .../org/apache/spark/sql/jdbc/DB2Dialect.scala     |  4 ++++
 .../apache/spark/sql/jdbc/DatabricksDialect.scala  |  6 ++++-
 .../org/apache/spark/sql/jdbc/DerbyDialect.scala   |  8 ++++++-
 .../org/apache/spark/sql/jdbc/H2Dialect.scala      |  4 ++++
 .../org/apache/spark/sql/jdbc/JdbcDialects.scala   |  5 ++++-
 .../apache/spark/sql/jdbc/MsSqlServerDialect.scala |  4 ++++
 .../org/apache/spark/sql/jdbc/MySQLDialect.scala   |  4 ++++
 .../org/apache/spark/sql/jdbc/OracleDialect.scala  |  5 +++++
 .../apache/spark/sql/jdbc/PostgresDialect.scala    |  6 +++++
 .../apache/spark/sql/jdbc/SnowflakeDialect.scala   |  5 +++++
 .../apache/spark/sql/jdbc/TeradataDialect.scala    |  6 ++++-
 13 files changed, 89 insertions(+), 11 deletions(-)

diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
index 51862ae1535c..b7dc397464df 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
@@ -1050,6 +1050,32 @@ private[v2] trait V2JDBCTest extends SharedSparkSession 
with DockerIntegrationFu
     }
   }
 
+  test("SPARK-48618: Test table does not exists error") {
+    val tbl = s"$catalogName.tbl1"
+    val sqlStatement = s"SELECT * FROM $tbl"
+    val startPos = sqlStatement.indexOf(tbl)
+
+    withTable(tbl) {
+      sql(s"CREATE TABLE $tbl (col1 INT, col2 INT)")
+      sql(s"INSERT INTO $tbl VALUES (1, 2)")
+      val df = sql(sqlStatement)
+      val row = df.collect()
+      assert(row.length === 1)
+
+      // Drop the table
+      sql(s"DROP TABLE IF EXISTS $tbl")
+
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(sqlStatement).collect()
+        },
+        condition = "TABLE_OR_VIEW_NOT_FOUND",
+        parameters = Map("relationName" -> s"`$catalogName`.`tbl1`"),
+        context = ExpectedContext(tbl, startPos, startPos + tbl.length - 1)
+      )
+    }
+  }
+
   def testDatetime(tbl: String): Unit = {}
 
   test("scan with filter push-down with date time functions") {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 8112cf1c80ef..0077012e2b0e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -26,7 +26,7 @@ import java.util
 import scala.annotation.tailrec
 import scala.collection.mutable.ArrayBuffer
 import scala.jdk.CollectionConverters._
-import scala.util.Try
+import scala.util.{Failure, Success, Try}
 import scala.util.control.NonFatal
 
 import org.apache.spark.{SparkThrowable, SparkUnsupportedOperationException, 
TaskContext}
@@ -64,18 +64,21 @@ object JdbcUtils extends Logging with SQLConfHelper {
   def tableExists(conn: Connection, options: JdbcOptionsInWrite): Boolean = {
     val dialect = JdbcDialects.get(options.url)
 
-    // Somewhat hacky, but there isn't a good way to identify whether a table 
exists for all
-    // SQL database systems using JDBC meta data calls, considering "table" 
could also include
-    // the database name. Query used to find table exists can be overridden by 
the dialects.
-    Try {
+    val executionResult = Try {
       val statement = 
conn.prepareStatement(dialect.getTableExistsQuery(options.table))
       try {
         statement.setQueryTimeout(options.queryTimeout)
-        statement.executeQuery()
+        statement.executeQuery()  // Success means table exists (query 
executed without error)
       } finally {
         statement.close()
       }
-    }.isSuccess
+    }
+
+    executionResult match {
+      case Success(_) => true
+      case Failure(e: SQLException) if dialect.isObjectNotFoundException(e) => 
false
+      case Failure(e) => throw e  // Re-throw unexpected exceptions
+    }
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
index 427275592ada..b748975eef65 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
@@ -47,6 +47,10 @@ private case class DB2Dialect() extends JdbcDialect with 
SQLConfHelper with NoLe
   override def isSupportedFunction(funcName: String): Boolean =
     supportedFunctions.contains(funcName)
 
+  override def isObjectNotFoundException(e: SQLException): Boolean = {
+    e.getErrorCode == -204
+  }
+
   class DB2SQLBuilder extends JDBCSQLBuilder {
 
     override def visitAggregateFunction(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala
index 1aa2282f4a84..cb3cfecd940b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.jdbc
 
-import java.sql.Connection
+import java.sql.{Connection, SQLException}
 
 import scala.collection.mutable.ArrayBuilder
 
@@ -31,6 +31,10 @@ private case class DatabricksDialect() extends JdbcDialect 
with NoLegacyJDBCErro
     url.startsWith("jdbc:databricks")
   }
 
+  override def isObjectNotFoundException(e: SQLException): Boolean = {
+    e.getSQLState == "42P01" || e.getSQLState == "42704"
+  }
+
   override def getCatalystType(
       sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): 
Option[DataType] = {
     sqlType match {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala
index 7b65a01b5e70..f4e6e25f58db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.jdbc
 
-import java.sql.Types
+import java.sql.{SQLException, Types}
 import java.util.Locale
 
 import org.apache.spark.sql.connector.catalog.Identifier
@@ -38,6 +38,12 @@ private case class DerbyDialect() extends JdbcDialect with 
NoLegacyJDBCError {
   override def isSupportedFunction(funcName: String): Boolean =
     supportedFunctions.contains(funcName)
 
+  override def isObjectNotFoundException(e: SQLException): Boolean = {
+    e.getSQLState.equalsIgnoreCase("42Y07") ||
+      e.getSQLState.equalsIgnoreCase("42X05") ||
+      e.getSQLState.equalsIgnoreCase("X0X05")
+  }
+
   override def getCatalystType(
       sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): 
Option[DataType] = {
     if (sqlType == Types.REAL) Option(FloatType) else None
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
index 82f6f5c6264c..956e7c05cd5f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
@@ -57,6 +57,10 @@ private[sql] case class H2Dialect() extends JdbcDialect with 
NoLegacyJDBCError {
   override def isSupportedFunction(funcName: String): Boolean =
     supportedFunctions.contains(funcName)
 
+  override def isObjectNotFoundException(e: SQLException): Boolean = {
+    Set(42102, 42103, 42104, 90079).contains(e.getErrorCode)
+  }
+
   override def getCatalystType(
       sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): 
Option[DataType] = {
     sqlType match {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index 7719d6a67053..d6fe564c1520 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.jdbc
 
-import java.sql.{Connection, Date, Driver, ResultSetMetaData, Statement, 
Timestamp}
+import java.sql.{Connection, Date, Driver, ResultSetMetaData, SQLException, 
Statement, Timestamp}
 import java.time.{Instant, LocalDate, LocalDateTime}
 import java.util
 import java.util.ServiceLoader
@@ -758,6 +758,9 @@ abstract class JdbcDialect extends Serializable with 
Logging {
     throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3182")
   }
 
+  @Since("4.1.0")
+  def isObjectNotFoundException(e: SQLException): Boolean = true
+
   /**
    * Gets a dialect exception, classifies it and wraps it by 
`AnalysisException`.
    * @param e The dialect specific exception.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
index 531e5d4f0f3a..7efdc52f35be 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
@@ -38,6 +38,10 @@ private case class MsSqlServerDialect() extends JdbcDialect 
with NoLegacyJDBCErr
   override def canHandle(url: String): Boolean =
     url.toLowerCase(Locale.ROOT).startsWith("jdbc:sqlserver")
 
+  override def isObjectNotFoundException(e: SQLException): Boolean = {
+    e.getErrorCode == 208
+  }
+
   // Microsoft SQL Server does not have the boolean type.
   // Compile the boolean value to the bit data type instead.
   // scalastyle:off line.size.limit
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
index 4323fa4ed99b..5b894e71619a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -50,6 +50,10 @@ private case class MySQLDialect() extends JdbcDialect with 
SQLConfHelper with No
   override def isSupportedFunction(funcName: String): Boolean =
     supportedFunctions.contains(funcName)
 
+  override def isObjectNotFoundException(e: SQLException): Boolean = {
+    e.getErrorCode == 1146
+  }
+
   class MySQLSQLBuilder extends JDBCSQLBuilder {
 
     override def visitExtract(extract: Extract): String = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
index 851b0e04d5e5..236d9469a58d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
@@ -49,6 +49,11 @@ private case class OracleDialect() extends JdbcDialect with 
SQLConfHelper with N
   override def isSupportedFunction(funcName: String): Boolean =
     supportedFunctions.contains(funcName)
 
+  override def isObjectNotFoundException(e: SQLException): Boolean = {
+    e.getMessage.contains("ORA-00942") ||
+      e.getMessage.contains("ORA-39165")
+  }
+
   class OracleSQLBuilder extends JDBCSQLBuilder {
 
     override def visitAggregateFunction(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
index b4cd5f578ccd..73b10f72e21b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
@@ -54,6 +54,12 @@ private case class PostgresDialect()
   override def isSupportedFunction(funcName: String): Boolean =
     supportedFunctions.contains(funcName)
 
+  override def isObjectNotFoundException(e: SQLException): Boolean = {
+    e.getSQLState == "42P01" ||
+      e.getSQLState == "3F000" ||
+      e.getSQLState == "42704"
+  }
+
   override def getCatalystType(
       sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): 
Option[DataType] = {
     sqlType match {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala
index a443a798db7c..d4ac21a45300 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.jdbc
 
+import java.sql.SQLException
 import java.util.Locale
 
 import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
@@ -26,6 +27,10 @@ private case class SnowflakeDialect() extends JdbcDialect 
with NoLegacyJDBCError
   override def canHandle(url: String): Boolean =
     url.toLowerCase(Locale.ROOT).startsWith("jdbc:snowflake")
 
+  override def isObjectNotFoundException(e: SQLException): Boolean = {
+    e.getSQLState == "002003"
+  }
+
   override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
     case BooleanType =>
       // By default, BOOLEAN is mapped to BIT(1).
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala
index 322b259485f5..bbdab81201fc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.jdbc
 
-import java.sql.Types
+import java.sql.{SQLException, Types}
 import java.util.Locale
 
 import org.apache.spark.sql.connector.catalog.Identifier
@@ -39,6 +39,10 @@ private case class TeradataDialect() extends JdbcDialect 
with NoLegacyJDBCError
   override def isSupportedFunction(funcName: String): Boolean =
     supportedFunctions.contains(funcName)
 
+  override def isObjectNotFoundException(e: SQLException): Boolean = {
+    e.getErrorCode == 3807
+  }
+
   override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
     case StringType => Some(JdbcType("VARCHAR(255)", java.sql.Types.VARCHAR))
     case BooleanType => Option(JdbcType("CHAR(1)", java.sql.Types.CHAR))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to