chuckwondo opened a new issue, #28558:
URL: https://github.com/apache/beam/issues/28558
### What happened?
When constructing a `Pipeline`, the option `pickle_library` is checked
during construction, raising a `ValueError` when the value supplied is not one
of the allowed values ("default", "dill", or "cloudpickle").
Unfortunately, however, it is ignored when it comes time to _use_ pickling.
I was able to produce a [`PicklingError` via a pipeline that uses a
`MultiZarrToZarr`
preprocessor](https://github.com/pangeo-forge/pangeo-forge-recipes/issues/616),
like so (abridged from linked issue, but illustrative):
```python
from kerchunk.combine import drop
...
with beam.Pipeline() as pipeline:
_ = pipeline | (
beam.Create(pattern.items())
| OpenWithKerchunk(file_type=pattern.file_type)
| CombineReferences(
concat_dims=["time"],
identical_dims=["lat", "lon", "channel"],
mzz_kwargs={"preprocess": drop("lst_unc_sys")},
)
| WriteCombinedReference(target_root, store_name)
)
```
The error produced was the following:
```plain
File "apache_beam/coders/coder_impl.py", line 270, in
apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream
File ".../python3.11/site-packages/apache_beam/coders/coders.py", line
869, in <lambda>
lambda x: dumps(x, protocol), pickle.loads)
^^^^^^^^^^^^^^^^^^
AttributeError: Can't pickle local object 'drop.<locals>.preproc'
```
It took quite a bit of digging, but I discovered that I should be able to
address this issue by setting the following pipeline options:
- `save_main_session=True`
- `pickle_library="cloudpickle"`
Therefore I tried this:
```python
from apache_beam.options.pipeline_options import PipelineOptions
from kerchunk.combine import drop
...
options = PipelineOptions(pickle_library="cloudpickle",
save_main_session=True)
with beam.Pipeline(options=options) as pipeline:
_ = pipeline | (
beam.Create(pattern.items())
| OpenWithKerchunk(file_type=pattern.file_type)
| CombineReferences(
concat_dims=["time"],
identical_dims=["lat", "lon", "channel"],
mzz_kwargs={"preprocess": drop("lst_unc_sys")},
)
| WriteCombinedReference(str(target_root), store_name)
)
```
Unfortunately, this produced the _identical_ error.
After a bit of digging, I discovered that the problem is with the
[`PickleCoder` class's `_create_impl`
method](https://github.com/apache/beam/blob/v2.50.0/sdks/python/apache_beam/coders/coders.py#L863-L869)
(v2.50.0):
```python
class PickleCoder(_PickleCoderBase):
"""Coder using Python's pickle functionality."""
def _create_impl(self):
dumps = pickle.dumps
protocol = pickle.HIGHEST_PROTOCOL
return coder_impl.CallbackCoderImpl(
lambda x: dumps(x, protocol), pickle.loads)
...
```
Specifically, the method refers directly to `pickle.dumps` and
`pickle.loads` from the standard `pickle` module (i.e., `import pickle` appears
at the top of `coders.py`), rather than from the repo's `pickler` module, which
is the module where the pickle library is set via the specified pipeline
options described above.
When I added the import of `pickler` and locally modified the `_create_impl`
method as follows, my pipeline ran without error:
```python
from apache_beam.internal import pickler
...
class PickleCoder(_PickleCoderBase):
"""Coder using Python's pickle functionality."""
def _create_impl(self):
return coder_impl.CallbackCoderImpl(pickler.dumps, pickler.loads)
...
```
### Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
### Issue Components
- [X] Component: Python SDK
- [ ] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]