This is an automated email from the ASF dual-hosted git repository.

uwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 5994094  ARROW-2209: [Python] Partition columns are not correctly 
loaded in schema of ParquetDataset
5994094 is described below

commit 5994094e2a963ba22abd657121935e2ddbfa8660
Author: Uwe L. Korn <[email protected]>
AuthorDate: Fri Mar 2 15:32:37 2018 +0100

    ARROW-2209: [Python] Partition columns are not correctly loaded in schema 
of ParquetDataset
    
    Author: Uwe L. Korn <[email protected]>
    
    Closes #1656 from xhochy/ARROW-2209 and squashes the following commits:
    
    7771eb0 <Uwe L. Korn> flake8
    c68c6a5 <Uwe L. Korn> Change field API on schema to be aligned with Python 
lists
    609df73 <Uwe L. Korn> ARROW-2209:  Partition columns are not correctly 
loaded in schema of ParquetDataset
---
 python/pyarrow/includes/libarrow.pxd |  4 +++
 python/pyarrow/parquet.py            | 42 +++++++++++++++++---------
 python/pyarrow/tests/test_parquet.py | 13 ++++++--
 python/pyarrow/types.pxi             | 58 ++++++++++++++++++++++++++++++++++++
 4 files changed, 100 insertions(+), 17 deletions(-)

diff --git a/python/pyarrow/includes/libarrow.pxd 
b/python/pyarrow/includes/libarrow.pxd
index 233f2cb..d95f016 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -263,6 +263,10 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
         int num_fields()
         c_string ToString()
 
+        CStatus AddField(int i, const shared_ptr[CField]& field,
+                         shared_ptr[CSchema]* out)
+        CStatus RemoveField(int i, shared_ptr[CSchema]* out)
+
         # Removed const in Cython so don't have to cast to get code to generate
         shared_ptr[CSchema] AddMetadata(
             const shared_ptr[CKeyValueMetadata]& metadata)
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index f46ce94..42c558b 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -581,6 +581,10 @@ class ParquetManifest(object):
 
         self._visit_level(0, self.dirpath, [])
 
+        if self.common_metadata_path is None:
+            # _common_metadata is a subset of _metadata
+            self.common_metadata_path = self.metadata_path
+
     def _visit_level(self, level, base_path, part_keys):
         fs = self.filesystem
 
@@ -695,10 +699,10 @@ class ParquetDataset(object):
         self.paths = path_or_paths
 
         (self.pieces, self.partitions,
-         self.metadata_path) = _make_manifest(path_or_paths, self.fs)
+         self.common_metadata_path) = _make_manifest(path_or_paths, self.fs)
 
-        if self.metadata_path is not None:
-            with self.fs.open(self.metadata_path) as f:
+        if self.common_metadata_path is not None:
+            with self.fs.open(self.common_metadata_path) as f:
                 self.common_metadata = ParquetFile(f).metadata
         else:
             self.common_metadata = None
@@ -718,21 +722,31 @@ class ParquetDataset(object):
         open_file = self._get_open_file_func()
 
         if self.metadata is None and self.schema is None:
-            if self.metadata_path is not None:
-                self.schema = open_file(self.metadata_path).schema
+            if self.common_metadata_path is not None:
+                self.schema = open_file(self.common_metadata_path).schema
             else:
                 self.schema = self.pieces[0].get_metadata(open_file).schema
         elif self.schema is None:
             self.schema = self.metadata.schema
 
-        # Verify schemas are all equal
+        # Verify schemas are all compatible
+        dataset_schema = self.schema.to_arrow_schema()
+        # Exclude the partition columns from the schema, they are provided
+        # by the path, not the DatasetPiece
+        if self.partitions is not None:
+            for partition_name in self.partitions.partition_names:
+                if dataset_schema.get_field_index(partition_name) != -1:
+                    field_idx = dataset_schema.get_field_index(partition_name)
+                    dataset_schema = dataset_schema.remove(field_idx)
+
         for piece in self.pieces:
             file_metadata = piece.get_metadata(open_file)
-            if not self.schema.equals(file_metadata.schema):
-                raise ValueError('Schema in {0!s} was different. '
-                                 '{1!s} vs {2!s}'
-                                 .format(piece, file_metadata.schema,
-                                         self.schema))
+            file_schema = file_metadata.schema.to_arrow_schema()
+            if not dataset_schema.equals(file_schema):
+                raise ValueError('Schema in {0!s} was different. \n'
+                                 '{1!s}\n\nvs\n\n{2!s}'
+                                 .format(piece, file_schema,
+                                         dataset_schema))
 
     def read(self, columns=None, nthreads=1, use_pandas_metadata=False):
         """
@@ -831,7 +845,7 @@ def _ensure_filesystem(fs):
 
 def _make_manifest(path_or_paths, fs, pathsep='/'):
     partitions = None
-    metadata_path = None
+    common_metadata_path = None
 
     if len(path_or_paths) == 1:
         # Dask passes a directory as a list of length 1
@@ -840,7 +854,7 @@ def _make_manifest(path_or_paths, fs, pathsep='/'):
     if is_string(path_or_paths) and fs.isdir(path_or_paths):
         manifest = ParquetManifest(path_or_paths, filesystem=fs,
                                    pathsep=fs.pathsep)
-        metadata_path = manifest.metadata_path
+        common_metadata_path = manifest.common_metadata_path
         pieces = manifest.pieces
         partitions = manifest.partitions
     else:
@@ -859,7 +873,7 @@ def _make_manifest(path_or_paths, fs, pathsep='/'):
             piece = ParquetDatasetPiece(path)
             pieces.append(piece)
 
-    return pieces, partitions, metadata_path
+    return pieces, partitions, common_metadata_path
 
 
 _read_table_docstring = """
diff --git a/python/pyarrow/tests/test_parquet.py 
b/python/pyarrow/tests/test_parquet.py
index 187971f..cec01c8 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -1113,12 +1113,12 @@ def _test_read_common_metadata_files(fs, base_path):
     with fs.open(data_path, 'wb') as f:
         _write_table(table, f)
 
-    metadata_path = pjoin(base_path, '_metadata')
+    metadata_path = pjoin(base_path, '_common_metadata')
     with fs.open(metadata_path, 'wb') as f:
         pq.write_metadata(table.schema, f)
 
     dataset = pq.ParquetDataset(base_path, filesystem=fs)
-    assert dataset.metadata_path == metadata_path
+    assert dataset.common_metadata_path == metadata_path
 
     with fs.open(data_path) as f:
         common_schema = pq.read_metadata(f).schema
@@ -1431,7 +1431,14 @@ def _test_write_to_dataset_with_partitions(base_path, 
filesystem=None):
     output_table = pa.Table.from_pandas(output_df)
     pq.write_to_dataset(output_table, base_path, partition_by,
                         filesystem=filesystem)
-    input_table = pq.ParquetDataset(base_path, filesystem=filesystem).read()
+    pq.write_metadata(output_table.schema,
+                      os.path.join(base_path, '_common_metadata'))
+    dataset = pq.ParquetDataset(base_path, filesystem=filesystem)
+    # ARROW-2209: Ensure the dataset schema also includes the partition columns
+    dataset_cols = set(dataset.schema.to_arrow_schema().names)
+    assert dataset_cols == set(output_table.schema.names)
+
+    input_table = dataset.read()
     input_df = input_table.to_pandas()
 
     # Read data back in and compare with original DataFrame
diff --git a/python/pyarrow/types.pxi b/python/pyarrow/types.pxi
index 849a0e0..5f96290 100644
--- a/python/pyarrow/types.pxi
+++ b/python/pyarrow/types.pxi
@@ -529,6 +529,64 @@ cdef class Schema:
     def get_field_index(self, name):
         return self.schema.GetFieldIndex(tobytes(name))
 
+    def append(self, Field field):
+        """
+        Append a field at the end of the schema.
+
+        Parameters
+        ----------
+
+        field: Field
+
+        Returns
+        -------
+        schema: Schema
+        """
+        return self.insert(self.schema.num_fields(), field)
+
+    def insert(self, int i, Field field):
+        """
+        Add a field at position i to the schema.
+
+        Parameters
+        ----------
+        i: int
+        field: Field
+
+        Returns
+        -------
+        schema: Schema
+        """
+        cdef:
+            shared_ptr[CSchema] new_schema
+            shared_ptr[CField] c_field
+
+        c_field = field.sp_field
+
+        with nogil:
+            check_status(self.schema.AddField(i, c_field, &new_schema))
+
+        return pyarrow_wrap_schema(new_schema)
+
+    def remove(self, int i):
+        """
+        Remove the field at index i from the schema.
+
+        Parameters
+        ----------
+        i: int
+
+        Returns
+        -------
+        schema: Schema
+        """
+        cdef shared_ptr[CSchema] new_schema
+
+        with nogil:
+            check_status(self.schema.RemoveField(i, &new_schema))
+
+        return pyarrow_wrap_schema(new_schema)
+
     def add_metadata(self, dict metadata):
         """
         Add metadata as dict of string keys and values to Schema

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to