Jonathan Kulzick created ARROW-2891:
---------------------------------------
Summary: Preserve schema in write_to_dataset
Key: ARROW-2891
URL: https://issues.apache.org/jira/browse/ARROW-2891
Project: Apache Arrow
Issue Type: Bug
Components: Python
Affects Versions: 0.9.0
Reporter: Jonathan Kulzick
When using `pyarrow.parquet.write_to_dataset` with `partition_cols` set, the
schema of the `table` passed into the function is not enforced when iterating
over the `subgroup` to create the `subtable`. See
[here](https://github.com/apache/arrow/blob/master/python/pyarrow/parquet.py#L1146).
Since pandas is used to generate the subtables, there is a risk that some
specificity is lost from the original `table.schema` due to the data types
supported by pandas and some of the internal type conversions pandas performs.
It would be ideal if a `subschema` was generated from `table.schema` and passed
to `Table` when instantiating the `subtable` to allow the user to enforce the
original schema.
Here is a simple example of where we are running into issues while trying to
preserve a valid schema. This use case is more likely to occur when working
with sparse data sets.
```
>>> from io import StringIO
>>> import pandas as pd
>>> import numpy as np
>>> import pyarrow as pa
>>> import parquet as pq
>>> import pyarrow.parquet as pq
# in csv col2 has no NaNs and in csv_nan col2 only has NaNs
>>> csv = StringIO('"1","10","100"')
>>> csv_nan = StringIO('"2","","200"')
# read in col2 as a float since pandas does not support NaNs in ints
>>> pd_dtype = \{'col1': np.int32, 'col2': np.float32, 'col3': np.int32}
>>> df = pd.read_csv(csv, header=None, names=['col1', 'col2', 'col3'],
>>> dtype=pd_dtype)
>>> df_nan = pd.read_csv(csv_nan, header=None, names=['col1', 'col2', 'col3'],
>>> dtype=pd_dtype)
# verify both dfs and their dtypes
>>> df
col1 col2 col3
0 1 10.0 100
>>> df.dtypes
col1 int32
col2 float32
col3 int32
dtype: object
>>> df_nan
col1 col2 col3
0 2 NaN 200
>>> df_nan.dtypes
col1 int32
col2 float32
col3 int32
dtype: object
# define col2 as an int32 since pyarrow does support NaNs in ints
# we want to preserve the original schema we started with and not
# upcast just because we're using pandas to go from csv to pyarrow
>>> schema = pa.schema([pa.field('col1', type=pa.int32()),
pa.field('col2', type=pa.int32()),
pa.field('col3', type=pa.int32())])
# verify schema
>>> schema
col1: int32
col2: int32
col3: int32
# create tables
>>> table = pa.Table.from_pandas(df, schema=schema, preserve_index=False)
>>> table_nan = pa.Table.from_pandas(df_nan, schema=schema,
>>> preserve_index=False)
# verify table schemas and metadata
# col2 has pandas_type int32 and numpy_type float32 in both tables
>>> table
pyarrow.Table
col1: int32
col2: int32
col3: int32
metadata
--------
{b'pandas': b'{"index_columns": [], "column_indexes": [], "columns": [{"name":'
b' "col1", "field_name": "col1", "pandas_type": "int32", "numpy_ty'
b'pe": "int32", "metadata": null}, {"name": "col2", "field_name": '
b'"col2", "pandas_type": "int32", "numpy_type": "float32", "metada'
b'ta": null}, {"name": "col3", "field_name": "col3", "pandas_type"'
b': "int32", "numpy_type": "int32", "metadata": null}], "pandas_ve'
b'rsion": "0.22.0"}'}
>>> table_nan
pyarrow.Table
col1: int32
col2: int32
col3: int32
metadata
--------
{b'pandas': b'{"index_columns": [], "column_indexes": [], "columns": [{"name":'
b' "col1", "field_name": "col1", "pandas_type": "int32", "numpy_ty'
b'pe": "int32", "metadata": null}, {"name": "col2", "field_name": '
b'"col2", "pandas_type": "int32", "numpy_type": "float32", "metada'
b'ta": null}, {"name": "col3", "field_name": "col3", "pandas_type"'
b': "int32", "numpy_type": "int32", "metadata": null}], "pandas_ve'
b'rsion": "0.22.0"}'}
# write both tables to local filesystem
>>> pq.write_to_dataset(table, '/Users/jkulzick/pyarrow_example',
partition_cols=['col1'],
preserve_index=False)
>>> pq.write_to_dataset(table_nan, '/Users/jkulzick/pyarrow_example',
partition_cols=['col1'],
preserve_index=False)
# read parquet files into a ParquetDataset to validate the schemas
# the metadata and schemas for both files is different from their original
tables
# table now has pandas_type int32 and numpy_type int32 (was float32) for col2
# table_nan now has pandas_type float64 (was int32) and numpy_type int64 (was
float32) for col2
>>> ds = pq.ParquetDataset('/Users/jkulzick/pyarrow_example')
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File
"/Users/jkulzick/miniconda3/envs/bowerbird/lib/python3.6/site-packages/pyarrow/parquet.py",
line 745, in __init__
self.validate_schemas()
File
"/Users/jkulzick/miniconda3/envs/bowerbird/lib/python3.6/site-packages/pyarrow/parquet.py",
line 775, in validate_schemas
dataset_schema))
ValueError: Schema in partition[col1=1]
/Users/jkulzick/pyarrow_example/col1=2/b7b42ce9de6a46a786a5361c42d28731.parquet
was different.
col2: double
col3: int32
metadata
--------
{b'pandas': b'{"index_columns": [], "column_indexes": [], "columns": [{"name":'
b' "col2", "field_name": "col2", "pandas_type": "float64", "numpy_'
b'type": "float64", "metadata": null}, {"name": "col3", "field_nam'
b'e": "col3", "pandas_type": "int32", "numpy_type": "int32", "meta'
b'data": null}], "pandas_version": "0.22.0"}'}
vs
col2: int32
col3: int32
metadata
--------
{b'pandas': b'{"index_columns": [], "column_indexes": [], "columns": [{"name":'
b' "col2", "field_name": "col2", "pandas_type": "int32", "numpy_ty'
b'pe": "int32", "metadata": null}, {"name": "col3", "field_name": '
b'"col3", "pandas_type": "int32", "numpy_type": "int32", "metadata'
b'": null}], "pandas_version": "0.22.0"}'}
```
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)