[ 
https://issues.apache.org/jira/browse/SPARK-27245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrea Rota resolved SPARK-27245.
---------------------------------
    Resolution: Duplicate

Duplicated issue, the behavior is by design as the optimizer might choose to 
collapse projects and call UDFs more than once. SPARK-17728 suggests a 
workaround using explode to isolate part of the DAG from the point of view of 
the optimizer.

> Optimizer repeat Python UDF calls
> ---------------------------------
>
>                 Key: SPARK-27245
>                 URL: https://issues.apache.org/jira/browse/SPARK-27245
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 2.3.1, 2.3.2, 2.4.0
>         Environment: Tested both on Linux and Windows, on my computer and on 
> Databricks.
> Spark version: 2.3.1
> Python version: 3.6.5 (v3.6.5:f59c0932b4, Mar 28 2018, 17:00:18) 
> I tried different releases of Spark too (2.4.0, 2.3.2), the behaviour 
> persists.
>            Reporter: Andrea Rota
>            Priority: Major
>              Labels: optimizer, performance, planner, pyspark, udf
>
> The physical plan proposed by .explain() method shows an inefficient way to 
> call Python UDFs in PySpark.
> This behaviour take place under these circustances:
>  * PySpark API
>  * At least one operation in the DAG that uses the result of the Python UDF
> My expectation is that the optimizer should call once the Python UDF with 
> BatchEvalPython and then reuse the result across following steps.
> The optimizer prefers to call n times the same UDF, with the same parameters 
> within the same BatchEvalPython, and only uses one of the result columns 
> (PythonUDF2#16) while discarding the others.
> I believe that could lead to poor performances due to the large data exchange 
> with Python processes and due to the additional calls.
> Example code:
> {code:python}
> foo_udf = f.udf(lambda x: 1, IntegerType())
> df = spark.createDataFrame([['bar']]) \
>         .withColumn('result', foo_udf(f.col('_1'))) \
>         .withColumn('a', f.col('result')) \
>         .withColumn('b', f.col('result'))
> df.explain()
> {code}
> {code}
> == Physical Plan ==
> *(1) Project [_1#0, pythonUDF2#16 AS result#2, pythonUDF2#16 AS a#5, 
> pythonUDF2#16 AS b#9]
> +- BatchEvalPython [<lambda>(_1#0), <lambda>(_1#0), <lambda>(_1#0)], [_1#0, 
> pythonUDF0#14, pythonUDF1#15, pythonUDF2#16]
>    +- Scan ExistingRDD[_1#0]
> {code}
> Full code on Gist: 
> [https://gist.github.com/andrearota/f77b6a293421a3f26dd5d2fb0a04046e]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to