[
https://issues.apache.org/jira/browse/SPARK-27684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Josh Rosen updated SPARK-27684:
-------------------------------
Description:
I believe that we can reduce ScalaUDF overheads when operating over primitive
types.
In [ScalaUDF's
doGenCode|https://github.com/apache/spark/blob/5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala#L991]
we have logic to convert UDF function input types from Catalyst internal types
to Scala types (for example, this is used to convert UTF8Strings to Java
Strings). Similarly, we convert UDF return types.
However, UDF input argument conversion is effectively a no-op for primitive
types because {{CatalystTypeConverters.createToScalaConverter()}} returns
{{identity}} in those cases. UDF result conversion is a little tricker because
{{createToCatalystConverter()}} returns [a
function|https://github.com/apache/spark/blob/5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala#L413]
that handles {{Option[Primitive]}}, but it might be the case that the
Option-boxing is unusable via ScalaUDF.
These unnecessary no-op conversions could be quite expensive because each call
involves an index into the {{references}} array to get the converters, a second
index into the converters array to get the correct converter for a specific
input argument, and, finally, the converter invocation itself:
{code:java}
Object project_arg_0 = false ? null : ((scala.Function1[]) references[1] /*
converters */)[0].apply(project_value_3);{code}
In these cases, I believe that we can reduce lookup / invocation overheads by
modifying the ScalaUDF code generation to eliminate the conversion calls for
primitives and directly assign the unconverted result, e.g.
{code:java}
Object project_arg_0 = false ? null : project_value_3;{code}
To cleanly handle the case where we have a multi-argument UDF accepting a
mixture of primitive and non-primitive types, we might be able to keep the
{{converters}} array the same size (so indexes stay the same) but omit the use
of the converters (e.g. {{converters}} is sparse / contains unused entries in
case of primitives).
I spotted this optimization while trying to construct some quick benchmarks to
measure UDF invocation overheads. For example:
{code:java}
spark.udf.register("identity", (x: Int) => x)
sql("select id, id * 2, id * 3 from range(1000 * 1000 * 1000)").rdd.count() //
~ 52 seconds
sql("select identity(id), identity(id * 2), identity(id * 3) from range(1000 *
1000 * 1000)").rdd.count() // ~84 seconds{code}
I'm curious to see whether the optimization suggested here can close this
performance gap. It'd also be a good idea to construct more principled
microbenchmarks covering multi-argument UDFs, projections involving multiple
UDFs over different input and output types, etc.
was:
I believe that we can reduce ScalaUDF overheads when operating over primitive
types.
In [ScalaUDF's
doGenCode|https://github.com/apache/spark/blob/5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala#L991]
we have logic to convert UDF function input types from Catalyst internal types
to Scala types (for example, this is used to convert UTF8Strings to Java
Strings). Similarly, we convert UDF return types.
However, UDF input argument conversion is effectively a no-op for primitive
types because {{CatalystTypeConverters.createToScalaConverter()}} returns
{{identity}} in those cases). UDF return argument conversion is a little
tricker because {{createToCatalystConverter()}} returns [a
function|https://github.com/apache/spark/blob/5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala#L413]
that handles {{Option[Primitive]}}, but it might be the case that the
Option-boxing is unusable via ScalaUDF.
These unnecessary no-op conversions could be quite expensive because each call
involves an index into the {{references}} array to get the converters, a second
index into the converters array to get the correct converter for a specific
input argument, and, finally, the converter invocation itself:
{code:java}
Object project_arg_0 = false ? null : ((scala.Function1[]) references[1] /*
converters */)[0].apply(project_value_3);{code}
In these cases, I believe that we can reduce lookup / invocation overheads by
modifying the ScalaUDF code generation to eliminate the conversion calls for
primitives and directly assign the unconverted result, e.g.
{code:java}
Object project_arg_0 = false ? null : project_value_3;{code}
To cleanly handle the case where we have a multi-argument UDF accepting a
mixture of primitive and non-primitive types, we might be able to keep the
{{converters}} array the same size (so indexes stay the same) but omit the use
of the converters (e.g. {{converters}} is sparse / contains unused entries in
case of primitives).
I spotted this optimization while trying to construct some quick benchmarks to
measure UDF invocation overheads. For example:
{code:java}
spark.udf.register("identity", (x: Int) => x)
sql("select id, id * 2, id * 3 from range(1000 * 1000 * 1000)").rdd.count() //
~ 52 seconds
sql("select identity(id), identity(id * 2), identity(id * 3) from range(1000 *
1000 * 1000)").rdd.count() // ~84 seconds{code}
I'm curious to see whether the optimization suggested here can close this
performance gap. It'd also be a good idea to construct more principled
microbenchmarks covering multi-argument UDFs, projections involving multiple
UDFs over different input and output types, etc.
> Reduce ScalaUDF conversion overheads for primitives
> ---------------------------------------------------
>
> Key: SPARK-27684
> URL: https://issues.apache.org/jira/browse/SPARK-27684
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 2.4.0
> Reporter: Josh Rosen
> Priority: Major
>
> I believe that we can reduce ScalaUDF overheads when operating over primitive
> types.
> In [ScalaUDF's
> doGenCode|https://github.com/apache/spark/blob/5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala#L991]
> we have logic to convert UDF function input types from Catalyst internal
> types to Scala types (for example, this is used to convert UTF8Strings to
> Java Strings). Similarly, we convert UDF return types.
> However, UDF input argument conversion is effectively a no-op for primitive
> types because {{CatalystTypeConverters.createToScalaConverter()}} returns
> {{identity}} in those cases. UDF result conversion is a little tricker
> because {{createToCatalystConverter()}} returns [a
> function|https://github.com/apache/spark/blob/5a8aad01c2aaf0ceef8e9a3cfabbd2e88c8d9f0d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala#L413]
> that handles {{Option[Primitive]}}, but it might be the case that the
> Option-boxing is unusable via ScalaUDF.
> These unnecessary no-op conversions could be quite expensive because each
> call involves an index into the {{references}} array to get the converters, a
> second index into the converters array to get the correct converter for a
> specific input argument, and, finally, the converter invocation itself:
> {code:java}
> Object project_arg_0 = false ? null : ((scala.Function1[]) references[1] /*
> converters */)[0].apply(project_value_3);{code}
> In these cases, I believe that we can reduce lookup / invocation overheads by
> modifying the ScalaUDF code generation to eliminate the conversion calls for
> primitives and directly assign the unconverted result, e.g.
> {code:java}
> Object project_arg_0 = false ? null : project_value_3;{code}
> To cleanly handle the case where we have a multi-argument UDF accepting a
> mixture of primitive and non-primitive types, we might be able to keep the
> {{converters}} array the same size (so indexes stay the same) but omit the
> use of the converters (e.g. {{converters}} is sparse / contains unused
> entries in case of primitives).
> I spotted this optimization while trying to construct some quick benchmarks
> to measure UDF invocation overheads. For example:
> {code:java}
> spark.udf.register("identity", (x: Int) => x)
> sql("select id, id * 2, id * 3 from range(1000 * 1000 * 1000)").rdd.count()
> // ~ 52 seconds
> sql("select identity(id), identity(id * 2), identity(id * 3) from range(1000
> * 1000 * 1000)").rdd.count() // ~84 seconds{code}
> I'm curious to see whether the optimization suggested here can close this
> performance gap. It'd also be a good idea to construct more principled
> microbenchmarks covering multi-argument UDFs, projections involving multiple
> UDFs over different input and output types, etc.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]