This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 86f480a  ARROW-1639: [Python] Serialize RangeIndex as metadata via 
Table.from_pandas instead of converting to a column of integers
86f480a is described below

commit 86f480a2a246f0c885031aff8d21f68640dbd72a
Author: Wes McKinney <[email protected]>
AuthorDate: Wed Mar 13 12:17:08 2019 -0500

    ARROW-1639: [Python] Serialize RangeIndex as metadata via Table.from_pandas 
instead of converting to a column of integers
    
    This ended up being much more difficult than anticipated due to the 
spaghetti-like state (as the result of many hacks) of pyarrow/pandas_compat.py.
    
    This is partly a performance and memory use optimization. It has 
consequences, though, namely tables will have some index data discarded when 
concatenated from multiple pandas DataFrame objects that were converted to 
Arrow. I think this is OK, though, since the preservation of pandas indexes is 
generally something that's handled at the granularity of a single DataFrame. 
One always has the option of calling `reset_index` to convert a RangeIndex if 
that's what is desired.
    
    This patch also implements proposed extensions to the serialized pandas 
metadata to accommodate indexes-as-columns vs. indexes-represented-as-metadata, 
as described in
    
    https://github.com/pandas-dev/pandas/issues/25672
    
    Author: Wes McKinney <[email protected]>
    
    Closes #3868 from wesm/ARROW-1639 and squashes the following commits:
    
    ec929aebd <Wes McKinney> Add pandas_metadata attribute to pyarrow.Schema to 
make interactions simpler
    670dc6f70 <Wes McKinney> Add compatibility tests for pre-0.13 metadata. Add 
Arrow version to pandas metadata
    0ca1bfc58 <Wes McKinney> Add benchmark
    9ba413106 <Wes McKinney> Serialize RangeIndex as metadata via 
Table.from_pandas instead of converting to data column. This affects 
serialize_pandas and writing to Parquet format
---
 python/benchmarks/convert_pandas.py         |  15 +
 python/pyarrow/pandas_compat.py             | 436 ++++++++++++++++++----------
 python/pyarrow/parquet.py                   |  29 +-
 python/pyarrow/tests/test_convert_pandas.py | 258 +++++++++++++---
 python/pyarrow/tests/test_ipc.py            |   4 +-
 python/pyarrow/tests/test_parquet.py        |  25 +-
 python/pyarrow/tests/test_table.py          |   2 +-
 python/pyarrow/types.pxi                    |  13 +
 8 files changed, 545 insertions(+), 237 deletions(-)

diff --git a/python/benchmarks/convert_pandas.py 
b/python/benchmarks/convert_pandas.py
index bb8d710..57a0518 100644
--- a/python/benchmarks/convert_pandas.py
+++ b/python/benchmarks/convert_pandas.py
@@ -90,3 +90,18 @@ class ZeroCopyPandasRead(object):
 
     def time_deserialize_from_components(self):
         pa.deserialize_components(self.as_components)
+
+
+class SerializeDeserializePandas(object):
+
+    def setup(self):
+        # 10 million length
+        n = 10000000
+        self.df = pd.DataFrame({'data': np.random.randn(n)})
+        self.serialized = pa.serialize_pandas(self.df)
+
+    def time_serialize_pandas(self):
+        pa.serialize_pandas(self.df)
+
+    def time_deserialize_pandas(self):
+        pa.deserialize_pandas(self.serialized)
diff --git a/python/pyarrow/pandas_compat.py b/python/pyarrow/pandas_compat.py
index 86e826f..10038de 100644
--- a/python/pyarrow/pandas_compat.py
+++ b/python/pyarrow/pandas_compat.py
@@ -28,7 +28,8 @@ import pandas as pd
 import six
 
 import pyarrow as pa
-from pyarrow.compat import builtin_pickle, DatetimeTZDtype, PY2, zip_longest  
# noqa
+from pyarrow.compat import (builtin_pickle, DatetimeTZDtype,  # noqa
+                            PY2, zip_longest)
 
 
 def infer_dtype(column):
@@ -185,7 +186,7 @@ def get_column_metadata(column, name, arrow_type, 
field_name):
     }
 
 
-def construct_metadata(df, column_names, index_levels, index_column_names,
+def construct_metadata(df, column_names, index_levels, index_descriptors,
                        preserve_index, types):
     """Returns a dictionary containing enough metadata to reconstruct a pandas
     DataFrame as an Arrow Table, including index columns.
@@ -194,73 +195,80 @@ def construct_metadata(df, column_names, index_levels, 
index_column_names,
     ----------
     df : pandas.DataFrame
     index_levels : List[pd.Index]
-    presere_index : bool
+    index_descriptors : List[Dict]
+    preserve_index : bool
     types : List[pyarrow.DataType]
 
     Returns
     -------
     dict
     """
+    num_serialized_index_levels = len([descr for descr in index_descriptors
+                                       if descr['kind'] == 'serialized'])
     # Use ntypes instead of Python shorthand notation [:-len(x)] as [:-0]
     # behaves differently to what we want.
     ntypes = len(types)
-    df_types = types[:ntypes - len(index_levels)]
-    index_types = types[ntypes - len(index_levels):]
-
-    column_metadata = [
-        get_column_metadata(
-            df[col_name],
-            name=sanitized_name,
-            arrow_type=arrow_type,
-            field_name=sanitized_name
-        ) for col_name, sanitized_name, arrow_type in zip(
-            df.columns, column_names, df_types
-        )
-    ]
-
+    df_types = types[:ntypes - num_serialized_index_levels]
+    index_types = types[ntypes - num_serialized_index_levels:]
+
+    column_metadata = []
+    for col_name, sanitized_name, arrow_type in zip(df.columns, column_names,
+                                                    df_types):
+        metadata = get_column_metadata(df[col_name], name=sanitized_name,
+                                       arrow_type=arrow_type,
+                                       field_name=sanitized_name)
+        column_metadata.append(metadata)
+
+    index_column_metadata = []
     if preserve_index:
-        index_column_metadata = [
-            get_column_metadata(
-                level,
-                name=level.name,
-                arrow_type=arrow_type,
-                field_name=field_name,
-            ) for i, (level, arrow_type, field_name) in enumerate(
-                zip(index_levels, index_types, index_column_names)
-            )
-        ]
+        for level, arrow_type, descriptor in zip(index_levels, index_types,
+                                                 index_descriptors):
+            if descriptor['kind'] != 'serialized':
+                # The index is represented in a non-serialized fashion,
+                # e.g. RangeIndex
+                continue
+            metadata = get_column_metadata(level, name=level.name,
+                                           arrow_type=arrow_type,
+                                           field_name=descriptor['field_name'])
+            index_column_metadata.append(metadata)
 
         column_indexes = []
 
         for level in getattr(df.columns, 'levels', [df.columns]):
-            string_dtype, extra_metadata = get_extension_dtype_info(level)
-
-            pandas_type = get_logical_type_from_numpy(level)
-            if pandas_type == 'unicode':
-                assert not extra_metadata
-                extra_metadata = {'encoding': 'UTF-8'}
-
-            column_index = {
-                'name': level.name,
-                'field_name': level.name,
-                'pandas_type': pandas_type,
-                'numpy_type': string_dtype,
-                'metadata': extra_metadata,
-            }
-            column_indexes.append(column_index)
+            metadata = _get_simple_index_descriptor(level)
+            column_indexes.append(metadata)
     else:
-        index_column_names = index_column_metadata = column_indexes = []
+        index_descriptors = index_column_metadata = column_indexes = []
 
     return {
         b'pandas': json.dumps({
-            'index_columns': index_column_names,
+            'index_columns': index_descriptors,
             'column_indexes': column_indexes,
             'columns': column_metadata + index_column_metadata,
+            'creator': {
+                'library': 'pyarrow',
+                'version': pa.__version__
+            },
             'pandas_version': pd.__version__
         }).encode('utf8')
     }
 
 
+def _get_simple_index_descriptor(level):
+    string_dtype, extra_metadata = get_extension_dtype_info(level)
+    pandas_type = get_logical_type_from_numpy(level)
+    if pandas_type == 'unicode':
+        assert not extra_metadata
+        extra_metadata = {'encoding': 'UTF-8'}
+    return {
+        'name': level.name,
+        'field_name': level.name,
+        'pandas_type': pandas_type,
+        'numpy_type': string_dtype,
+        'metadata': extra_metadata,
+    }
+
+
 def _column_name_to_strings(name):
     """Convert a column name (or level) to either a string or a recursive
     collection of strings.
@@ -320,26 +328,12 @@ def _index_level_name(index, i, column_names):
 
 
 def _get_columns_to_convert(df, schema, preserve_index, columns):
-    if schema is not None and columns is not None:
-        raise ValueError('Schema and columns arguments are mutually '
-                         'exclusive, pass only one of them')
-    elif schema is not None:
-        columns = schema.names
-    elif columns is not None:
-        # columns is only for filtering, the function must keep the column
-        # ordering of either the dataframe or the passed schema
-        columns = [c for c in df.columns if c in columns]
-    else:
-        columns = df.columns
+    columns = _resolve_columns_of_interest(df, schema, columns)
 
     column_names = []
-    index_columns = []
-    index_column_names = []
     type = None
 
-    if preserve_index:
-        n = len(getattr(df.index, 'levels', [df.index]))
-        index_columns.extend(df.index.get_level_values(i) for i in range(n))
+    index_levels = _get_index_level_values(df.index) if preserve_index else []
 
     columns_to_convert = []
     convert_types = []
@@ -361,23 +355,78 @@ def _get_columns_to_convert(df, schema, preserve_index, 
columns):
         convert_types.append(type)
         column_names.append(name)
 
-    for i, column in enumerate(index_columns):
-        columns_to_convert.append(column)
-        convert_types.append(None)
-        name = _index_level_name(column, i, column_names)
-        index_column_names.append(name)
+    index_descriptors = []
+    index_column_names = []
+    for i, index_level in enumerate(index_levels):
+        name = _index_level_name(index_level, i, column_names)
+        if isinstance(index_level, pd.RangeIndex):
+            descr = _get_range_index_descriptor(index_level)
+        else:
+            columns_to_convert.append(index_level)
+            convert_types.append(None)
+            descr = {
+                'kind': 'serialized',
+                'field_name': name
+            }
+            index_column_names.append(name)
+        index_descriptors.append(descr)
+
+    all_names = column_names + index_column_names
+
+    # all_names : all of the columns in the resulting table including the data
+    # columns and serialized index columns
+    # column_names : the names of the data columns
+    # index_descriptors : descriptions of each index to be used for
+    # reconstruction
+    # index_levels : the extracted index level values
+    # columns_to_convert : assembled raw data (both data columns and indexes)
+    # to be converted to Arrow format
+    # columns_types : specified column types to use for coercion / casting
+    # during serialization, if a Schema was provided
+    return (all_names, column_names, index_descriptors, index_levels,
+            columns_to_convert, convert_types)
 
-    names = column_names + index_column_names
 
-    return (names, column_names, index_columns, index_column_names,
-            columns_to_convert, convert_types)
+def _get_range_index_descriptor(level):
+    # TODO(wesm): Why are these non-public and is there a more public way to
+    # get them?
+    return {
+        'kind': 'range',
+        'name': level.name,
+        'start': level._start,
+        'stop': level._stop,
+        'step': level._step
+    }
+
+
+def _get_index_level_values(index):
+    n = len(getattr(index, 'levels', [index]))
+    return [index.get_level_values(i) for i in range(n)]
+
+
+def _resolve_columns_of_interest(df, schema, columns):
+    if schema is not None and columns is not None:
+        raise ValueError('Schema and columns arguments are mutually '
+                         'exclusive, pass only one of them')
+    elif schema is not None:
+        columns = schema.names
+    elif columns is not None:
+        # columns is only for filtering, the function must keep the column
+        # ordering of either the dataframe or the passed schema
+        columns = [c for c in df.columns if c in columns]
+    else:
+        columns = df.columns
+
+    return columns
 
 
 def dataframe_to_types(df, preserve_index, columns=None):
-    names, column_names, index_columns, index_column_names, \
-        columns_to_convert, _ = _get_columns_to_convert(
-            df, None, preserve_index, columns
-        )
+    (all_names,
+     column_names,
+     index_descriptors,
+     index_columns,
+     columns_to_convert,
+     _) = _get_columns_to_convert(df, None, preserve_index, columns)
 
     types = []
     # If pandas knows type, skip conversion
@@ -393,17 +442,20 @@ def dataframe_to_types(df, preserve_index, columns=None):
         types.append(type_)
 
     metadata = construct_metadata(df, column_names, index_columns,
-                                  index_column_names, preserve_index, types)
+                                  index_descriptors, preserve_index, types)
 
-    return names, types, metadata
+    return all_names, types, metadata
 
 
 def dataframe_to_arrays(df, schema, preserve_index, nthreads=1, columns=None,
                         safe=True):
-    names, column_names, index_columns, index_column_names, \
-        columns_to_convert, convert_types = _get_columns_to_convert(
-            df, schema, preserve_index, columns
-        )
+    (all_names,
+     column_names,
+     index_descriptors,
+     index_columns,
+     columns_to_convert,
+     convert_types) = _get_columns_to_convert(df, schema, preserve_index,
+                                              columns)
 
     # NOTE(wesm): If nthreads=None, then we use a heuristic to decide whether
     # using a thread pool is worth it. Currently the heuristic is whether the
@@ -438,12 +490,10 @@ def dataframe_to_arrays(df, schema, preserve_index, 
nthreads=1, columns=None,
 
     types = [x.type for x in arrays]
 
-    metadata = construct_metadata(
-        df, column_names, index_columns, index_column_names, preserve_index,
-        types
-    )
-
-    return names, arrays, metadata
+    metadata = construct_metadata(df, column_names, index_columns,
+                                  index_descriptors, preserve_index,
+                                  types)
+    return all_names, arrays, metadata
 
 
 def get_datetimetz_type(values, dtype, type_):
@@ -551,95 +601,46 @@ def _make_datetimetz(tz):
 
 def table_to_blockmanager(options, table, categories=None,
                           ignore_metadata=False):
-
-    index_columns = []
-    columns = []
+    all_columns = []
     column_indexes = []
-    index_arrays = []
-    index_names = []
-    schema = table.schema
-    row_count = table.num_rows
-    metadata = schema.metadata
-
-    has_pandas_metadata = (not ignore_metadata and metadata is not None
-                           and b'pandas' in metadata)
+    pandas_metadata = table.schema.pandas_metadata
 
-    if has_pandas_metadata:
-        pandas_metadata = json.loads(metadata[b'pandas'].decode('utf8'))
-        index_columns = pandas_metadata['index_columns']
-        columns = pandas_metadata['columns']
+    if not ignore_metadata and pandas_metadata is not None:
+        all_columns = pandas_metadata['columns']
         column_indexes = pandas_metadata.get('column_indexes', [])
+        index_descriptors = pandas_metadata['index_columns']
         table = _add_any_metadata(table, pandas_metadata)
+        table, index = _reconstruct_index(table, index_descriptors,
+                                          all_columns)
+    else:
+        index = pd.RangeIndex(table.num_rows)
 
-    block_table = table
-
-    index_columns_set = frozenset(index_columns)
+    _check_data_column_metadata_consistency(all_columns)
+    blocks = _table_to_blocks(options, table, pa.default_memory_pool(),
+                              categories)
+    columns = _deserialize_column_index(table, all_columns, column_indexes)
 
-    # 0. 'field_name' is the name of the column in the arrow Table
-    # 1. 'name' is the user-facing name of the column, that is, it came from
-    #    pandas
-    # 2. 'field_name' and 'name' differ for index columns
-    # 3. We fall back on c['name'] for backwards compatibility
-    logical_index_names = [
-        c['name'] for c in columns
-        if c.get('field_name', c['name']) in index_columns_set
-    ]
+    axes = [columns, index]
+    return _int.BlockManager(blocks, axes)
 
-    # There must be the same number of field names and physical names
-    # (fields in the arrow Table)
-    assert len(logical_index_names) == len(index_columns_set)
 
+def _check_data_column_metadata_consistency(all_columns):
     # It can never be the case in a released version of pyarrow that
     # c['name'] is None *and* 'field_name' is not a key in the column metadata,
     # because the change to allow c['name'] to be None and the change to add
     # 'field_name' are in the same release (0.8.0)
     assert all(
         (c['name'] is None and 'field_name' in c) or c['name'] is not None
-        for c in columns
+        for c in all_columns
     )
 
-    # Build up a list of index columns and names while removing those columns
-    # from the original table
-    for raw_name, logical_name in zip(index_columns, logical_index_names):
-        i = schema.get_field_index(raw_name)
-        if i != -1:
-            col = table.column(i)
-            col_pandas = col.to_pandas()
-            values = col_pandas.values
-            if hasattr(values, 'flags') and not values.flags.writeable:
-                # ARROW-1054: in pandas 0.19.2, factorize will reject
-                # non-writeable arrays when calling MultiIndex.from_arrays
-                values = values.copy()
-
-            if isinstance(col_pandas.dtype, DatetimeTZDtype):
-                index_array = (pd.Series(values).dt.tz_localize('utc')
-                               .dt.tz_convert(col_pandas.dtype.tz))
-            else:
-                index_array = pd.Series(values, dtype=col_pandas.dtype)
-            index_arrays.append(index_array)
-            index_names.append(
-                _backwards_compatible_index_name(raw_name, logical_name)
-            )
-            block_table = block_table.remove_column(
-                block_table.schema.get_field_index(raw_name)
-            )
-
-    blocks = _table_to_blocks(options, block_table, pa.default_memory_pool(),
-                              categories)
-
-    # Construct the row index
-    if len(index_arrays) > 1:
-        index = pd.MultiIndex.from_arrays(index_arrays, names=index_names)
-    elif len(index_arrays) == 1:
-        index = pd.Index(index_arrays[0], name=index_names[0])
-    else:
-        index = pd.RangeIndex(row_count)
 
+def _deserialize_column_index(block_table, all_columns, column_indexes):
     column_strings = [x.name for x in block_table.itercolumns()]
-    if columns:
+    if all_columns:
         columns_name_dict = {
             c.get('field_name', _column_name_to_strings(c['name'])): c['name']
-            for c in columns
+            for c in all_columns
         }
         columns_values = [
             columns_name_dict.get(name, name) for name in column_strings
@@ -664,14 +665,129 @@ def table_to_blockmanager(options, table, 
categories=None,
         )
 
     # if we're reconstructing the index
-    if has_pandas_metadata:
+    if len(column_indexes) > 0:
         columns = _reconstruct_columns_from_metadata(columns, column_indexes)
 
     # ARROW-1751: flatten a single level column MultiIndex for pandas 0.21.0
     columns = _flatten_single_level_multiindex(columns)
 
-    axes = [columns, index]
-    return _int.BlockManager(blocks, axes)
+    return columns
+
+
+def _sanitize_old_index_descr(descr):
+    if not isinstance(descr, dict):
+        # version < 0.13.0
+        return {
+            'kind': 'serialized',
+            'field_name': descr
+        }
+    else:
+        return descr
+
+
+def _reconstruct_index(table, index_descriptors, all_columns):
+    # 0. 'field_name' is the name of the column in the arrow Table
+    # 1. 'name' is the user-facing name of the column, that is, it came from
+    #    pandas
+    # 2. 'field_name' and 'name' differ for index columns
+    # 3. We fall back on c['name'] for backwards compatibility
+    field_name_to_metadata = {
+        c.get('field_name', c['name']): c
+        for c in all_columns
+    }
+
+    # Build up a list of index columns and names while removing those columns
+    # from the original table
+    index_arrays = []
+    index_names = []
+    result_table = table
+    for descr in index_descriptors:
+        descr = _sanitize_old_index_descr(descr)
+        if descr['kind'] == 'serialized':
+            result_table, index_level, index_name = _extract_index_level(
+                table, result_table, descr, field_name_to_metadata)
+            if index_level is None:
+                # ARROW-1883: the serialized index column was not found
+                continue
+        elif descr['kind'] == 'range':
+            index_name = descr['name']
+            index_level = pd.RangeIndex(descr['start'], descr['stop'],
+                                        step=descr['step'], name=index_name)
+            if len(index_level) != len(table):
+                # Possibly the result of munged metadata
+                continue
+        else:
+            raise ValueError("Unrecognized index kind: {0}"
+                             .format(descr['kind']))
+        index_arrays.append(index_level)
+        index_names.append(index_name)
+
+    # Reconstruct the row index
+    if len(index_arrays) > 1:
+        index = pd.MultiIndex.from_arrays(index_arrays, names=index_names)
+    elif len(index_arrays) == 1:
+        index = index_arrays[0]
+        if not isinstance(index, pd.Index):
+            # Box anything that wasn't boxed above
+            index = pd.Index(index, name=index_names[0])
+    else:
+        index = pd.RangeIndex(table.num_rows)
+
+    return result_table, index
+
+
+def _extract_index_level(table, result_table, descr,
+                         field_name_to_metadata):
+    field_name = descr['field_name']
+    logical_name = field_name_to_metadata[field_name]['name']
+    index_name = _backwards_compatible_index_name(field_name, logical_name)
+    i = table.schema.get_field_index(field_name)
+
+    if i == -1:
+        # The serialized index column was removed by the user
+        return table, None, None
+
+    col = table.column(i)
+    col_pandas = col.to_pandas()
+    values = col_pandas.values
+    if hasattr(values, 'flags') and not values.flags.writeable:
+        # ARROW-1054: in pandas 0.19.2, factorize will reject
+        # non-writeable arrays when calling MultiIndex.from_arrays
+        values = values.copy()
+
+    if isinstance(col_pandas.dtype, DatetimeTZDtype):
+        index_level = (pd.Series(values).dt.tz_localize('utc')
+                       .dt.tz_convert(col_pandas.dtype.tz))
+    else:
+        index_level = pd.Series(values, dtype=col_pandas.dtype)
+    result_table = result_table.remove_column(
+        result_table.schema.get_field_index(field_name)
+    )
+    return result_table, index_level, index_name
+
+
+def _get_serialized_index_names(index_descriptors, all_columns):
+    serialized_index_names = []
+    for descr in index_descriptors:
+        if not isinstance(descr, dict):
+            # version < 0.13.0
+            serialized_index_names.append(descr)
+        elif descr['kind'] != 'serialized':
+            continue
+
+        serialized_index_names.append(descr['field_name'])
+
+    index_columns_set = frozenset(serialized_index_names)
+
+    logical_index_names = [
+        c['name'] for c in all_columns
+        if c.get('field_name', c['name']) in index_columns_set
+    ]
+
+    # There must be the same number of field names and physical names
+    # (fields in the arrow Table)
+    assert len(logical_index_names) == len(index_columns_set)
+    return logical_index_names
 
 
 def _backwards_compatible_index_name(raw_name, logical_name):
@@ -692,13 +808,17 @@ def _backwards_compatible_index_name(raw_name, 
logical_name):
     * Part of :func:`~pyarrow.pandas_compat.table_to_blockmanager`
     """
     # Part of table_to_blockmanager
-    pattern = r'^__index_level_\d+__$'
-    if raw_name == logical_name and re.match(pattern, raw_name) is not None:
+    if raw_name == logical_name and _is_generated_index_name(raw_name):
         return None
     else:
         return logical_name
 
 
+def _is_generated_index_name(name):
+    pattern = r'^__index_level_\d+__$'
+    return re.match(pattern, name) is not None
+
+
 _pandas_logical_type_map = {
     'date': 'datetime64[D]',
     'unicode': np.unicode_,
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index fe602bb..a149bcc 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -257,7 +257,9 @@ class ParquetFile(object):
                 index_columns = []
 
             if indices is not None and index_columns:
-                indices += map(self.reader.column_name_idx, index_columns)
+                indices += [self.reader.column_name_idx(descr['field_name'])
+                            for descr in index_columns
+                            if descr['kind'] == 'serialized']
 
         return indices
 
@@ -1200,8 +1202,8 @@ def _mkdir_if_not_exists(fs, path):
             assert fs.exists(path)
 
 
-def write_to_dataset(table, root_path, partition_cols=None,
-                     filesystem=None, preserve_index=True, **kwargs):
+def write_to_dataset(table, root_path, partition_cols=None, filesystem=None,
+                     **kwargs):
     """
     Wrapper around parquet.write_table for writing a Table to
     Parquet format by partitions.
@@ -1232,8 +1234,6 @@ def write_to_dataset(table, root_path, 
partition_cols=None,
     partition_cols : list,
         Column names by which to partition the dataset
         Columns are partitioned in the order they are given
-    preserve_index : bool,
-        Parameter for instantiating Table; preserve pandas index or not.
     **kwargs : dict, kwargs for write_table function.
     """
     fs, root_path = _get_filesystem_and_path(filesystem, root_path)
@@ -1241,7 +1241,7 @@ def write_to_dataset(table, root_path, 
partition_cols=None,
     _mkdir_if_not_exists(fs, root_path)
 
     if partition_cols is not None and len(partition_cols) > 0:
-        df = table.to_pandas()
+        df = table.to_pandas(ignore_metadata=True)
         partition_keys = [df[col] for col in partition_cols]
         data_df = df.drop(partition_cols, axis='columns')
         data_cols = df.columns.drop(partition_cols)
@@ -1249,18 +1249,11 @@ def write_to_dataset(table, root_path, 
partition_cols=None,
             raise ValueError('No data left to save outside partition columns')
 
         subschema = table.schema
-        # ARROW-4538: Remove index column from subschema in write_to_dataframe
-        metadata = subschema.metadata
-        has_pandas_metadata = (metadata is not None and b'pandas' in metadata)
-        index_columns = []
-        if has_pandas_metadata:
-            pandas_metadata = json.loads(metadata[b'pandas'].decode('utf8'))
-            index_columns = pandas_metadata['index_columns']
+
         # ARROW-2891: Ensure the output_schema is preserved when writing a
         # partitioned dataset
         for col in table.schema.names:
-            if (col.startswith('__index_level_') or col in partition_cols or
-                    col in index_columns):
+            if col in partition_cols:
                 subschema = subschema.remove(subschema.get_field_index(col))
 
         for keys, subgroup in data_df.groupby(partition_keys):
@@ -1269,10 +1262,8 @@ def write_to_dataset(table, root_path, 
partition_cols=None,
             subdir = '/'.join(
                 ['{colname}={value}'.format(colname=name, value=val)
                  for name, val in zip(partition_cols, keys)])
-            subtable = pa.Table.from_pandas(subgroup,
-                                            preserve_index=preserve_index,
-                                            schema=subschema,
-                                            safe=False)
+            subtable = pa.Table.from_pandas(subgroup, preserve_index=False,
+                                            schema=subschema, safe=False)
             prefix = '/'.join([root_path, subdir])
             _mkdir_if_not_exists(fs, prefix)
             outfile = guid() + '.parquet'
diff --git a/python/pyarrow/tests/test_convert_pandas.py 
b/python/pyarrow/tests/test_convert_pandas.py
index fe5b305..16e2a76 100644
--- a/python/pyarrow/tests/test_convert_pandas.py
+++ b/python/pyarrow/tests/test_convert_pandas.py
@@ -155,6 +155,33 @@ class TestConvertMetadata(object):
         df.columns.names = ['a']
         _check_pandas_roundtrip(df, preserve_index=True)
 
+    def test_range_index_shortcut(self):
+        # ARROW-1639
+        index_name = 'foo'
+        df = pd.DataFrame({'a': [1, 2, 3, 4]},
+                          index=pd.RangeIndex(0, 8, step=2, name=index_name))
+
+        df2 = pd.DataFrame({'a': [4, 5, 6, 7]},
+                           index=pd.RangeIndex(0, 4))
+
+        table = pa.Table.from_pandas(df)
+        table_no_index_name = pa.Table.from_pandas(df2)
+
+        # The RangeIndex is tracked in the metadata only
+        assert len(table.schema) == 1
+
+        result = table.to_pandas()
+        tm.assert_frame_equal(result, df)
+        assert isinstance(result.index, pd.RangeIndex)
+        assert result.index._step == 2
+        assert result.index.name == index_name
+
+        result2 = table_no_index_name.to_pandas()
+        tm.assert_frame_equal(result2, df2)
+        assert isinstance(result2.index, pd.RangeIndex)
+        assert result2.index._step == 1
+        assert result2.index.name is None
+
     def test_multiindex_columns(self):
         columns = pd.MultiIndex.from_arrays([
             ['one', 'two'], ['X', 'Y']
@@ -203,9 +230,7 @@ class TestConvertMetadata(object):
             columns=['a', None, '__index_level_0__'],
         )
         t = pa.Table.from_pandas(df, preserve_index=True)
-        raw_metadata = t.schema.metadata
-
-        js = json.loads(raw_metadata[b'pandas'].decode('utf8'))
+        js = t.schema.pandas_metadata
 
         col1, col2, col3, idx0, foo = js['columns']
 
@@ -218,11 +243,14 @@ class TestConvertMetadata(object):
         assert col3['name'] == '__index_level_0__'
         assert col3['name'] == col3['field_name']
 
-        idx0_name, foo_name = js['index_columns']
+        idx0_descr, foo_descr = js['index_columns']
+        assert idx0_descr['kind'] == 'serialized'
+        idx0_name = idx0_descr['field_name']
         assert idx0_name == '__index_level_0__'
         assert idx0['field_name'] == idx0_name
         assert idx0['name'] is None
 
+        foo_name = foo_descr['field_name']
         assert foo_name == 'foo'
         assert foo['field_name'] == foo_name
         assert foo['name'] == foo_name
@@ -233,8 +261,7 @@ class TestConvertMetadata(object):
             columns=pd.Index(list('def'), dtype='category')
         )
         t = pa.Table.from_pandas(df, preserve_index=True)
-        raw_metadata = t.schema.metadata
-        js = json.loads(raw_metadata[b'pandas'].decode('utf8'))
+        js = t.schema.pandas_metadata
 
         column_indexes, = js['column_indexes']
         assert column_indexes['name'] is None
@@ -251,8 +278,7 @@ class TestConvertMetadata(object):
             columns=pd.Index(list('def'), name='stringz')
         )
         t = pa.Table.from_pandas(df, preserve_index=True)
-        raw_metadata = t.schema.metadata
-        js = json.loads(raw_metadata[b'pandas'].decode('utf8'))
+        js = t.schema.pandas_metadata
 
         column_indexes, = js['column_indexes']
         assert column_indexes['name'] == 'stringz'
@@ -278,8 +304,7 @@ class TestConvertMetadata(object):
             )
         )
         t = pa.Table.from_pandas(df, preserve_index=True)
-        raw_metadata = t.schema.metadata
-        js = json.loads(raw_metadata[b'pandas'].decode('utf8'))
+        js = t.schema.pandas_metadata
 
         column_indexes, = js['column_indexes']
         assert column_indexes['name'] is None
@@ -369,10 +394,8 @@ class TestConvertMetadata(object):
     def test_metadata_with_mixed_types(self):
         df = pd.DataFrame({'data': [b'some_bytes', u'some_unicode']})
         table = pa.Table.from_pandas(df)
-        metadata = table.schema.metadata
-        assert b'mixed' not in metadata[b'pandas']
-
-        js = json.loads(metadata[b'pandas'].decode('utf8'))
+        js = table.schema.pandas_metadata
+        assert 'mixed' not in js
         data_column = js['columns'][0]
         assert data_column['pandas_type'] == 'bytes'
         assert data_column['numpy_type'] == 'object'
@@ -392,10 +415,8 @@ class TestConvertMetadata(object):
         df = pd.DataFrame({'data': [[1], [2, 3, 4], [5] * 7]})
         schema = pa.schema([pa.field('data', type=pa.list_(pa.int64()))])
         table = pa.Table.from_pandas(df, schema=schema)
-        metadata = table.schema.metadata
-        assert b'mixed' not in metadata[b'pandas']
-
-        js = json.loads(metadata[b'pandas'].decode('utf8'))
+        js = table.schema.pandas_metadata
+        assert 'mixed' not in js
         data_column = js['columns'][0]
         assert data_column['pandas_type'] == 'list[int64]'
         assert data_column['numpy_type'] == 'object'
@@ -408,10 +429,8 @@ class TestConvertMetadata(object):
             ]
         })
         table = pa.Table.from_pandas(expected)
-        metadata = table.schema.metadata
-        assert b'mixed' not in metadata[b'pandas']
-
-        js = json.loads(metadata[b'pandas'].decode('utf8'))
+        js = table.schema.pandas_metadata
+        assert 'mixed' not in js
         data_column = js['columns'][0]
         assert data_column['pandas_type'] == 'decimal'
         assert data_column['numpy_type'] == 'object'
@@ -419,19 +438,6 @@ class TestConvertMetadata(object):
 
     def test_table_column_subset_metadata(self):
         # ARROW-1883
-        df = pd.DataFrame({
-            'a': [1, 2, 3],
-            'b': pd.date_range("2017-01-01", periods=3, tz='Europe/Brussels')})
-        table = pa.Table.from_pandas(df)
-
-        table_subset = table.remove_column(1)
-        result = table_subset.to_pandas()
-        tm.assert_frame_equal(result, df[['a']])
-
-        table_subset2 = table_subset.remove_column(1)
-        result = table_subset2.to_pandas()
-        tm.assert_frame_equal(result, df[['a']])
-
         # non-default index
         for index in [
                 pd.Index(['a', 'b', 'c'], name='index'),
@@ -467,7 +473,7 @@ class TestConvertMetadata(object):
         # type of empty lists
         df = tbl.to_pandas()
         tbl2 = pa.Table.from_pandas(df, preserve_index=True)
-        md2 = json.loads(tbl2.schema.metadata[b'pandas'].decode('utf8'))
+        md2 = tbl2.schema.pandas_metadata
 
         # Second roundtrip
         df2 = tbl2.to_pandas()
@@ -489,13 +495,6 @@ class TestConvertMetadata(object):
                 'metadata': None,
                 'numpy_type': 'object',
                 'pandas_type': 'list[empty]',
-            },
-            {
-                'name': None,
-                'field_name': '__index_level_0__',
-                'metadata': None,
-                'numpy_type': 'int64',
-                'pandas_type': 'int64',
             }
         ]
 
@@ -2529,3 +2528,176 @@ def 
test_table_from_pandas_columns_and_schema_are_mutually_exclusive():
 
     with pytest.raises(ValueError):
         pa.Table.from_pandas(df, schema=schema, columns=columns)
+
+
+# ----------------------------------------------------------------------
+# Legacy metadata compatibility tests
+
+
+def test_range_index_pre_0_12():
+    # Forward compatibility for metadata created from pandas.RangeIndex
+    # prior to pyarrow 0.13.0
+    a_values = [u'foo', u'bar', None, u'baz']
+    b_values = [u'a', u'a', u'b', u'b']
+    a_arrow = pa.array(a_values, type='utf8')
+    b_arrow = pa.array(b_values, type='utf8')
+
+    rng_index_arrow = pa.array([0, 2, 4, 6], type='int64')
+
+    gen_name_0 = '__index_level_0__'
+    gen_name_1 = '__index_level_1__'
+
+    # Case 1: named RangeIndex
+    e1 = pd.DataFrame({
+        'a': a_values
+    }, index=pd.RangeIndex(0, 8, step=2, name='qux'))
+    t1 = pa.Table.from_arrays([a_arrow, rng_index_arrow],
+                              names=['a', 'qux'])
+    t1 = t1.replace_schema_metadata({
+        b'pandas': json.dumps(
+            {'index_columns': ['qux'],
+             'column_indexes': [{'name': None,
+                                 'field_name': None,
+                                 'pandas_type': 'unicode',
+                                 'numpy_type': 'object',
+                                 'metadata': {'encoding': 'UTF-8'}}],
+             'columns': [{'name': 'a',
+                          'field_name': 'a',
+                          'pandas_type': 'unicode',
+                          'numpy_type': 'object',
+                          'metadata': None},
+                         {'name': 'qux',
+                          'field_name': 'qux',
+                          'pandas_type': 'int64',
+                          'numpy_type': 'int64',
+                          'metadata': None}],
+             'pandas_version': '0.23.4'}
+        )})
+    r1 = t1.to_pandas()
+    tm.assert_frame_equal(r1, e1)
+
+    # Case 2: named RangeIndex, but conflicts with an actual column
+    e2 = pd.DataFrame({
+        'qux': a_values
+    }, index=pd.RangeIndex(0, 8, step=2, name='qux'))
+    t2 = pa.Table.from_arrays([a_arrow, rng_index_arrow],
+                              names=['qux', gen_name_0])
+    t2 = t2.replace_schema_metadata({
+        b'pandas': json.dumps(
+            {'index_columns': [gen_name_0],
+             'column_indexes': [{'name': None,
+                                 'field_name': None,
+                                 'pandas_type': 'unicode',
+                                 'numpy_type': 'object',
+                                 'metadata': {'encoding': 'UTF-8'}}],
+             'columns': [{'name': 'a',
+                          'field_name': 'a',
+                          'pandas_type': 'unicode',
+                          'numpy_type': 'object',
+                          'metadata': None},
+                         {'name': 'qux',
+                          'field_name': gen_name_0,
+                          'pandas_type': 'int64',
+                          'numpy_type': 'int64',
+                          'metadata': None}],
+             'pandas_version': '0.23.4'}
+        )})
+    r2 = t2.to_pandas()
+    tm.assert_frame_equal(r2, e2)
+
+    # Case 3: unnamed RangeIndex
+    e3 = pd.DataFrame({
+        'a': a_values
+    }, index=pd.RangeIndex(0, 8, step=2, name=None))
+    t3 = pa.Table.from_arrays([a_arrow, rng_index_arrow],
+                              names=['a', gen_name_0])
+    t3 = t3.replace_schema_metadata({
+        b'pandas': json.dumps(
+            {'index_columns': [gen_name_0],
+             'column_indexes': [{'name': None,
+                                 'field_name': None,
+                                 'pandas_type': 'unicode',
+                                 'numpy_type': 'object',
+                                 'metadata': {'encoding': 'UTF-8'}}],
+             'columns': [{'name': 'a',
+                          'field_name': 'a',
+                          'pandas_type': 'unicode',
+                          'numpy_type': 'object',
+                          'metadata': None},
+                         {'name': None,
+                          'field_name': gen_name_0,
+                          'pandas_type': 'int64',
+                          'numpy_type': 'int64',
+                          'metadata': None}],
+             'pandas_version': '0.23.4'}
+        )})
+    r3 = t3.to_pandas()
+    tm.assert_frame_equal(r3, e3)
+
+    # Case 4: MultiIndex with named RangeIndex
+    e4 = pd.DataFrame({
+        'a': a_values
+    }, index=[pd.RangeIndex(0, 8, step=2, name='qux'), b_values])
+    t4 = pa.Table.from_arrays([a_arrow, rng_index_arrow, b_arrow],
+                              names=['a', 'qux', gen_name_1])
+    t4 = t4.replace_schema_metadata({
+        b'pandas': json.dumps(
+            {'index_columns': ['qux', gen_name_1],
+             'column_indexes': [{'name': None,
+                                 'field_name': None,
+                                 'pandas_type': 'unicode',
+                                 'numpy_type': 'object',
+                                 'metadata': {'encoding': 'UTF-8'}}],
+             'columns': [{'name': 'a',
+                          'field_name': 'a',
+                          'pandas_type': 'unicode',
+                          'numpy_type': 'object',
+                          'metadata': None},
+                         {'name': 'qux',
+                          'field_name': 'qux',
+                          'pandas_type': 'int64',
+                          'numpy_type': 'int64',
+                          'metadata': None},
+                         {'name': None,
+                          'field_name': gen_name_1,
+                          'pandas_type': 'unicode',
+                          'numpy_type': 'object',
+                          'metadata': None}],
+             'pandas_version': '0.23.4'}
+        )})
+    r4 = t4.to_pandas()
+    tm.assert_frame_equal(r4, e4)
+
+    # Case 4: MultiIndex with unnamed RangeIndex
+    e5 = pd.DataFrame({
+        'a': a_values
+    }, index=[pd.RangeIndex(0, 8, step=2, name=None), b_values])
+    t5 = pa.Table.from_arrays([a_arrow, rng_index_arrow, b_arrow],
+                              names=['a', gen_name_0, gen_name_1])
+    t5 = t5.replace_schema_metadata({
+        b'pandas': json.dumps(
+            {'index_columns': [gen_name_0, gen_name_1],
+             'column_indexes': [{'name': None,
+                                 'field_name': None,
+                                 'pandas_type': 'unicode',
+                                 'numpy_type': 'object',
+                                 'metadata': {'encoding': 'UTF-8'}}],
+             'columns': [{'name': 'a',
+                          'field_name': 'a',
+                          'pandas_type': 'unicode',
+                          'numpy_type': 'object',
+                          'metadata': None},
+                         {'name': None,
+                          'field_name': gen_name_0,
+                          'pandas_type': 'int64',
+                          'numpy_type': 'int64',
+                          'metadata': None},
+                         {'name': None,
+                          'field_name': gen_name_1,
+                          'pandas_type': 'unicode',
+                          'numpy_type': 'object',
+                          'metadata': None}],
+             'pandas_version': '0.23.4'}
+        )})
+    r5 = t5.to_pandas()
+    tm.assert_frame_equal(r5, e5)
diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py
index 67a91b9..7c0bb2f 100644
--- a/python/pyarrow/tests/test_ipc.py
+++ b/python/pyarrow/tests/test_ipc.py
@@ -173,7 +173,7 @@ def test_file_read_pandas(file_fixture):
     reader = pa.ipc.open_file(file_contents)
     result = reader.read_pandas()
 
-    expected = pd.concat(frames)
+    expected = pd.concat(frames).reset_index(drop=True)
     assert_frame_equal(result, expected)
 
 
@@ -314,7 +314,7 @@ def test_stream_read_pandas(stream_fixture):
     reader = pa.ipc.open_stream(file_contents)
     result = reader.read_pandas()
 
-    expected = pd.concat(frames)
+    expected = pd.concat(frames).reset_index(drop=True)
     assert_frame_equal(result, expected)
 
 
diff --git a/python/pyarrow/tests/test_parquet.py 
b/python/pyarrow/tests/test_parquet.py
index 77b9ead..ccc01f4 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -153,12 +153,12 @@ def test_pandas_parquet_2_0_rountrip(tempdir, chunk_size):
 
     filename = tempdir / 'pandas_rountrip.parquet'
     arrow_table = pa.Table.from_pandas(df)
-    assert b'pandas' in arrow_table.schema.metadata
+    assert arrow_table.schema.pandas_metadata is not None
 
     _write_table(arrow_table, filename, version="2.0",
                  coerce_timestamps='ms', chunk_size=chunk_size)
     table_read = pq.read_pandas(filename)
-    assert b'pandas' in table_read.schema.metadata
+    assert table_read.schema.pandas_metadata is not None
 
     assert arrow_table.schema.metadata == table_read.schema.metadata
 
@@ -280,7 +280,10 @@ def test_pandas_parquet_custom_metadata(tempdir):
     assert b'pandas' in metadata
 
     js = json.loads(metadata[b'pandas'].decode('utf8'))
-    assert js['index_columns'] == ['__index_level_0__']
+    assert js['index_columns'] == [{'kind': 'range',
+                                    'name': None,
+                                    'start': 0, 'stop': 10000,
+                                    'step': 1}]
 
 
 def test_pandas_parquet_column_multiindex(tempdir):
@@ -292,7 +295,7 @@ def test_pandas_parquet_column_multiindex(tempdir):
 
     filename = tempdir / 'pandas_rountrip.parquet'
     arrow_table = pa.Table.from_pandas(df)
-    assert b'pandas' in arrow_table.schema.metadata
+    assert arrow_table.schema.pandas_metadata is not None
 
     _write_table(arrow_table, filename, version='2.0', coerce_timestamps='ms')
 
@@ -306,7 +309,7 @@ def 
test_pandas_parquet_2_0_rountrip_read_pandas_no_index_written(tempdir):
 
     filename = tempdir / 'pandas_rountrip.parquet'
     arrow_table = pa.Table.from_pandas(df, preserve_index=False)
-    js = json.loads(arrow_table.schema.metadata[b'pandas'].decode('utf8'))
+    js = arrow_table.schema.pandas_metadata
     assert not js['index_columns']
     # ARROW-2170
     # While index_columns should be empty, columns needs to be filled still.
@@ -315,7 +318,7 @@ def 
test_pandas_parquet_2_0_rountrip_read_pandas_no_index_written(tempdir):
     _write_table(arrow_table, filename, version='2.0', coerce_timestamps='ms')
     table_read = pq.read_pandas(filename)
 
-    js = json.loads(table_read.schema.metadata[b'pandas'].decode('utf8'))
+    js = table_read.schema.pandas_metadata
     assert not js['index_columns']
 
     assert arrow_table.schema.metadata == table_read.schema.metadata
@@ -561,6 +564,7 @@ def make_sample_file(table_or_df):
 def test_parquet_metadata_api():
     df = alltypes_sample(size=10000)
     df = df.reindex(columns=sorted(df.columns))
+    df.index = np.random.randint(0, 1000000, size=len(df))
 
     fileh = make_sample_file(df)
     ncols = len(df.columns)
@@ -1909,13 +1913,10 @@ def _test_write_to_dataset_with_partitions(base_path,
                               'nan': [pd.np.nan] * 10,
                               'date': np.arange('2017-01-01', '2017-01-11',
                                                 dtype='datetime64[D]')})
-    # ARROW-4538
-    output_df.index.name = index_name
-
     cols = output_df.columns.tolist()
     partition_by = ['group1', 'group2']
     output_table = pa.Table.from_pandas(output_df, schema=schema, safe=False,
-                                        preserve_index=True)
+                                        preserve_index=False)
     pq.write_to_dataset(output_table, base_path, partition_by,
                         filesystem=filesystem)
 
@@ -1936,10 +1937,6 @@ def _test_write_to_dataset_with_partitions(base_path,
     dataset_cols = set(dataset.schema.to_arrow_schema().names)
     assert dataset_cols == set(output_table.schema.names)
 
-    # ARROW-4538
-    if index_name is not None:
-        assert index_name in dataset_cols
-
     input_table = dataset.read()
     input_df = input_table.to_pandas()
 
diff --git a/python/pyarrow/tests/test_table.py 
b/python/pyarrow/tests/test_table.py
index afdfd42..073a7c1 100644
--- a/python/pyarrow/tests/test_table.py
+++ b/python/pyarrow/tests/test_table.py
@@ -493,7 +493,7 @@ def test_recordbatchlist_to_pandas():
 
     table = pa.Table.from_batches([batch1, batch2])
     result = table.to_pandas()
-    data = pd.concat([data1, data2])
+    data = pd.concat([data1, data2]).reset_index(drop=True)
     assert_frame_equal(data, result)
 
 
diff --git a/python/pyarrow/types.pxi b/python/pyarrow/types.pxi
index 1de4fac..db71292 100644
--- a/python/pyarrow/types.pxi
+++ b/python/pyarrow/types.pxi
@@ -629,6 +629,19 @@ cdef class Schema:
         return hash((tuple(self), self.metadata))
 
     @property
+    def pandas_metadata(self):
+        """
+        Return deserialized-from-JSON pandas metadata field (if it exists)
+        """
+        metadata = self.metadata
+        key = b'pandas'
+        if metadata is None or key not in metadata:
+            return None
+
+        import json
+        return json.loads(metadata[key].decode('utf8'))
+
+    @property
     def names(self):
         """
         The schema's field names.

Reply via email to