[ 
https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17157128#comment-17157128
 ] 

Martin Loncaric edited comment on SPARK-31356 at 7/19/20, 1:07 AM:
-------------------------------------------------------------------

Actually, there seem to be 4 separate performance issues:
1. unnecessary serialize + deserialize
2. unnecessary map
3. unnecessary appendColumns when groupByKey function just returns a subset of 
columns (though this is hard to get around in a type safe way)
4. actually the RDD's API is roughly a whole 2x faster. There might be even 
more room to improve aggregations


was (Author: mwlon):
Actually, there seem to be 3 separate performance issues:
1. unnecessary appendColumns when groupByKey function just returns a subset of 
columns (though this is hard to get around in a type safe way)
2. unnecessary serialize + deserialize
3. actually the RDD's API is roughly a whole 2x faster. It seems there's a lot 
of room to improve aggregations

> reduceGroupValues function for KeyValueGroupedDataset
> -----------------------------------------------------
>
>                 Key: SPARK-31356
>                 URL: https://issues.apache.org/jira/browse/SPARK-31356
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Martin Loncaric
>            Priority: Major
>
> Problem: in Datasets API, it is a very common pattern to do something like 
> this whenever a complex reduce function is needed:
> {code:scala}
> ds
>   .groupByKey(_.y)
>   .reduceGroups((a, b) => {...})
>   .map(_._2)
> {code}
> However, the the optimized plan unfortunately ends up with an unnecessary 
> implicit serialization during aggregation step, followed by 
> {{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use 
> case, there are 2 things we can improve with a specialized 
> {{.reduceGroupValues}}:
> 1. avoid the extra serialization (baked in to AggregationIterator 
> implementations) and deserialization
> 2. avoid requiring a {{.map}} and the time it takes to unpack tuples by 
> emitting the values only
> Proposal:
> Create an {{AggregationIteratorBase}} superclass that can emit general 
> {{InternalRow}}s instead of just {{UnsafeRow}}s.
> Create a new {{AggregationIteratorBase}} implementation called 
> {{ObjectValuesAggregationIterator}} that emits {{InternalRow}}s containing 
> only the values instead of serializing them into {{UnsafeRow}}s on {{Final}} 
> or {{Complete}} modes. Since we don't need to emit the keys, which are 
> serialized, this is not too complicated. To make use of this, have the 
> {{.reduceGroupValues}} return an {{Aggregate}} wrapped in a 
> {{CatalystSerde.serialize}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to