[ 
https://issues.apache.org/jira/browse/SPARK-53842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18029184#comment-18029184
 ] 

Julian Klein edited comment on SPARK-53842 at 10/11/25 9:27 AM:
----------------------------------------------------------------

Hi [~lioritzhak] ,

I would _love_ to work on this, because I am super excited about query 
optimization and want to get my feet wet working on Spark. Since this would be 
my first issue for Spark, I might also have questions during the process, 
though.
I do have another thing to work on first, AND I'll have to setup my dev 
eivironment for Spark first (which might take some time).
If that does not sound too off-putting, feel free to assign this to me. I'd be 
super excited to do this!

Best,
Julian


was (Author: JIRAUSER311102):
Hi [~lioritzhak] ,

I would _love_ to work on this, because I am super excited about query 
optimization and want to get my feet wet working on Spark. Since this would be 
my first issue for Spark, I might also have questions.
I do have another thing to work on first, AND I'll have to setup my dev 
eivironment for Spark first (which might take some time).
If that does not sound too off-putting, feel free to assign this to me. I'd be 
super excited to do this!

Best,
Julian

>  Enable Filter Push-Down for Pandas UDFs with an Immutable Column Hint
> ----------------------------------------------------------------------
>
>                 Key: SPARK-53842
>                 URL: https://issues.apache.org/jira/browse/SPARK-53842
>             Project: Spark
>          Issue Type: Improvement
>          Components: Optimizer, PySpark
>    Affects Versions: 3.5.3, 4.0.1
>            Reporter: Lior Itzhak
>            Priority: Major
>
> h3. Problem Description
> Pandas UDFs ({{{}mapInPandas{}}}, {{{}applyInPandas{}}}, etc.) are powerful 
> for custom data processing in PySpark. However, they currently act as a black 
> box to the Catalyst Optimizer. This prevents the optimizer from pushing down 
> filters on columns that pass through the UDF unmodified. As a result, 
> filtering operations occur _after_ the expensive UDF execution and associated 
> data shuffling, leading to significant performance degradation.
> This is especially common in pipelines where transformations are applied to 
> grouped data, and the grouping key itself is not modified within the UDF.
> *Example:*
> Consider the following DataFrame and Pandas UDFs:
> {code:java}
> import pandas as pd
> from typing import Iterator
> df = spark.createDataFrame(
>     [["A", 1], ["A", 1], ["B", 2]], 
>     schema=["id string", "value int"]
> )
> # UDF to modify the 'value' column
> def map_udf(pdfs: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
>     for pdf in pdfs:
>         pdf["value"] = pdf["value"] + 1
>         yield pdf
> # UDF to aggregate data by 'id'
> def agg_udf(pdf: pd.DataFrame) -> pd.DataFrame:
>     return pdf.groupby("id").agg(count=("value", "count"))
> # Apply the UDFs
> modified_df = (
>     df
>     .mapInPandas(map_udf, schema="id string,value int")
>     .groupby("id")
>     .applyInPandas(agg_udf, schema="id string,count int")
> )
> # Filter the result
> modified_df.where("id == 'A'").explain() {code}
> In this example, the {{id}} column is never modified by either UDF. However, 
> the filter on {{id}} is applied only after all transformations are complete.
> *Current Physical Plan:*
> The physical plan shows the {{Filter}} operation at the very top, processing 
> data that has already been scanned, shuffled, and processed by both Pandas 
> UDFs.
>  
> {{}}
> {code:java}
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- Filter (isnotnull(id#20) AND (id#20 = A))
>    +- FlatMapGroupsInPandas [id#13], agg_udf(id#13, value#14)#19, [id#20, 
> count#21]
>       +- Sort [id#13 ASC NULLS FIRST], false, 0
>          +- Exchange hashpartitioning(id#13, 200), ENSURE_REQUIREMENTS, 
> [plan_id=20]
>             +- Project [id#13, id#13, value#14]
>                +- MapInPandas map_udf(id string#8, value int#9L)#12, [id#13, 
> value#14], false
>                   +- Scan ExistingRDD[id string#8,value int#9L]{code}
> {{ }}
> This plan processes all data for both {{id = 'A'}} and {{id = 'B'}} through 
> the entire pipeline, even though the data for {{'B'}} is discarded at the end.
> h3. Proposed Solution
> We propose introducing a mechanism to *hint* to the Catalyst Optimizer that 
> specific columns within a Pandas UDF are immutable or pass through without 
> modification. This would allow the optimizer to safely push down filters on 
> these columns.
> This could be implemented as a new parameter in the UDF registration, for 
> example, {{{}passthrough_cols{}}}:
>  
> {{}}
> {code:java}
> # Proposed API modification
> modified_df = (
>     df
>     .mapInPandas(
>         map_udf, 
>         schema="id string,value int",
>         passthrough_cols=["id"]  # New hint parameter
>     )
>     .groupby("id")
>     .applyInPandas(
>         agg_udf, 
>         schema="id string,count int",
>         passthrough_cols=["id"]  # New hint parameter
>     )
> )
> {code}
> {{ }}
> With this hint, the optimizer could transform the physical plan to apply the 
> filter at the data source, _before_ any expensive operations.
> *Desired Physical Plan:*
>  
> {{}}
> {code:java}
> == Desired Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- FlatMapGroupsInPandas [id#13], agg_udf(id#13, value#14)#19, [id#20, 
> count#21]
>    +- Sort [id#13 ASC NULLS FIRST], false, 0
>       +- Exchange hashpartitioning(id#13, 200), ENSURE_REQUIREMENTS, 
> [plan_id=20]
>          +- Project [id#13, id#13, value#14]
>             +- MapInPandas map_udf(id string#8, value int#9L)#12, [id#13, 
> value#14], false
>                +- Filter (isnotnull(id#8) AND (id#8 = A))  // <-- FILTER 
> PUSHED DOWN
>                   +- Scan ExistingRDD[id string#8,value int#9L]]{code}
> {{ }}
> This optimized plan would significantly reduce the amount of data sent to the 
> UDFs and shuffled across the network, resulting in major performance 
> improvements.
> h3. Motivation & Justification
>  # *Performance:* In large-scale data processing pipelines, filtering data 
> early is one of the most effective optimization strategies. Enabling filter 
> push-down for Pandas UDFs would unlock substantial performance gains, 
> reducing I/O, network traffic, and computational load.
>  # *Common Use Case:* Developers often know with certainty that grouping keys 
> or other identifier columns are not modified within their UDFs. The proposed 
> hint provides a direct means of communicating this domain knowledge to the 
> optimizer.
>  # *Usability:* This feature would empower developers to optimize their 
> pipelines in scenarios where they cannot change an incoming plan and can only 
> apply transformations to a given DataFrame.
> h3. Optional: Runtime Validation
> To safeguard against incorrect usage of the hint, Spark could optionally 
> perform a runtime validation. This check would verify that the values in the 
> columns marked as {{passthrough_cols}} are indeed unchanged between the input 
> and output of the UDF. If a discrepancy is found (e.g., a value in the output 
> {{id}} column did not exist in the input {{id}} column for that batch), Spark 
> could raise an exception. While not entirely foolproof, this would cover most 
> grouping and mapping use cases and prevent subtle bugs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to