Repository: arrow Updated Branches: refs/heads/master 166f0a871 -> ee78cdcb1
ARROW-1503: [Python] Add default serialization context, callbacks for pandas.Series/DataFrame The performance is a bit slower than it could be because we do not have native handling of pyarrow.Buffer (per ARROW-1522). That would allow us to skip the `to_pybytes` copy portion Author: Wes McKinney <[email protected]> Closes #1192 from wesm/ARROW-1503 and squashes the following commits: a36f97d [Wes McKinney] Add default serialization context and add serialization callbacks for pandas Series, DataFrame 5ff10f4 [Wes McKinney] stubs for handling Series, DataFrame more efficiently by default in serialization code paths Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/ee78cdcb Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/ee78cdcb Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/ee78cdcb Branch: refs/heads/master Commit: ee78cdcb1c475a05df9cd9de63358e80ba280a63 Parents: 166f0a8 Author: Wes McKinney <[email protected]> Authored: Tue Oct 10 12:21:55 2017 -0700 Committer: Philipp Moritz <[email protected]> Committed: Tue Oct 10 12:21:55 2017 -0700 ---------------------------------------------------------------------- python/pyarrow/ipc.py | 47 +++++++++++++++++++++++++++++++++++ python/pyarrow/serialization.pxi | 17 +++++++++++-- python/pyarrow/tests/test_ipc.py | 17 ++++++++++++- 3 files changed, 78 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/ee78cdcb/python/pyarrow/ipc.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py index f264f08..1223673 100644 --- a/python/pyarrow/ipc.py +++ b/python/pyarrow/ipc.py @@ -187,3 +187,50 @@ def deserialize_pandas(buf, nthreads=None): reader = pa.RecordBatchStreamReader(buffer_reader) table = reader.read_all() return table.to_pandas(nthreads=nthreads) + + +# ---------------------------------------------------------------------- +# Set up default serialization context + +def _serialize_pandas_series(s): + import pandas as pd + # TODO: serializing Series without extra copy + serialized = serialize_pandas(pd.DataFrame({s.name: s})) + return { + 'type': 'Series', + 'data': serialized.to_pybytes() + } + + +def _serialize_pandas_dataframe(df): + return { + 'type': 'DataFrame', + 'data': serialize_pandas(df).to_pybytes() + } + + +def _deserialize_callback_pandas(data): + deserialized = deserialize_pandas(data['data']) + type_ = data['type'] + if type_ == 'Series': + return deserialized[deserialized.columns[0]] + elif type_ == 'DataFrame': + return deserialized + else: + raise ValueError(type_) + + +try: + import pandas as pd + lib._default_serialization_context.register_type( + pd.Series, 'pandas.Series', + custom_serializer=_serialize_pandas_series, + custom_deserializer=_deserialize_callback_pandas) + + lib._default_serialization_context.register_type( + pd.DataFrame, 'pandas.DataFrame', + custom_serializer=_serialize_pandas_dataframe, + custom_deserializer=_deserialize_callback_pandas) +except ImportError: + # no pandas + pass http://git-wip-us.apache.org/repos/asf/arrow/blob/ee78cdcb/python/pyarrow/serialization.pxi ---------------------------------------------------------------------- diff --git a/python/pyarrow/serialization.pxi b/python/pyarrow/serialization.pxi index aa1a6a4..4e9ab8e 100644 --- a/python/pyarrow/serialization.pxi +++ b/python/pyarrow/serialization.pxi @@ -137,6 +137,10 @@ cdef class SerializationContext: obj.__dict__.update(serialized_obj) return obj + +_default_serialization_context = SerializationContext() + + cdef class SerializedPyObject: """ Arrow-serialized representation of Python object @@ -174,6 +178,9 @@ cdef class SerializedPyObject: """ cdef PyObject* result + if context is None: + context = _default_serialization_context + with nogil: check_status(DeserializeObject(context, self.data, <PyObject*> self.base, &result)) @@ -202,7 +209,8 @@ def serialize(object value, SerializationContext context=None): value: object Python object for the sequence that is to be serialized. context : SerializationContext - Custom serialization and deserialization context + Custom serialization and deserialization context, uses a default + context with some standard type handlers if not specified Returns ------- @@ -210,6 +218,10 @@ def serialize(object value, SerializationContext context=None): """ cdef SerializedPyObject serialized = SerializedPyObject() wrapped_value = [value] + + if context is None: + context = _default_serialization_context + with nogil: check_status(SerializeObject(context, wrapped_value, &serialized.data)) return serialized @@ -225,7 +237,8 @@ def serialize_to(object value, sink, SerializationContext context=None): sink: NativeFile or file-like File the sequence will be written to. context : SerializationContext - Custom serialization and deserialization context + Custom serialization and deserialization context, uses a default + context with some standard type handlers if not specified """ serialized = serialize(value, context) serialized.write_to(sink) http://git-wip-us.apache.org/repos/asf/arrow/blob/ee78cdcb/python/pyarrow/tests/test_ipc.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py index fcde582..68c0c80 100644 --- a/python/pyarrow/tests/test_ipc.py +++ b/python/pyarrow/tests/test_ipc.py @@ -22,7 +22,8 @@ import threading import numpy as np -from pandas.util.testing import assert_frame_equal +from pandas.util.testing import (assert_frame_equal, + assert_series_equal) import pandas as pd from pyarrow.compat import unittest @@ -429,6 +430,20 @@ def test_serialize_pandas_no_preserve_index(): assert_frame_equal(result, df) +def test_serialize_with_pandas_objects(): + df = pd.DataFrame({'a': [1, 2, 3]}, index=[1, 2, 3]) + + data = { + 'a_series': df['a'], + 'a_frame': df + } + + serialized = pa.serialize(data).to_buffer() + deserialized = pa.deserialize(serialized) + assert_frame_equal(deserialized['a_frame'], df) + assert_series_equal(deserialized['a_series'], df['a']) + + def test_schema_batch_serialize_methods(): nrows = 5 df = pd.DataFrame({
