This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b862685b3d4 [SPARK-52082][PYTHON][DOCS] Improve ExtractPythonUDF docs
4b862685b3d4 is described below

commit 4b862685b3d499d2c366ae101747e5b73775fe08
Author: Ben Hurdelhey <ben.hurdel...@databricks.com>
AuthorDate: Mon Jul 7 14:07:36 2025 +0900

    [SPARK-52082][PYTHON][DOCS] Improve ExtractPythonUDF docs
    
    ### What changes were proposed in this pull request?
    
    - renames two methods in ExtractPythonUDFs, and adds docstrings explaining 
the parallel fusing and chaining concepts
    
    ### Why are the changes needed?
    
    - in my experience, new developers find the planning code hard to 
understand without sufficient explanations. The current method naming is 
confusing, as the `canChainUDF` is actually used select eligibility to fuse 
parallel udf invocations like `udf1(), udf2()`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    - No
    
    ### How was this patch tested?
    
    - Existing tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #50867 from benrobby/SPARK-52082.
    
    Authored-by: Ben Hurdelhey <ben.hurdel...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../sql/execution/python/ExtractPythonUDFs.scala   | 63 +++++++++++++++++++---
 1 file changed, 56 insertions(+), 7 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
index a42e5d3e6c97..4c6e38746464 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
@@ -169,16 +169,63 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with 
Logging {
     e.exists(PythonUDF.isScalarPythonUDF)
   }
 
+  /**
+   * Return true if we should extract the current expression, including all of 
its current
+   * children (including UDF expression, and all others), to a logical node.
+   * The children of the expression can be UDF expressions, this would be 
nested chaining.
+   * If child UDF expressions were already extracted before, then this will 
just extract
+   * the current UDF expression, so they will end up in separate logical 
nodes. The child
+   * expressions will have been transformed to Attribute expressions 
referencing the child plan
+   * node's output.
+   *
+   * Return false if there is no single continuous chain of UDFs that can be 
extracted:
+   * - if there are other expression in-between, return false. In
+   *   below example, the caller will have to extract bar(baz()) separately 
first:
+   *   Query: foo(1 + bar(baz()))
+   *   Plan:
+   *   - PythonUDF (foo)
+   *      - Project
+   *         - PythonUDF (bar)
+   *           - PythonUDF (baz)
+   * - if the eval types of the UDF expressions in the chain differ, return 
false.
+   * - if a UDF has more than one child, e.g. foo(bar(), baz()), return false
+   * If we return false here, the expectation is that the recursive calls of
+   * collectEvaluableUDFsFromExpressions will then visit the children and 
extract them first to
+   * separate nodes.
+   */
   @scala.annotation.tailrec
-  private def canEvaluateInPython(e: PythonUDF): Boolean = {
+  private def shouldExtractUDFExpressionTree(e: PythonUDF): Boolean = {
     e.children match {
-      // single PythonUDF child could be chained and evaluated in Python
-      case Seq(u: PythonUDF) => correctEvalType(e) == correctEvalType(u) && 
canEvaluateInPython(u)
+      case Seq(child: PythonUDF) => correctEvalType(e) == 
correctEvalType(child) &&
+        shouldExtractUDFExpressionTree(child)
       // Python UDF can't be evaluated directly in JVM
       case children => !children.exists(hasScalarPythonUDF)
     }
   }
 
+  /**
+   * We use the following terminology:
+   * - chaining is the act of combining multiple UDFs into a single logical 
node. This can be
+   *   accomplished in different cases, for example:
+   *   - parallel chaining: if the UDFs are siblings, e.g., foo(x), bar(x),
+   *     where multiple independent UDFs are evaluated together over the same 
input
+   *   - nested chaining: if the UDFs are nested, e.g., foo(bar(...)),
+   *     where the output of one UDF feeds into the next in a sequential 
pipeline
+   *
+   * collectEvaluableUDFsFromExpressions returns a list of UDF expressions 
that can be planned
+   * together into one plan node. collectEvaluableUDFsFromExpressions will be 
called multiple times
+   * by recursive calls of extract(plan), until no more evaluable UDFs are 
found.
+   *
+   * As an example, consider the following expression tree:
+   * udf1(udf2(udf3(x)), udf4(x))), where all UDFs are PythonUDFs of the same 
evaltype.
+   * We can only fuse UDFs of the same eval type, and never UDFs of 
SQL_SCALAR_PANDAS_ITER_UDF.
+   * The following udf expressions will be returned:
+   * - First, we will return Seq(udf3, udf4), as these two UDFs must be 
evaluated first.
+   *   We return both in one Seq, as it is possible to do parallel fusing for 
udf3 an udf4.
+   * - As we can only chain UDFs with exactly one child, we will not fuse udf2 
with its children.
+   *   But we can chain udf1 and udf2, so a later call to 
collectEvaluableUDFsFromExpressions will
+   *   return Seq(udf1, udf2).
+   */
   private def collectEvaluableUDFsFromExpressions(expressions: 
Seq[Expression]): Seq[PythonUDF] = {
     // If first UDF is SQL_SCALAR_PANDAS_ITER_UDF or SQL_SCALAR_ARROW_ITER_UDF,
     // then only return this UDF,
@@ -187,7 +234,7 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with 
Logging {
 
     var firstVisitedScalarUDFEvalType: Option[Int] = None
 
-    def canChainUDF(evalType: Int): Boolean = {
+    def canChainWithParallelUDFs(evalType: Int): Boolean = {
       if (evalType == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF ||
         evalType == PythonEvalType.SQL_SCALAR_ARROW_ITER_UDF) {
         false
@@ -197,12 +244,14 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with 
Logging {
     }
 
     def collectEvaluableUDFs(expr: Expression): Seq[PythonUDF] = expr match {
-      case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && 
canEvaluateInPython(udf)
+      case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf)
+        && shouldExtractUDFExpressionTree(udf)
         && firstVisitedScalarUDFEvalType.isEmpty =>
         firstVisitedScalarUDFEvalType = Some(correctEvalType(udf))
         Seq(udf)
-      case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && 
canEvaluateInPython(udf)
-        && canChainUDF(correctEvalType(udf)) =>
+      case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf)
+        && shouldExtractUDFExpressionTree(udf)
+        && canChainWithParallelUDFs(correctEvalType(udf)) =>
         Seq(udf)
       case e => e.children.flatMap(collectEvaluableUDFs)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to