[ https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Martin Loncaric updated SPARK-31356: ------------------------------------ Description: Problem: in Datasets API, it is a very common pattern to do something like this whenever a complex reduce function is needed: {code:scala} ds .groupByKey(_.y) .reduceGroups(((aKey, aVal), (bKey, bVal)) => outVal) .map(_._2) {code} However, the the optimized plan unfortunately ends up with an unnecessary implicit serialization during aggregation step, followed by {{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use case, there are 2 things we can improve with a specialized {{.reduceGroupValues}}: 1. avoid the extra serialization (baked in to AggregationIterator implementations) and deserialization 2. avoid requiring a {{.map}} and the time it takes to unpack tuples by emitting the values only Proposal: {code:scala} ds .groupByKey(_.y) .reduceGroupValues((aVal, bVal) => outVal) {code} Create an {{AggregationIteratorBase}} superclass that can emit general {{InternalRow}} s instead of just {{UnsafeRow}} s. Create a new {{AggregationIteratorBase}} implementation called {{ObjectValuesAggregationIterator}} that emits {{InternalRow}} s containing only the values instead of serializing them into {{UnsafeRow}} s on {{Final}} or {{Complete}} modes. Since we don't need to emit the keys, which are serialized, this is not too complicated. To make use of this, have the {{.reduceGroupValues}} return an {{Aggregate}} wrapped in a {{CatalystSerde.serialize}}. was: Problem: in Datasets API, it is a very common pattern to do something like this whenever a complex reduce function is needed: {code:scala} ds .groupByKey(_.y) .reduceGroups(((aKey, aVal), (bKey, bVal)) => outVal) .map(_._2) {code} However, the the optimized plan unfortunately ends up with an unnecessary implicit serialization during aggregation step, followed by {{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use case, there are 2 things we can improve with a specialized {{.reduceGroupValues}}: 1. avoid the extra serialization (baked in to AggregationIterator implementations) and deserialization 2. avoid requiring a {{.map}} and the time it takes to unpack tuples by emitting the values only Proposal: {code:scala} ds .groupByKey(_.y) .reduceGroupValues((aVal, bVal) => outVal) {code} Create an {{AggregationIteratorBase}} superclass that can emit general {{InternalRow}} s instead of just {{UnsafeRow}} s. Create a new {{AggregationIteratorBase}} implementation called {{ObjectValuesAggregationIterator}} that emits {{InternalRow}}s containing only the values instead of serializing them into {{UnsafeRow}}s on {{Final}} or {{Complete}} modes. Since we don't need to emit the keys, which are serialized, this is not too complicated. To make use of this, have the {{.reduceGroupValues}} return an {{Aggregate}} wrapped in a {{CatalystSerde.serialize}}. > reduceGroupValues function for KeyValueGroupedDataset > ----------------------------------------------------- > > Key: SPARK-31356 > URL: https://issues.apache.org/jira/browse/SPARK-31356 > Project: Spark > Issue Type: New Feature > Components: SQL > Affects Versions: 3.0.0 > Reporter: Martin Loncaric > Priority: Major > > Problem: in Datasets API, it is a very common pattern to do something like > this whenever a complex reduce function is needed: > {code:scala} > ds > .groupByKey(_.y) > .reduceGroups(((aKey, aVal), (bKey, bVal)) => outVal) > .map(_._2) > {code} > However, the the optimized plan unfortunately ends up with an unnecessary > implicit serialization during aggregation step, followed by > {{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use > case, there are 2 things we can improve with a specialized > {{.reduceGroupValues}}: > 1. avoid the extra serialization (baked in to AggregationIterator > implementations) and deserialization > 2. avoid requiring a {{.map}} and the time it takes to unpack tuples by > emitting the values only > Proposal: > {code:scala} > ds > .groupByKey(_.y) > .reduceGroupValues((aVal, bVal) => outVal) > {code} > Create an {{AggregationIteratorBase}} superclass that can emit general > {{InternalRow}} s instead of just {{UnsafeRow}} s. > Create a new {{AggregationIteratorBase}} implementation called > {{ObjectValuesAggregationIterator}} that emits {{InternalRow}} s containing > only the values instead of serializing them into {{UnsafeRow}} s on {{Final}} > or {{Complete}} modes. Since we don't need to emit the keys, which are > serialized, this is not too complicated. To make use of this, have the > {{.reduceGroupValues}} return an {{Aggregate}} wrapped in a > {{CatalystSerde.serialize}}. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org