[ 
https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Loncaric updated SPARK-31356:
------------------------------------
    Description: 
Problem: in Datasets API, it is a very common pattern to do something like this 
whenever a complex reduce function is needed:

{code:scala}
ds
  .groupByKey(_.y)
  .reduceGroups(((aKey, aVal), (bKey, bVal)) => outVal)
  .map(_._2)
{code}

However, the the optimized plan unfortunately ends up with an unnecessary 
implicit serialization during aggregation step, followed by 
{{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use 
case, there are 2 things we can improve with a specialized 
{{.reduceGroupValues}}:

1. avoid the extra serialization (baked in to AggregationIterator 
implementations) and deserialization
2. avoid requiring a {{.map}} and the time it takes to unpack tuples by 
emitting the values only

Proposal:

{code:scala}
ds
  .groupByKey(_.y)
  .reduceGroupValues((aVal, bVal) => outVal)
{code}
Create an {{AggregationIteratorBase}} superclass that can emit general 
{{InternalRow}} s instead of just {{UnsafeRow}} s.
Create a new {{AggregationIteratorBase}} implementation called 
{{ObjectValuesAggregationIterator}} that emits {{InternalRow}} s containing 
only the values instead of serializing them into {{UnsafeRow}} s on {{Final}} 
or {{Complete}} modes. Since we don't need to emit the keys, which are 
serialized, this is not too complicated. To make use of this, have the 
{{.reduceGroupValues}} return an {{Aggregate}} wrapped in a 
{{CatalystSerde.serialize}}.

  was:
Problem: in Datasets API, it is a very common pattern to do something like this 
whenever a complex reduce function is needed:

{code:scala}
ds
  .groupByKey(_.y)
  .reduceGroups(((aKey, aVal), (bKey, bVal)) => outVal)
  .map(_._2)
{code}

However, the the optimized plan unfortunately ends up with an unnecessary 
implicit serialization during aggregation step, followed by 
{{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use 
case, there are 2 things we can improve with a specialized 
{{.reduceGroupValues}}:

1. avoid the extra serialization (baked in to AggregationIterator 
implementations) and deserialization
2. avoid requiring a {{.map}} and the time it takes to unpack tuples by 
emitting the values only

Proposal:

{code:scala}
ds
  .groupByKey(_.y)
  .reduceGroupValues((aVal, bVal) => outVal)
{code}
Create an {{AggregationIteratorBase}} superclass that can emit general 
{{InternalRow}} s instead of just {{UnsafeRow}} s.
Create a new {{AggregationIteratorBase}} implementation called 
{{ObjectValuesAggregationIterator}} that emits {{InternalRow}}s containing only 
the values instead of serializing them into {{UnsafeRow}}s on {{Final}} or 
{{Complete}} modes. Since we don't need to emit the keys, which are serialized, 
this is not too complicated. To make use of this, have the 
{{.reduceGroupValues}} return an {{Aggregate}} wrapped in a 
{{CatalystSerde.serialize}}.


> reduceGroupValues function for KeyValueGroupedDataset
> -----------------------------------------------------
>
>                 Key: SPARK-31356
>                 URL: https://issues.apache.org/jira/browse/SPARK-31356
>             Project: Spark
>          Issue Type: New Feature
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Martin Loncaric
>            Priority: Major
>
> Problem: in Datasets API, it is a very common pattern to do something like 
> this whenever a complex reduce function is needed:
> {code:scala}
> ds
>   .groupByKey(_.y)
>   .reduceGroups(((aKey, aVal), (bKey, bVal)) => outVal)
>   .map(_._2)
> {code}
> However, the the optimized plan unfortunately ends up with an unnecessary 
> implicit serialization during aggregation step, followed by 
> {{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use 
> case, there are 2 things we can improve with a specialized 
> {{.reduceGroupValues}}:
> 1. avoid the extra serialization (baked in to AggregationIterator 
> implementations) and deserialization
> 2. avoid requiring a {{.map}} and the time it takes to unpack tuples by 
> emitting the values only
> Proposal:
> {code:scala}
> ds
>   .groupByKey(_.y)
>   .reduceGroupValues((aVal, bVal) => outVal)
> {code}
> Create an {{AggregationIteratorBase}} superclass that can emit general 
> {{InternalRow}} s instead of just {{UnsafeRow}} s.
> Create a new {{AggregationIteratorBase}} implementation called 
> {{ObjectValuesAggregationIterator}} that emits {{InternalRow}} s containing 
> only the values instead of serializing them into {{UnsafeRow}} s on {{Final}} 
> or {{Complete}} modes. Since we don't need to emit the keys, which are 
> serialized, this is not too complicated. To make use of this, have the 
> {{.reduceGroupValues}} return an {{Aggregate}} wrapped in a 
> {{CatalystSerde.serialize}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to