[ 
https://issues.apache.org/jira/browse/ARROW-5144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16814535#comment-16814535
 ] 

Matthew Rocklin commented on ARROW-5144:
----------------------------------------

Hi everyone, 

This is pretty critical for Dask usage.  Anyone trying to use PyArrow 0.13 in a 
Dask workflow will break pretty hard here.  This isn't something that we can 
work around easily on our side.  It would be great to know if this is likely to 
be resolved quickly, or if we should warn users strongly away from 0.13.

In general, I recommend serialization tests for any project looking to interact 
with distributed computing libraries in Python.  Often this consists of tests 
like the following for any type that you think a parallel computing framework 
might want to interact with.

{code}
def test_serialization():
    obj = MyObj()
    obj2 = pickle.loads(pickle.dumps(obj))

    assert obj == obj2
{code}

> ParquetDataset and CloudParuqtePiece not serializable
> -----------------------------------------------------
>
>                 Key: ARROW-5144
>                 URL: https://issues.apache.org/jira/browse/ARROW-5144
>             Project: Apache Arrow
>          Issue Type: Bug
>    Affects Versions: 0.13.0
>         Environment: osx python36/conda cloudpickle 0.8.1
> arrow-cpp                 0.13.0           py36ha71616b_0    conda-forge
> pyarrow                   0.13.0           py36hb37e6aa_0    conda-forge
>            Reporter: Martin Durant
>            Priority: Major
>
> Since 0.13.0, parquet instances are no longer serialisable, which means that 
> dask.distributed cannot pass them between processes in order to load parquet 
> in parallel.
> Example:
> ```
> >>> import cloudpickle
> >>> import pyarrow.parquet as pq
> >>> pf = pq.ParquetDataset('nation.impala.parquet')
> >>> cloudpickle.dumps(pf)
> ~/anaconda/envs/py36/lib/python3.6/site-packages/cloudpickle/cloudpickle.py 
> in dumps(obj, protocol)
>     893     try:
>     894         cp = CloudPickler(file, protocol=protocol)
> --> 895         cp.dump(obj)
>     896         return file.getvalue()
>     897     finally:
> ~/anaconda/envs/py36/lib/python3.6/site-packages/cloudpickle/cloudpickle.py 
> in dump(self, obj)
>     266         self.inject_addons()
>     267         try:
> --> 268             return Pickler.dump(self, obj)
>     269         except RuntimeError as e:
>     270             if 'recursion' in e.args[0]:
> ~/anaconda/envs/py36/lib/python3.6/pickle.py in dump(self, obj)
>     407         if self.proto >= 4:
>     408             self.framer.start_framing()
> --> 409         self.save(obj)
>     410         self.write(STOP)
>     411         self.framer.end_framing()
> ~/anaconda/envs/py36/lib/python3.6/pickle.py in save(self, obj, 
> save_persistent_id)
>     519
>     520         # Save the reduce() output and finally memoize the object
> --> 521         self.save_reduce(obj=obj, *rv)
>     522
>     523     def persistent_id(self, obj):
> ~/anaconda/envs/py36/lib/python3.6/pickle.py in save_reduce(self, func, args, 
> state, listitems, dictitems, obj)
>     632
>     633         if state is not None:
> --> 634             save(state)
>     635             write(BUILD)
>     636
> ~/anaconda/envs/py36/lib/python3.6/pickle.py in save(self, obj, 
> save_persistent_id)
>     474         f = self.dispatch.get(t)
>     475         if f is not None:
> --> 476             f(self, obj) # Call unbound method with explicit self
>     477             return
>     478
> ~/anaconda/envs/py36/lib/python3.6/pickle.py in save_dict(self, obj)
>     819
>     820         self.memoize(obj)
> --> 821         self._batch_setitems(obj.items())
>     822
>     823     dispatch[dict] = save_dict
> ~/anaconda/envs/py36/lib/python3.6/pickle.py in _batch_setitems(self, items)
>     845                 for k, v in tmp:
>     846                     save(k)
> --> 847                     save(v)
>     848                 write(SETITEMS)
>     849             elif n:
> ~/anaconda/envs/py36/lib/python3.6/pickle.py in save(self, obj, 
> save_persistent_id)
>     494             reduce = getattr(obj, "__reduce_ex__", None)
>     495             if reduce is not None:
> --> 496                 rv = reduce(self.proto)
>     497             else:
>     498                 reduce = getattr(obj, "__reduce__", None)
> ~/anaconda/envs/py36/lib/python3.6/site-packages/pyarrow/_parquet.cpython-36m-darwin.so
>  in pyarrow._parquet.ParquetSchema.__reduce_cython__()
> TypeError: no default __reduce__ due to non-trivial __cinit__
> ```
> The indicated schema instance is also referenced by the ParquetDatasetPiece s.
> ref: https://github.com/dask/distributed/issues/2597



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to