Hi all,

Running expensive deterministic UDFs that return complex types, followed by multiple references to those results cause Spark to evaluate the UDF multiple times per row. This has been reported and discussed before: SPARK-18748 SPARK-17728

    val f: Int => Array[Int]
    val udfF = udf(f)
    df
      .select($"id", udfF($"id").as("array"))
      .select($"array"(0).as("array0"), $"array"(1).as("array1"))

A common approach to make Spark evaluate the UDF only once is to cache the intermediate result right after projecting the UDF:

    df
      .select($"id", udfF($"id").as("array"))
      .cache()
      .select($"array"(0).as("array0"), $"array"(1).as("array1"))

There are scenarios where this intermediate result is too big for the cluster to cache. Also this is bad design.

The best approach is to mark the UDF as non-deterministic. Then Spark optimizes the query in a way that the UDF gets called only once per row, exactly what you want.

    val udfF = udf(f).asNondeterministic()

*However, stating a UDF is non-deterministic though it clearly is deterministic is counter-intuitive and makes your code harder to read.*

Spark should provide a better way to flag the UDF. Calling it expensive would be a better naming here.

    val udfF = udf(f).asExpensive()

I understand that deterministic is a notion that Expression provides, and there is no equivalent to expensive that is understood by the optimizer. However, that asExpensive() could just set the ScalaUDF.udfDeterministic =deterministic &&!expensive, which implements the best available approach behind a better naming.

What are your thoughts on asExpensive()?

Regards,
Enrico

Reply via email to