For stateful/non-deterministic UDFs, we do not evaluate them in the
optimizer stage. For deterministic UDFs, each invocation should return the
same result.

Before Spark 2.3 release, we assume all the UDFs are deterministic and
stateless. In the recent release Spark 2.3, we allow users to mark the
determinism of UDFs. Thus, in your case, if you want to execute the UDF for
each row, please define the UDF as the deterministic one.

In the next release, we might introduce an interface to let users control
the optimizer to not run some specific rules. That might also help your
case too.


2018-06-08 13:22 GMT-07:00 Li Jin <ice.xell...@gmail.com>:

> Sorry I am confused now... My UDF gets executed for each row anyway
> (because I am doing with column and want to execute the UDF with each row).
> The difference is that with the optimization "ConvertToLocalRelation" it
> gets executed for each row on the driver in the optimization stage?
>
> On Fri, Jun 8, 2018 at 3:57 PM, Herman van Hövell tot Westerflier <
> her...@databricks.com> wrote:
>
>> But that is still cheaper than executing that expensive UDF for each row
>> in your dataset right?
>>
>> On Fri, Jun 8, 2018 at 9:51 PM Li Jin <ice.xell...@gmail.com> wrote:
>>
>>> I see. Thanks for the clarification. It's not a a big issue but I am
>>> surprised my UDF can be executed in planning phase. If my UDF is doing
>>> something expensive it could get weird.
>>>
>>>
>>>
>>> On Fri, Jun 8, 2018 at 3:44 PM, Reynold Xin <r...@databricks.com> wrote:
>>>
>>>> But from the user's perspective, optimization is not run right? So it
>>>> is still lazy.
>>>>
>>>>
>>>> On Fri, Jun 8, 2018 at 12:35 PM Li Jin <ice.xell...@gmail.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> Sorry for the long email title. I am a bit surprised to find that the
>>>>> current optimizer rule "ConvertToLocalRelation" causes expressions to be
>>>>> eager-evaluated in planning phase, this can be demonstrated with the
>>>>> following code:
>>>>>
>>>>> scala> val myUDF = udf((x: String) => { println("UDF evaled");
>>>>> "result" })
>>>>>
>>>>> myUDF: org.apache.spark.sql.expressions.UserDefinedFunction =
>>>>> UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
>>>>>
>>>>>
>>>>> scala> val df = Seq(("test", 1)).toDF("s", "i").select(myUDF($"s"))
>>>>>
>>>>> df: org.apache.spark.sql.DataFrame = [UDF(s): string]
>>>>>
>>>>>
>>>>> scala> println(df.queryExecution.optimizedPlan)
>>>>>
>>>>> UDF evaled
>>>>>
>>>>> LocalRelation [UDF(s)#9]
>>>>>
>>>>>  This is somewhat unexpected to me because of Spark's lazy execution
>>>>> model.
>>>>>
>>>>> I am wondering if this behavior is by design?
>>>>>
>>>>> Thanks!
>>>>> Li
>>>>>
>>>>>
>>>>>
>>>
>

Reply via email to