This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 4b862685b3d4 [SPARK-52082][PYTHON][DOCS] Improve ExtractPythonUDF docs 4b862685b3d4 is described below commit 4b862685b3d499d2c366ae101747e5b73775fe08 Author: Ben Hurdelhey <ben.hurdel...@databricks.com> AuthorDate: Mon Jul 7 14:07:36 2025 +0900 [SPARK-52082][PYTHON][DOCS] Improve ExtractPythonUDF docs ### What changes were proposed in this pull request? - renames two methods in ExtractPythonUDFs, and adds docstrings explaining the parallel fusing and chaining concepts ### Why are the changes needed? - in my experience, new developers find the planning code hard to understand without sufficient explanations. The current method naming is confusing, as the `canChainUDF` is actually used select eligibility to fuse parallel udf invocations like `udf1(), udf2()`. ### Does this PR introduce _any_ user-facing change? - No ### How was this patch tested? - Existing tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #50867 from benrobby/SPARK-52082. Authored-by: Ben Hurdelhey <ben.hurdel...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../sql/execution/python/ExtractPythonUDFs.scala | 63 +++++++++++++++++++--- 1 file changed, 56 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala index a42e5d3e6c97..4c6e38746464 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala @@ -169,16 +169,63 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with Logging { e.exists(PythonUDF.isScalarPythonUDF) } + /** + * Return true if we should extract the current expression, including all of its current + * children (including UDF expression, and all others), to a logical node. + * The children of the expression can be UDF expressions, this would be nested chaining. + * If child UDF expressions were already extracted before, then this will just extract + * the current UDF expression, so they will end up in separate logical nodes. The child + * expressions will have been transformed to Attribute expressions referencing the child plan + * node's output. + * + * Return false if there is no single continuous chain of UDFs that can be extracted: + * - if there are other expression in-between, return false. In + * below example, the caller will have to extract bar(baz()) separately first: + * Query: foo(1 + bar(baz())) + * Plan: + * - PythonUDF (foo) + * - Project + * - PythonUDF (bar) + * - PythonUDF (baz) + * - if the eval types of the UDF expressions in the chain differ, return false. + * - if a UDF has more than one child, e.g. foo(bar(), baz()), return false + * If we return false here, the expectation is that the recursive calls of + * collectEvaluableUDFsFromExpressions will then visit the children and extract them first to + * separate nodes. + */ @scala.annotation.tailrec - private def canEvaluateInPython(e: PythonUDF): Boolean = { + private def shouldExtractUDFExpressionTree(e: PythonUDF): Boolean = { e.children match { - // single PythonUDF child could be chained and evaluated in Python - case Seq(u: PythonUDF) => correctEvalType(e) == correctEvalType(u) && canEvaluateInPython(u) + case Seq(child: PythonUDF) => correctEvalType(e) == correctEvalType(child) && + shouldExtractUDFExpressionTree(child) // Python UDF can't be evaluated directly in JVM case children => !children.exists(hasScalarPythonUDF) } } + /** + * We use the following terminology: + * - chaining is the act of combining multiple UDFs into a single logical node. This can be + * accomplished in different cases, for example: + * - parallel chaining: if the UDFs are siblings, e.g., foo(x), bar(x), + * where multiple independent UDFs are evaluated together over the same input + * - nested chaining: if the UDFs are nested, e.g., foo(bar(...)), + * where the output of one UDF feeds into the next in a sequential pipeline + * + * collectEvaluableUDFsFromExpressions returns a list of UDF expressions that can be planned + * together into one plan node. collectEvaluableUDFsFromExpressions will be called multiple times + * by recursive calls of extract(plan), until no more evaluable UDFs are found. + * + * As an example, consider the following expression tree: + * udf1(udf2(udf3(x)), udf4(x))), where all UDFs are PythonUDFs of the same evaltype. + * We can only fuse UDFs of the same eval type, and never UDFs of SQL_SCALAR_PANDAS_ITER_UDF. + * The following udf expressions will be returned: + * - First, we will return Seq(udf3, udf4), as these two UDFs must be evaluated first. + * We return both in one Seq, as it is possible to do parallel fusing for udf3 an udf4. + * - As we can only chain UDFs with exactly one child, we will not fuse udf2 with its children. + * But we can chain udf1 and udf2, so a later call to collectEvaluableUDFsFromExpressions will + * return Seq(udf1, udf2). + */ private def collectEvaluableUDFsFromExpressions(expressions: Seq[Expression]): Seq[PythonUDF] = { // If first UDF is SQL_SCALAR_PANDAS_ITER_UDF or SQL_SCALAR_ARROW_ITER_UDF, // then only return this UDF, @@ -187,7 +234,7 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with Logging { var firstVisitedScalarUDFEvalType: Option[Int] = None - def canChainUDF(evalType: Int): Boolean = { + def canChainWithParallelUDFs(evalType: Int): Boolean = { if (evalType == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF || evalType == PythonEvalType.SQL_SCALAR_ARROW_ITER_UDF) { false @@ -197,12 +244,14 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with Logging { } def collectEvaluableUDFs(expr: Expression): Seq[PythonUDF] = expr match { - case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf) + case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) + && shouldExtractUDFExpressionTree(udf) && firstVisitedScalarUDFEvalType.isEmpty => firstVisitedScalarUDFEvalType = Some(correctEvalType(udf)) Seq(udf) - case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf) - && canChainUDF(correctEvalType(udf)) => + case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) + && shouldExtractUDFExpressionTree(udf) + && canChainWithParallelUDFs(correctEvalType(udf)) => Seq(udf) case e => e.children.flatMap(collectEvaluableUDFs) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org