This is an automated email from the ASF dual-hosted git repository.

chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 9f91bd3e80 [GLUTEN-9076][VL] Prioritize offloading supported hive udf 
in ColumnarPartialProject (#9077)
9f91bd3e80 is described below

commit 9f91bd3e8085f144806432b406a7f2bb4e89bdfc
Author: WangGuangxin <[email protected]>
AuthorDate: Mon Mar 24 02:19:09 2025 -0700

    [GLUTEN-9076][VL] Prioritize offloading supported hive udf in 
ColumnarPartialProject (#9077)
---
 .../execution/ColumnarPartialProjectExec.scala     | 11 +++--
 .../spark/sql/hive/VeloxHiveUDFTransformer.scala   | 30 ++++++++-----
 .../spark/sql/execution/GlutenHiveUDFSuite.scala   | 51 ++++++++++++++++++++++
 3 files changed, 79 insertions(+), 13 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
index 9d6077ec62..bf98320a07 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.execution.{ExplainUtils, ProjectExec, SparkPlan, 
UnaryExecNode}
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
-import org.apache.spark.sql.hive.HiveUdfUtil
+import org.apache.spark.sql.hive.{HiveUdfUtil, VeloxHiveUDFTransformer}
 import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
 
 import scala.collection.mutable.ListBuffer
@@ -275,7 +275,10 @@ object ColumnarPartialProjectExec {
     if (expr == null) return false
     expr match {
       case _: ScalaUDF => true
-      case h if HiveUdfUtil.isHiveUdf(h) => true
+      case h
+          if HiveUdfUtil.isHiveUdf(h) &&
+            !VeloxHiveUDFTransformer.isHiveUDFSupportsTransform(h) =>
+        true
       case p => p.children.exists(c => containsUDF(c))
     }
   }
@@ -306,7 +309,9 @@ object ColumnarPartialProjectExec {
     expr match {
       case u: ScalaUDF =>
         replaceByAlias(u, replacedAliasUdf)
-      case h if HiveUdfUtil.isHiveUdf(h) =>
+      case h
+          if HiveUdfUtil.isHiveUdf(h) &&
+            !VeloxHiveUDFTransformer.isHiveUDFSupportsTransform(h) =>
         replaceByAlias(h, replacedAliasUdf)
       case au @ Alias(_: ScalaUDF, _) =>
         val replaceIndex = replacedAliasUdf.indexWhere(r => r.exprId == 
au.exprId)
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/hive/VeloxHiveUDFTransformer.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/hive/VeloxHiveUDFTransformer.scala
index b3524e20f0..f6ebc847c0 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/hive/VeloxHiveUDFTransformer.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/hive/VeloxHiveUDFTransformer.scala
@@ -17,24 +17,18 @@
 package org.apache.spark.sql.hive
 
 import org.apache.gluten.exception.GlutenNotSupportException
-import org.apache.gluten.expression.{ExpressionConverter, 
ExpressionTransformer}
+import org.apache.gluten.expression.{ExpressionConverter, 
ExpressionTransformer, UDFMappings}
 
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.expression.UDFResolver
 
+import java.util.Locale
+
 object VeloxHiveUDFTransformer {
   def replaceWithExpressionTransformer(
       expr: Expression,
       attributeSeq: Seq[Attribute]): ExpressionTransformer = {
-    val (udfName, udfClassName) = expr match {
-      case s: HiveSimpleUDF =>
-        (s.name.stripPrefix("default."), s.funcWrapper.functionClassName)
-      case g: HiveGenericUDF =>
-        (g.name.stripPrefix("default."), g.funcWrapper.functionClassName)
-      case _ =>
-        throw new GlutenNotSupportException(
-          s"Expression $expr is not a HiveSimpleUDF or HiveGenericUDF")
-    }
+    val (udfName, udfClassName) = getHiveUDFNameAndClassName(expr)
 
     if (UDFResolver.UDFNames.contains(udfClassName)) {
       val udfExpression = UDFResolver
@@ -46,4 +40,20 @@ object VeloxHiveUDFTransformer {
       HiveUDFTransformer.genTransformerFromUDFMappings(udfName, expr, 
attributeSeq)
     }
   }
+
+  def isHiveUDFSupportsTransform(expr: Expression): Boolean = {
+    val (udfName, udfClassName) = getHiveUDFNameAndClassName(expr)
+    UDFResolver.UDFNames.contains(udfClassName) ||
+    UDFMappings.hiveUDFMap.contains(udfName.toLowerCase(Locale.ROOT))
+  }
+
+  private def getHiveUDFNameAndClassName(expr: Expression): (String, String) = 
expr match {
+    case s: HiveSimpleUDF =>
+      (s.name.stripPrefix("default."), s.funcWrapper.functionClassName)
+    case g: HiveGenericUDF =>
+      (g.name.stripPrefix("default."), g.funcWrapper.functionClassName)
+    case _ =>
+      throw new GlutenNotSupportException(
+        s"Expression $expr is not a HiveSimpleUDF or HiveGenericUDF")
+  }
 }
diff --git 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/GlutenHiveUDFSuite.scala
 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/GlutenHiveUDFSuite.scala
index c8404fb93b..8423b9d4d9 100644
--- 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/GlutenHiveUDFSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/GlutenHiveUDFSuite.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.sql.execution
 
 import org.apache.gluten.execution.ColumnarPartialProjectExec
+import org.apache.gluten.expression.UDFMappings
 import org.apache.gluten.udf.CustomerUDF
 
 import org.apache.spark.SparkConf
@@ -200,4 +201,54 @@ class GlutenHiveUDFSuite extends GlutenQueryTest with 
SQLTestUtils {
       checkOperatorMatch[ColumnarPartialProjectExec](df)
     }
   }
+
+  test("UDFMapping should prioritize over ColumnarPartialProject when both 
applicable") {
+    withTempFunction("udf_substr") {
+      withTempFunction("udf_substr2") {
+        withTempFunction("udf_sort_array") {
+          spark.sql(s"""
+                       |CREATE TEMPORARY FUNCTION udf_sort_array AS
+                       
|'org.apache.hadoop.hive.ql.udf.generic.GenericUDFSortArray';
+                       |""".stripMargin)
+          // Mapping hive udf "udf_substr" to velox function "substring"
+          UDFMappings.hiveUDFMap.put("udf_substr", "substring")
+          Seq("udf_substr", "udf_substr2").foreach {
+            testudf =>
+              spark.sql(s"""CREATE TEMPORARY FUNCTION $testudf AS
+                           |'org.apache.hadoop.hive.ql.udf.UDFSubstr';
+                           |""".stripMargin)
+
+              val df = spark.sql(s"""
+                                    |select
+                                    |  l_partkey,
+                                    |  udf_sort_array(array(10, l_orderkey, 
1)),
+                                    |  $testudf(l_comment, 1, 5)
+                                    |FROM lineitem WHERE l_partkey <= 5 and 
l_orderkey <1000
+                                    |""".stripMargin)
+              val executedPlan = getExecutedPlan(df)
+              checkGlutenOperatorMatch[ColumnarPartialProjectExec](df)
+              val partialProject = executedPlan
+                .filter {
+                  _ match {
+                    case _: ColumnarPartialProjectExec => true
+                    case _ => false
+                  }
+                }
+                .head
+                .asInstanceOf[ColumnarPartialProjectExec]
+
+              if (testudf == "udf_substr") {
+                // Since udf_substr is supported to transform,
+                // then we should only partial project udf_sort_array.
+                
assert(partialProject.output.count(_.name.startsWith("_SparkPartialProject")) 
== 1)
+              } else {
+                // Since both udf_sort_array and udf_substr2 is not supported 
to transform,
+                // then we should partial project udf_sort_array and 
udf_substr2.
+                
assert(partialProject.output.count(_.name.startsWith("_SparkPartialProject")) 
== 2)
+              }
+          }
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to