This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 97efe0efb26 [SPARK-39162][SQL] Jdbc dialect should decide which
function could be pushed down
97efe0efb26 is described below
commit 97efe0efb2665833910e13eb7bae16cc1ad4e0fa
Author: Jiaan Geng <[email protected]>
AuthorDate: Sat May 14 16:28:21 2022 -0700
[SPARK-39162][SQL] Jdbc dialect should decide which function could be
pushed down
### What changes were proposed in this pull request?
Regardless of whether the functions are ANSI or not, most databases are
actually unsure of their support.
So we should add a new API into `JdbcDialect` so that jdbc dialect decide
which function could be pushed down.
### Why are the changes needed?
Let function push-down more flexible.
### Does this PR introduce _any_ user-facing change?
'No'.
New feature.
### How was this patch tested?
Exists tests.
Closes #36521 from beliefer/SPARK-39162.
Authored-by: Jiaan Geng <[email protected]>
Signed-off-by: huaxingao <[email protected]>
---
.../spark/sql/errors/QueryCompilationErrors.scala | 4 ----
.../org/apache/spark/sql/jdbc/H2Dialect.scala | 28 ++++------------------
.../org/apache/spark/sql/jdbc/JdbcDialects.scala | 19 +++++++++++++++
3 files changed, 23 insertions(+), 28 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 3b167eeb417..efb4389ec50 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -2404,10 +2404,6 @@ object QueryCompilationErrors extends QueryErrorsBase {
"Sinks cannot request distribution and ordering in continuous execution
mode")
}
- def noSuchFunctionError(database: String, funcInfo: String): Throwable = {
- new AnalysisException(s"$database does not support function: $funcInfo")
- }
-
// Return a more descriptive error message if the user tries to nest a
DEFAULT column reference
// inside some other expression (such as DEFAULT + 1) in an INSERT INTO
command's VALUES list;
// this is not allowed.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
index 56cadbe8e2c..4a88203ec59 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
@@ -20,13 +20,9 @@ package org.apache.spark.sql.jdbc
import java.sql.{SQLException, Types}
import java.util.Locale
-import scala.util.control.NonFatal
-
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException,
NoSuchTableException, TableAlreadyExistsException}
-import org.apache.spark.sql.connector.expressions.Expression
import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc,
GeneralAggregateFunc}
-import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.types.{BooleanType, ByteType, DataType,
DecimalType, ShortType, StringType}
@@ -34,27 +30,11 @@ private object H2Dialect extends JdbcDialect {
override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:h2")
- class H2SQLBuilder extends JDBCSQLBuilder {
- override def visitSQLFunction(funcName: String, inputs: Array[String]):
String = {
- funcName match {
- case "WIDTH_BUCKET" =>
- val functionInfo = super.visitSQLFunction(funcName, inputs)
- throw QueryCompilationErrors.noSuchFunctionError("H2", functionInfo)
- case _ => super.visitSQLFunction(funcName, inputs)
- }
- }
- }
+ private val supportedFunctions =
+ Set("ABS", "COALESCE", "LN", "EXP", "POWER", "SQRT", "FLOOR", "CEIL")
- override def compileExpression(expr: Expression): Option[String] = {
- val h2SQLBuilder = new H2SQLBuilder()
- try {
- Some(h2SQLBuilder.build(expr))
- } catch {
- case NonFatal(e) =>
- logWarning("Error occurs while compiling V2 expression", e)
- None
- }
- }
+ override def isSupportedFunction(funcName: String): Boolean =
+ supportedFunctions.contains(funcName)
override def compileAggregate(aggFunction: AggregateFunc): Option[String] = {
super.compileAggregate(aggFunction).orElse(
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index 0ef23073a27..e1883e4e7f4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -240,8 +240,27 @@ abstract class JdbcDialect extends Serializable with
Logging{
getJDBCType(dataType).map(_.databaseTypeDefinition).getOrElse(dataType.typeName)
s"CAST($l AS $databaseTypeDefinition)"
}
+
+ override def visitSQLFunction(funcName: String, inputs: Array[String]):
String = {
+ if (isSupportedFunction(funcName)) {
+ s"""$funcName(${inputs.mkString(", ")})"""
+ } else {
+ // The framework will catch the error and give up the push-down.
+ // Please see `JdbcDialect.compileExpression(expr: Expression)` for
more details.
+ throw new UnsupportedOperationException(
+ s"${this.getClass.getSimpleName} does not support function:
$funcName")
+ }
+ }
}
+ /**
+ * Returns whether the database supports function.
+ * @param funcName Upper-cased function name
+ * @return True if the database supports function.
+ */
+ @Since("3.3.0")
+ def isSupportedFunction(funcName: String): Boolean = false
+
/**
* Converts V2 expression to String representing a SQL expression.
* @param expr The V2 expression to be converted.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]