cisaacstern commented on code in PR #27618:
URL: https://github.com/apache/beam/pull/27618#discussion_r1357577926
##########
sdks/python/apache_beam/runners/dask/dask_runner.py:
##########
@@ -177,6 +183,6 @@ def run_pipeline(self, pipeline, options):
dask_visitor = self.to_dask_bag_visitor()
pipeline.visit(dask_visitor)
-
- futures = client.compute(list(dask_visitor.bags.values()))
+ opt_graph = dask.optimize(list(dask_visitor.bags.values()))
Review Comment:
In
https://github.com/apache/beam/pull/27618/commits/305699bbbdde9b555ecd88c4bedc0b312a268a53,
I've cast `dask_visitor.bags.values()` to `list` (as is done in master),
because I found that the subsequent call to `client.compute` would silently
fail otherwise.
The **Details** below contain the closest thing I have to an MRE of why
casting to `list()` is necessary here. I still don't understand is why the
tests in `dask_runner_test.py` still appear to pass without casting to `list()`
🤔 .
<details>
```pycon
In [1]: import dask.bag; import distributed
In [2]: b = dask.bag.from_sequence([1, 2, 3]).map(lambda x: x*2).map(print)
In [3]: opt_b = dask.optimize(b)
In [4]: with distributed.Client() as c:
...: futures = c.compute(opt_b)
...: distributed.wait(futures)
...:
2
4
6
In [5]: bdict = {"bag": b}
In [6]: opt_bdict_values = dask.optimize(bdict.values())
In [7]: with distributed.Client() as c:
...: futures = c.compute(opt_bdict_values)
...: distributed.wait(futures)
...:
In [8]: opt_bdict_values_aslist = dask.optimize(list(bdict.values()))
In [9]: with distributed.Client() as c:
...: futures = c.compute(opt_bdict_values_aslist)
...: distributed.wait(futures)
...:
2
4
6
```
</details>
##########
sdks/python/apache_beam/runners/dask/dask_runner.py:
##########
@@ -177,6 +183,6 @@ def run_pipeline(self, pipeline, options):
dask_visitor = self.to_dask_bag_visitor()
pipeline.visit(dask_visitor)
-
- futures = client.compute(list(dask_visitor.bags.values()))
+ opt_graph = dask.optimize(list(dask_visitor.bags.values()))
Review Comment:
In
https://github.com/apache/beam/pull/27618/commits/305699bbbdde9b555ecd88c4bedc0b312a268a53,
I've cast `dask_visitor.bags.values()` to `list` (as is done in master),
because I found that the subsequent call to `client.compute` would silently
fail otherwise.
The **Details** below contain the closest thing I have to an MRE of why
casting to `list()` is necessary here. I still don't understand is why the
tests in `dask_runner_test.py` still appear to pass without casting to `list()`
🤔 .
<details>
```pycon
In [1]: import dask.bag; import distributed
In [2]: b = dask.bag.from_sequence([1, 2, 3]).map(lambda x: x*2).map(print)
In [3]: opt_b = dask.optimize(b)
In [4]: with distributed.Client() as c:
...: futures = c.compute(opt_b)
...: distributed.wait(futures)
...:
2
4
6
In [5]: bdict = {"bag": b}
In [6]: opt_bdict_values = dask.optimize(bdict.values())
In [7]: with distributed.Client() as c:
...: futures = c.compute(opt_bdict_values)
...: distributed.wait(futures)
...:
In [8]: opt_bdict_values_aslist = dask.optimize(list(bdict.values()))
In [9]: with distributed.Client() as c:
...: futures = c.compute(opt_bdict_values_aslist)
...: distributed.wait(futures)
...:
2
4
6
```
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]