jto opened a new pull request, #32440: URL: https://github.com/apache/beam/pull/32440
## Context Flink will drop support for the dataset API in 2.0 which should be released by EOY so it quite important for Beam to support Datastream well. ## The PR This PR improves the performances of Batch jobs executed with `--useDatastreamForBatch` by porting the following performance optimizations already present in `FlinkBatchTransformTranslators` but lacking in `FlinkStreamingTransformTranslators`. - Limit the max size of source splits. Similar to https://github.com/apache/beam/pull/28045 - Pre-combine before shuffle (both reduce by key and GBK) - Disable bundling in batch mode (except for pre-combine). Lower the default bundle size since the new behavior puts pressure on the heap. It also implements the following optimizations: - Use a "lazy" split enumerator to distributes split dynamically rather the eagerly. This new enumerator greatly reduces skew as each slot is able to pull new splits to consume only when it has finished its work. - Set the default `maxParallelism` to `parallelism` as the total number of splits is equal to `maxParallelism`. Again this reduces skew. - Make `ToKeyedWorkItem` part of `DoFnOperator` which reduces the size of the job graph and avoid unnecessary inter-task communication. - Force a common slot-sharing group on every bounded IOs. This emulate the behavior of the Dataset API which again improves performances especially when data is being shuffled several times while partitioning keys are unchanged (for example of the job does `GBK -> map -> CombinePerKey`). Add a flag to control this feature (defaults to active). - Other minor optimizations removing repeated serde work. ## Benchmarks The patched version was tested against a few of Spotify's production batch workflows. All settings were left unchanged except for the followings: - passed `--useDatastreamForBatch=true` - set `jobmanager.scheduler: default` (otherwise datastream default to adaptive scheduler). | | | Beam 2.56 - dataset | Beam 2.56 - datastream | | Beam 2.56 - datastream patched | | | ----- | --------: | ------------------: | ---------------------: | -----: | -----------------------------: | ------: | | job | # workers | execution time | execution time | % diff | execution time | % diff | | Job 1 | 350 | 2:19:00 | fails after 4h29min | - | 1:43:00 | -25.90% | | Job 2 | 160 | 0:23:00 | 0:35:00 | 52.17% | 0:22:36 | -1.74% | | Job 3 | 200 | 0:53:08 | 1:34:39 | 78.14% | failed | - | | Job 4 | 160 | 2:31:20 | 4:27:00 | 76.43% | 2:19:35 | -7.76% | | Job 5 | 1 | 0:43:00 | not tested | - | 0:38:00 | -11.63% | | Job 6 | 300 | 2:58:51 | not tested | - | running | | ## Note Job 3 fails with a stackoverflow exception because if [a bug in old version of Kryo](https://github.com/EsotericSoftware/kryo/issues/341). I believe this is because the job uses `taskmanager.runtime.large-record-handler: true` and it should be fixed in Flink 2.0 since Kryo is upgraded to a more recent version. ------------------------ Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier). To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md) GitHub Actions Tests Status (on master branch) ------------------------------------------------------------------------------------------------ [](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule) [](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule) [](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule) [](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule) See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI or the [workflows README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) to see a list of phrases to trigger workflows. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
