This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 808b3acfd5bd [SPARK-50666][SQL] Support hint for reading in JDBC data 
source
808b3acfd5bd is described below

commit 808b3acfd5bd05357567ec5e138c7ab6f4ed09bd
Author: Wei Guo <[email protected]>
AuthorDate: Mon Jan 27 08:52:28 2025 -0800

    [SPARK-50666][SQL] Support hint for reading in JDBC data source
    
    ### What changes were proposed in this pull request?
    
    This PR aims to add a hint option for JDBC data source. This option is used 
to specify the hint for reading. It will apply only if the underlying DBMS 
supports the hint feature. Currently, this option is only supported by 
OracleDialect and MySQLDialect.
    
    ### Why are the changes needed?
    
    It's useful for performance tuning when reading from DBMS.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Passed GA and add a new test case.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #49564 from wayneguow/jdbc_hint.
    
    Authored-by: Wei Guo <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit fef1b2375c3074cb3b53d5c29df1aa27c269469c)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../src/main/resources/error/error-conditions.json |  6 +++
 docs/sql-data-sources-jdbc.md                      |  8 ++++
 pom.xml                                            | 21 ++++++++++
 .../spark/sql/errors/QueryExecutionErrors.scala    |  6 +++
 sql/core/pom.xml                                   | 15 ++++++++
 .../execution/datasources/jdbc/JDBCOptions.scala   | 10 +++++
 .../org/apache/spark/sql/jdbc/DB2Dialect.scala     |  2 +-
 .../apache/spark/sql/jdbc/DatabricksDialect.scala  |  2 +
 .../org/apache/spark/sql/jdbc/JdbcDialects.scala   |  2 +
 .../spark/sql/jdbc/JdbcSQLQueryBuilder.scala       | 15 +++++++-
 .../apache/spark/sql/jdbc/MsSqlServerDialect.scala |  4 +-
 .../org/apache/spark/sql/jdbc/MySQLDialect.scala   |  4 +-
 .../org/apache/spark/sql/jdbc/OracleDialect.scala  |  6 ++-
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala      | 45 +++++++++++++++++++++-
 14 files changed, 138 insertions(+), 8 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 5b08a31a05e4..3bac6638f706 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -1701,6 +1701,12 @@
     ],
     "sqlState" : "42822"
   },
+  "HINT_UNSUPPORTED_FOR_JDBC_DIALECT" : {
+    "message" : [
+      "The option `hint` is not supported for <jdbcDialect> in JDBC data 
source. Supported dialects are `MySQLDialect`, `OracleDialect` and 
`DatabricksDialect`."
+    ],
+    "sqlState" : "42822"
+  },
   "HLL_INVALID_INPUT_SKETCH_BUFFER" : {
     "message" : [
       "Invalid call to <function>; only valid HLL sketch buffers are supported 
as inputs (such as those produced by the `hll_sketch_agg` function)."
diff --git a/docs/sql-data-sources-jdbc.md b/docs/sql-data-sources-jdbc.md
index 9ffd96cd40ee..eca8fb9c6c43 100644
--- a/docs/sql-data-sources-jdbc.md
+++ b/docs/sql-data-sources-jdbc.md
@@ -374,6 +374,14 @@ logging into the data sources.
     </td>
     <td>read</td>
   </tr>
+  <tr>
+    <td><code>hint</code></td>
+    <td>(none)</td>
+    <td>
+      This option is used to specify the hint for reading. The supported hint 
format is a variant of C-style comments: it needs to start with `/*+ ` and end 
with ` */`. Currently, this option is only supported in MySQLDialect, 
OracleDialect and DatabricksDialect.
+    </td>
+    <td>read</td>
+  </tr>
 </table>
 
 Note that kerberos authentication with keytab is not always supported by the 
JDBC driver.<br>
diff --git a/pom.xml b/pom.xml
index efe5b9285c52..d1b17d2147ae 100644
--- a/pom.xml
+++ b/pom.xml
@@ -334,6 +334,9 @@
     <db2.jcc.version>11.5.9.0</db2.jcc.version>
     <mssql.jdbc.version>12.8.1.jre11</mssql.jdbc.version>
     <ojdbc17.version>23.6.0.24.10</ojdbc17.version>
+    <databricks.jdbc.version>2.7.1</databricks.jdbc.version>
+    <snowflake.jdbc.version>3.21.0</snowflake.jdbc.version>
+    <terajdbc.version>20.00.00.39</terajdbc.version>
     <!-- Used for SBT build to retrieve the Spark version -->
     <spark.version>${project.version}</spark.version>
   </properties>
@@ -1350,6 +1353,24 @@
         <version>${ojdbc17.version}</version>
         <scope>test</scope>
       </dependency>
+      <dependency>
+        <groupId>com.databricks</groupId>
+        <artifactId>databricks-jdbc</artifactId>
+        <version>${databricks.jdbc.version}</version>
+        <scope>test</scope>
+      </dependency>
+      <dependency>
+        <groupId>net.snowflake</groupId>
+        <artifactId>snowflake-jdbc</artifactId>
+        <version>${snowflake.jdbc.version}</version>
+        <scope>test</scope>
+      </dependency>
+      <dependency>
+        <groupId>com.teradata.jdbc</groupId>
+        <artifactId>terajdbc</artifactId>
+        <version>${terajdbc.version}</version>
+        <scope>test</scope>
+      </dependency>
       <dependency>
         <groupId>org.apache.curator</groupId>
         <artifactId>curator-recipes</artifactId>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 5755ad38fb29..3b2e68ea32e2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -980,6 +980,12 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase with ExecutionE
       messageParameters = Map("content" -> content))
   }
 
+  def hintUnsupportedForJdbcDialectError(jdbcDialect: String): 
SparkIllegalArgumentException = {
+    new SparkIllegalArgumentException(
+      errorClass = "HINT_UNSUPPORTED_FOR_JDBC_DIALECT",
+      messageParameters = Map("jdbcDialect" -> jdbcDialect))
+  }
+
   def unsupportedArrayElementTypeBasedOnBinaryError(dt: DataType): 
SparkIllegalArgumentException = {
     new SparkIllegalArgumentException(
       errorClass = "_LEGACY_ERROR_TEMP_2084",
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index 47c9ca0ea7a1..b9a0c9b7dd6c 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -222,6 +222,21 @@
       <artifactId>derbytools</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.databricks</groupId>
+      <artifactId>databricks-jdbc</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>net.snowflake</groupId>
+      <artifactId>snowflake-jdbc</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.teradata.jdbc</groupId>
+      <artifactId>terajdbc</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.parquet</groupId>
       <artifactId>parquet-avro</artifactId>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
index e7a4c9b258c1..9c56ad0433a4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -249,6 +249,15 @@ class JDBCOptions(
       .map(_.toBoolean)
       .getOrElse(SQLConf.get.timestampType == TimestampNTZType)
 
+  val hint = {
+    parameters.get(JDBC_HINT_STRING).map(value => {
+      require(value.matches("(?s)^/\\*\\+ .* \\*/$"),
+        s"Invalid value `$value` for option `$JDBC_HINT_STRING`." +
+          s" It should start with `/*+ ` and end with ` */`.")
+      s"$value "
+    }).getOrElse("")
+  }
+
   override def hashCode: Int = this.parameters.hashCode()
 
   override def equals(other: Any): Boolean = other match {
@@ -321,4 +330,5 @@ object JDBCOptions {
   val JDBC_CONNECTION_PROVIDER = newOption("connectionProvider")
   val JDBC_PREPARE_QUERY = newOption("prepareQuery")
   val JDBC_PREFER_TIMESTAMP_NTZ = newOption("preferTimestampNTZ")
+  val JDBC_HINT_STRING = newOption("hint")
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
index f33e64c859fb..01028f553324 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
@@ -207,7 +207,7 @@ private case class DB2Dialect() extends JdbcDialect with 
SQLConfHelper with NoLe
       val offsetClause = dialect.getOffsetClause(offset)
 
       options.prepareQuery +
-        s"SELECT $columnList FROM ${options.tableOrQuery} $tableSampleClause" +
+        s"SELECT $hintClause$columnList FROM ${options.tableOrQuery} 
$tableSampleClause" +
         s" $whereClause $groupByClause $orderByClause $offsetClause 
$limitClause"
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala
index af77f8575dd8..1aa2282f4a84 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala
@@ -63,6 +63,8 @@ private case class DatabricksDialect() extends JdbcDialect 
with NoLegacyJDBCErro
     s"TABLESAMPLE (${(sample.upperBound - sample.lowerBound) * 100}) 
REPEATABLE (${sample.seed})"
   }
 
+  override def supportsHint: Boolean = true
+
   // Override listSchemas to run "show schemas" as a PreparedStatement instead 
of
   // invoking getMetaData.getSchemas as it may not work correctly in older 
versions of the driver.
   override def schemasExists(conn: Connection, options: JDBCOptions, schema: 
String): Boolean = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index 69121e33c592..03541bd892a8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -826,6 +826,8 @@ abstract class JdbcDialect extends Serializable with 
Logging {
   def getTableSample(sample: TableSampleInfo): String =
     throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3183")
 
+  def supportsHint: Boolean = false
+
   /**
    * Return the DB-specific quoted and fully qualified table name
    */
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcSQLQueryBuilder.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcSQLQueryBuilder.scala
index 6113da3d4e8f..f8b50f496080 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcSQLQueryBuilder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcSQLQueryBuilder.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.jdbc
 
 import org.apache.spark.sql.connector.expressions.filter.Predicate
+import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, 
JDBCPartition}
 import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo
 
@@ -67,6 +68,18 @@ class JdbcSQLQueryBuilder(dialect: JdbcDialect, options: 
JDBCOptions) {
    */
   protected var tableSampleClause: String = ""
 
+  /**
+   * A hint sample clause representing query hints.
+   */
+  protected val hintClause: String = {
+    if (options.hint == "" || dialect.supportsHint) {
+      options.hint
+    } else {
+      throw QueryExecutionErrors.hintUnsupportedForJdbcDialectError(
+        dialect.getClass.getSimpleName)
+    }
+  }
+
   /**
    * The columns names that following dialect's SQL syntax.
    * e.g. The column name is the raw name or quoted name.
@@ -161,7 +174,7 @@ class JdbcSQLQueryBuilder(dialect: JdbcDialect, options: 
JDBCOptions) {
     val offsetClause = dialect.getOffsetClause(offset)
 
     options.prepareQuery +
-      s"SELECT $columnList FROM ${options.tableOrQuery} $tableSampleClause" +
+      s"SELECT $hintClause$columnList FROM ${options.tableOrQuery} 
$tableSampleClause" +
       s" $whereClause $groupByClause $orderByClause $limitClause $offsetClause"
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
index e2a5671a7c28..cf943fc6c7c8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
@@ -249,8 +249,8 @@ private case class MsSqlServerDialect() extends JdbcDialect 
with NoLegacyJDBCErr
       val limitClause = dialect.getLimitClause(limit)
 
       options.prepareQuery +
-        s"SELECT $limitClause $columnList FROM ${options.tableOrQuery} 
$tableSampleClause" +
-        s" $whereClause $groupByClause $orderByClause"
+        s"SELECT $hintClause$limitClause $columnList FROM 
${options.tableOrQuery}" +
+        s" $tableSampleClause $whereClause $groupByClause $orderByClause"
     }
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
index dd0118d87599..f93ba1f67d93 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -406,7 +406,7 @@ private case class MySQLDialect() extends JdbcDialect with 
SQLConfHelper with No
       }
 
       options.prepareQuery +
-        s"SELECT $columnList FROM ${options.tableOrQuery} $tableSampleClause" +
+        s"SELECT $hintClause$columnList FROM ${options.tableOrQuery} 
$tableSampleClause" +
         s" $whereClause $groupByClause $orderByClause $limitOrOffsetStmt"
     }
   }
@@ -417,4 +417,6 @@ private case class MySQLDialect() extends JdbcDialect with 
SQLConfHelper with No
   override def supportsLimit: Boolean = true
 
   override def supportsOffset: Boolean = true
+
+  override def supportsHint: Boolean = true
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
index 114b524dcd96..a1540c4489b4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
@@ -222,8 +222,8 @@ private case class OracleDialect() extends JdbcDialect with 
SQLConfHelper with N
     extends JdbcSQLQueryBuilder(dialect, options) {
 
     override def build(): String = {
-      val selectStmt = s"SELECT $columnList FROM ${options.tableOrQuery} 
$tableSampleClause" +
-        s" $whereClause $groupByClause $orderByClause"
+      val selectStmt = s"SELECT $hintClause$columnList FROM 
${options.tableOrQuery}" +
+        s" $tableSampleClause $whereClause $groupByClause $orderByClause"
       val finalSelectStmt = if (limit > 0) {
         if (offset > 0) {
           // Because the rownum is calculated when the value is returned,
@@ -255,6 +255,8 @@ private case class OracleDialect() extends JdbcDialect with 
SQLConfHelper with N
 
   override def supportsOffset: Boolean = true
 
+  override def supportsHint: Boolean = true
+
   override def classifyException(
       e: Throwable,
       errorClass: String,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index d4481dd3716a..9a2ccd4785b3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -29,7 +29,7 @@ import scala.util.Random
 import org.mockito.ArgumentMatchers._
 import org.mockito.Mockito._
 
-import org.apache.spark.{SparkException, SparkSQLException}
+import org.apache.spark.{SparkException, SparkIllegalArgumentException, 
SparkSQLException}
 import org.apache.spark.sql.{AnalysisException, DataFrame, Observation, 
QueryTest, Row}
 import org.apache.spark.sql.catalyst.{analysis, TableIdentifier}
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
@@ -2206,4 +2206,47 @@ class JDBCSuite extends QueryTest with 
SharedSparkSession {
       JdbcUtils.schemaString(dialect, schema, caseSensitive = false, Some("b 
boolean"))
     assert(schemaStr === """"b" NUMBER(1) """)
   }
+
+  test("SPARK-50666: reading hint test") {
+    // hint format check
+    Seq("INDEX(test idx1) */", "/*+ INDEX(test idx1)", "").foreach { hint =>
+      val e = intercept[IllegalArgumentException] {
+        val options = new JDBCOptions(Map("url" -> url, "dbtable" -> "test",
+          "hint" -> hint))
+      }.getMessage
+      assert(e.contains(s"Invalid value `$hint` for option `hint`." +
+        s" It should start with `/*+ ` and end with ` */`."))
+    }
+
+    // dialect supported check
+    val baseParameters = CaseInsensitiveMap(
+      Map("dbtable" -> "test", "hint" -> "/*+ INDEX(test idx1) */"))
+    // supported
+    Seq(
+      "jdbc:oracle:thin:@//host:port",
+      "jdbc:mysql://host:port",
+      "jdbc:databricks://host:port").foreach { url =>
+      val options = new JDBCOptions(baseParameters + ("url" -> url))
+      val dialect = JdbcDialects.get(url)
+      assert(dialect.getJdbcSQLQueryBuilder(options)
+        .withColumns(Array("a", "b"))
+        .build().trim() == "SELECT /*+ INDEX(test idx1) */ a,b FROM test")
+    }
+    // not supported
+    Seq(
+      "jdbc:db2://host:port", "jdbc:derby:memory", "jdbc:h2://host:port",
+      "jdbc:sqlserver://host:port", "jdbc:postgresql://host:5432/postgres",
+      "jdbc:snowflake://host:443?account=test", 
"jdbc:teradata://host:port").foreach { url =>
+      val options = new JDBCOptions(baseParameters + ("url" -> url))
+      val dialect = JdbcDialects.get(url)
+      checkError(
+        exception = intercept[SparkIllegalArgumentException] {
+          dialect.getJdbcSQLQueryBuilder(options)
+            .withColumns(Array("a", "b"))
+            .build().trim()
+        },
+        condition = "HINT_UNSUPPORTED_FOR_JDBC_DIALECT",
+        parameters = Map("jdbcDialect" -> dialect.getClass.getSimpleName))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to