This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 4c306ff5162c [SPARK-54581][SQL] Making fetchsize option
case-insensitive for Postgres connector
4c306ff5162c is described below
commit 4c306ff5162ca738caad047b2a7740023cc43c57
Author: Marko Sisovic <[email protected]>
AuthorDate: Tue Dec 9 08:27:01 2025 +0800
[SPARK-54581][SQL] Making fetchsize option case-insensitive for Postgres
connector
### What changes were proposed in this pull request?
Making new API for `beforeFetch` in `JDBCDialect`, where it accepts options
as `JDBCOptions` instead of `Map[String, String]`, and deprecating the old API
starting from Spark version `4.2.0`. [Spark
docs](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option)
state that options are case insensitive, so this will make it easier for
dialects to respect that. Even if we have some edge case where we need the
original casing, we can access the original map ins [...]
### Why are the changes needed?
The option `fetchsize` requires another option, `autocommit` to be set to
`false` for the Postgres connector. We have logic for this:
https://github.com/apache/spark/blob/415f50511463a8e73af77cf9f70cba4292c1331d/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala#L203-L205
However, this logic is case-sensitive, and will only work for lowercased
`fetchsize`. When passing `fetchSize` for example, the correct value for the
fetchsize will be set on the Postgres driver, but it won't have `autocommit ->
false`, so the fetch size will be ignored.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New test: `PostgresDialectSuite`.
### Was this patch authored or co-authored using generative AI tooling?
Yes, partly generated-by: claude code.
Closes #53308 from marko-sisovic-db/msisovic/postgres-fetchsize-fix.
Authored-by: Marko Sisovic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../sql/execution/datasources/jdbc/JDBCRDD.scala | 3 +-
.../org/apache/spark/sql/jdbc/JdbcDialects.scala | 11 ++++
.../spark/sql/jdbc/PostgresDialectSuite.scala | 65 ++++++++++++++++++++++
3 files changed, 77 insertions(+), 2 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index 47f5f180789e..8534a24d0110 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -305,8 +305,7 @@ class JDBCRDD(
val inputMetrics = context.taskMetrics().inputMetrics
val part = thePart.asInstanceOf[JDBCPartition]
conn = getConnection(part.idx)
- import scala.jdk.CollectionConverters._
- dialect.beforeFetch(conn, options.asProperties.asScala.toMap)
+ dialect.beforeFetch(conn, options)
// This executes a generic SQL statement (or PL/SQL block) before reading
// the table/query via JDBC. Use this feature to initialize the database
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index ce4c347cad34..875bfeb011bb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -342,9 +342,20 @@ abstract class JdbcDialect extends Serializable with
Logging {
* @param connection The connection object
* @param properties The connection properties. This is passed through from
the relation.
*/
+ @deprecated("Use beforeFetch(Connection, JDBCOptions) instead", "4.2.0")
def beforeFetch(connection: Connection, properties: Map[String, String]):
Unit = {
}
+ /**
+ * Override connection specific properties to run before a select is made.
This is in place to
+ * allow dialects that need special treatment to optimize behavior.
+ * @param connection The connection object
+ * @param options The JDBC options for the connection.
+ */
+ def beforeFetch(connection: Connection, options: JDBCOptions): Unit = {
+ beforeFetch(connection, options.parameters)
+ }
+
/**
* Escape special characters in SQL string literals.
* @param value The string to be escaped.
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresDialectSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresDialectSuite.scala
new file mode 100644
index 000000000000..15682bcf68f1
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresDialectSuite.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.jdbc
+
+import java.sql.Connection
+
+import org.mockito.Mockito._
+import org.scalatestplus.mockito.MockitoSugar
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
+
+class PostgresDialectSuite extends SparkFunSuite with MockitoSugar {
+
+ private def createJDBCOptions(extraOptions: Map[String, String]):
JDBCOptions = {
+ new JDBCOptions(Map(
+ "url" -> "jdbc:postgresql://localhost:5432/test",
+ "dbtable" -> "test_table"
+ ) ++ extraOptions)
+ }
+
+ test("beforeFetch sets autoCommit=false with lowercase fetchsize") {
+ val conn = mock[Connection]
+ val dialect = PostgresDialect()
+ dialect.beforeFetch(conn, createJDBCOptions(Map("fetchsize" -> "100")))
+ verify(conn).setAutoCommit(false)
+ }
+
+ test("beforeFetch sets autoCommit=false with camelCase fetchSize") {
+ val conn = mock[Connection]
+ val dialect = PostgresDialect()
+ dialect.beforeFetch(conn, createJDBCOptions(Map("fetchSize" -> "100")))
+ verify(conn).setAutoCommit(false)
+ }
+
+ test("beforeFetch sets autoCommit=false with uppercase FETCHSIZE") {
+ val conn = mock[Connection]
+ val dialect = PostgresDialect()
+ dialect.beforeFetch(conn, createJDBCOptions(Map("FETCHSIZE" -> "100")))
+ verify(conn).setAutoCommit(false)
+ }
+
+ test("beforeFetch does not set autoCommit when fetchSize is 0") {
+ val conn = mock[Connection]
+ val dialect = PostgresDialect()
+ dialect.beforeFetch(conn, createJDBCOptions(Map("fetchsize" -> "0")))
+ verify(conn, never()).setAutoCommit(false)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]