[ 
https://issues.apache.org/jira/browse/SPARK-27684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Rosen updated SPARK-27684:
-------------------------------
    Description: 
I believe that we can reduce ScalaUDF overheads when operating over primitive 
types.

In [ScalaUDF's 
doGenCode|https://github.com/apache/spark/blob/5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala#L991]
 we have logic to convert UDF function input types from Catalyst internal types 
to Scala types (for example, this is used to convert UTF8Strings to Java 
Strings). Similarly, we convert UDF return types.

However, UDF input argument conversion is effectively a no-op for primitive 
types because {{CatalystTypeConverters.createToScalaConverter()}} returns 
{{identity}} in those cases. UDF result conversion is a little tricker because 
{{createToCatalystConverter()}} returns [a 
function|https://github.com/apache/spark/blob/5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala#L413]
 that handles {{Option[Primitive]}}, but it might be the case that the 
Option-boxing is unusable via ScalaUDF (in which case the conversion truly is 
an {{identity}} no-op).

These unnecessary no-op conversions could be quite expensive because each call 
involves an index into the {{references}} array to get the converters, a second 
index into the converters array to get the correct converter for the nth input 
argument, and, finally, the converter invocation itself:
{code:java}
Object project_arg_0 = false ? null : ((scala.Function1[]) references[1] /* 
converters */)[0].apply(project_value_3);{code}
In these cases, I believe that we can reduce lookup / invocation overheads by 
modifying the ScalaUDF code generation to eliminate the conversion calls for 
primitives and directly assign the unconverted result, e.g.
{code:java}
Object project_arg_0 = false ? null : project_value_3;{code}
To cleanly handle the case where we have a multi-argument UDF accepting a 
mixture of primitive and non-primitive types, we might be able to keep the 
{{converters}} array the same size (so indexes stay the same) but omit the 
invocation of the converters for the primitive arguments (e.g. {{converters}} 
is sparse / contains unused entries in case of primitives).

I spotted this optimization while trying to construct some quick benchmarks to 
measure UDF invocation overheads. For example:
{code:java}
spark.udf.register("identity", (x: Int) => x)
sql("select id, id * 2, id * 3 from range(1000 * 1000 * 1000)").rdd.count() // 
~ 52 seconds
sql("select identity(id), identity(id * 2), identity(id * 3) from range(1000 * 
1000 * 1000)").rdd.count() // ~84 seconds{code}
I'm curious to see whether the optimization suggested here can close this 
performance gap. It'd also be a good idea to construct more principled 
microbenchmarks covering multi-argument UDFs, projections involving multiple 
UDFs over different input and output types, etc.

 

  was:
I believe that we can reduce ScalaUDF overheads when operating over primitive 
types.

In [ScalaUDF's 
doGenCode|https://github.com/apache/spark/blob/5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala#L991]
 we have logic to convert UDF function input types from Catalyst internal types 
to Scala types (for example, this is used to convert UTF8Strings to Java 
Strings). Similarly, we convert UDF return types.

However, UDF input argument conversion is effectively a no-op for primitive 
types because {{CatalystTypeConverters.createToScalaConverter()}} returns 
{{identity}} in those cases. UDF result conversion is a little tricker because 
{{createToCatalystConverter()}} returns [a 
function|https://github.com/apache/spark/blob/5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala#L413]
 that handles {{Option[Primitive]}}, but it might be the case that the 
Option-boxing is unusable via ScalaUDF (in which case the conversion truly is 
an {{identity}} no-op).

These unnecessary no-op conversions could be quite expensive because each call 
involves an index into the {{references}} array to get the converters, a second 
index into the converters array to get the correct converter for the nth input 
argument, and, finally, the converter invocation itself:
{code:java}
Object project_arg_0 = false ? null : ((scala.Function1[]) references[1] /* 
converters */)[0].apply(project_value_3);{code}
In these cases, I believe that we can reduce lookup / invocation overheads by 
modifying the ScalaUDF code generation to eliminate the conversion calls for 
primitives and directly assign the unconverted result, e.g.
{code:java}
Object project_arg_0 = false ? null : project_value_3;{code}
To cleanly handle the case where we have a multi-argument UDF accepting a 
mixture of primitive and non-primitive types, we might be able to keep the 
{{converters}} array the same size (so indexes stay the same) but omit the use 
of the converters (e.g. {{converters}} is sparse / contains unused entries in 
case of primitives).

I spotted this optimization while trying to construct some quick benchmarks to 
measure UDF invocation overheads. For example:
{code:java}
spark.udf.register("identity", (x: Int) => x)
sql("select id, id * 2, id * 3 from range(1000 * 1000 * 1000)").rdd.count() // 
~ 52 seconds
sql("select identity(id), identity(id * 2), identity(id * 3) from range(1000 * 
1000 * 1000)").rdd.count() // ~84 seconds{code}
I'm curious to see whether the optimization suggested here can close this 
performance gap. It'd also be a good idea to construct more principled 
microbenchmarks covering multi-argument UDFs, projections involving multiple 
UDFs over different input and output types, etc.

 


> Reduce ScalaUDF conversion overheads for primitives
> ---------------------------------------------------
>
>                 Key: SPARK-27684
>                 URL: https://issues.apache.org/jira/browse/SPARK-27684
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.4.0
>            Reporter: Josh Rosen
>            Priority: Major
>
> I believe that we can reduce ScalaUDF overheads when operating over primitive 
> types.
> In [ScalaUDF's 
> doGenCode|https://github.com/apache/spark/blob/5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala#L991]
>  we have logic to convert UDF function input types from Catalyst internal 
> types to Scala types (for example, this is used to convert UTF8Strings to 
> Java Strings). Similarly, we convert UDF return types.
> However, UDF input argument conversion is effectively a no-op for primitive 
> types because {{CatalystTypeConverters.createToScalaConverter()}} returns 
> {{identity}} in those cases. UDF result conversion is a little tricker 
> because {{createToCatalystConverter()}} returns [a 
> function|https://github.com/apache/spark/blob/5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala#L413]
>  that handles {{Option[Primitive]}}, but it might be the case that the 
> Option-boxing is unusable via ScalaUDF (in which case the conversion truly is 
> an {{identity}} no-op).
> These unnecessary no-op conversions could be quite expensive because each 
> call involves an index into the {{references}} array to get the converters, a 
> second index into the converters array to get the correct converter for the 
> nth input argument, and, finally, the converter invocation itself:
> {code:java}
> Object project_arg_0 = false ? null : ((scala.Function1[]) references[1] /* 
> converters */)[0].apply(project_value_3);{code}
> In these cases, I believe that we can reduce lookup / invocation overheads by 
> modifying the ScalaUDF code generation to eliminate the conversion calls for 
> primitives and directly assign the unconverted result, e.g.
> {code:java}
> Object project_arg_0 = false ? null : project_value_3;{code}
> To cleanly handle the case where we have a multi-argument UDF accepting a 
> mixture of primitive and non-primitive types, we might be able to keep the 
> {{converters}} array the same size (so indexes stay the same) but omit the 
> invocation of the converters for the primitive arguments (e.g. {{converters}} 
> is sparse / contains unused entries in case of primitives).
> I spotted this optimization while trying to construct some quick benchmarks 
> to measure UDF invocation overheads. For example:
> {code:java}
> spark.udf.register("identity", (x: Int) => x)
> sql("select id, id * 2, id * 3 from range(1000 * 1000 * 1000)").rdd.count() 
> // ~ 52 seconds
> sql("select identity(id), identity(id * 2), identity(id * 3) from range(1000 
> * 1000 * 1000)").rdd.count() // ~84 seconds{code}
> I'm curious to see whether the optimization suggested here can close this 
> performance gap. It'd also be a good idea to construct more principled 
> microbenchmarks covering multi-argument UDFs, projections involving multiple 
> UDFs over different input and output types, etc.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to