cisaacstern commented on PR #27618:
URL: https://github.com/apache/beam/pull/27618#issuecomment-1791940297

   Some quick observations from latest work. I'm seeing at least two issues 
related to GBK on the DaskRunner. The latter is definitely present on the 
`2.51.0` release (not just this PR); I'll need to double-check if that's true 
of the former as well. Eventually some of this (partially the latter issue, 
which does appear to be a bug in `2.51.0`) can become standalone issue(s):
   
   1. Not all elements of a PCollection of size > 1 are correctly materialized 
for comparison in the `assert_that` context. This manifests as "missing 
elements" errors, when in fact the elements that are missing _do_ exist in the 
PCollection, it's just that they are not being materialized correctly for 
comparison. This can affect any PCollection of size > 1 (including those for 
which the user has not invoked GBK), because GBK is used internally in 
`assert_that`. It is also apparent in the more obvious case when GBK has been 
invoked in the user's pipeline. In either of these cases, a simple 
`beam.Map(print)` of the PCollection _does_ reveal all expected elements, but 
all elements are apparently not available to the `matcher` function in 
`assert_that` when the comparison is made there. In the `assert_that` case, the 
code path is:
   
https://github.com/apache/beam/blob/07d5d534aa8e0f0066e5182a3f17fd9b8a29287c/sdks/python/apache_beam/testing/util.py#L298
 
https://github.com/apache/beam/blob/07d5d534aa8e0f0066e5182a3f17fd9b8a29287c/sdks/python/apache_beam/transforms/util.py#L258
   2. In attempts to debug the above, I was using the [seasonal produce 
example](https://beam.apache.org/documentation/transforms/python/aggregation/groupbykey/)
 from the Beam docs, which lead me to discover that the DaskRunner seems to be 
unable to run GBK using string keys... but other keys such as ints work fine! 
For example:
   
       ```python
       # str_keys_gbk.py
     
       import apache_beam as beam
       from apache_beam.options.pipeline_options import PipelineOptions
       from apache_beam.runners.dask.dask_runner import DaskRunner
       from distributed import Client
       
       collections = dict(
           str_keys=[("a", 0), ("a", 1), ("b", 2), ("b", 3)],
           int_keys=[(0, 0), (0, 1), (1, 2), (1, 3)],
       )
       
       if __name__ == "__main__":
           with Client() as c:
               dask_opts = 
PipelineOptions([f"--dask_client_address={c.scheduler.address}"])
               for runner, options in zip(["DirectRunner", DaskRunner()], 
[None, dask_opts]):
                   for k, v in collections.items():
                       print(f"Running {k} collection on {runner}")
                       with beam.Pipeline(runner=runner) as p:
                           p | beam.Create(v) | beam.GroupByKey() | 
beam.Map(print)
       
       ```
       ```console
       ➜ python -W ignore str_keys_gbk.py
       Running str_keys collection on DirectRunner
       ('a', [0, 1])
       ('b', [2, 3])
       Running int_keys collection on DirectRunner
       (0, [0, 1])
       (1, [2, 3])
       Running str_keys collection on 
<apache_beam.runners.dask.dask_runner.DaskRunner object at 0x12dd241f0>
       ('a', [1])
       ('b', [3])
       ('b', [2])
       ('a', [0])
       Running int_keys collection on 
<apache_beam.runners.dask.dask_runner.DaskRunner object at 0x12dd241f0>
       (1, [2, 3])
       (0, [0, 1])
       ```
       (The warning I'm ignoring here is just a Dask cluster teardown issue, 
not related.)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to