This is an automated email from the ASF dual-hosted git repository.
uwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 5994094 ARROW-2209: [Python] Partition columns are not correctly
loaded in schema of ParquetDataset
5994094 is described below
commit 5994094e2a963ba22abd657121935e2ddbfa8660
Author: Uwe L. Korn <[email protected]>
AuthorDate: Fri Mar 2 15:32:37 2018 +0100
ARROW-2209: [Python] Partition columns are not correctly loaded in schema
of ParquetDataset
Author: Uwe L. Korn <[email protected]>
Closes #1656 from xhochy/ARROW-2209 and squashes the following commits:
7771eb0 <Uwe L. Korn> flake8
c68c6a5 <Uwe L. Korn> Change field API on schema to be aligned with Python
lists
609df73 <Uwe L. Korn> ARROW-2209: Partition columns are not correctly
loaded in schema of ParquetDataset
---
python/pyarrow/includes/libarrow.pxd | 4 +++
python/pyarrow/parquet.py | 42 +++++++++++++++++---------
python/pyarrow/tests/test_parquet.py | 13 ++++++--
python/pyarrow/types.pxi | 58 ++++++++++++++++++++++++++++++++++++
4 files changed, 100 insertions(+), 17 deletions(-)
diff --git a/python/pyarrow/includes/libarrow.pxd
b/python/pyarrow/includes/libarrow.pxd
index 233f2cb..d95f016 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -263,6 +263,10 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
int num_fields()
c_string ToString()
+ CStatus AddField(int i, const shared_ptr[CField]& field,
+ shared_ptr[CSchema]* out)
+ CStatus RemoveField(int i, shared_ptr[CSchema]* out)
+
# Removed const in Cython so don't have to cast to get code to generate
shared_ptr[CSchema] AddMetadata(
const shared_ptr[CKeyValueMetadata]& metadata)
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index f46ce94..42c558b 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -581,6 +581,10 @@ class ParquetManifest(object):
self._visit_level(0, self.dirpath, [])
+ if self.common_metadata_path is None:
+ # _common_metadata is a subset of _metadata
+ self.common_metadata_path = self.metadata_path
+
def _visit_level(self, level, base_path, part_keys):
fs = self.filesystem
@@ -695,10 +699,10 @@ class ParquetDataset(object):
self.paths = path_or_paths
(self.pieces, self.partitions,
- self.metadata_path) = _make_manifest(path_or_paths, self.fs)
+ self.common_metadata_path) = _make_manifest(path_or_paths, self.fs)
- if self.metadata_path is not None:
- with self.fs.open(self.metadata_path) as f:
+ if self.common_metadata_path is not None:
+ with self.fs.open(self.common_metadata_path) as f:
self.common_metadata = ParquetFile(f).metadata
else:
self.common_metadata = None
@@ -718,21 +722,31 @@ class ParquetDataset(object):
open_file = self._get_open_file_func()
if self.metadata is None and self.schema is None:
- if self.metadata_path is not None:
- self.schema = open_file(self.metadata_path).schema
+ if self.common_metadata_path is not None:
+ self.schema = open_file(self.common_metadata_path).schema
else:
self.schema = self.pieces[0].get_metadata(open_file).schema
elif self.schema is None:
self.schema = self.metadata.schema
- # Verify schemas are all equal
+ # Verify schemas are all compatible
+ dataset_schema = self.schema.to_arrow_schema()
+ # Exclude the partition columns from the schema, they are provided
+ # by the path, not the DatasetPiece
+ if self.partitions is not None:
+ for partition_name in self.partitions.partition_names:
+ if dataset_schema.get_field_index(partition_name) != -1:
+ field_idx = dataset_schema.get_field_index(partition_name)
+ dataset_schema = dataset_schema.remove(field_idx)
+
for piece in self.pieces:
file_metadata = piece.get_metadata(open_file)
- if not self.schema.equals(file_metadata.schema):
- raise ValueError('Schema in {0!s} was different. '
- '{1!s} vs {2!s}'
- .format(piece, file_metadata.schema,
- self.schema))
+ file_schema = file_metadata.schema.to_arrow_schema()
+ if not dataset_schema.equals(file_schema):
+ raise ValueError('Schema in {0!s} was different. \n'
+ '{1!s}\n\nvs\n\n{2!s}'
+ .format(piece, file_schema,
+ dataset_schema))
def read(self, columns=None, nthreads=1, use_pandas_metadata=False):
"""
@@ -831,7 +845,7 @@ def _ensure_filesystem(fs):
def _make_manifest(path_or_paths, fs, pathsep='/'):
partitions = None
- metadata_path = None
+ common_metadata_path = None
if len(path_or_paths) == 1:
# Dask passes a directory as a list of length 1
@@ -840,7 +854,7 @@ def _make_manifest(path_or_paths, fs, pathsep='/'):
if is_string(path_or_paths) and fs.isdir(path_or_paths):
manifest = ParquetManifest(path_or_paths, filesystem=fs,
pathsep=fs.pathsep)
- metadata_path = manifest.metadata_path
+ common_metadata_path = manifest.common_metadata_path
pieces = manifest.pieces
partitions = manifest.partitions
else:
@@ -859,7 +873,7 @@ def _make_manifest(path_or_paths, fs, pathsep='/'):
piece = ParquetDatasetPiece(path)
pieces.append(piece)
- return pieces, partitions, metadata_path
+ return pieces, partitions, common_metadata_path
_read_table_docstring = """
diff --git a/python/pyarrow/tests/test_parquet.py
b/python/pyarrow/tests/test_parquet.py
index 187971f..cec01c8 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -1113,12 +1113,12 @@ def _test_read_common_metadata_files(fs, base_path):
with fs.open(data_path, 'wb') as f:
_write_table(table, f)
- metadata_path = pjoin(base_path, '_metadata')
+ metadata_path = pjoin(base_path, '_common_metadata')
with fs.open(metadata_path, 'wb') as f:
pq.write_metadata(table.schema, f)
dataset = pq.ParquetDataset(base_path, filesystem=fs)
- assert dataset.metadata_path == metadata_path
+ assert dataset.common_metadata_path == metadata_path
with fs.open(data_path) as f:
common_schema = pq.read_metadata(f).schema
@@ -1431,7 +1431,14 @@ def _test_write_to_dataset_with_partitions(base_path,
filesystem=None):
output_table = pa.Table.from_pandas(output_df)
pq.write_to_dataset(output_table, base_path, partition_by,
filesystem=filesystem)
- input_table = pq.ParquetDataset(base_path, filesystem=filesystem).read()
+ pq.write_metadata(output_table.schema,
+ os.path.join(base_path, '_common_metadata'))
+ dataset = pq.ParquetDataset(base_path, filesystem=filesystem)
+ # ARROW-2209: Ensure the dataset schema also includes the partition columns
+ dataset_cols = set(dataset.schema.to_arrow_schema().names)
+ assert dataset_cols == set(output_table.schema.names)
+
+ input_table = dataset.read()
input_df = input_table.to_pandas()
# Read data back in and compare with original DataFrame
diff --git a/python/pyarrow/types.pxi b/python/pyarrow/types.pxi
index 849a0e0..5f96290 100644
--- a/python/pyarrow/types.pxi
+++ b/python/pyarrow/types.pxi
@@ -529,6 +529,64 @@ cdef class Schema:
def get_field_index(self, name):
return self.schema.GetFieldIndex(tobytes(name))
+ def append(self, Field field):
+ """
+ Append a field at the end of the schema.
+
+ Parameters
+ ----------
+
+ field: Field
+
+ Returns
+ -------
+ schema: Schema
+ """
+ return self.insert(self.schema.num_fields(), field)
+
+ def insert(self, int i, Field field):
+ """
+ Add a field at position i to the schema.
+
+ Parameters
+ ----------
+ i: int
+ field: Field
+
+ Returns
+ -------
+ schema: Schema
+ """
+ cdef:
+ shared_ptr[CSchema] new_schema
+ shared_ptr[CField] c_field
+
+ c_field = field.sp_field
+
+ with nogil:
+ check_status(self.schema.AddField(i, c_field, &new_schema))
+
+ return pyarrow_wrap_schema(new_schema)
+
+ def remove(self, int i):
+ """
+ Remove the field at index i from the schema.
+
+ Parameters
+ ----------
+ i: int
+
+ Returns
+ -------
+ schema: Schema
+ """
+ cdef shared_ptr[CSchema] new_schema
+
+ with nogil:
+ check_status(self.schema.RemoveField(i, &new_schema))
+
+ return pyarrow_wrap_schema(new_schema)
+
def add_metadata(self, dict metadata):
"""
Add metadata as dict of string keys and values to Schema
--
To stop receiving notification emails like this one, please contact
[email protected].