This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 86f480a ARROW-1639: [Python] Serialize RangeIndex as metadata via
Table.from_pandas instead of converting to a column of integers
86f480a is described below
commit 86f480a2a246f0c885031aff8d21f68640dbd72a
Author: Wes McKinney <[email protected]>
AuthorDate: Wed Mar 13 12:17:08 2019 -0500
ARROW-1639: [Python] Serialize RangeIndex as metadata via Table.from_pandas
instead of converting to a column of integers
This ended up being much more difficult than anticipated due to the
spaghetti-like state (as the result of many hacks) of pyarrow/pandas_compat.py.
This is partly a performance and memory use optimization. It has
consequences, though, namely tables will have some index data discarded when
concatenated from multiple pandas DataFrame objects that were converted to
Arrow. I think this is OK, though, since the preservation of pandas indexes is
generally something that's handled at the granularity of a single DataFrame.
One always has the option of calling `reset_index` to convert a RangeIndex if
that's what is desired.
This patch also implements proposed extensions to the serialized pandas
metadata to accommodate indexes-as-columns vs. indexes-represented-as-metadata,
as described in
https://github.com/pandas-dev/pandas/issues/25672
Author: Wes McKinney <[email protected]>
Closes #3868 from wesm/ARROW-1639 and squashes the following commits:
ec929aebd <Wes McKinney> Add pandas_metadata attribute to pyarrow.Schema to
make interactions simpler
670dc6f70 <Wes McKinney> Add compatibility tests for pre-0.13 metadata. Add
Arrow version to pandas metadata
0ca1bfc58 <Wes McKinney> Add benchmark
9ba413106 <Wes McKinney> Serialize RangeIndex as metadata via
Table.from_pandas instead of converting to data column. This affects
serialize_pandas and writing to Parquet format
---
python/benchmarks/convert_pandas.py | 15 +
python/pyarrow/pandas_compat.py | 436 ++++++++++++++++++----------
python/pyarrow/parquet.py | 29 +-
python/pyarrow/tests/test_convert_pandas.py | 258 +++++++++++++---
python/pyarrow/tests/test_ipc.py | 4 +-
python/pyarrow/tests/test_parquet.py | 25 +-
python/pyarrow/tests/test_table.py | 2 +-
python/pyarrow/types.pxi | 13 +
8 files changed, 545 insertions(+), 237 deletions(-)
diff --git a/python/benchmarks/convert_pandas.py
b/python/benchmarks/convert_pandas.py
index bb8d710..57a0518 100644
--- a/python/benchmarks/convert_pandas.py
+++ b/python/benchmarks/convert_pandas.py
@@ -90,3 +90,18 @@ class ZeroCopyPandasRead(object):
def time_deserialize_from_components(self):
pa.deserialize_components(self.as_components)
+
+
+class SerializeDeserializePandas(object):
+
+ def setup(self):
+ # 10 million length
+ n = 10000000
+ self.df = pd.DataFrame({'data': np.random.randn(n)})
+ self.serialized = pa.serialize_pandas(self.df)
+
+ def time_serialize_pandas(self):
+ pa.serialize_pandas(self.df)
+
+ def time_deserialize_pandas(self):
+ pa.deserialize_pandas(self.serialized)
diff --git a/python/pyarrow/pandas_compat.py b/python/pyarrow/pandas_compat.py
index 86e826f..10038de 100644
--- a/python/pyarrow/pandas_compat.py
+++ b/python/pyarrow/pandas_compat.py
@@ -28,7 +28,8 @@ import pandas as pd
import six
import pyarrow as pa
-from pyarrow.compat import builtin_pickle, DatetimeTZDtype, PY2, zip_longest
# noqa
+from pyarrow.compat import (builtin_pickle, DatetimeTZDtype, # noqa
+ PY2, zip_longest)
def infer_dtype(column):
@@ -185,7 +186,7 @@ def get_column_metadata(column, name, arrow_type,
field_name):
}
-def construct_metadata(df, column_names, index_levels, index_column_names,
+def construct_metadata(df, column_names, index_levels, index_descriptors,
preserve_index, types):
"""Returns a dictionary containing enough metadata to reconstruct a pandas
DataFrame as an Arrow Table, including index columns.
@@ -194,73 +195,80 @@ def construct_metadata(df, column_names, index_levels,
index_column_names,
----------
df : pandas.DataFrame
index_levels : List[pd.Index]
- presere_index : bool
+ index_descriptors : List[Dict]
+ preserve_index : bool
types : List[pyarrow.DataType]
Returns
-------
dict
"""
+ num_serialized_index_levels = len([descr for descr in index_descriptors
+ if descr['kind'] == 'serialized'])
# Use ntypes instead of Python shorthand notation [:-len(x)] as [:-0]
# behaves differently to what we want.
ntypes = len(types)
- df_types = types[:ntypes - len(index_levels)]
- index_types = types[ntypes - len(index_levels):]
-
- column_metadata = [
- get_column_metadata(
- df[col_name],
- name=sanitized_name,
- arrow_type=arrow_type,
- field_name=sanitized_name
- ) for col_name, sanitized_name, arrow_type in zip(
- df.columns, column_names, df_types
- )
- ]
-
+ df_types = types[:ntypes - num_serialized_index_levels]
+ index_types = types[ntypes - num_serialized_index_levels:]
+
+ column_metadata = []
+ for col_name, sanitized_name, arrow_type in zip(df.columns, column_names,
+ df_types):
+ metadata = get_column_metadata(df[col_name], name=sanitized_name,
+ arrow_type=arrow_type,
+ field_name=sanitized_name)
+ column_metadata.append(metadata)
+
+ index_column_metadata = []
if preserve_index:
- index_column_metadata = [
- get_column_metadata(
- level,
- name=level.name,
- arrow_type=arrow_type,
- field_name=field_name,
- ) for i, (level, arrow_type, field_name) in enumerate(
- zip(index_levels, index_types, index_column_names)
- )
- ]
+ for level, arrow_type, descriptor in zip(index_levels, index_types,
+ index_descriptors):
+ if descriptor['kind'] != 'serialized':
+ # The index is represented in a non-serialized fashion,
+ # e.g. RangeIndex
+ continue
+ metadata = get_column_metadata(level, name=level.name,
+ arrow_type=arrow_type,
+ field_name=descriptor['field_name'])
+ index_column_metadata.append(metadata)
column_indexes = []
for level in getattr(df.columns, 'levels', [df.columns]):
- string_dtype, extra_metadata = get_extension_dtype_info(level)
-
- pandas_type = get_logical_type_from_numpy(level)
- if pandas_type == 'unicode':
- assert not extra_metadata
- extra_metadata = {'encoding': 'UTF-8'}
-
- column_index = {
- 'name': level.name,
- 'field_name': level.name,
- 'pandas_type': pandas_type,
- 'numpy_type': string_dtype,
- 'metadata': extra_metadata,
- }
- column_indexes.append(column_index)
+ metadata = _get_simple_index_descriptor(level)
+ column_indexes.append(metadata)
else:
- index_column_names = index_column_metadata = column_indexes = []
+ index_descriptors = index_column_metadata = column_indexes = []
return {
b'pandas': json.dumps({
- 'index_columns': index_column_names,
+ 'index_columns': index_descriptors,
'column_indexes': column_indexes,
'columns': column_metadata + index_column_metadata,
+ 'creator': {
+ 'library': 'pyarrow',
+ 'version': pa.__version__
+ },
'pandas_version': pd.__version__
}).encode('utf8')
}
+def _get_simple_index_descriptor(level):
+ string_dtype, extra_metadata = get_extension_dtype_info(level)
+ pandas_type = get_logical_type_from_numpy(level)
+ if pandas_type == 'unicode':
+ assert not extra_metadata
+ extra_metadata = {'encoding': 'UTF-8'}
+ return {
+ 'name': level.name,
+ 'field_name': level.name,
+ 'pandas_type': pandas_type,
+ 'numpy_type': string_dtype,
+ 'metadata': extra_metadata,
+ }
+
+
def _column_name_to_strings(name):
"""Convert a column name (or level) to either a string or a recursive
collection of strings.
@@ -320,26 +328,12 @@ def _index_level_name(index, i, column_names):
def _get_columns_to_convert(df, schema, preserve_index, columns):
- if schema is not None and columns is not None:
- raise ValueError('Schema and columns arguments are mutually '
- 'exclusive, pass only one of them')
- elif schema is not None:
- columns = schema.names
- elif columns is not None:
- # columns is only for filtering, the function must keep the column
- # ordering of either the dataframe or the passed schema
- columns = [c for c in df.columns if c in columns]
- else:
- columns = df.columns
+ columns = _resolve_columns_of_interest(df, schema, columns)
column_names = []
- index_columns = []
- index_column_names = []
type = None
- if preserve_index:
- n = len(getattr(df.index, 'levels', [df.index]))
- index_columns.extend(df.index.get_level_values(i) for i in range(n))
+ index_levels = _get_index_level_values(df.index) if preserve_index else []
columns_to_convert = []
convert_types = []
@@ -361,23 +355,78 @@ def _get_columns_to_convert(df, schema, preserve_index,
columns):
convert_types.append(type)
column_names.append(name)
- for i, column in enumerate(index_columns):
- columns_to_convert.append(column)
- convert_types.append(None)
- name = _index_level_name(column, i, column_names)
- index_column_names.append(name)
+ index_descriptors = []
+ index_column_names = []
+ for i, index_level in enumerate(index_levels):
+ name = _index_level_name(index_level, i, column_names)
+ if isinstance(index_level, pd.RangeIndex):
+ descr = _get_range_index_descriptor(index_level)
+ else:
+ columns_to_convert.append(index_level)
+ convert_types.append(None)
+ descr = {
+ 'kind': 'serialized',
+ 'field_name': name
+ }
+ index_column_names.append(name)
+ index_descriptors.append(descr)
+
+ all_names = column_names + index_column_names
+
+ # all_names : all of the columns in the resulting table including the data
+ # columns and serialized index columns
+ # column_names : the names of the data columns
+ # index_descriptors : descriptions of each index to be used for
+ # reconstruction
+ # index_levels : the extracted index level values
+ # columns_to_convert : assembled raw data (both data columns and indexes)
+ # to be converted to Arrow format
+ # columns_types : specified column types to use for coercion / casting
+ # during serialization, if a Schema was provided
+ return (all_names, column_names, index_descriptors, index_levels,
+ columns_to_convert, convert_types)
- names = column_names + index_column_names
- return (names, column_names, index_columns, index_column_names,
- columns_to_convert, convert_types)
+def _get_range_index_descriptor(level):
+ # TODO(wesm): Why are these non-public and is there a more public way to
+ # get them?
+ return {
+ 'kind': 'range',
+ 'name': level.name,
+ 'start': level._start,
+ 'stop': level._stop,
+ 'step': level._step
+ }
+
+
+def _get_index_level_values(index):
+ n = len(getattr(index, 'levels', [index]))
+ return [index.get_level_values(i) for i in range(n)]
+
+
+def _resolve_columns_of_interest(df, schema, columns):
+ if schema is not None and columns is not None:
+ raise ValueError('Schema and columns arguments are mutually '
+ 'exclusive, pass only one of them')
+ elif schema is not None:
+ columns = schema.names
+ elif columns is not None:
+ # columns is only for filtering, the function must keep the column
+ # ordering of either the dataframe or the passed schema
+ columns = [c for c in df.columns if c in columns]
+ else:
+ columns = df.columns
+
+ return columns
def dataframe_to_types(df, preserve_index, columns=None):
- names, column_names, index_columns, index_column_names, \
- columns_to_convert, _ = _get_columns_to_convert(
- df, None, preserve_index, columns
- )
+ (all_names,
+ column_names,
+ index_descriptors,
+ index_columns,
+ columns_to_convert,
+ _) = _get_columns_to_convert(df, None, preserve_index, columns)
types = []
# If pandas knows type, skip conversion
@@ -393,17 +442,20 @@ def dataframe_to_types(df, preserve_index, columns=None):
types.append(type_)
metadata = construct_metadata(df, column_names, index_columns,
- index_column_names, preserve_index, types)
+ index_descriptors, preserve_index, types)
- return names, types, metadata
+ return all_names, types, metadata
def dataframe_to_arrays(df, schema, preserve_index, nthreads=1, columns=None,
safe=True):
- names, column_names, index_columns, index_column_names, \
- columns_to_convert, convert_types = _get_columns_to_convert(
- df, schema, preserve_index, columns
- )
+ (all_names,
+ column_names,
+ index_descriptors,
+ index_columns,
+ columns_to_convert,
+ convert_types) = _get_columns_to_convert(df, schema, preserve_index,
+ columns)
# NOTE(wesm): If nthreads=None, then we use a heuristic to decide whether
# using a thread pool is worth it. Currently the heuristic is whether the
@@ -438,12 +490,10 @@ def dataframe_to_arrays(df, schema, preserve_index,
nthreads=1, columns=None,
types = [x.type for x in arrays]
- metadata = construct_metadata(
- df, column_names, index_columns, index_column_names, preserve_index,
- types
- )
-
- return names, arrays, metadata
+ metadata = construct_metadata(df, column_names, index_columns,
+ index_descriptors, preserve_index,
+ types)
+ return all_names, arrays, metadata
def get_datetimetz_type(values, dtype, type_):
@@ -551,95 +601,46 @@ def _make_datetimetz(tz):
def table_to_blockmanager(options, table, categories=None,
ignore_metadata=False):
-
- index_columns = []
- columns = []
+ all_columns = []
column_indexes = []
- index_arrays = []
- index_names = []
- schema = table.schema
- row_count = table.num_rows
- metadata = schema.metadata
-
- has_pandas_metadata = (not ignore_metadata and metadata is not None
- and b'pandas' in metadata)
+ pandas_metadata = table.schema.pandas_metadata
- if has_pandas_metadata:
- pandas_metadata = json.loads(metadata[b'pandas'].decode('utf8'))
- index_columns = pandas_metadata['index_columns']
- columns = pandas_metadata['columns']
+ if not ignore_metadata and pandas_metadata is not None:
+ all_columns = pandas_metadata['columns']
column_indexes = pandas_metadata.get('column_indexes', [])
+ index_descriptors = pandas_metadata['index_columns']
table = _add_any_metadata(table, pandas_metadata)
+ table, index = _reconstruct_index(table, index_descriptors,
+ all_columns)
+ else:
+ index = pd.RangeIndex(table.num_rows)
- block_table = table
-
- index_columns_set = frozenset(index_columns)
+ _check_data_column_metadata_consistency(all_columns)
+ blocks = _table_to_blocks(options, table, pa.default_memory_pool(),
+ categories)
+ columns = _deserialize_column_index(table, all_columns, column_indexes)
- # 0. 'field_name' is the name of the column in the arrow Table
- # 1. 'name' is the user-facing name of the column, that is, it came from
- # pandas
- # 2. 'field_name' and 'name' differ for index columns
- # 3. We fall back on c['name'] for backwards compatibility
- logical_index_names = [
- c['name'] for c in columns
- if c.get('field_name', c['name']) in index_columns_set
- ]
+ axes = [columns, index]
+ return _int.BlockManager(blocks, axes)
- # There must be the same number of field names and physical names
- # (fields in the arrow Table)
- assert len(logical_index_names) == len(index_columns_set)
+def _check_data_column_metadata_consistency(all_columns):
# It can never be the case in a released version of pyarrow that
# c['name'] is None *and* 'field_name' is not a key in the column metadata,
# because the change to allow c['name'] to be None and the change to add
# 'field_name' are in the same release (0.8.0)
assert all(
(c['name'] is None and 'field_name' in c) or c['name'] is not None
- for c in columns
+ for c in all_columns
)
- # Build up a list of index columns and names while removing those columns
- # from the original table
- for raw_name, logical_name in zip(index_columns, logical_index_names):
- i = schema.get_field_index(raw_name)
- if i != -1:
- col = table.column(i)
- col_pandas = col.to_pandas()
- values = col_pandas.values
- if hasattr(values, 'flags') and not values.flags.writeable:
- # ARROW-1054: in pandas 0.19.2, factorize will reject
- # non-writeable arrays when calling MultiIndex.from_arrays
- values = values.copy()
-
- if isinstance(col_pandas.dtype, DatetimeTZDtype):
- index_array = (pd.Series(values).dt.tz_localize('utc')
- .dt.tz_convert(col_pandas.dtype.tz))
- else:
- index_array = pd.Series(values, dtype=col_pandas.dtype)
- index_arrays.append(index_array)
- index_names.append(
- _backwards_compatible_index_name(raw_name, logical_name)
- )
- block_table = block_table.remove_column(
- block_table.schema.get_field_index(raw_name)
- )
-
- blocks = _table_to_blocks(options, block_table, pa.default_memory_pool(),
- categories)
-
- # Construct the row index
- if len(index_arrays) > 1:
- index = pd.MultiIndex.from_arrays(index_arrays, names=index_names)
- elif len(index_arrays) == 1:
- index = pd.Index(index_arrays[0], name=index_names[0])
- else:
- index = pd.RangeIndex(row_count)
+def _deserialize_column_index(block_table, all_columns, column_indexes):
column_strings = [x.name for x in block_table.itercolumns()]
- if columns:
+ if all_columns:
columns_name_dict = {
c.get('field_name', _column_name_to_strings(c['name'])): c['name']
- for c in columns
+ for c in all_columns
}
columns_values = [
columns_name_dict.get(name, name) for name in column_strings
@@ -664,14 +665,129 @@ def table_to_blockmanager(options, table,
categories=None,
)
# if we're reconstructing the index
- if has_pandas_metadata:
+ if len(column_indexes) > 0:
columns = _reconstruct_columns_from_metadata(columns, column_indexes)
# ARROW-1751: flatten a single level column MultiIndex for pandas 0.21.0
columns = _flatten_single_level_multiindex(columns)
- axes = [columns, index]
- return _int.BlockManager(blocks, axes)
+ return columns
+
+
+def _sanitize_old_index_descr(descr):
+ if not isinstance(descr, dict):
+ # version < 0.13.0
+ return {
+ 'kind': 'serialized',
+ 'field_name': descr
+ }
+ else:
+ return descr
+
+
+def _reconstruct_index(table, index_descriptors, all_columns):
+ # 0. 'field_name' is the name of the column in the arrow Table
+ # 1. 'name' is the user-facing name of the column, that is, it came from
+ # pandas
+ # 2. 'field_name' and 'name' differ for index columns
+ # 3. We fall back on c['name'] for backwards compatibility
+ field_name_to_metadata = {
+ c.get('field_name', c['name']): c
+ for c in all_columns
+ }
+
+ # Build up a list of index columns and names while removing those columns
+ # from the original table
+ index_arrays = []
+ index_names = []
+ result_table = table
+ for descr in index_descriptors:
+ descr = _sanitize_old_index_descr(descr)
+ if descr['kind'] == 'serialized':
+ result_table, index_level, index_name = _extract_index_level(
+ table, result_table, descr, field_name_to_metadata)
+ if index_level is None:
+ # ARROW-1883: the serialized index column was not found
+ continue
+ elif descr['kind'] == 'range':
+ index_name = descr['name']
+ index_level = pd.RangeIndex(descr['start'], descr['stop'],
+ step=descr['step'], name=index_name)
+ if len(index_level) != len(table):
+ # Possibly the result of munged metadata
+ continue
+ else:
+ raise ValueError("Unrecognized index kind: {0}"
+ .format(descr['kind']))
+ index_arrays.append(index_level)
+ index_names.append(index_name)
+
+ # Reconstruct the row index
+ if len(index_arrays) > 1:
+ index = pd.MultiIndex.from_arrays(index_arrays, names=index_names)
+ elif len(index_arrays) == 1:
+ index = index_arrays[0]
+ if not isinstance(index, pd.Index):
+ # Box anything that wasn't boxed above
+ index = pd.Index(index, name=index_names[0])
+ else:
+ index = pd.RangeIndex(table.num_rows)
+
+ return result_table, index
+
+
+def _extract_index_level(table, result_table, descr,
+ field_name_to_metadata):
+ field_name = descr['field_name']
+ logical_name = field_name_to_metadata[field_name]['name']
+ index_name = _backwards_compatible_index_name(field_name, logical_name)
+ i = table.schema.get_field_index(field_name)
+
+ if i == -1:
+ # The serialized index column was removed by the user
+ return table, None, None
+
+ col = table.column(i)
+ col_pandas = col.to_pandas()
+ values = col_pandas.values
+ if hasattr(values, 'flags') and not values.flags.writeable:
+ # ARROW-1054: in pandas 0.19.2, factorize will reject
+ # non-writeable arrays when calling MultiIndex.from_arrays
+ values = values.copy()
+
+ if isinstance(col_pandas.dtype, DatetimeTZDtype):
+ index_level = (pd.Series(values).dt.tz_localize('utc')
+ .dt.tz_convert(col_pandas.dtype.tz))
+ else:
+ index_level = pd.Series(values, dtype=col_pandas.dtype)
+ result_table = result_table.remove_column(
+ result_table.schema.get_field_index(field_name)
+ )
+ return result_table, index_level, index_name
+
+
+def _get_serialized_index_names(index_descriptors, all_columns):
+ serialized_index_names = []
+ for descr in index_descriptors:
+ if not isinstance(descr, dict):
+ # version < 0.13.0
+ serialized_index_names.append(descr)
+ elif descr['kind'] != 'serialized':
+ continue
+
+ serialized_index_names.append(descr['field_name'])
+
+ index_columns_set = frozenset(serialized_index_names)
+
+ logical_index_names = [
+ c['name'] for c in all_columns
+ if c.get('field_name', c['name']) in index_columns_set
+ ]
+
+ # There must be the same number of field names and physical names
+ # (fields in the arrow Table)
+ assert len(logical_index_names) == len(index_columns_set)
+ return logical_index_names
def _backwards_compatible_index_name(raw_name, logical_name):
@@ -692,13 +808,17 @@ def _backwards_compatible_index_name(raw_name,
logical_name):
* Part of :func:`~pyarrow.pandas_compat.table_to_blockmanager`
"""
# Part of table_to_blockmanager
- pattern = r'^__index_level_\d+__$'
- if raw_name == logical_name and re.match(pattern, raw_name) is not None:
+ if raw_name == logical_name and _is_generated_index_name(raw_name):
return None
else:
return logical_name
+def _is_generated_index_name(name):
+ pattern = r'^__index_level_\d+__$'
+ return re.match(pattern, name) is not None
+
+
_pandas_logical_type_map = {
'date': 'datetime64[D]',
'unicode': np.unicode_,
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index fe602bb..a149bcc 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -257,7 +257,9 @@ class ParquetFile(object):
index_columns = []
if indices is not None and index_columns:
- indices += map(self.reader.column_name_idx, index_columns)
+ indices += [self.reader.column_name_idx(descr['field_name'])
+ for descr in index_columns
+ if descr['kind'] == 'serialized']
return indices
@@ -1200,8 +1202,8 @@ def _mkdir_if_not_exists(fs, path):
assert fs.exists(path)
-def write_to_dataset(table, root_path, partition_cols=None,
- filesystem=None, preserve_index=True, **kwargs):
+def write_to_dataset(table, root_path, partition_cols=None, filesystem=None,
+ **kwargs):
"""
Wrapper around parquet.write_table for writing a Table to
Parquet format by partitions.
@@ -1232,8 +1234,6 @@ def write_to_dataset(table, root_path,
partition_cols=None,
partition_cols : list,
Column names by which to partition the dataset
Columns are partitioned in the order they are given
- preserve_index : bool,
- Parameter for instantiating Table; preserve pandas index or not.
**kwargs : dict, kwargs for write_table function.
"""
fs, root_path = _get_filesystem_and_path(filesystem, root_path)
@@ -1241,7 +1241,7 @@ def write_to_dataset(table, root_path,
partition_cols=None,
_mkdir_if_not_exists(fs, root_path)
if partition_cols is not None and len(partition_cols) > 0:
- df = table.to_pandas()
+ df = table.to_pandas(ignore_metadata=True)
partition_keys = [df[col] for col in partition_cols]
data_df = df.drop(partition_cols, axis='columns')
data_cols = df.columns.drop(partition_cols)
@@ -1249,18 +1249,11 @@ def write_to_dataset(table, root_path,
partition_cols=None,
raise ValueError('No data left to save outside partition columns')
subschema = table.schema
- # ARROW-4538: Remove index column from subschema in write_to_dataframe
- metadata = subschema.metadata
- has_pandas_metadata = (metadata is not None and b'pandas' in metadata)
- index_columns = []
- if has_pandas_metadata:
- pandas_metadata = json.loads(metadata[b'pandas'].decode('utf8'))
- index_columns = pandas_metadata['index_columns']
+
# ARROW-2891: Ensure the output_schema is preserved when writing a
# partitioned dataset
for col in table.schema.names:
- if (col.startswith('__index_level_') or col in partition_cols or
- col in index_columns):
+ if col in partition_cols:
subschema = subschema.remove(subschema.get_field_index(col))
for keys, subgroup in data_df.groupby(partition_keys):
@@ -1269,10 +1262,8 @@ def write_to_dataset(table, root_path,
partition_cols=None,
subdir = '/'.join(
['{colname}={value}'.format(colname=name, value=val)
for name, val in zip(partition_cols, keys)])
- subtable = pa.Table.from_pandas(subgroup,
- preserve_index=preserve_index,
- schema=subschema,
- safe=False)
+ subtable = pa.Table.from_pandas(subgroup, preserve_index=False,
+ schema=subschema, safe=False)
prefix = '/'.join([root_path, subdir])
_mkdir_if_not_exists(fs, prefix)
outfile = guid() + '.parquet'
diff --git a/python/pyarrow/tests/test_convert_pandas.py
b/python/pyarrow/tests/test_convert_pandas.py
index fe5b305..16e2a76 100644
--- a/python/pyarrow/tests/test_convert_pandas.py
+++ b/python/pyarrow/tests/test_convert_pandas.py
@@ -155,6 +155,33 @@ class TestConvertMetadata(object):
df.columns.names = ['a']
_check_pandas_roundtrip(df, preserve_index=True)
+ def test_range_index_shortcut(self):
+ # ARROW-1639
+ index_name = 'foo'
+ df = pd.DataFrame({'a': [1, 2, 3, 4]},
+ index=pd.RangeIndex(0, 8, step=2, name=index_name))
+
+ df2 = pd.DataFrame({'a': [4, 5, 6, 7]},
+ index=pd.RangeIndex(0, 4))
+
+ table = pa.Table.from_pandas(df)
+ table_no_index_name = pa.Table.from_pandas(df2)
+
+ # The RangeIndex is tracked in the metadata only
+ assert len(table.schema) == 1
+
+ result = table.to_pandas()
+ tm.assert_frame_equal(result, df)
+ assert isinstance(result.index, pd.RangeIndex)
+ assert result.index._step == 2
+ assert result.index.name == index_name
+
+ result2 = table_no_index_name.to_pandas()
+ tm.assert_frame_equal(result2, df2)
+ assert isinstance(result2.index, pd.RangeIndex)
+ assert result2.index._step == 1
+ assert result2.index.name is None
+
def test_multiindex_columns(self):
columns = pd.MultiIndex.from_arrays([
['one', 'two'], ['X', 'Y']
@@ -203,9 +230,7 @@ class TestConvertMetadata(object):
columns=['a', None, '__index_level_0__'],
)
t = pa.Table.from_pandas(df, preserve_index=True)
- raw_metadata = t.schema.metadata
-
- js = json.loads(raw_metadata[b'pandas'].decode('utf8'))
+ js = t.schema.pandas_metadata
col1, col2, col3, idx0, foo = js['columns']
@@ -218,11 +243,14 @@ class TestConvertMetadata(object):
assert col3['name'] == '__index_level_0__'
assert col3['name'] == col3['field_name']
- idx0_name, foo_name = js['index_columns']
+ idx0_descr, foo_descr = js['index_columns']
+ assert idx0_descr['kind'] == 'serialized'
+ idx0_name = idx0_descr['field_name']
assert idx0_name == '__index_level_0__'
assert idx0['field_name'] == idx0_name
assert idx0['name'] is None
+ foo_name = foo_descr['field_name']
assert foo_name == 'foo'
assert foo['field_name'] == foo_name
assert foo['name'] == foo_name
@@ -233,8 +261,7 @@ class TestConvertMetadata(object):
columns=pd.Index(list('def'), dtype='category')
)
t = pa.Table.from_pandas(df, preserve_index=True)
- raw_metadata = t.schema.metadata
- js = json.loads(raw_metadata[b'pandas'].decode('utf8'))
+ js = t.schema.pandas_metadata
column_indexes, = js['column_indexes']
assert column_indexes['name'] is None
@@ -251,8 +278,7 @@ class TestConvertMetadata(object):
columns=pd.Index(list('def'), name='stringz')
)
t = pa.Table.from_pandas(df, preserve_index=True)
- raw_metadata = t.schema.metadata
- js = json.loads(raw_metadata[b'pandas'].decode('utf8'))
+ js = t.schema.pandas_metadata
column_indexes, = js['column_indexes']
assert column_indexes['name'] == 'stringz'
@@ -278,8 +304,7 @@ class TestConvertMetadata(object):
)
)
t = pa.Table.from_pandas(df, preserve_index=True)
- raw_metadata = t.schema.metadata
- js = json.loads(raw_metadata[b'pandas'].decode('utf8'))
+ js = t.schema.pandas_metadata
column_indexes, = js['column_indexes']
assert column_indexes['name'] is None
@@ -369,10 +394,8 @@ class TestConvertMetadata(object):
def test_metadata_with_mixed_types(self):
df = pd.DataFrame({'data': [b'some_bytes', u'some_unicode']})
table = pa.Table.from_pandas(df)
- metadata = table.schema.metadata
- assert b'mixed' not in metadata[b'pandas']
-
- js = json.loads(metadata[b'pandas'].decode('utf8'))
+ js = table.schema.pandas_metadata
+ assert 'mixed' not in js
data_column = js['columns'][0]
assert data_column['pandas_type'] == 'bytes'
assert data_column['numpy_type'] == 'object'
@@ -392,10 +415,8 @@ class TestConvertMetadata(object):
df = pd.DataFrame({'data': [[1], [2, 3, 4], [5] * 7]})
schema = pa.schema([pa.field('data', type=pa.list_(pa.int64()))])
table = pa.Table.from_pandas(df, schema=schema)
- metadata = table.schema.metadata
- assert b'mixed' not in metadata[b'pandas']
-
- js = json.loads(metadata[b'pandas'].decode('utf8'))
+ js = table.schema.pandas_metadata
+ assert 'mixed' not in js
data_column = js['columns'][0]
assert data_column['pandas_type'] == 'list[int64]'
assert data_column['numpy_type'] == 'object'
@@ -408,10 +429,8 @@ class TestConvertMetadata(object):
]
})
table = pa.Table.from_pandas(expected)
- metadata = table.schema.metadata
- assert b'mixed' not in metadata[b'pandas']
-
- js = json.loads(metadata[b'pandas'].decode('utf8'))
+ js = table.schema.pandas_metadata
+ assert 'mixed' not in js
data_column = js['columns'][0]
assert data_column['pandas_type'] == 'decimal'
assert data_column['numpy_type'] == 'object'
@@ -419,19 +438,6 @@ class TestConvertMetadata(object):
def test_table_column_subset_metadata(self):
# ARROW-1883
- df = pd.DataFrame({
- 'a': [1, 2, 3],
- 'b': pd.date_range("2017-01-01", periods=3, tz='Europe/Brussels')})
- table = pa.Table.from_pandas(df)
-
- table_subset = table.remove_column(1)
- result = table_subset.to_pandas()
- tm.assert_frame_equal(result, df[['a']])
-
- table_subset2 = table_subset.remove_column(1)
- result = table_subset2.to_pandas()
- tm.assert_frame_equal(result, df[['a']])
-
# non-default index
for index in [
pd.Index(['a', 'b', 'c'], name='index'),
@@ -467,7 +473,7 @@ class TestConvertMetadata(object):
# type of empty lists
df = tbl.to_pandas()
tbl2 = pa.Table.from_pandas(df, preserve_index=True)
- md2 = json.loads(tbl2.schema.metadata[b'pandas'].decode('utf8'))
+ md2 = tbl2.schema.pandas_metadata
# Second roundtrip
df2 = tbl2.to_pandas()
@@ -489,13 +495,6 @@ class TestConvertMetadata(object):
'metadata': None,
'numpy_type': 'object',
'pandas_type': 'list[empty]',
- },
- {
- 'name': None,
- 'field_name': '__index_level_0__',
- 'metadata': None,
- 'numpy_type': 'int64',
- 'pandas_type': 'int64',
}
]
@@ -2529,3 +2528,176 @@ def
test_table_from_pandas_columns_and_schema_are_mutually_exclusive():
with pytest.raises(ValueError):
pa.Table.from_pandas(df, schema=schema, columns=columns)
+
+
+# ----------------------------------------------------------------------
+# Legacy metadata compatibility tests
+
+
+def test_range_index_pre_0_12():
+ # Forward compatibility for metadata created from pandas.RangeIndex
+ # prior to pyarrow 0.13.0
+ a_values = [u'foo', u'bar', None, u'baz']
+ b_values = [u'a', u'a', u'b', u'b']
+ a_arrow = pa.array(a_values, type='utf8')
+ b_arrow = pa.array(b_values, type='utf8')
+
+ rng_index_arrow = pa.array([0, 2, 4, 6], type='int64')
+
+ gen_name_0 = '__index_level_0__'
+ gen_name_1 = '__index_level_1__'
+
+ # Case 1: named RangeIndex
+ e1 = pd.DataFrame({
+ 'a': a_values
+ }, index=pd.RangeIndex(0, 8, step=2, name='qux'))
+ t1 = pa.Table.from_arrays([a_arrow, rng_index_arrow],
+ names=['a', 'qux'])
+ t1 = t1.replace_schema_metadata({
+ b'pandas': json.dumps(
+ {'index_columns': ['qux'],
+ 'column_indexes': [{'name': None,
+ 'field_name': None,
+ 'pandas_type': 'unicode',
+ 'numpy_type': 'object',
+ 'metadata': {'encoding': 'UTF-8'}}],
+ 'columns': [{'name': 'a',
+ 'field_name': 'a',
+ 'pandas_type': 'unicode',
+ 'numpy_type': 'object',
+ 'metadata': None},
+ {'name': 'qux',
+ 'field_name': 'qux',
+ 'pandas_type': 'int64',
+ 'numpy_type': 'int64',
+ 'metadata': None}],
+ 'pandas_version': '0.23.4'}
+ )})
+ r1 = t1.to_pandas()
+ tm.assert_frame_equal(r1, e1)
+
+ # Case 2: named RangeIndex, but conflicts with an actual column
+ e2 = pd.DataFrame({
+ 'qux': a_values
+ }, index=pd.RangeIndex(0, 8, step=2, name='qux'))
+ t2 = pa.Table.from_arrays([a_arrow, rng_index_arrow],
+ names=['qux', gen_name_0])
+ t2 = t2.replace_schema_metadata({
+ b'pandas': json.dumps(
+ {'index_columns': [gen_name_0],
+ 'column_indexes': [{'name': None,
+ 'field_name': None,
+ 'pandas_type': 'unicode',
+ 'numpy_type': 'object',
+ 'metadata': {'encoding': 'UTF-8'}}],
+ 'columns': [{'name': 'a',
+ 'field_name': 'a',
+ 'pandas_type': 'unicode',
+ 'numpy_type': 'object',
+ 'metadata': None},
+ {'name': 'qux',
+ 'field_name': gen_name_0,
+ 'pandas_type': 'int64',
+ 'numpy_type': 'int64',
+ 'metadata': None}],
+ 'pandas_version': '0.23.4'}
+ )})
+ r2 = t2.to_pandas()
+ tm.assert_frame_equal(r2, e2)
+
+ # Case 3: unnamed RangeIndex
+ e3 = pd.DataFrame({
+ 'a': a_values
+ }, index=pd.RangeIndex(0, 8, step=2, name=None))
+ t3 = pa.Table.from_arrays([a_arrow, rng_index_arrow],
+ names=['a', gen_name_0])
+ t3 = t3.replace_schema_metadata({
+ b'pandas': json.dumps(
+ {'index_columns': [gen_name_0],
+ 'column_indexes': [{'name': None,
+ 'field_name': None,
+ 'pandas_type': 'unicode',
+ 'numpy_type': 'object',
+ 'metadata': {'encoding': 'UTF-8'}}],
+ 'columns': [{'name': 'a',
+ 'field_name': 'a',
+ 'pandas_type': 'unicode',
+ 'numpy_type': 'object',
+ 'metadata': None},
+ {'name': None,
+ 'field_name': gen_name_0,
+ 'pandas_type': 'int64',
+ 'numpy_type': 'int64',
+ 'metadata': None}],
+ 'pandas_version': '0.23.4'}
+ )})
+ r3 = t3.to_pandas()
+ tm.assert_frame_equal(r3, e3)
+
+ # Case 4: MultiIndex with named RangeIndex
+ e4 = pd.DataFrame({
+ 'a': a_values
+ }, index=[pd.RangeIndex(0, 8, step=2, name='qux'), b_values])
+ t4 = pa.Table.from_arrays([a_arrow, rng_index_arrow, b_arrow],
+ names=['a', 'qux', gen_name_1])
+ t4 = t4.replace_schema_metadata({
+ b'pandas': json.dumps(
+ {'index_columns': ['qux', gen_name_1],
+ 'column_indexes': [{'name': None,
+ 'field_name': None,
+ 'pandas_type': 'unicode',
+ 'numpy_type': 'object',
+ 'metadata': {'encoding': 'UTF-8'}}],
+ 'columns': [{'name': 'a',
+ 'field_name': 'a',
+ 'pandas_type': 'unicode',
+ 'numpy_type': 'object',
+ 'metadata': None},
+ {'name': 'qux',
+ 'field_name': 'qux',
+ 'pandas_type': 'int64',
+ 'numpy_type': 'int64',
+ 'metadata': None},
+ {'name': None,
+ 'field_name': gen_name_1,
+ 'pandas_type': 'unicode',
+ 'numpy_type': 'object',
+ 'metadata': None}],
+ 'pandas_version': '0.23.4'}
+ )})
+ r4 = t4.to_pandas()
+ tm.assert_frame_equal(r4, e4)
+
+ # Case 4: MultiIndex with unnamed RangeIndex
+ e5 = pd.DataFrame({
+ 'a': a_values
+ }, index=[pd.RangeIndex(0, 8, step=2, name=None), b_values])
+ t5 = pa.Table.from_arrays([a_arrow, rng_index_arrow, b_arrow],
+ names=['a', gen_name_0, gen_name_1])
+ t5 = t5.replace_schema_metadata({
+ b'pandas': json.dumps(
+ {'index_columns': [gen_name_0, gen_name_1],
+ 'column_indexes': [{'name': None,
+ 'field_name': None,
+ 'pandas_type': 'unicode',
+ 'numpy_type': 'object',
+ 'metadata': {'encoding': 'UTF-8'}}],
+ 'columns': [{'name': 'a',
+ 'field_name': 'a',
+ 'pandas_type': 'unicode',
+ 'numpy_type': 'object',
+ 'metadata': None},
+ {'name': None,
+ 'field_name': gen_name_0,
+ 'pandas_type': 'int64',
+ 'numpy_type': 'int64',
+ 'metadata': None},
+ {'name': None,
+ 'field_name': gen_name_1,
+ 'pandas_type': 'unicode',
+ 'numpy_type': 'object',
+ 'metadata': None}],
+ 'pandas_version': '0.23.4'}
+ )})
+ r5 = t5.to_pandas()
+ tm.assert_frame_equal(r5, e5)
diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py
index 67a91b9..7c0bb2f 100644
--- a/python/pyarrow/tests/test_ipc.py
+++ b/python/pyarrow/tests/test_ipc.py
@@ -173,7 +173,7 @@ def test_file_read_pandas(file_fixture):
reader = pa.ipc.open_file(file_contents)
result = reader.read_pandas()
- expected = pd.concat(frames)
+ expected = pd.concat(frames).reset_index(drop=True)
assert_frame_equal(result, expected)
@@ -314,7 +314,7 @@ def test_stream_read_pandas(stream_fixture):
reader = pa.ipc.open_stream(file_contents)
result = reader.read_pandas()
- expected = pd.concat(frames)
+ expected = pd.concat(frames).reset_index(drop=True)
assert_frame_equal(result, expected)
diff --git a/python/pyarrow/tests/test_parquet.py
b/python/pyarrow/tests/test_parquet.py
index 77b9ead..ccc01f4 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -153,12 +153,12 @@ def test_pandas_parquet_2_0_rountrip(tempdir, chunk_size):
filename = tempdir / 'pandas_rountrip.parquet'
arrow_table = pa.Table.from_pandas(df)
- assert b'pandas' in arrow_table.schema.metadata
+ assert arrow_table.schema.pandas_metadata is not None
_write_table(arrow_table, filename, version="2.0",
coerce_timestamps='ms', chunk_size=chunk_size)
table_read = pq.read_pandas(filename)
- assert b'pandas' in table_read.schema.metadata
+ assert table_read.schema.pandas_metadata is not None
assert arrow_table.schema.metadata == table_read.schema.metadata
@@ -280,7 +280,10 @@ def test_pandas_parquet_custom_metadata(tempdir):
assert b'pandas' in metadata
js = json.loads(metadata[b'pandas'].decode('utf8'))
- assert js['index_columns'] == ['__index_level_0__']
+ assert js['index_columns'] == [{'kind': 'range',
+ 'name': None,
+ 'start': 0, 'stop': 10000,
+ 'step': 1}]
def test_pandas_parquet_column_multiindex(tempdir):
@@ -292,7 +295,7 @@ def test_pandas_parquet_column_multiindex(tempdir):
filename = tempdir / 'pandas_rountrip.parquet'
arrow_table = pa.Table.from_pandas(df)
- assert b'pandas' in arrow_table.schema.metadata
+ assert arrow_table.schema.pandas_metadata is not None
_write_table(arrow_table, filename, version='2.0', coerce_timestamps='ms')
@@ -306,7 +309,7 @@ def
test_pandas_parquet_2_0_rountrip_read_pandas_no_index_written(tempdir):
filename = tempdir / 'pandas_rountrip.parquet'
arrow_table = pa.Table.from_pandas(df, preserve_index=False)
- js = json.loads(arrow_table.schema.metadata[b'pandas'].decode('utf8'))
+ js = arrow_table.schema.pandas_metadata
assert not js['index_columns']
# ARROW-2170
# While index_columns should be empty, columns needs to be filled still.
@@ -315,7 +318,7 @@ def
test_pandas_parquet_2_0_rountrip_read_pandas_no_index_written(tempdir):
_write_table(arrow_table, filename, version='2.0', coerce_timestamps='ms')
table_read = pq.read_pandas(filename)
- js = json.loads(table_read.schema.metadata[b'pandas'].decode('utf8'))
+ js = table_read.schema.pandas_metadata
assert not js['index_columns']
assert arrow_table.schema.metadata == table_read.schema.metadata
@@ -561,6 +564,7 @@ def make_sample_file(table_or_df):
def test_parquet_metadata_api():
df = alltypes_sample(size=10000)
df = df.reindex(columns=sorted(df.columns))
+ df.index = np.random.randint(0, 1000000, size=len(df))
fileh = make_sample_file(df)
ncols = len(df.columns)
@@ -1909,13 +1913,10 @@ def _test_write_to_dataset_with_partitions(base_path,
'nan': [pd.np.nan] * 10,
'date': np.arange('2017-01-01', '2017-01-11',
dtype='datetime64[D]')})
- # ARROW-4538
- output_df.index.name = index_name
-
cols = output_df.columns.tolist()
partition_by = ['group1', 'group2']
output_table = pa.Table.from_pandas(output_df, schema=schema, safe=False,
- preserve_index=True)
+ preserve_index=False)
pq.write_to_dataset(output_table, base_path, partition_by,
filesystem=filesystem)
@@ -1936,10 +1937,6 @@ def _test_write_to_dataset_with_partitions(base_path,
dataset_cols = set(dataset.schema.to_arrow_schema().names)
assert dataset_cols == set(output_table.schema.names)
- # ARROW-4538
- if index_name is not None:
- assert index_name in dataset_cols
-
input_table = dataset.read()
input_df = input_table.to_pandas()
diff --git a/python/pyarrow/tests/test_table.py
b/python/pyarrow/tests/test_table.py
index afdfd42..073a7c1 100644
--- a/python/pyarrow/tests/test_table.py
+++ b/python/pyarrow/tests/test_table.py
@@ -493,7 +493,7 @@ def test_recordbatchlist_to_pandas():
table = pa.Table.from_batches([batch1, batch2])
result = table.to_pandas()
- data = pd.concat([data1, data2])
+ data = pd.concat([data1, data2]).reset_index(drop=True)
assert_frame_equal(data, result)
diff --git a/python/pyarrow/types.pxi b/python/pyarrow/types.pxi
index 1de4fac..db71292 100644
--- a/python/pyarrow/types.pxi
+++ b/python/pyarrow/types.pxi
@@ -629,6 +629,19 @@ cdef class Schema:
return hash((tuple(self), self.metadata))
@property
+ def pandas_metadata(self):
+ """
+ Return deserialized-from-JSON pandas metadata field (if it exists)
+ """
+ metadata = self.metadata
+ key = b'pandas'
+ if metadata is None or key not in metadata:
+ return None
+
+ import json
+ return json.loads(metadata[key].decode('utf8'))
+
+ @property
def names(self):
"""
The schema's field names.