This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 712b9d2 ARROW-1784: [Python] Enable zero-copy serialization,
deserialization of pandas.DataFrame via components
712b9d2 is described below
commit 712b9d2c98d82b6f3a37ba8a2944bfda6f186994
Author: Wes McKinney <[email protected]>
AuthorDate: Wed Dec 6 14:10:23 2017 -0500
ARROW-1784: [Python] Enable zero-copy serialization, deserialization of
pandas.DataFrame via components
This patch adds a serialization path for pandas.DataFrame (and Series) that
decomposes the internal BlockManager into a dictionary structure that can be
serialized to the zero-copy component representation from ARROW-1783, and then
reconstructed similarly.
The impact of this is that when a DataFrame has no data that requires
pickling, the reconstruction is zero-copy. I will post some benchmarks to
illustrate the impact of this. The performance improvements are pretty
remarkable, nearly 1000x speedup on a large DataFrame.
As some follow-up work, we will need to do more efficient serialization of
the different pandas Index types. We should create a new JIRA for this
Author: Wes McKinney <[email protected]>
Closes #1390 from wesm/ARROW-1784 and squashes the following commits:
21adbe7d [Wes McKinney] Do not test with IntervalIndex in pandas < 0.21,
since manylinux1 is pinned at 0.20.1
939c02bb [Wes McKinney] Add pandas serialization test for periods, intervals
4b4c776c [Wes McKinney] Code comment, add more serialization docs for
pandas / component serialization
1ac073c3 [Wes McKinney] Complete component-based serializer for
pandas.DataFrame
6b01746d [Wes McKinney] Begin refactoring
---
python/doc/source/index.rst | 12 +-
python/doc/source/ipc.rst | 77 +++++++++
python/manylinux1/README.md | 2 +-
python/pyarrow/pandas_compat.py | 184 +++++++++++++-------
python/pyarrow/serialization.pxi | 24 +++
python/pyarrow/serialization.py | 121 +++++++++----
python/pyarrow/tests/test_convert_pandas.py | 255 ++++++++++++++++------------
7 files changed, 468 insertions(+), 207 deletions(-)
diff --git a/python/doc/source/index.rst b/python/doc/source/index.rst
index b933d23..c35f20b 100644
--- a/python/doc/source/index.rst
+++ b/python/doc/source/index.rst
@@ -18,10 +18,14 @@
Apache Arrow (Python)
=====================
-Arrow is a columnar in-memory analytics layer designed to accelerate big data.
-It houses a set of canonical in-memory representations of flat and hierarchical
-data along with multiple language-bindings for structure manipulation. It also
-provides IPC and common algorithm implementations.
+Apache Arrow is a cross-language development platform for in-memory data. It
+specifies a standardized language-independent columnar memory format for flat
+and hierarchical data, organized for efficient analytic operations on modern
+hardware. It also provides computational libraries and zero-copy streaming
+messaging and interprocess communication.
+
+The Arrow Python bindings have first-class integration with NumPy, pandas, and
+built-in Python objects.
This is the documentation of the Python API of Apache Arrow. For more details
on the format and other language bindings see
diff --git a/python/doc/source/ipc.rst b/python/doc/source/ipc.rst
index 17fe84e..6842cb5 100644
--- a/python/doc/source/ipc.rst
+++ b/python/doc/source/ipc.rst
@@ -256,6 +256,83 @@ Lastly, we use this context as an additioanl argument to
``pyarrow.serialize``:
buf = pa.serialize(val, context=context).to_buffer()
restored_val = pa.deserialize(buf, context=context)
+The ``SerializationContext`` also has convenience methods ``serialize`` and
+``deserialize``, so these are equivalent statements:
+
+.. code-block:: python
+
+ buf = context.serialize(val).to_buffer()
+ restored_val = context.deserialize(buf)
+
+Component-based Serialization
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+For serializing Python objects containing some number of NumPy arrays, Arrow
+buffers, or other data types, it may be desirable to transport their serialized
+representation without having to produce an intermediate copy using the
+``to_buffer`` method. To motivate this, support we have a list of NumPy arrays:
+
+.. ipython:: python
+
+ import numpy as np
+ data = [np.random.randn(10, 10) for i in range(5)]
+
+The call ``pa.serialize(data)`` does not copy the memory inside each of these
+NumPy arrays. This serialized representation can be then decomposed into a
+dictionary containing a sequence of ``pyarrow.Buffer`` objects containing
+metadata for each array and references to the memory inside the arrays. To do
+this, use the ``to_components`` method:
+
+.. ipython:: python
+
+ serialized = pa.serialize(data)
+ components = serialized.to_components()
+
+The particular details of the output of ``to_components`` are not too
+important. The objects in the ``'data'`` field are ``pyarrow.Buffer`` objects,
+which are zero-copy convertible to Python ``memoryview`` objects:
+
+.. ipython:: python
+
+ memoryview(components['data'][0])
+
+A memoryview can be converted back to a ``Buffer`` with ``pyarrow.frombuffer``:
+
+.. ipython:: python
+
+ mv = memoryview(components['data'][0])
+ buf = pa.frombuffer(mv)
+
+An object can be reconstructed from its component-based representation using
+``deserialize_components``:
+
+.. ipython:: python
+
+ restored_data = pa.deserialize_components(components)
+ restored_data[0]
+
+``deserialize_components`` is also available as a method on
+``SerializationContext`` objects.
+
+Serializing pandas Objects
+--------------------------
+
+We provide a serialization context that has optimized handling of pandas
+objects like ``DataFrame`` and ``Series``. This is the
+``pyarrow.pandas_serialization_context`` member. Combined with component-based
+serialization above, this enables zero-copy transport of pandas DataFrame
+objects not containing any Python objects:
+
+.. ipython:: python
+
+ import pandas as pd
+ df = pd.DataFrame({'a': [1, 2, 3, 4, 5]})
+ context = pa.pandas_serialization_context
+ serialized_df = context.serialize(df)
+ df_components = serialized_df.to_components()
+ original_df = context.deserialize_components(df_components)
+ original_df
+
Feather Format
--------------
diff --git a/python/manylinux1/README.md b/python/manylinux1/README.md
index a74f7a2..3d462ff 100644
--- a/python/manylinux1/README.md
+++ b/python/manylinux1/README.md
@@ -37,7 +37,7 @@ git clone ../../ arrow
# Build the native baseimage
docker build -t arrow-base-x86_64 -f Dockerfile-x86_64 .
# Build the python packages
-docker run --rm -t -i -v $PWD:/io arrow-base-x86_64 /io/build_arrow.sh
+docker run --shm-size=2g --rm -t -i -v $PWD:/io arrow-base-x86_64
/io/build_arrow.sh
# Now the new packages are located in the dist/ folder
ls -l dist/
```
diff --git a/python/pyarrow/pandas_compat.py b/python/pyarrow/pandas_compat.py
index a50ef96..8459ec3 100644
--- a/python/pyarrow/pandas_compat.py
+++ b/python/pyarrow/pandas_compat.py
@@ -20,6 +20,7 @@ import collections
import json
import re
+import pandas.core.internals as _int
import numpy as np
import pandas as pd
@@ -348,25 +349,85 @@ def get_datetimetz_type(values, dtype, type_):
return values, type_
+# ----------------------------------------------------------------------
+# Converting pandas.DataFrame to a dict containing only NumPy arrays or other
+# objects friendly to pyarrow.serialize
-def make_datetimetz(tz):
+
+def dataframe_to_serialized_dict(frame):
+ block_manager = frame._data
+
+ blocks = []
+ axes = [ax for ax in block_manager.axes]
+
+ for block in block_manager.blocks:
+ values = block.values
+ block_data = {}
+
+ if isinstance(block, _int.DatetimeTZBlock):
+ block_data['timezone'] = values.tz.zone
+ values = values.values
+ elif isinstance(block, _int.CategoricalBlock):
+ block_data.update(dictionary=values.categories,
+ ordered=values.ordered)
+ values = values.codes
+
+ block_data.update(
+ placement=block.mgr_locs.as_array,
+ block=values
+ )
+ blocks.append(block_data)
+
+ return {
+ 'blocks': blocks,
+ 'axes': axes
+ }
+
+
+def serialized_dict_to_dataframe(data):
+ reconstructed_blocks = [_reconstruct_block(block)
+ for block in data['blocks']]
+
+ block_mgr = _int.BlockManager(reconstructed_blocks, data['axes'])
+ return pd.DataFrame(block_mgr)
+
+
+def _reconstruct_block(item):
+ # Construct the individual blocks converting dictionary types to pandas
+ # categorical types and Timestamps-with-timezones types to the proper
+ # pandas Blocks
+
+ block_arr = item['block']
+ placement = item['placement']
+ if 'dictionary' in item:
+ cat = pd.Categorical.from_codes(block_arr,
+ categories=item['dictionary'],
+ ordered=item['ordered'])
+ block = _int.make_block(cat, placement=placement,
+ klass=_int.CategoricalBlock,
+ fastpath=True)
+ elif 'timezone' in item:
+ dtype = _make_datetimetz(item['timezone'])
+ block = _int.make_block(block_arr, placement=placement,
+ klass=_int.DatetimeTZBlock,
+ dtype=dtype, fastpath=True)
+ else:
+ block = _int.make_block(block_arr, placement=placement)
+
+ return block
+
+
+def _make_datetimetz(tz):
from pyarrow.compat import DatetimeTZDtype
return DatetimeTZDtype('ns', tz=tz)
-def backwards_compatible_index_name(raw_name, logical_name):
- pattern = r'^__index_level_\d+__$'
- if raw_name == logical_name and re.match(pattern, raw_name) is not None:
- return None
- else:
- return logical_name
+# ----------------------------------------------------------------------
+# Converting pyarrow.Table efficiently to pandas.DataFrame
def table_to_blockmanager(options, table, memory_pool, nthreads=1,
categoricals=None):
- import pandas.core.internals as _int
- import pyarrow.lib as lib
-
index_columns = []
columns = []
column_indexes = []
@@ -405,37 +466,13 @@ def table_to_blockmanager(options, table, memory_pool,
nthreads=1,
index_arrays.append(pd.Series(values, dtype=col_pandas.dtype))
index_names.append(
- backwards_compatible_index_name(raw_name, logical_name)
+ _backwards_compatible_index_name(raw_name, logical_name)
)
block_table = block_table.remove_column(
block_table.schema.get_field_index(raw_name)
)
- # Convert an arrow table to Block from the internal pandas API
- result = lib.table_to_blocks(options, block_table, nthreads, memory_pool)
-
- # Construct the individual blocks converting dictionary types to pandas
- # categorical types and Timestamps-with-timezones types to the proper
- # pandas Blocks
- blocks = []
- for item in result:
- block_arr = item['block']
- placement = item['placement']
- if 'dictionary' in item:
- cat = pd.Categorical(block_arr,
- categories=item['dictionary'],
- ordered=item['ordered'], fastpath=True)
- block = _int.make_block(cat, placement=placement,
- klass=_int.CategoricalBlock,
- fastpath=True)
- elif 'timezone' in item:
- dtype = make_datetimetz(item['timezone'])
- block = _int.make_block(block_arr, placement=placement,
- klass=_int.DatetimeTZBlock,
- dtype=dtype, fastpath=True)
- else:
- block = _int.make_block(block_arr, placement=placement)
- blocks.append(block)
+ blocks = _table_to_blocks(options, block_table, nthreads, memory_pool)
# Construct the row index
if len(index_arrays) > 1:
@@ -477,31 +514,7 @@ def table_to_blockmanager(options, table, memory_pool,
nthreads=1,
# if we're reconstructing the index
if has_pandas_metadata:
-
- # Get levels and labels, and provide sane defaults if the index has a
- # single level to avoid if/else spaghetti.
- levels = getattr(columns, 'levels', None) or [columns]
- labels = getattr(columns, 'labels', None) or [
- pd.RangeIndex(len(level)) for level in levels
- ]
-
- # Convert each level to the dtype provided in the metadata
- levels_dtypes = [
- (level, col_index.get('numpy_type', level.dtype))
- for level, col_index in zip_longest(
- levels, column_indexes, fillvalue={}
- )
- ]
- new_levels = [
- _level if _level.dtype == _dtype else _level.astype(_dtype)
- for _level, _dtype in levels_dtypes
- ]
-
- columns = pd.MultiIndex(
- levels=new_levels,
- labels=labels,
- names=columns.names
- )
+ columns = _reconstruct_columns_from_metadata(columns, column_indexes)
# ARROW-1751: flatten a single level column MultiIndex for pandas 0.21.0
columns = _flatten_single_level_multiindex(columns)
@@ -510,6 +523,55 @@ def table_to_blockmanager(options, table, memory_pool,
nthreads=1,
return _int.BlockManager(blocks, axes)
+def _backwards_compatible_index_name(raw_name, logical_name):
+ # Part of table_to_blockmanager
+ pattern = r'^__index_level_\d+__$'
+ if raw_name == logical_name and re.match(pattern, raw_name) is not None:
+ return None
+ else:
+ return logical_name
+
+
+def _reconstruct_columns_from_metadata(columns, column_indexes):
+ # Part of table_to_blockmanager
+
+ # Get levels and labels, and provide sane defaults if the index has a
+ # single level to avoid if/else spaghetti.
+ levels = getattr(columns, 'levels', None) or [columns]
+ labels = getattr(columns, 'labels', None) or [
+ pd.RangeIndex(len(level)) for level in levels
+ ]
+
+ # Convert each level to the dtype provided in the metadata
+ levels_dtypes = [
+ (level, col_index.get('numpy_type', level.dtype))
+ for level, col_index in zip_longest(
+ levels, column_indexes, fillvalue={}
+ )
+ ]
+ new_levels = [
+ _level if _level.dtype == _dtype else _level.astype(_dtype)
+ for _level, _dtype in levels_dtypes
+ ]
+
+ return pd.MultiIndex(
+ levels=new_levels,
+ labels=labels,
+ names=columns.names
+ )
+
+
+def _table_to_blocks(options, block_table, nthreads, memory_pool):
+ # Part of table_to_blockmanager
+
+ # Convert an arrow table to Block from the internal pandas API
+ result = pa.lib.table_to_blocks(options, block_table, nthreads,
+ memory_pool)
+
+ # Defined above
+ return [_reconstruct_block(item) for item in result]
+
+
def _flatten_single_level_multiindex(index):
if isinstance(index, pd.MultiIndex) and index.nlevels == 1:
levels, = index.levels
diff --git a/python/pyarrow/serialization.pxi b/python/pyarrow/serialization.pxi
index faf164b..cbc5e3b 100644
--- a/python/pyarrow/serialization.pxi
+++ b/python/pyarrow/serialization.pxi
@@ -152,6 +152,30 @@ cdef class SerializationContext:
obj.__dict__.update(serialized_obj)
return obj
+ def serialize(self, obj):
+ """
+ Call pyarrow.serialize and pass this SerializationContext
+ """
+ return serialize(obj, context=self)
+
+ def serialize_to(self, object value, sink):
+ """
+ Call pyarrow.serialize_to and pass this SerializationContext
+ """
+ return serialize_to(value, sink, context=self)
+
+ def deserialize(self, what):
+ """
+ Call pyarrow.deserialize and pass this SerializationContext
+ """
+ return deserialize(what, context=self)
+
+ def deserialize_components(self, what):
+ """
+ Call pyarrow.deserialize_components and pass this SerializationContext
+ """
+ return deserialize_components(what, context=self)
+
_default_serialization_context = SerializationContext()
diff --git a/python/pyarrow/serialization.py b/python/pyarrow/serialization.py
index 08e6cce..b6d2b02 100644
--- a/python/pyarrow/serialization.py
+++ b/python/pyarrow/serialization.py
@@ -43,15 +43,19 @@ def _deserialize_numpy_array_list(data):
return np.array(data[0], dtype=np.dtype(data[1]))
-def _serialize_numpy_array_pickle(obj):
- pickled = pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
+def _pickle_to_buffer(x):
+ pickled = pickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
return frombuffer(pickled)
-def _deserialize_numpy_array_pickle(data):
+def _load_pickle_from_buffer(data):
return pickle.loads(memoryview(data))
+_serialize_numpy_array_pickle = _pickle_to_buffer
+_deserialize_numpy_array_pickle = _load_pickle_from_buffer
+
+
def register_default_serialization_handlers(serialization_context):
# ----------------------------------------------------------------------
@@ -108,38 +112,6 @@ def
register_default_serialization_handlers(serialization_context):
custom_deserializer=_deserialize_numpy_array_list)
# ----------------------------------------------------------------------
- # Set up serialization for pandas Series and DataFrame
-
- try:
- import pandas as pd
-
- def _serialize_pandas_series(obj):
- return serialize_pandas(pd.DataFrame({obj.name: obj}))
-
- def _deserialize_pandas_series(data):
- deserialized = deserialize_pandas(data)
- return deserialized[deserialized.columns[0]]
-
- def _serialize_pandas_dataframe(obj):
- return serialize_pandas(obj)
-
- def _deserialize_pandas_dataframe(data):
- return deserialize_pandas(data)
-
- serialization_context.register_type(
- pd.Series, 'pd.Series',
- custom_serializer=_serialize_pandas_series,
- custom_deserializer=_deserialize_pandas_series)
-
- serialization_context.register_type(
- pd.DataFrame, 'pd.DataFrame',
- custom_serializer=_serialize_pandas_dataframe,
- custom_deserializer=_deserialize_pandas_dataframe)
- except ImportError:
- # no pandas
- pass
-
- # ----------------------------------------------------------------------
# Set up serialization for pytorch tensors
try:
@@ -165,8 +137,87 @@ def
register_default_serialization_handlers(serialization_context):
register_default_serialization_handlers(_default_serialization_context)
+
+# ----------------------------------------------------------------------
+# pandas-specific serialization matters
+
+
pandas_serialization_context = _default_serialization_context.clone()
+
+def _register_pandas_arrow_handlers(context):
+ try:
+ import pandas as pd
+ except ImportError:
+ return
+
+ def _serialize_pandas_series(obj):
+ return serialize_pandas(pd.DataFrame({obj.name: obj}))
+
+ def _deserialize_pandas_series(data):
+ deserialized = deserialize_pandas(data)
+ return deserialized[deserialized.columns[0]]
+
+ def _serialize_pandas_dataframe(obj):
+ return serialize_pandas(obj)
+
+ def _deserialize_pandas_dataframe(data):
+ return deserialize_pandas(data)
+
+ context.register_type(
+ pd.Series, 'pd.Series',
+ custom_serializer=_serialize_pandas_series,
+ custom_deserializer=_deserialize_pandas_series)
+
+ context.register_type(
+ pd.DataFrame, 'pd.DataFrame',
+ custom_serializer=_serialize_pandas_dataframe,
+ custom_deserializer=_deserialize_pandas_dataframe)
+
+
+def _register_custom_pandas_handlers(context):
+ # ARROW-1784, faster path for pandas-only visibility
+
+ try:
+ import pandas as pd
+ except ImportError:
+ return
+
+ import pyarrow.pandas_compat as pdcompat
+
+ def _serialize_pandas_dataframe(obj):
+ return pdcompat.dataframe_to_serialized_dict(obj)
+
+ def _deserialize_pandas_dataframe(data):
+ return pdcompat.serialized_dict_to_dataframe(data)
+
+ def _serialize_pandas_series(obj):
+ return _serialize_pandas_dataframe(pd.DataFrame({obj.name: obj}))
+
+ def _deserialize_pandas_series(data):
+ deserialized = _deserialize_pandas_dataframe(data)
+ return deserialized[deserialized.columns[0]]
+
+ context.register_type(
+ pd.Series, 'pd.Series',
+ custom_serializer=_serialize_pandas_series,
+ custom_deserializer=_deserialize_pandas_series)
+
+ context.register_type(
+ pd.Index, 'pd.Index',
+ custom_serializer=_pickle_to_buffer,
+ custom_deserializer=_load_pickle_from_buffer)
+
+ context.register_type(
+ pd.DataFrame, 'pd.DataFrame',
+ custom_serializer=_serialize_pandas_dataframe,
+ custom_deserializer=_deserialize_pandas_dataframe)
+
+
+_register_pandas_arrow_handlers(_default_serialization_context)
+_register_custom_pandas_handlers(pandas_serialization_context)
+
+
pandas_serialization_context.register_type(
np.ndarray, 'np.array',
custom_serializer=_serialize_numpy_array_pickle,
diff --git a/python/pyarrow/tests/test_convert_pandas.py
b/python/pyarrow/tests/test_convert_pandas.py
index a43651e..e94ee46 100644
--- a/python/pyarrow/tests/test_convert_pandas.py
+++ b/python/pyarrow/tests/test_convert_pandas.py
@@ -60,70 +60,73 @@ def _alltypes_example(size=100):
})
-class TestPandasConversion(object):
+def _check_pandas_roundtrip(df, expected=None, nthreads=1,
+ expected_schema=None,
+ check_dtype=True, schema=None,
+ preserve_index=False,
+ as_batch=False):
+ klass = pa.RecordBatch if as_batch else pa.Table
+ table = klass.from_pandas(df, schema=schema,
+ preserve_index=preserve_index,
+ nthreads=nthreads)
+
+ result = table.to_pandas(nthreads=nthreads)
+ if expected_schema:
+ assert table.schema.equals(expected_schema)
+ if expected is None:
+ expected = df
+ tm.assert_frame_equal(result, expected, check_dtype=check_dtype,
+ check_index_type=('equiv' if preserve_index
+ else False))
+
+
+def _check_series_roundtrip(s, type_=None):
+ arr = pa.array(s, from_pandas=True, type=type_)
+
+ result = pd.Series(arr.to_pandas(), name=s.name)
+ if patypes.is_timestamp(arr.type) and arr.type.tz is not None:
+ result = (result.dt.tz_localize('utc')
+ .dt.tz_convert(arr.type.tz))
+
+ tm.assert_series_equal(s, result)
+
+
+def _check_array_roundtrip(values, expected=None, mask=None,
+ type=None):
+ arr = pa.array(values, from_pandas=True, mask=mask, type=type)
+ result = arr.to_pandas()
+
+ values_nulls = pd.isnull(values)
+ if mask is None:
+ assert arr.null_count == values_nulls.sum()
+ else:
+ assert arr.null_count == (mask | values_nulls).sum()
+
+ if mask is None:
+ tm.assert_series_equal(pd.Series(result), pd.Series(values),
+ check_names=False)
+ else:
+ expected = pd.Series(np.ma.masked_array(values, mask=mask))
+ tm.assert_series_equal(pd.Series(result), expected,
+ check_names=False)
+
+
+def _check_array_from_pandas_roundtrip(np_array):
+ arr = pa.array(np_array, from_pandas=True)
+ result = arr.to_pandas()
+ npt.assert_array_equal(result, np_array)
- def setUp(self):
- pass
-
- def tearDown(self):
- pass
-
- def _check_pandas_roundtrip(self, df, expected=None, nthreads=1,
- expected_schema=None,
- check_dtype=True, schema=None,
- preserve_index=False,
- as_batch=False):
- klass = pa.RecordBatch if as_batch else pa.Table
- table = klass.from_pandas(df, schema=schema,
- preserve_index=preserve_index,
- nthreads=nthreads)
-
- result = table.to_pandas(nthreads=nthreads)
- if expected_schema:
- assert table.schema.equals(expected_schema)
- if expected is None:
- expected = df
- tm.assert_frame_equal(result, expected, check_dtype=check_dtype,
- check_index_type=('equiv' if preserve_index
- else False))
-
- def _check_series_roundtrip(self, s, type_=None):
- arr = pa.array(s, from_pandas=True, type=type_)
-
- result = pd.Series(arr.to_pandas(), name=s.name)
- if patypes.is_timestamp(arr.type) and arr.type.tz is not None:
- result = (result.dt.tz_localize('utc')
- .dt.tz_convert(arr.type.tz))
-
- tm.assert_series_equal(s, result)
-
- def _check_array_roundtrip(self, values, expected=None, mask=None,
- type=None):
- arr = pa.array(values, from_pandas=True, mask=mask, type=type)
- result = arr.to_pandas()
-
- values_nulls = pd.isnull(values)
- if mask is None:
- assert arr.null_count == values_nulls.sum()
- else:
- assert arr.null_count == (mask | values_nulls).sum()
-
- if mask is None:
- tm.assert_series_equal(pd.Series(result), pd.Series(values),
- check_names=False)
- else:
- expected = pd.Series(np.ma.masked_array(values, mask=mask))
- tm.assert_series_equal(pd.Series(result), expected,
- check_names=False)
+
+class TestPandasConversion(object):
def test_all_none_objects(self):
df = pd.DataFrame({'a': [None, None, None]})
- self._check_pandas_roundtrip(df)
+ _check_pandas_roundtrip(df)
def test_all_none_category(self):
df = pd.DataFrame({'a': [None, None, None]})
df['a'] = df['a'].astype('category')
- self._check_pandas_roundtrip(df)
+ _check_pandas_roundtrip(df)
def test_non_string_columns(self):
df = pd.DataFrame({0: [1, 2, 3]})
@@ -133,14 +136,14 @@ class TestPandasConversion(object):
def test_column_index_names_are_preserved(self):
df = pd.DataFrame({'data': [1, 2, 3]})
df.columns.names = ['a']
- self._check_pandas_roundtrip(df, preserve_index=True)
+ _check_pandas_roundtrip(df, preserve_index=True)
def test_multiindex_columns(self):
columns = pd.MultiIndex.from_arrays([
['one', 'two'], ['X', 'Y']
])
df = pd.DataFrame([(1, 'a'), (2, 'b'), (3, 'c')], columns=columns)
- self._check_pandas_roundtrip(df, preserve_index=True)
+ _check_pandas_roundtrip(df, preserve_index=True)
def test_multiindex_columns_with_dtypes(self):
columns = pd.MultiIndex.from_arrays(
@@ -151,11 +154,11 @@ class TestPandasConversion(object):
names=['level_1', 'level_2'],
)
df = pd.DataFrame([(1, 'a'), (2, 'b'), (3, 'c')], columns=columns)
- self._check_pandas_roundtrip(df, preserve_index=True)
+ _check_pandas_roundtrip(df, preserve_index=True)
def test_integer_index_column(self):
df = pd.DataFrame([(1, 'a'), (2, 'b'), (3, 'c')])
- self._check_pandas_roundtrip(df, preserve_index=True)
+ _check_pandas_roundtrip(df, preserve_index=True)
def test_categorical_column_index(self):
# I *really* hope no one uses category dtypes for single level column
@@ -203,7 +206,7 @@ class TestPandasConversion(object):
df['a'] = df.a.astype('category')
df = df.set_index('a')
- self._check_pandas_roundtrip(df, preserve_index=True)
+ _check_pandas_roundtrip(df, preserve_index=True)
def test_float_no_nulls(self):
data = {}
@@ -218,7 +221,7 @@ class TestPandasConversion(object):
df = pd.DataFrame(data)
schema = pa.schema(fields)
- self._check_pandas_roundtrip(df, expected_schema=schema)
+ _check_pandas_roundtrip(df, expected_schema=schema)
def test_zero_copy_success(self):
result = pa.array([0, 1, 2]).to_pandas(zero_copy_only=True)
@@ -312,8 +315,8 @@ class TestPandasConversion(object):
expected = pd.DataFrame({'floats': pd.to_numeric(arr)})
field = pa.field('floats', pa.float64())
schema = pa.schema([field])
- self._check_pandas_roundtrip(df, expected=expected,
- expected_schema=schema)
+ _check_pandas_roundtrip(df, expected=expected,
+ expected_schema=schema)
def test_int_object_nulls(self):
arr = np.array([None, 1, np.int64(3)] * 5, dtype=object)
@@ -321,8 +324,8 @@ class TestPandasConversion(object):
expected = pd.DataFrame({'ints': pd.to_numeric(arr)})
field = pa.field('ints', pa.int64())
schema = pa.schema([field])
- self._check_pandas_roundtrip(df, expected=expected,
- expected_schema=schema)
+ _check_pandas_roundtrip(df, expected=expected,
+ expected_schema=schema)
def test_integer_no_nulls(self):
data = OrderedDict()
@@ -347,7 +350,7 @@ class TestPandasConversion(object):
df = pd.DataFrame(data)
schema = pa.schema(fields)
- self._check_pandas_roundtrip(df, expected_schema=schema)
+ _check_pandas_roundtrip(df, expected_schema=schema)
def test_integer_with_nulls(self):
# pandas requires upcast to float dtype
@@ -395,7 +398,7 @@ class TestPandasConversion(object):
df = pd.DataFrame({'bools': np.random.randn(num_values) > 0})
field = pa.field('bools', pa.bool_())
schema = pa.schema([field])
- self._check_pandas_roundtrip(df, expected_schema=schema)
+ _check_pandas_roundtrip(df, expected_schema=schema)
def test_boolean_nulls(self):
# pandas requires upcast to object dtype
@@ -425,7 +428,7 @@ class TestPandasConversion(object):
df = pd.DataFrame({'bools': arr})
field = pa.field('bools', pa.bool_())
schema = pa.schema([field])
- self._check_pandas_roundtrip(df, expected_schema=schema)
+ _check_pandas_roundtrip(df, expected_schema=schema)
def test_all_nulls_cast_numeric(self):
arr = np.array([None], dtype=object)
@@ -445,7 +448,7 @@ class TestPandasConversion(object):
field = pa.field('strings', pa.string())
schema = pa.schema([field])
- self._check_pandas_roundtrip(df, expected_schema=schema)
+ _check_pandas_roundtrip(df, expected_schema=schema)
def test_bytes_to_binary(self):
values = [u('qux'), b'foo', None, 'bar', 'qux', np.nan]
@@ -456,7 +459,7 @@ class TestPandasConversion(object):
values2 = [b'qux', b'foo', None, b'bar', b'qux', np.nan]
expected = pd.DataFrame({'strings': values2})
- self._check_pandas_roundtrip(df, expected)
+ _check_pandas_roundtrip(df, expected)
@pytest.mark.large_memory
def test_bytes_exceed_2gb(self):
@@ -499,7 +502,7 @@ class TestPandasConversion(object):
})
field = pa.field('datetime64', pa.timestamp('ns'))
schema = pa.schema([field])
- self._check_pandas_roundtrip(
+ _check_pandas_roundtrip(
df,
expected_schema=schema,
)
@@ -514,7 +517,7 @@ class TestPandasConversion(object):
})
field = pa.field('datetime64', pa.timestamp('ns'))
schema = pa.schema([field])
- self._check_pandas_roundtrip(
+ _check_pandas_roundtrip(
df,
expected_schema=schema,
)
@@ -529,9 +532,9 @@ class TestPandasConversion(object):
})
df['datetime64'] = (df['datetime64'].dt.tz_localize('US/Eastern')
.to_frame())
- self._check_pandas_roundtrip(df)
+ _check_pandas_roundtrip(df)
- self._check_series_roundtrip(df['datetime64'])
+ _check_series_roundtrip(df['datetime64'])
# drop-in a null and ns instead of ms
df = pd.DataFrame({
@@ -545,7 +548,7 @@ class TestPandasConversion(object):
df['datetime64'] = (df['datetime64'].dt.tz_localize('US/Eastern')
.to_frame())
- self._check_pandas_roundtrip(df)
+ _check_pandas_roundtrip(df)
def test_datetime64_to_date32(self):
# ARROW-1718
@@ -647,13 +650,13 @@ class TestPandasConversion(object):
def test_column_of_arrays(self):
df, schema = dataframe_with_arrays()
- self._check_pandas_roundtrip(df, schema=schema, expected_schema=schema)
+ _check_pandas_roundtrip(df, schema=schema, expected_schema=schema)
table = pa.Table.from_pandas(df, schema=schema, preserve_index=False)
assert table.schema.equals(schema)
for column in df.columns:
field = schema.field_by_name(column)
- self._check_array_roundtrip(df[column], type=field.type)
+ _check_array_roundtrip(df[column], type=field.type)
def test_column_of_arrays_to_py(self):
# Test regression in ARROW-1199 not caught in above test
@@ -674,13 +677,13 @@ class TestPandasConversion(object):
def test_column_of_lists(self):
df, schema = dataframe_with_lists()
- self._check_pandas_roundtrip(df, schema=schema, expected_schema=schema)
+ _check_pandas_roundtrip(df, schema=schema, expected_schema=schema)
table = pa.Table.from_pandas(df, schema=schema, preserve_index=False)
assert table.schema.equals(schema)
for column in df.columns:
field = schema.field_by_name(column)
- self._check_array_roundtrip(df[column], type=field.type)
+ _check_array_roundtrip(df[column], type=field.type)
def test_column_of_lists_chunked(self):
# ARROW-1357
@@ -732,7 +735,7 @@ class TestPandasConversion(object):
arr = df['int64'].values[::3]
assert arr.strides[0] != 8
- self._check_array_roundtrip(arr)
+ _check_array_roundtrip(arr)
def test_nested_lists_all_none(self):
data = np.array([[None, None], None], dtype=object)
@@ -751,8 +754,8 @@ class TestPandasConversion(object):
def test_threaded_conversion(self):
df = _alltypes_example()
- self._check_pandas_roundtrip(df, nthreads=2)
- self._check_pandas_roundtrip(df, nthreads=2, as_batch=True)
+ _check_pandas_roundtrip(df, nthreads=2)
+ _check_pandas_roundtrip(df, nthreads=2, as_batch=True)
def test_category(self):
repeats = 5
@@ -770,7 +773,7 @@ class TestPandasConversion(object):
'strings': v1 * repeats,
'strings2': v1 * repeats,
'strings3': v3 * repeats})
- self._check_pandas_roundtrip(df)
+ _check_pandas_roundtrip(df)
arrays = [
pd.Categorical(v1 * repeats),
@@ -778,7 +781,7 @@ class TestPandasConversion(object):
pd.Categorical(v3 * repeats)
]
for values in arrays:
- self._check_array_roundtrip(values)
+ _check_array_roundtrip(values)
def test_mixed_types_fails(self):
data = pd.DataFrame({'a': ['a', 1, 2.0]})
@@ -825,9 +828,9 @@ class TestPandasConversion(object):
df = pd.DataFrame(case, columns=columns)
col = df['a']
- self._check_pandas_roundtrip(df)
- self._check_array_roundtrip(col)
- self._check_array_roundtrip(col, mask=strided_mask)
+ _check_pandas_roundtrip(df)
+ _check_array_roundtrip(col)
+ _check_array_roundtrip(col, mask=strided_mask)
def test_decimal_32_from_pandas(self):
expected = pd.DataFrame({
@@ -987,11 +990,6 @@ class TestPandasConversion(object):
tm.assert_frame_equal(df, expected_df)
- def _check_array_from_pandas_roundtrip(self, np_array):
- arr = pa.array(np_array, from_pandas=True)
- result = arr.to_pandas()
- npt.assert_array_equal(result, np_array)
-
def test_numpy_datetime64_columns(self):
datetime64_ns = np.array([
'2007-07-13T01:23:34.123456789',
@@ -999,7 +997,7 @@ class TestPandasConversion(object):
'2006-01-13T12:34:56.432539784',
'2010-08-13T05:46:57.437699912'],
dtype='datetime64[ns]')
- self._check_array_from_pandas_roundtrip(datetime64_ns)
+ _check_array_from_pandas_roundtrip(datetime64_ns)
datetime64_us = np.array([
'2007-07-13T01:23:34.123456',
@@ -1007,7 +1005,7 @@ class TestPandasConversion(object):
'2006-01-13T12:34:56.432539',
'2010-08-13T05:46:57.437699'],
dtype='datetime64[us]')
- self._check_array_from_pandas_roundtrip(datetime64_us)
+ _check_array_from_pandas_roundtrip(datetime64_us)
datetime64_ms = np.array([
'2007-07-13T01:23:34.123',
@@ -1015,7 +1013,7 @@ class TestPandasConversion(object):
'2006-01-13T12:34:56.432',
'2010-08-13T05:46:57.437'],
dtype='datetime64[ms]')
- self._check_array_from_pandas_roundtrip(datetime64_ms)
+ _check_array_from_pandas_roundtrip(datetime64_ms)
datetime64_s = np.array([
'2007-07-13T01:23:34',
@@ -1023,7 +1021,7 @@ class TestPandasConversion(object):
'2006-01-13T12:34:56',
'2010-08-13T05:46:57'],
dtype='datetime64[s]')
- self._check_array_from_pandas_roundtrip(datetime64_s)
+ _check_array_from_pandas_roundtrip(datetime64_s)
def test_numpy_datetime64_day_unit(self):
datetime64_d = np.array([
@@ -1032,7 +1030,7 @@ class TestPandasConversion(object):
'2006-01-15',
'2010-08-19'],
dtype='datetime64[D]')
- self._check_array_from_pandas_roundtrip(datetime64_d)
+ _check_array_from_pandas_roundtrip(datetime64_d)
def test_all_nones(self):
def _check_series(s):
@@ -1079,8 +1077,8 @@ class TestPandasConversion(object):
pa.field('c', pa.int64())
])
- self._check_pandas_roundtrip(df, schema=partial_schema,
- expected_schema=expected_schema)
+ _check_pandas_roundtrip(df, schema=partial_schema,
+ expected_schema=expected_schema)
def test_structarray(self):
ints = pa.array([None, 2, 3], type=pa.int64())
@@ -1115,7 +1113,7 @@ class TestPandasConversion(object):
pa.field('nested_strs', pa.list_(pa.list_(pa.string())))
])
- self._check_pandas_roundtrip(df, expected_schema=expected_schema)
+ _check_pandas_roundtrip(df, expected_schema=expected_schema)
def test_infer_numpy_array(self):
data = OrderedDict([
@@ -1129,7 +1127,7 @@ class TestPandasConversion(object):
pa.field('ints', pa.list_(pa.int64()))
])
- self._check_pandas_roundtrip(df, expected_schema=expected_schema)
+ _check_pandas_roundtrip(df, expected_schema=expected_schema)
def test_metadata_with_mixed_types(self):
df = pd.DataFrame({'data': [b'some_bytes', u'some_unicode']})
@@ -1184,12 +1182,12 @@ class TestPandasConversion(object):
def test_table_batch_empty_dataframe(self):
df = pd.DataFrame({})
- self._check_pandas_roundtrip(df)
- self._check_pandas_roundtrip(df, as_batch=True)
+ _check_pandas_roundtrip(df)
+ _check_pandas_roundtrip(df, as_batch=True)
df2 = pd.DataFrame({}, index=[0, 1, 2])
- self._check_pandas_roundtrip(df2, preserve_index=True)
- self._check_pandas_roundtrip(df2, as_batch=True, preserve_index=True)
+ _check_pandas_roundtrip(df2, preserve_index=True)
+ _check_pandas_roundtrip(df2, as_batch=True, preserve_index=True)
def test_array_from_pandas_date_with_mask(self):
m = np.array([True, False, True])
@@ -1231,6 +1229,51 @@ class TestPandasConversion(object):
type=pa.list_(t())).equals(result)
+def _fully_loaded_dataframe_example():
+ from distutils.version import LooseVersion
+
+ index = pd.MultiIndex.from_arrays([
+ pd.date_range('2000-01-01', periods=5).repeat(2),
+ np.tile(np.array(['foo', 'bar'], dtype=object), 5)
+ ])
+
+ c1 = pd.date_range('2000-01-01', periods=10)
+ data = {
+ 0: c1,
+ 1: c1.tz_localize('utc'),
+ 2: c1.tz_localize('US/Eastern'),
+ 3: c1[::2].tz_localize('utc').repeat(2).astype('category'),
+ 4: ['foo', 'bar'] * 5,
+ 5: pd.Series(['foo', 'bar'] * 5).astype('category').values,
+ 6: [True, False] * 5,
+ 7: np.random.randn(10),
+ 8: np.random.randint(0, 100, size=10),
+ 9: pd.period_range('2013', periods=10, freq='M')
+ }
+
+ if LooseVersion(pd.__version__) >= '0.21':
+ # There is an issue with pickling IntervalIndex in pandas 0.20.x
+ data[10] = pd.interval_range(start=1, freq=1, periods=10)
+
+ return pd.DataFrame(data, index=index)
+
+
+def _check_serialize_components_roundtrip(df):
+ ctx = pa.pandas_serialization_context
+
+ components = ctx.serialize(df).to_components()
+ deserialized = ctx.deserialize_components(components)
+
+ tm.assert_frame_equal(df, deserialized)
+
+
+def test_serialize_deserialize_pandas():
+ # ARROW-1784, serialize and deserialize DataFrame by decomposing
+ # BlockManager
+ df = _fully_loaded_dataframe_example()
+ _check_serialize_components_roundtrip(df)
+
+
def _pytime_from_micros(val):
microseconds = val % 1000000
val //= 1000000
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].