Martin Loncaric created SPARK-31356:
---------------------------------------

             Summary: KeyValueGroupedDataset method to reduce and take values 
only
                 Key: SPARK-31356
                 URL: https://issues.apache.org/jira/browse/SPARK-31356
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 3.0.0
            Reporter: Martin Loncaric


Problem: in Datasets API, it is a very common pattern to do something like this 
whenever a complex reduce function is needed:

{code:scala}
ds
  .groupByKey(_.y)
  .reduceGroups((a, b) => {...})
  .map(_._2)
{code}

However, the .map(_._2) step (taking values and throwing keys away) 
unfortunately often ends up as an unnecessary serialization during aggregation 
step, followed by {{DeserializeToObject + MapElements (from (K, V) => V) + 
SerializeFromObject}} in the optimized logical plan. In this example, it would 
be more ideal something like {{Project (from (K, V) => V)}} or . Even manually 
doing a `.select(...).as[T]` to replace the `.map` is quite tricky, because
* the columns are complicated, like {{[value, 
ReduceAggregator(my.data.type)]}}, and seem to be impossible to {{.select}}
* it breaks the nice type checking of Datasets

Proposal:
Change the {{KeyValueGroupedDataset.aggUntyped}} method to (like 
{{KeyValueGroupedDataset.cogroup}} append add both an {{Aggregate node}} and a 
{{SerializeFromObject}} node so that the Optimizer can eliminate the 
serialization when it is redundant. Change aggregations to emit deserialized 
results.

I had 2 ideas for what we could change: either add a new feature to 
{{.reduceGroupValues}} that projects to only the necessary columns, or do this 
improvement. I thought this would be a better solution because
* it will improve the performance of existing Spark applications with no 
modifications
* feature growth is undesirable

Uncertainties:
Affects Version: I'm not sure - if I submit a PR soon, can we get this into 
3.0? Or only 3.1? And I assume we're not adding new features to 2.4?
Complications: Are there any hazards in splitting Aggregation into Aggregation 
+ SerializeFromObject that I'm not aware of?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to