chuckwondo opened a new issue, #28558:
URL: https://github.com/apache/beam/issues/28558

   ### What happened?
   
   When constructing a `Pipeline`, the option `pickle_library` is checked 
during construction, raising a `ValueError` when the value supplied is not one 
of the allowed values ("default", "dill", or "cloudpickle").
   
   Unfortunately, however, it is ignored when it comes time to _use_ pickling.
   
   I was able to produce a [`PicklingError` via a pipeline that uses a 
`MultiZarrToZarr` 
preprocessor](https://github.com/pangeo-forge/pangeo-forge-recipes/issues/616), 
like so (abridged from linked issue, but illustrative):
   
   ```python
   from kerchunk.combine import drop
   
   ...
   
   with beam.Pipeline() as pipeline:
       _ = pipeline | (
           beam.Create(pattern.items())
           | OpenWithKerchunk(file_type=pattern.file_type)
           | CombineReferences(
               concat_dims=["time"],
               identical_dims=["lat", "lon", "channel"],
               mzz_kwargs={"preprocess": drop("lst_unc_sys")},
           )
           | WriteCombinedReference(target_root, store_name)
       )
   ```
   
   The error produced was the following:
   
   ```plain
     File "apache_beam/coders/coder_impl.py", line 270, in 
apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream
     File ".../python3.11/site-packages/apache_beam/coders/coders.py", line 
869, in <lambda>
       lambda x: dumps(x, protocol), pickle.loads)
                 ^^^^^^^^^^^^^^^^^^
   AttributeError: Can't pickle local object 'drop.<locals>.preproc'
   ```
   
   It took quite a bit of digging, but I discovered that I should be able to 
address this issue by setting the following pipeline options:
   
   - `save_main_session=True`
   - `pickle_library="cloudpickle"`
   
   Therefore I tried this:
   
   ```python
   from apache_beam.options.pipeline_options import PipelineOptions
   from kerchunk.combine import drop
   
   ...
   
   options = PipelineOptions(pickle_library="cloudpickle", 
save_main_session=True)
   
   with beam.Pipeline(options=options) as pipeline:
       _ = pipeline | (
           beam.Create(pattern.items())
           | OpenWithKerchunk(file_type=pattern.file_type)
           | CombineReferences(
               concat_dims=["time"],
               identical_dims=["lat", "lon", "channel"],
               mzz_kwargs={"preprocess": drop("lst_unc_sys")},
           )
           | WriteCombinedReference(str(target_root), store_name)
       )
   ```
   
   Unfortunately, this produced the _identical_ error.
   
   After a bit of digging, I discovered that the problem is with the 
[`PickleCoder` class's `_create_impl` 
method](https://github.com/apache/beam/blob/v2.50.0/sdks/python/apache_beam/coders/coders.py#L863-L869)
 (v2.50.0):
   
   ```python
   class PickleCoder(_PickleCoderBase):
     """Coder using Python's pickle functionality."""
     def _create_impl(self):
       dumps = pickle.dumps
       protocol = pickle.HIGHEST_PROTOCOL
       return coder_impl.CallbackCoderImpl(
           lambda x: dumps(x, protocol), pickle.loads)
   
     ...
   ```
   
   Specifically, the method refers directly to `pickle.dumps` and 
`pickle.loads` from the standard `pickle` module (i.e., `import pickle` appears 
at the top of `coders.py`), rather than from the repo's `pickler` module, which 
is the module where the pickle library is set via the specified pipeline 
options described above.
   
   When I added the import of `pickler` and locally modified the `_create_impl` 
method as follows, my pipeline ran without error:
   
   ```python
   from apache_beam.internal import pickler
   
   ...
   
   class PickleCoder(_PickleCoderBase):
     """Coder using Python's pickle functionality."""
     def _create_impl(self):
       return coder_impl.CallbackCoderImpl(pickler.dumps, pickler.loads)
   
     ...
   ```
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [X] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to