[ 
https://issues.apache.org/jira/browse/SPARK-42115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-42115:
------------------------------------

    Assignee:     (was: Apache Spark)

> Push down limit through Python UDFs
> -----------------------------------
>
>                 Key: SPARK-42115
>                 URL: https://issues.apache.org/jira/browse/SPARK-42115
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, SQL
>    Affects Versions: 3.4.0
>            Reporter: Hyukjin Kwon
>            Priority: Major
>
> {code}
> from pyspark.sql.functions import udf
> spark.range(10).write.mode("overwrite").parquet("/tmp/abc")
> @udf(returnType='string')
> def my_udf(arg):
>     return arg
> df = spark.read.parquet("/tmp/abc")
> df.limit(10).withColumn("prediction", my_udf(df["id"])).explain()
> {code}
> As an example. since Python UDFs are executed asynchronously, so pushing 
> limits benefit the performance.
> {code}
> == Physical Plan ==
> CollectLimit 10
> +- *(2) Project [id#3L, pythonUDF0#10 AS prediction#6]
>    +- BatchEvalPython [my_udf(id#3L)#5], [pythonUDF0#10]
>       +- *(1) ColumnarToRow
>          +- FileScan parquet [id#3L] Batched: true, DataFilters: [], Format: 
> Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/abc], 
> PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
> {code}
> This is a regression from Spark 3.3.1:
> {code}
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- Project [id#3L, pythonUDF0#10 AS prediction#6]
>    +- BatchEvalPython [my_udf(id#3L)#5], [pythonUDF0#10]
>       +- GlobalLimit 10
>          +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=30]
>             +- LocalLimit 10
>                +- FileScan parquet [id#3L] Batched: true, DataFilters: [], 
> Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/abc], 
> PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to