cisaacstern opened a new issue, #29365:
URL: https://github.com/apache/beam/issues/29365

   ### What happened?
   
   As released in `2.51.0`, the DaskRunner's GBK implementation only works 
under certain conditions (e.g., if the key type is an integer). It does not 
work for many commonly-used patterns, including:
   - The use of string keys; e.g., the official [seasonal produce 
example](https://beam.apache.org/documentation/transforms/python/aggregation/groupbykey/).
 (A minimal reproducer for this failure mode is provided in 
https://github.com/apache/beam/pull/27618#issuecomment-1791940297.)
   - This code path in `assert_that`:
   
https://github.com/apache/beam/blob/07d5d534aa8e0f0066e5182a3f17fd9b8a29287c/sdks/python/apache_beam/testing/util.py#L298
 
https://github.com/apache/beam/blob/07d5d534aa8e0f0066e5182a3f17fd9b8a29287c/sdks/python/apache_beam/transforms/util.py#L258
   
   While both of these are problematic, the latter is especially so because, 
given the ubiquity of `asset_that` in the tests, it more-or-less means we can't 
test (or, therefore, develop) the DaskRunner at all until this issue is fixed.
   
   Over in https://github.com/cisaacstern/beam-daskrunner-gbk-bugs/pull/1, I 
have developed a relatively comprehensive demonstration of both of these 
failure modes. Based on experiments there, it looks like the root cause of this 
issue relates to the partitioning of the underlying Dask Bag collection. As 
evidence of this, when the following implementation of `Create` is mocked into 
`2.51.0` during the test session, both of these GBK-related bugs are resolved 
(coped from 
[here](https://github.com/cisaacstern/beam-daskrunner-gbk-bugs/pull/1/files#diff-d592c74a98ca45c4162bc084f81d9032e19730168fc05e7f42b7824bb3061a3aR40-R49)):
   
   ```python
   
   from apache_beam.runners.dask.transform_evaluator import Create as 
DaskCreate, OpInput
   
   class UnpartitionedCreate(DaskCreate):
        """The DaskRunner GBK bug(s) demonstrated by this test module are 
(somehow)
        related to the partitioning of the dask bag collection. The following
        object is mocked into the DaskRunner in certain test cases below to
        demonstrate that if the dask bag collection is unpartitioned (i.e., 
consists
        of only a single partition), then the GBK bug(s) are resolved.
        """
        def apply(self, input_bag: OpInput) -> db.Bag:
            partitioned = super().apply(input_bag)
            return partitioned.repartition(npartitions=1)
   ```
   
   I am nearly certain that this issue is not with `dask.bag.Bag.groupby`, 
which AFAICT works fine, e.g., with partitioned collections containing string 
keys. Rather, this appears to have something to do with how the DaskRunner is 
wrapping partioned collections.
   
   (Note: I have tried to develop an MRE for the `assert_that` failure mode 
that does not involve `assert_that`, but thus fair come up empty-handed. 
Initially, I thought that it might have to do with using `None` as a key, which 
happens there, but the linked test cases show that, at least in a minimal 
example, the `DaskRunner`'s GBK _can_ handle `None` keys.)
   
   I am happy to take this issue, as it is currently blocking me from finishing 
#27618. I may need to enlist the support of some more Dask-savvy folks in 
getting to the bottom of this, perhaps @jacobtomlinson will be keen!
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [X] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to