[ 
https://issues.apache.org/jira/browse/ARROW-2209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383641#comment-16383641
 ] 

ASF GitHub Bot commented on ARROW-2209:
---------------------------------------

xhochy closed pull request #1656: ARROW-2209: [Python] Partition columns are 
not correctly loaded in schema of ParquetDataset
URL: https://github.com/apache/arrow/pull/1656
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/python/pyarrow/includes/libarrow.pxd 
b/python/pyarrow/includes/libarrow.pxd
index 8da126aaf..2804d6c38 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -263,6 +263,10 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
         int num_fields()
         c_string ToString()
 
+        CStatus AddField(int i, const shared_ptr[CField]& field,
+                         shared_ptr[CSchema]* out)
+        CStatus RemoveField(int i, shared_ptr[CSchema]* out)
+
         # Removed const in Cython so don't have to cast to get code to generate
         shared_ptr[CSchema] AddMetadata(
             const shared_ptr[CKeyValueMetadata]& metadata)
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 485459e0b..e33dd1b07 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -581,6 +581,10 @@ def __init__(self, dirpath, filesystem=None, pathsep='/',
 
         self._visit_level(0, self.dirpath, [])
 
+        if self.common_metadata_path is None:
+            # _common_metadata is a subset of _metadata
+            self.common_metadata_path = self.metadata_path
+
     def _visit_level(self, level, base_path, part_keys):
         fs = self.filesystem
 
@@ -695,10 +699,10 @@ def __init__(self, path_or_paths, filesystem=None, 
schema=None,
         self.paths = path_or_paths
 
         (self.pieces, self.partitions,
-         self.metadata_path) = _make_manifest(path_or_paths, self.fs)
+         self.common_metadata_path) = _make_manifest(path_or_paths, self.fs)
 
-        if self.metadata_path is not None:
-            with self.fs.open(self.metadata_path) as f:
+        if self.common_metadata_path is not None:
+            with self.fs.open(self.common_metadata_path) as f:
                 self.common_metadata = ParquetFile(f).metadata
         else:
             self.common_metadata = None
@@ -718,21 +722,31 @@ def validate_schemas(self):
         open_file = self._get_open_file_func()
 
         if self.metadata is None and self.schema is None:
-            if self.metadata_path is not None:
-                self.schema = open_file(self.metadata_path).schema
+            if self.common_metadata_path is not None:
+                self.schema = open_file(self.common_metadata_path).schema
             else:
                 self.schema = self.pieces[0].get_metadata(open_file).schema
         elif self.schema is None:
             self.schema = self.metadata.schema
 
-        # Verify schemas are all equal
+        # Verify schemas are all compatible
+        dataset_schema = self.schema.to_arrow_schema()
+        # Exclude the partition columns from the schema, they are provided
+        # by the path, not the DatasetPiece
+        if self.partitions is not None:
+            for partition_name in self.partitions.partition_names:
+                if dataset_schema.get_field_index(partition_name) != -1:
+                    field_idx = dataset_schema.get_field_index(partition_name)
+                    dataset_schema = dataset_schema.remove(field_idx)
+
         for piece in self.pieces:
             file_metadata = piece.get_metadata(open_file)
-            if not self.schema.equals(file_metadata.schema):
-                raise ValueError('Schema in {0!s} was different. '
-                                 '{1!s} vs {2!s}'
-                                 .format(piece, file_metadata.schema,
-                                         self.schema))
+            file_schema = file_metadata.schema.to_arrow_schema()
+            if not dataset_schema.equals(file_schema):
+                raise ValueError('Schema in {0!s} was different. \n'
+                                 '{1!s}\n\nvs\n\n{2!s}'
+                                 .format(piece, file_schema,
+                                         dataset_schema))
 
     def read(self, columns=None, nthreads=1, use_pandas_metadata=False):
         """
@@ -831,7 +845,7 @@ def _ensure_filesystem(fs):
 
 def _make_manifest(path_or_paths, fs, pathsep='/'):
     partitions = None
-    metadata_path = None
+    common_metadata_path = None
 
     if len(path_or_paths) == 1:
         # Dask passes a directory as a list of length 1
@@ -840,7 +854,7 @@ def _make_manifest(path_or_paths, fs, pathsep='/'):
     if is_string(path_or_paths) and fs.isdir(path_or_paths):
         manifest = ParquetManifest(path_or_paths, filesystem=fs,
                                    pathsep=fs.pathsep)
-        metadata_path = manifest.metadata_path
+        common_metadata_path = manifest.common_metadata_path
         pieces = manifest.pieces
         partitions = manifest.partitions
     else:
@@ -859,7 +873,7 @@ def _make_manifest(path_or_paths, fs, pathsep='/'):
             piece = ParquetDatasetPiece(path)
             pieces.append(piece)
 
-    return pieces, partitions, metadata_path
+    return pieces, partitions, common_metadata_path
 
 
 def read_table(source, columns=None, nthreads=1, metadata=None,
diff --git a/python/pyarrow/tests/test_parquet.py 
b/python/pyarrow/tests/test_parquet.py
index bd76feb2e..319f130f6 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -1099,12 +1099,12 @@ def _test_read_common_metadata_files(fs, base_path):
     with fs.open(data_path, 'wb') as f:
         _write_table(table, f)
 
-    metadata_path = pjoin(base_path, '_metadata')
+    metadata_path = pjoin(base_path, '_common_metadata')
     with fs.open(metadata_path, 'wb') as f:
         pq.write_metadata(table.schema, f)
 
     dataset = pq.ParquetDataset(base_path, filesystem=fs)
-    assert dataset.metadata_path == metadata_path
+    assert dataset.common_metadata_path == metadata_path
 
     with fs.open(data_path) as f:
         common_schema = pq.read_metadata(f).schema
@@ -1417,7 +1417,14 @@ def _test_write_to_dataset_with_partitions(base_path, 
filesystem=None):
     output_table = pa.Table.from_pandas(output_df)
     pq.write_to_dataset(output_table, base_path, partition_by,
                         filesystem=filesystem)
-    input_table = pq.ParquetDataset(base_path, filesystem=filesystem).read()
+    pq.write_metadata(output_table.schema,
+                      os.path.join(base_path, '_common_metadata'))
+    dataset = pq.ParquetDataset(base_path, filesystem=filesystem)
+    # ARROW-2209: Ensure the dataset schema also includes the partition columns
+    dataset_cols = set(dataset.schema.to_arrow_schema().names)
+    assert dataset_cols == set(output_table.schema.names)
+
+    input_table = dataset.read()
     input_df = input_table.to_pandas()
 
     # Read data back in and compare with original DataFrame
diff --git a/python/pyarrow/types.pxi b/python/pyarrow/types.pxi
index 849a0e016..5f962901c 100644
--- a/python/pyarrow/types.pxi
+++ b/python/pyarrow/types.pxi
@@ -529,6 +529,64 @@ cdef class Schema:
     def get_field_index(self, name):
         return self.schema.GetFieldIndex(tobytes(name))
 
+    def append(self, Field field):
+        """
+        Append a field at the end of the schema.
+
+        Parameters
+        ----------
+
+        field: Field
+
+        Returns
+        -------
+        schema: Schema
+        """
+        return self.insert(self.schema.num_fields(), field)
+
+    def insert(self, int i, Field field):
+        """
+        Add a field at position i to the schema.
+
+        Parameters
+        ----------
+        i: int
+        field: Field
+
+        Returns
+        -------
+        schema: Schema
+        """
+        cdef:
+            shared_ptr[CSchema] new_schema
+            shared_ptr[CField] c_field
+
+        c_field = field.sp_field
+
+        with nogil:
+            check_status(self.schema.AddField(i, c_field, &new_schema))
+
+        return pyarrow_wrap_schema(new_schema)
+
+    def remove(self, int i):
+        """
+        Remove the field at index i from the schema.
+
+        Parameters
+        ----------
+        i: int
+
+        Returns
+        -------
+        schema: Schema
+        """
+        cdef shared_ptr[CSchema] new_schema
+
+        with nogil:
+            check_status(self.schema.RemoveField(i, &new_schema))
+
+        return pyarrow_wrap_schema(new_schema)
+
     def add_metadata(self, dict metadata):
         """
         Add metadata as dict of string keys and values to Schema


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> [Python] Partition columns are not correctly loaded in schema of 
> ParquetDataset
> -------------------------------------------------------------------------------
>
>                 Key: ARROW-2209
>                 URL: https://issues.apache.org/jira/browse/ARROW-2209
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: Python
>            Reporter: Uwe L. Korn
>            Assignee: Uwe L. Korn
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 0.9.0
>
>
> Currently the partition columns are not included in the schema of a 
> ParquetDataset. We correctly write them out in the {{_common_metadata}} file 
> but we fail to load this file correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to