[
https://issues.apache.org/jira/browse/BEAM-5392?focusedWorklogId=189030&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-189030
]
ASF GitHub Bot logged work on BEAM-5392:
----------------------------------------
Author: ASF GitHub Bot
Created on: 23/Jan/19 16:06
Start Date: 23/Jan/19 16:06
Worklog Time Spent: 10m
Work Description: mareksimunek commented on pull request #7601:
[BEAM-5392] GroupByKey optimized for non-merging windows
URL: https://github.com/apache/beam/pull/7601
- used spark `repartitionAndSortWithinPartitions` instead `groupByKey` to
not store all records for key in memory
- For non-merging windows, window is put itself into the key resulting in
smaller groupings.
Tested on our job with 64TB shuffle read in one stage.
more info https://issues.apache.org/jira/browse/BEAM-5392
------------------------
Follow this checklist to help us incorporate your contribution quickly and
easily:
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA
issue, if applicable. This will automatically link the pull request to the
issue.
- [ ] If this contribution is large, please file an Apache [Individual
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
It will help us expedite review of your Pull Request if you tag someone
(e.g. `@username`) to look at it.
Post-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
--- | --- | --- | --- | --- | --- | --- | ---
Go | [](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
| --- | --- | --- | --- | --- | ---
Java | [](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)
Python | [](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)
| --- | [](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
</br> [](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
| --- | --- | --- | ---
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 189030)
Time Spent: 10m
Remaining Estimate: 0h
> GroupByKey on Spark: All values for a single key need to fit in-memory at once
> ------------------------------------------------------------------------------
>
> Key: BEAM-5392
> URL: https://issues.apache.org/jira/browse/BEAM-5392
> Project: Beam
> Issue Type: Bug
> Components: runner-spark
> Affects Versions: 2.6.0
> Reporter: David Moravek
> Assignee: David Moravek
> Priority: Major
> Labels: performance
> Time Spent: 10m
> Remaining Estimate: 0h
>
> Currently, when using GroupByKey, all values for a single key need to fit
> in-memory at once.
>
> There are following issues, that need to be addressed:
> a) We can not use Spark's _groupByKey_, because it requires all values to fit
> in memory for a single key (it is implemented as "list combiner")
> b) _ReduceFnRunner_ iterates over values multiple times in order to group
> also by window
>
> Solution:
>
> In Dataflow Worker code, there are optimized versions of ReduceFnRunner, that
> can take advantage of having elements for a single key sorted by timestamp.
>
> We can use Spark's `{{repartitionAndSortWithinPartitions}}` in order to meet
> this constraint.
>
> For non-merging windows, we can put window itself into the key resulting in
> smaller groupings.
>
> This approach was already tested in ~100TB input scale on Spark 2.3.x.
> (custom Spark runner).
>
> I'll submit a patch once the Dataflow Worker code donation is complete.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)