cisaacstern commented on PR #27618:
URL: https://github.com/apache/beam/pull/27618#issuecomment-1778113411

   Getting closer. To confirm, the following change (which I haven't yet 
pushed) is definitely required to get computable graphs:
   
   ```diff
   --- a/sdks/python/apache_beam/runners/dask/dask_runner.py
   +++ b/sdks/python/apache_beam/runners/dask/dask_runner.py
   @@ -183,6 +183,6 @@ class DaskRunner(BundleBasedDirectRunner):
    
        dask_visitor = self.to_dask_bag_visitor()
        pipeline.visit(dask_visitor)
   -    opt_graph = dask.optimize(dask_visitor.bags.values())
   +    opt_graph = dask.optimize(*list(dask_visitor.bags.values()))
        futures = client.compute(opt_graph)
        return DaskRunnerResult(client, futures)
   ```
   
   With this change, very basic pipelines do run end-to-end without error, but 
any pipeline that involves a pipeline stage that includes a _**deeply-nested 
function or object definition**_ raises some form of:
   
   ```python-traceback
     File ".../distributed/distributed/client.py", line 3159, in 
_graph_to_futures
       header, frames = serialize(ToPickle(dsk), on_error="raise")
     File ".../distributed/distributed/protocol/serialize.py", line 378, in 
serialize
       raise TypeError(msg, str_x) from exc
   TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: 
HighLevelGraph with 2 layers.\n<dask.highlevelgraph.HighLevelGraph object at 
0x159b6d990>\n 0. 5800487232\n 1. bf0ad4e590a57329d7d581b1a8a01c79\n>')
   ```
   
   This is with the latest `distributed==2023.10.0`. Deeply nested 
function/object definitions are common throughout the Python SDK, and 
significantly, appear in the definition of `assert_that`, which appears to be 
why `assert_that` is failing.
   
   According to the dask docs 
[here](https://distributed.dask.org/en/stable/serialization.html#defaults):
   
   > Computational tasks like f(x) that are defined and serialized on client 
processes and deserialized and run on worker processes. These are serialized 
using a fixed scheme decided on by those libraries. Today this is a combination 
of pickle and cloudpickle.
   
   So it seems like if we can get this serialization onto the cloudpickle 
serialization code path, it might work? (Since IIUC cloudpickle handles deeply 
nested objects better than pickle.) AFAICT, I am not hitting the cloudpickle 
code path here, but some further digging is necessary:
   
   - Can I make an MRE which demonstrates this pickling issue using vanilla 
Dask (no Beam)?
   - If is, it is resolved with cloudpickle?
   - And finally, is the current attempt to serialize the HighLevelGraph indeed 
missing the cloudpickle path, and if so how can we can we get it onto that path?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to