cisaacstern commented on code in PR #27618:
URL: https://github.com/apache/beam/pull/27618#discussion_r1357577926


##########
sdks/python/apache_beam/runners/dask/dask_runner.py:
##########
@@ -177,6 +183,6 @@ def run_pipeline(self, pipeline, options):
 
     dask_visitor = self.to_dask_bag_visitor()
     pipeline.visit(dask_visitor)
-
-    futures = client.compute(list(dask_visitor.bags.values()))
+    opt_graph = dask.optimize(list(dask_visitor.bags.values()))

Review Comment:
   In 
https://github.com/apache/beam/pull/27618/commits/305699bbbdde9b555ecd88c4bedc0b312a268a53,
 I've cast `dask_visitor.bags.values()` to `list` (as is done in master), 
because I found that the subsequent call to `client.compute` would silently 
fail otherwise.
   
   The **Details** below contain the closest thing I have to an MRE of why 
casting to `list()` is necessary here. I still don't understand is why the 
tests in `dask_runner_test.py` still appear to pass without casting to `list()` 
🤔 .  
   
   <details>
   
   ```pycon
   In [1]: import dask.bag; import distributed
   
   In [2]: b = dask.bag.from_sequence([1, 2, 3]).map(lambda x: x*2).map(print)
   
   In [3]: opt_b = dask.optimize(b)
   
   In [4]: with distributed.Client() as c:
      ...:     futures = c.compute(opt_b)
      ...:     distributed.wait(futures)
      ...: 
   2
   4
   6
   
   In [5]: bdict = {"bag": b}
   
   In [6]: opt_bdict_values = dask.optimize(bdict.values())
   
   In [7]: with distributed.Client() as c:
      ...:     futures = c.compute(opt_bdict_values)
      ...:     distributed.wait(futures)
      ...: 
   
   In [8]: opt_bdict_values_aslist = dask.optimize(list(bdict.values()))
   
   In [9]: with distributed.Client() as c:
      ...:     futures = c.compute(opt_bdict_values_aslist)
      ...:     distributed.wait(futures)
      ...: 
   2
   4
   6
   ```
   </details>



##########
sdks/python/apache_beam/runners/dask/dask_runner.py:
##########
@@ -177,6 +183,6 @@ def run_pipeline(self, pipeline, options):
 
     dask_visitor = self.to_dask_bag_visitor()
     pipeline.visit(dask_visitor)
-
-    futures = client.compute(list(dask_visitor.bags.values()))
+    opt_graph = dask.optimize(list(dask_visitor.bags.values()))

Review Comment:
   In 
https://github.com/apache/beam/pull/27618/commits/305699bbbdde9b555ecd88c4bedc0b312a268a53,
 I've cast `dask_visitor.bags.values()` to `list` (as is done in master), 
because I found that the subsequent call to `client.compute` would silently 
fail otherwise.
   
   The **Details** below contain the closest thing I have to an MRE of why 
casting to `list()` is necessary here. I still don't understand is why the 
tests in `dask_runner_test.py` still appear to pass without casting to `list()` 
🤔 .  
   
   <details>
   
   ```pycon
   In [1]: import dask.bag; import distributed
   
   In [2]: b = dask.bag.from_sequence([1, 2, 3]).map(lambda x: x*2).map(print)
   
   In [3]: opt_b = dask.optimize(b)
   
   In [4]: with distributed.Client() as c:
      ...:     futures = c.compute(opt_b)
      ...:     distributed.wait(futures)
      ...: 
   2
   4
   6
   
   In [5]: bdict = {"bag": b}
   
   In [6]: opt_bdict_values = dask.optimize(bdict.values())
   
   In [7]: with distributed.Client() as c:
      ...:     futures = c.compute(opt_bdict_values)
      ...:     distributed.wait(futures)
      ...: 
   
   In [8]: opt_bdict_values_aslist = dask.optimize(list(bdict.values()))
   
   In [9]: with distributed.Client() as c:
      ...:     futures = c.compute(opt_bdict_values_aslist)
      ...:     distributed.wait(futures)
      ...: 
   2
   4
   6
   ```
   </details>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to