cisaacstern opened a new issue, #29365: URL: https://github.com/apache/beam/issues/29365
### What happened? As released in `2.51.0`, the DaskRunner's GBK implementation only works under certain conditions (e.g., if the key type is an integer). It does not work for many commonly-used patterns, including: - The use of string keys; e.g., the official [seasonal produce example](https://beam.apache.org/documentation/transforms/python/aggregation/groupbykey/). (A minimal reproducer for this failure mode is provided in https://github.com/apache/beam/pull/27618#issuecomment-1791940297.) - This code path in `assert_that`: https://github.com/apache/beam/blob/07d5d534aa8e0f0066e5182a3f17fd9b8a29287c/sdks/python/apache_beam/testing/util.py#L298 https://github.com/apache/beam/blob/07d5d534aa8e0f0066e5182a3f17fd9b8a29287c/sdks/python/apache_beam/transforms/util.py#L258 While both of these are problematic, the latter is especially so because, given the ubiquity of `asset_that` in the tests, it more-or-less means we can't test (or, therefore, develop) the DaskRunner at all until this issue is fixed. Over in https://github.com/cisaacstern/beam-daskrunner-gbk-bugs/pull/1, I have developed a relatively comprehensive demonstration of both of these failure modes. Based on experiments there, it looks like the root cause of this issue relates to the partitioning of the underlying Dask Bag collection. As evidence of this, when the following implementation of `Create` is mocked into `2.51.0` during the test session, both of these GBK-related bugs are resolved (coped from [here](https://github.com/cisaacstern/beam-daskrunner-gbk-bugs/pull/1/files#diff-d592c74a98ca45c4162bc084f81d9032e19730168fc05e7f42b7824bb3061a3aR40-R49)): ```python from apache_beam.runners.dask.transform_evaluator import Create as DaskCreate, OpInput class UnpartitionedCreate(DaskCreate): """The DaskRunner GBK bug(s) demonstrated by this test module are (somehow) related to the partitioning of the dask bag collection. The following object is mocked into the DaskRunner in certain test cases below to demonstrate that if the dask bag collection is unpartitioned (i.e., consists of only a single partition), then the GBK bug(s) are resolved. """ def apply(self, input_bag: OpInput) -> db.Bag: partitioned = super().apply(input_bag) return partitioned.repartition(npartitions=1) ``` I am nearly certain that this issue is not with `dask.bag.Bag.groupby`, which AFAICT works fine, e.g., with partitioned collections containing string keys. Rather, this appears to have something to do with how the DaskRunner is wrapping partioned collections. (Note: I have tried to develop an MRE for the `assert_that` failure mode that does not involve `assert_that`, but thus fair come up empty-handed. Initially, I thought that it might have to do with using `None` as a key, which happens there, but the linked test cases show that, at least in a minimal example, the `DaskRunner`'s GBK _can_ handle `None` keys.) I am happy to take this issue, as it is currently blocking me from finishing #27618. I may need to enlist the support of some more Dask-savvy folks in getting to the bottom of this, perhaps @jacobtomlinson will be keen! ### Issue Priority Priority: 2 (default / most bugs should be filed as P2) ### Issue Components - [X] Component: Python SDK - [ ] Component: Java SDK - [ ] Component: Go SDK - [ ] Component: Typescript SDK - [ ] Component: IO connector - [ ] Component: Beam YAML - [ ] Component: Beam examples - [ ] Component: Beam playground - [ ] Component: Beam katas - [ ] Component: Website - [ ] Component: Spark Runner - [ ] Component: Flink Runner - [ ] Component: Samza Runner - [ ] Component: Twister2 Runner - [ ] Component: Hazelcast Jet Runner - [ ] Component: Google Cloud Dataflow Runner -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
