[ 
https://issues.apache.org/jira/browse/SPARK-39481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17556219#comment-17556219
 ] 

Yuming Wang commented on SPARK-39481:
-------------------------------------

Copy the code from description:
{code:python}
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

df = spark.createDataFrame(
    [
        [1, 'one'],
        [2, 'two'],
        [3, 'three'],
    ],
    'int_col int, string_col string',
)

@F.pandas_udf('int')
def copy_int_col(s):
    return s

df = df.withColumn('int_col_copy', copy_int_col(df['int_col']))
df = df.filter(F.col('int_col_copy') >= 3)

df.explain(True)

{code}


> Pandas UDF executed twice if used in projection followed by filter
> ------------------------------------------------------------------
>
>                 Key: SPARK-39481
>                 URL: https://issues.apache.org/jira/browse/SPARK-39481
>             Project: Spark
>          Issue Type: Improvement
>          Components: PySpark
>    Affects Versions: 3.2.1
>            Reporter: Timothy Dijamco
>            Priority: Minor
>
> In this scenario, a Pandas UDF will be executed twice:
>  # Projection that applies a Pandas UDF
>  # Filter
> In the {{explain}} output of the example below, the Optimized Logical Plan 
> and Physical Plan contain {{ArrowEvalPython}} twice:
> {code:python}
> from pyspark.sql import SparkSession
> import pyspark.sql.functions as F
> spark = SparkSession.builder.master('local[1]').getOrCreate()
> df = spark.createDataFrame(
>     [
>         [1, 'one'],
>         [2, 'two'],
>         [3, 'three'],
>     ],
>     'int_col int, string_col string',
> )
> @F.pandas_udf('int')
> def copy_int_col(s):
>     return s
> df = df.withColumn('int_col_copy', copy_int_col(df['int_col']))
> df = df.filter(F.col('int_col_copy') >= 3)
> df.explain(True)
> {code}
> {code:java}
> == Parsed Logical Plan ==
> 'Filter ('int_col_copy >= 3)
> +- Project [int_col#322, string_col#323, copy_int_col(int_col#322) AS 
> int_col_copy#327]
>    +- LogicalRDD [int_col#322, string_col#323], false
> == Analyzed Logical Plan ==
> int_col: int, string_col: string, int_col_copy: int
> Filter (int_col_copy#327 >= 3)
> +- Project [int_col#322, string_col#323, copy_int_col(int_col#322) AS 
> int_col_copy#327]
>    +- LogicalRDD [int_col#322, string_col#323], false
> == Optimized Logical Plan ==
> Project [int_col#322, string_col#323, pythonUDF0#332 AS int_col_copy#327]
> +- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#332], 200
>    +- Project [int_col#322, string_col#323]
>       +- Filter (pythonUDF0#331 >= 3)
>          +- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#331], 200
>             +- LogicalRDD [int_col#322, string_col#323], false
> == Physical Plan ==
> *(3) Project [int_col#322, string_col#323, pythonUDF0#332 AS int_col_copy#327]
> +- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#332], 200
>    +- *(2) Project [int_col#322, string_col#323]
>       +- *(2) Filter (pythonUDF0#331 >= 3)
>          +- ArrowEvalPython [copy_int_col(int_col#322)], [pythonUDF0#331], 200
>             +- *(1) Scan ExistingRDD[int_col#322,string_col#323]
> {code}
> If the Pandas UDF is marked as non-deterministic (e.g. {{{}copy_int_col = 
> copy_int_col.asNondeterministic(){}}}), then it is not executed twice.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to