cisaacstern commented on PR #27618: URL: https://github.com/apache/beam/pull/27618#issuecomment-1791940297
Some quick observations from latest work. I'm seeing at least two issues related to GBK on the DaskRunner. The latter is definitely present on the `2.51.0` release (not just this PR); I'll need to double-check if that's true of the former as well. Eventually some of this (partially the latter issue, which does appear to be a bug in `2.51.0`) can become standalone issue(s): 1. Not all elements of a PCollection of size > 1 are correctly materialized for comparison in the `assert_that` context. This manifests as "missing elements" errors, when in fact the elements that are missing _do_ exist in the PCollection, it's just that they are not being materialized correctly for comparison. This can affect any PCollection of size > 1 (including those for which the user has not invoked GBK), because GBK is used internally in `assert_that`. It is also apparent in the more obvious case when GBK has been invoked in the user's pipeline. In either of these cases, a simple `beam.Map(print)` of the PCollection _does_ reveal all expected elements, but all elements are apparently not available to the `matcher` function in `assert_that` when the comparison is made there. In the `assert_that` case, the code path is: https://github.com/apache/beam/blob/07d5d534aa8e0f0066e5182a3f17fd9b8a29287c/sdks/python/apache_beam/testing/util.py#L298 https://github.com/apache/beam/blob/07d5d534aa8e0f0066e5182a3f17fd9b8a29287c/sdks/python/apache_beam/transforms/util.py#L258 2. In attempts to debug the above, I was using the [seasonal produce example](https://beam.apache.org/documentation/transforms/python/aggregation/groupbykey/) from the Beam docs, which lead me to discover that the DaskRunner seems to be unable to run GBK using string keys... but other keys such as ints work fine! For example: ```python # str_keys_gbk.py import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.runners.dask.dask_runner import DaskRunner from distributed import Client collections = dict( str_keys=[("a", 0), ("a", 1), ("b", 2), ("b", 3)], int_keys=[(0, 0), (0, 1), (1, 2), (1, 3)], ) if __name__ == "__main__": with Client() as c: dask_opts = PipelineOptions([f"--dask_client_address={c.scheduler.address}"]) for runner, options in zip(["DirectRunner", DaskRunner()], [None, dask_opts]): for k, v in collections.items(): print(f"Running {k} collection on {runner}") with beam.Pipeline(runner=runner) as p: p | beam.Create(v) | beam.GroupByKey() | beam.Map(print) ``` ```console ➜ python -W ignore str_keys_gbk.py Running str_keys collection on DirectRunner ('a', [0, 1]) ('b', [2, 3]) Running int_keys collection on DirectRunner (0, [0, 1]) (1, [2, 3]) Running str_keys collection on <apache_beam.runners.dask.dask_runner.DaskRunner object at 0x12dd241f0> ('a', [1]) ('b', [3]) ('b', [2]) ('a', [0]) Running int_keys collection on <apache_beam.runners.dask.dask_runner.DaskRunner object at 0x12dd241f0> (1, [2, 3]) (0, [0, 1]) ``` (The warning I'm ignoring here is just a Dask cluster teardown issue, not related.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
