This is an automated email from the ASF dual-hosted git repository.
philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new b47fedbdfc [GLUTEN-9076][VL][FOLLOWUP] Simplify code of HiveUDF (#9127)
b47fedbdfc is described below
commit b47fedbdfc63786f255e6646bc77af101525acaf
Author: WangGuangxin <[email protected]>
AuthorDate: Mon Mar 24 18:47:54 2025 -0700
[GLUTEN-9076][VL][FOLLOWUP] Simplify code of HiveUDF (#9127)
---
.../execution/ColumnarPartialProjectExec.scala | 16 ++++++------
.../org/apache/spark/sql/hive/HiveUdfUtil.scala | 30 ----------------------
.../spark/sql/hive/VeloxHiveUDFTransformer.scala | 23 +++++++----------
.../spark/sql/execution/GlutenHiveUDFSuite.scala | 2 +-
.../apache/spark/sql/hive/HiveUDFTransformer.scala | 20 ++++++++-------
5 files changed, 29 insertions(+), 62 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
index bf98320a07..e97385b7cf 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.execution.{ExplainUtils, ProjectExec, SparkPlan,
UnaryExecNode}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
-import org.apache.spark.sql.hive.{HiveUdfUtil, VeloxHiveUDFTransformer}
+import org.apache.spark.sql.hive.{HiveUDFTransformer, VeloxHiveUDFTransformer}
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import scala.collection.mutable.ListBuffer
@@ -271,14 +271,16 @@ object ColumnarPartialProjectExec {
val projectPrefix = "_SparkPartialProject"
+ /** Check if it's a hive udf but not transformable */
+ private def containsUnsupportedHiveUDF(h: Expression): Boolean = {
+ HiveUDFTransformer.isHiveUDF(h) &&
!VeloxHiveUDFTransformer.isSupportedHiveUDF(h)
+ }
+
private def containsUDF(expr: Expression): Boolean = {
if (expr == null) return false
expr match {
case _: ScalaUDF => true
- case h
- if HiveUdfUtil.isHiveUdf(h) &&
- !VeloxHiveUDFTransformer.isHiveUDFSupportsTransform(h) =>
- true
+ case h if containsUnsupportedHiveUDF(h) => true
case p => p.children.exists(c => containsUDF(c))
}
}
@@ -309,9 +311,7 @@ object ColumnarPartialProjectExec {
expr match {
case u: ScalaUDF =>
replaceByAlias(u, replacedAliasUdf)
- case h
- if HiveUdfUtil.isHiveUdf(h) &&
- !VeloxHiveUDFTransformer.isHiveUDFSupportsTransform(h) =>
+ case h if containsUnsupportedHiveUDF(h) =>
replaceByAlias(h, replacedAliasUdf)
case au @ Alias(_: ScalaUDF, _) =>
val replaceIndex = replacedAliasUdf.indexWhere(r => r.exprId ==
au.exprId)
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/hive/HiveUdfUtil.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/hive/HiveUdfUtil.scala
deleted file mode 100644
index 7686455896..0000000000
--- a/backends-velox/src/main/scala/org/apache/spark/sql/hive/HiveUdfUtil.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.catalyst.expressions.Expression
-
-object HiveUdfUtil {
- def isHiveUdf(expr: Expression): Boolean = expr match {
- case _: HiveSimpleUDF => true
- case _: HiveGenericUDF => true
- case _: HiveUDAFFunction => true
- case _: HiveGenericUDTF => true
- case _ => false
- }
-
-}
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/hive/VeloxHiveUDFTransformer.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/hive/VeloxHiveUDFTransformer.scala
index f6ebc847c0..09fdac530a 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/hive/VeloxHiveUDFTransformer.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/hive/VeloxHiveUDFTransformer.scala
@@ -16,7 +16,6 @@
*/
package org.apache.spark.sql.hive
-import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.expression.{ExpressionConverter,
ExpressionTransformer, UDFMappings}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
@@ -28,7 +27,7 @@ object VeloxHiveUDFTransformer {
def replaceWithExpressionTransformer(
expr: Expression,
attributeSeq: Seq[Attribute]): ExpressionTransformer = {
- val (udfName, udfClassName) = getHiveUDFNameAndClassName(expr)
+ val (udfName, udfClassName) =
HiveUDFTransformer.getHiveUDFNameAndClassName(expr)
if (UDFResolver.UDFNames.contains(udfClassName)) {
val udfExpression = UDFResolver
@@ -41,19 +40,15 @@ object VeloxHiveUDFTransformer {
}
}
- def isHiveUDFSupportsTransform(expr: Expression): Boolean = {
- val (udfName, udfClassName) = getHiveUDFNameAndClassName(expr)
+ /**
+ * Check whether the input hive udf expression is supported to transform. It
maybe transformed by
+ * [[VeloxHiveUDFTransformer]] or [[HiveUDFTransformer]].
+ */
+ def isSupportedHiveUDF(expr: Expression): Boolean = {
+ val (udfName, udfClassName) =
HiveUDFTransformer.getHiveUDFNameAndClassName(expr)
+ // Transformable by VeloxHiveUDFTransformer
UDFResolver.UDFNames.contains(udfClassName) ||
+ // Transformable by HiveUDFTransformer
UDFMappings.hiveUDFMap.contains(udfName.toLowerCase(Locale.ROOT))
}
-
- private def getHiveUDFNameAndClassName(expr: Expression): (String, String) =
expr match {
- case s: HiveSimpleUDF =>
- (s.name.stripPrefix("default."), s.funcWrapper.functionClassName)
- case g: HiveGenericUDF =>
- (g.name.stripPrefix("default."), g.funcWrapper.functionClassName)
- case _ =>
- throw new GlutenNotSupportException(
- s"Expression $expr is not a HiveSimpleUDF or HiveGenericUDF")
- }
}
diff --git
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/GlutenHiveUDFSuite.scala
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/GlutenHiveUDFSuite.scala
index 8423b9d4d9..a82e251ccb 100644
---
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/GlutenHiveUDFSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/GlutenHiveUDFSuite.scala
@@ -202,7 +202,7 @@ class GlutenHiveUDFSuite extends GlutenQueryTest with
SQLTestUtils {
}
}
- test("UDFMapping should prioritize over ColumnarPartialProject when both
applicable") {
+ test("prioritize offloading supported hive udf in ColumnarPartialProject") {
withTempFunction("udf_substr") {
withTempFunction("udf_substr2") {
withTempFunction("udf_sort_array") {
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveUDFTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveUDFTransformer.scala
index 52739aaca4..4caf4fb43e 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveUDFTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveUDFTransformer.scala
@@ -31,18 +31,20 @@ object HiveUDFTransformer {
}
}
+ def getHiveUDFNameAndClassName(expr: Expression): (String, String) = expr
match {
+ case s: HiveSimpleUDF =>
+ (s.name.stripPrefix("default."), s.funcWrapper.functionClassName)
+ case g: HiveGenericUDF =>
+ (g.name.stripPrefix("default."), g.funcWrapper.functionClassName)
+ case _ =>
+ throw new GlutenNotSupportException(
+ s"Expression $expr is not a HiveSimpleUDF or HiveGenericUDF")
+ }
+
def replaceWithExpressionTransformer(
expr: Expression,
attributeSeq: Seq[Attribute]): ExpressionTransformer = {
- val udfName = expr match {
- case s: HiveSimpleUDF =>
- s.name.stripPrefix("default.")
- case g: HiveGenericUDF =>
- g.name.stripPrefix("default.")
- case _ =>
- throw new GlutenNotSupportException(
- s"Expression $expr is not a HiveSimpleUDF or HiveGenericUDF")
- }
+ val (udfName, _) = getHiveUDFNameAndClassName(expr)
genTransformerFromUDFMappings(udfName, expr, attributeSeq)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]