This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 9f91bd3e80 [GLUTEN-9076][VL] Prioritize offloading supported hive udf
in ColumnarPartialProject (#9077)
9f91bd3e80 is described below
commit 9f91bd3e8085f144806432b406a7f2bb4e89bdfc
Author: WangGuangxin <[email protected]>
AuthorDate: Mon Mar 24 02:19:09 2025 -0700
[GLUTEN-9076][VL] Prioritize offloading supported hive udf in
ColumnarPartialProject (#9077)
---
.../execution/ColumnarPartialProjectExec.scala | 11 +++--
.../spark/sql/hive/VeloxHiveUDFTransformer.scala | 30 ++++++++-----
.../spark/sql/execution/GlutenHiveUDFSuite.scala | 51 ++++++++++++++++++++++
3 files changed, 79 insertions(+), 13 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
index 9d6077ec62..bf98320a07 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.execution.{ExplainUtils, ProjectExec, SparkPlan,
UnaryExecNode}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
-import org.apache.spark.sql.hive.HiveUdfUtil
+import org.apache.spark.sql.hive.{HiveUdfUtil, VeloxHiveUDFTransformer}
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import scala.collection.mutable.ListBuffer
@@ -275,7 +275,10 @@ object ColumnarPartialProjectExec {
if (expr == null) return false
expr match {
case _: ScalaUDF => true
- case h if HiveUdfUtil.isHiveUdf(h) => true
+ case h
+ if HiveUdfUtil.isHiveUdf(h) &&
+ !VeloxHiveUDFTransformer.isHiveUDFSupportsTransform(h) =>
+ true
case p => p.children.exists(c => containsUDF(c))
}
}
@@ -306,7 +309,9 @@ object ColumnarPartialProjectExec {
expr match {
case u: ScalaUDF =>
replaceByAlias(u, replacedAliasUdf)
- case h if HiveUdfUtil.isHiveUdf(h) =>
+ case h
+ if HiveUdfUtil.isHiveUdf(h) &&
+ !VeloxHiveUDFTransformer.isHiveUDFSupportsTransform(h) =>
replaceByAlias(h, replacedAliasUdf)
case au @ Alias(_: ScalaUDF, _) =>
val replaceIndex = replacedAliasUdf.indexWhere(r => r.exprId ==
au.exprId)
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/hive/VeloxHiveUDFTransformer.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/hive/VeloxHiveUDFTransformer.scala
index b3524e20f0..f6ebc847c0 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/hive/VeloxHiveUDFTransformer.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/hive/VeloxHiveUDFTransformer.scala
@@ -17,24 +17,18 @@
package org.apache.spark.sql.hive
import org.apache.gluten.exception.GlutenNotSupportException
-import org.apache.gluten.expression.{ExpressionConverter,
ExpressionTransformer}
+import org.apache.gluten.expression.{ExpressionConverter,
ExpressionTransformer, UDFMappings}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.expression.UDFResolver
+import java.util.Locale
+
object VeloxHiveUDFTransformer {
def replaceWithExpressionTransformer(
expr: Expression,
attributeSeq: Seq[Attribute]): ExpressionTransformer = {
- val (udfName, udfClassName) = expr match {
- case s: HiveSimpleUDF =>
- (s.name.stripPrefix("default."), s.funcWrapper.functionClassName)
- case g: HiveGenericUDF =>
- (g.name.stripPrefix("default."), g.funcWrapper.functionClassName)
- case _ =>
- throw new GlutenNotSupportException(
- s"Expression $expr is not a HiveSimpleUDF or HiveGenericUDF")
- }
+ val (udfName, udfClassName) = getHiveUDFNameAndClassName(expr)
if (UDFResolver.UDFNames.contains(udfClassName)) {
val udfExpression = UDFResolver
@@ -46,4 +40,20 @@ object VeloxHiveUDFTransformer {
HiveUDFTransformer.genTransformerFromUDFMappings(udfName, expr,
attributeSeq)
}
}
+
+ def isHiveUDFSupportsTransform(expr: Expression): Boolean = {
+ val (udfName, udfClassName) = getHiveUDFNameAndClassName(expr)
+ UDFResolver.UDFNames.contains(udfClassName) ||
+ UDFMappings.hiveUDFMap.contains(udfName.toLowerCase(Locale.ROOT))
+ }
+
+ private def getHiveUDFNameAndClassName(expr: Expression): (String, String) =
expr match {
+ case s: HiveSimpleUDF =>
+ (s.name.stripPrefix("default."), s.funcWrapper.functionClassName)
+ case g: HiveGenericUDF =>
+ (g.name.stripPrefix("default."), g.funcWrapper.functionClassName)
+ case _ =>
+ throw new GlutenNotSupportException(
+ s"Expression $expr is not a HiveSimpleUDF or HiveGenericUDF")
+ }
}
diff --git
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/GlutenHiveUDFSuite.scala
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/GlutenHiveUDFSuite.scala
index c8404fb93b..8423b9d4d9 100644
---
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/GlutenHiveUDFSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/GlutenHiveUDFSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution
import org.apache.gluten.execution.ColumnarPartialProjectExec
+import org.apache.gluten.expression.UDFMappings
import org.apache.gluten.udf.CustomerUDF
import org.apache.spark.SparkConf
@@ -200,4 +201,54 @@ class GlutenHiveUDFSuite extends GlutenQueryTest with
SQLTestUtils {
checkOperatorMatch[ColumnarPartialProjectExec](df)
}
}
+
+ test("UDFMapping should prioritize over ColumnarPartialProject when both
applicable") {
+ withTempFunction("udf_substr") {
+ withTempFunction("udf_substr2") {
+ withTempFunction("udf_sort_array") {
+ spark.sql(s"""
+ |CREATE TEMPORARY FUNCTION udf_sort_array AS
+
|'org.apache.hadoop.hive.ql.udf.generic.GenericUDFSortArray';
+ |""".stripMargin)
+ // Mapping hive udf "udf_substr" to velox function "substring"
+ UDFMappings.hiveUDFMap.put("udf_substr", "substring")
+ Seq("udf_substr", "udf_substr2").foreach {
+ testudf =>
+ spark.sql(s"""CREATE TEMPORARY FUNCTION $testudf AS
+ |'org.apache.hadoop.hive.ql.udf.UDFSubstr';
+ |""".stripMargin)
+
+ val df = spark.sql(s"""
+ |select
+ | l_partkey,
+ | udf_sort_array(array(10, l_orderkey,
1)),
+ | $testudf(l_comment, 1, 5)
+ |FROM lineitem WHERE l_partkey <= 5 and
l_orderkey <1000
+ |""".stripMargin)
+ val executedPlan = getExecutedPlan(df)
+ checkGlutenOperatorMatch[ColumnarPartialProjectExec](df)
+ val partialProject = executedPlan
+ .filter {
+ _ match {
+ case _: ColumnarPartialProjectExec => true
+ case _ => false
+ }
+ }
+ .head
+ .asInstanceOf[ColumnarPartialProjectExec]
+
+ if (testudf == "udf_substr") {
+ // Since udf_substr is supported to transform,
+ // then we should only partial project udf_sort_array.
+
assert(partialProject.output.count(_.name.startsWith("_SparkPartialProject"))
== 1)
+ } else {
+ // Since both udf_sort_array and udf_substr2 is not supported
to transform,
+ // then we should partial project udf_sort_array and
udf_substr2.
+
assert(partialProject.output.count(_.name.startsWith("_SparkPartialProject"))
== 2)
+ }
+ }
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]