[
https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17157128#comment-17157128
]
Martin Loncaric edited comment on SPARK-31356 at 7/19/20, 1:07 AM:
-------------------------------------------------------------------
Actually, there seem to be 4 separate performance issues:
1. unnecessary serialize + deserialize
2. unnecessary map
3. unnecessary appendColumns when groupByKey function just returns a subset of
columns (though this is hard to get around in a type safe way)
4. actually the RDD's API is roughly a whole 2x faster. There might be even
more room to improve aggregations
was (Author: mwlon):
Actually, there seem to be 3 separate performance issues:
1. unnecessary appendColumns when groupByKey function just returns a subset of
columns (though this is hard to get around in a type safe way)
2. unnecessary serialize + deserialize
3. actually the RDD's API is roughly a whole 2x faster. It seems there's a lot
of room to improve aggregations
> reduceGroupValues function for KeyValueGroupedDataset
> -----------------------------------------------------
>
> Key: SPARK-31356
> URL: https://issues.apache.org/jira/browse/SPARK-31356
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 3.0.0
> Reporter: Martin Loncaric
> Priority: Major
>
> Problem: in Datasets API, it is a very common pattern to do something like
> this whenever a complex reduce function is needed:
> {code:scala}
> ds
> .groupByKey(_.y)
> .reduceGroups((a, b) => {...})
> .map(_._2)
> {code}
> However, the the optimized plan unfortunately ends up with an unnecessary
> implicit serialization during aggregation step, followed by
> {{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use
> case, there are 2 things we can improve with a specialized
> {{.reduceGroupValues}}:
> 1. avoid the extra serialization (baked in to AggregationIterator
> implementations) and deserialization
> 2. avoid requiring a {{.map}} and the time it takes to unpack tuples by
> emitting the values only
> Proposal:
> Create an {{AggregationIteratorBase}} superclass that can emit general
> {{InternalRow}}s instead of just {{UnsafeRow}}s.
> Create a new {{AggregationIteratorBase}} implementation called
> {{ObjectValuesAggregationIterator}} that emits {{InternalRow}}s containing
> only the values instead of serializing them into {{UnsafeRow}}s on {{Final}}
> or {{Complete}} modes. Since we don't need to emit the keys, which are
> serialized, this is not too complicated. To make use of this, have the
> {{.reduceGroupValues}} return an {{Aggregate}} wrapped in a
> {{CatalystSerde.serialize}}.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]