cisaacstern commented on PR #27618:
URL: https://github.com/apache/beam/pull/27618#issuecomment-1778113411
Getting closer. To confirm, the following change (which I haven't yet
pushed) is definitely required to get computable graphs:
```diff
--- a/sdks/python/apache_beam/runners/dask/dask_runner.py
+++ b/sdks/python/apache_beam/runners/dask/dask_runner.py
@@ -183,6 +183,6 @@ class DaskRunner(BundleBasedDirectRunner):
dask_visitor = self.to_dask_bag_visitor()
pipeline.visit(dask_visitor)
- opt_graph = dask.optimize(dask_visitor.bags.values())
+ opt_graph = dask.optimize(*list(dask_visitor.bags.values()))
futures = client.compute(opt_graph)
return DaskRunnerResult(client, futures)
```
With this change, very basic pipelines do run end-to-end without error, but
any pipeline that involves a pipeline stage that includes a _**deeply-nested
function or object definition**_ raises some form of:
```python-traceback
File ".../distributed/distributed/client.py", line 3159, in
_graph_to_futures
header, frames = serialize(ToPickle(dsk), on_error="raise")
File ".../distributed/distributed/protocol/serialize.py", line 378, in
serialize
raise TypeError(msg, str_x) from exc
TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle:
HighLevelGraph with 2 layers.\n<dask.highlevelgraph.HighLevelGraph object at
0x159b6d990>\n 0. 5800487232\n 1. bf0ad4e590a57329d7d581b1a8a01c79\n>')
```
This is with the latest `distributed==2023.10.0`. Deeply nested
function/object definitions are common throughout the Python SDK, and
significantly, appear in the definition of `assert_that`, which appears to be
why `assert_that` is failing.
According to the dask docs
[here](https://distributed.dask.org/en/stable/serialization.html#defaults):
> Computational tasks like f(x) that are defined and serialized on client
processes and deserialized and run on worker processes. These are serialized
using a fixed scheme decided on by those libraries. Today this is a combination
of pickle and cloudpickle.
So it seems like if we can get this serialization onto the cloudpickle
serialization code path, it might work? (Since IIUC cloudpickle handles deeply
nested objects better than pickle.) AFAICT, I am not hitting the cloudpickle
code path here, but some further digging is necessary:
- Can I make an MRE which demonstrates this pickling issue using vanilla
Dask (no Beam)?
- If is, it is resolved with cloudpickle?
- And finally, is the current attempt to serialize the HighLevelGraph indeed
missing the cloudpickle path, and if so how can we can we get it onto that path?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]