[
https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Martin Loncaric updated SPARK-31356:
------------------------------------
Description:
Problem: in Datasets API, it is a very common pattern to do something like this
whenever a complex reduce function is needed:
{code:scala}
ds
.groupByKey(_.y)
.reduceGroups((a, b) => {...})
.map(_._2)
{code}
However, the .map(_._2) step (taking values and throwing keys away)
unfortunately often ends up as an unnecessary serialization during aggregation
step, followed by {{DeserializeToObject + MapElements (from (K, V) => V) +
SerializeFromObject}} in the optimized logical plan. In this example, it would
be more ideal to either skip the deserialization/serialization or {{Project
(from (K, V) => V)}}. Even manually doing a {{.select(...).as[T]}} to replace
the `.map` is quite tricky, because
* the columns are complicated, like {{[value,
ReduceAggregator(my.data.type)]}}, and seem to be impossible to {{.select}}
* it breaks the nice type checking of Datasets
Proposal:
Change the {{KeyValueGroupedDataset.aggUntyped}} method to (like
{{KeyValueGroupedDataset.cogroup}} append add both an {{Aggregate node}} and a
{{SerializeFromObject}} node so that the Optimizer can eliminate the
serialization when it is redundant. Change aggregations to emit deserialized
results.
I had 2 ideas for what we could change: either add a new feature to
{{.reduceGroupValues}} that projects to only the necessary columns, or do this
improvement. I thought this would be a better solution because
* it will improve the performance of existing Spark applications with no
modifications
* feature growth is undesirable
Uncertainties:
Affects Version: I'm not sure - if I submit a PR soon, can we get this into
3.0? Or only 3.1? And I assume we're not adding new features to 2.4?
Complications: Are there any hazards in splitting Aggregation into Aggregation
+ SerializeFromObject that I'm not aware of?
was:
Problem: in Datasets API, it is a very common pattern to do something like this
whenever a complex reduce function is needed:
{code:scala}
ds
.groupByKey(_.y)
.reduceGroups((a, b) => {...})
.map(_._2)
{code}
However, the .map(_._2) step (taking values and throwing keys away)
unfortunately often ends up as an unnecessary serialization during aggregation
step, followed by {{DeserializeToObject + MapElements (from (K, V) => V) +
SerializeFromObject}} in the optimized logical plan. In this example, it would
be more ideal something like {{Project (from (K, V) => V)}} or . Even manually
doing a `.select(...).as[T]` to replace the `.map` is quite tricky, because
* the columns are complicated, like {{[value,
ReduceAggregator(my.data.type)]}}, and seem to be impossible to {{.select}}
* it breaks the nice type checking of Datasets
Proposal:
Change the {{KeyValueGroupedDataset.aggUntyped}} method to (like
{{KeyValueGroupedDataset.cogroup}} append add both an {{Aggregate node}} and a
{{SerializeFromObject}} node so that the Optimizer can eliminate the
serialization when it is redundant. Change aggregations to emit deserialized
results.
I had 2 ideas for what we could change: either add a new feature to
{{.reduceGroupValues}} that projects to only the necessary columns, or do this
improvement. I thought this would be a better solution because
* it will improve the performance of existing Spark applications with no
modifications
* feature growth is undesirable
Uncertainties:
Affects Version: I'm not sure - if I submit a PR soon, can we get this into
3.0? Or only 3.1? And I assume we're not adding new features to 2.4?
Complications: Are there any hazards in splitting Aggregation into Aggregation
+ SerializeFromObject that I'm not aware of?
> KeyValueGroupedDataset method to reduce and take values only
> ------------------------------------------------------------
>
> Key: SPARK-31356
> URL: https://issues.apache.org/jira/browse/SPARK-31356
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 3.0.0
> Reporter: Martin Loncaric
> Priority: Major
>
> Problem: in Datasets API, it is a very common pattern to do something like
> this whenever a complex reduce function is needed:
> {code:scala}
> ds
> .groupByKey(_.y)
> .reduceGroups((a, b) => {...})
> .map(_._2)
> {code}
> However, the .map(_._2) step (taking values and throwing keys away)
> unfortunately often ends up as an unnecessary serialization during
> aggregation step, followed by {{DeserializeToObject + MapElements (from (K,
> V) => V) + SerializeFromObject}} in the optimized logical plan. In this
> example, it would be more ideal to either skip the
> deserialization/serialization or {{Project (from (K, V) => V)}}. Even
> manually doing a {{.select(...).as[T]}} to replace the `.map` is quite
> tricky, because
> * the columns are complicated, like {{[value,
> ReduceAggregator(my.data.type)]}}, and seem to be impossible to {{.select}}
> * it breaks the nice type checking of Datasets
> Proposal:
> Change the {{KeyValueGroupedDataset.aggUntyped}} method to (like
> {{KeyValueGroupedDataset.cogroup}} append add both an {{Aggregate node}} and
> a {{SerializeFromObject}} node so that the Optimizer can eliminate the
> serialization when it is redundant. Change aggregations to emit deserialized
> results.
> I had 2 ideas for what we could change: either add a new feature to
> {{.reduceGroupValues}} that projects to only the necessary columns, or do
> this improvement. I thought this would be a better solution because
> * it will improve the performance of existing Spark applications with no
> modifications
> * feature growth is undesirable
> Uncertainties:
> Affects Version: I'm not sure - if I submit a PR soon, can we get this into
> 3.0? Or only 3.1? And I assume we're not adding new features to 2.4?
> Complications: Are there any hazards in splitting Aggregation into
> Aggregation + SerializeFromObject that I'm not aware of?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]