This is an automated email from the ASF dual-hosted git repository.

fsaintjacques pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new ac3bfe4  ARROW-8244: [Python] Fix parquet.write_to_dataset to set file 
path in metadata_collector
ac3bfe4 is described below

commit ac3bfe47821cb8368f657860f115e88077eaf64d
Author: Joris Van den Bossche <[email protected]>
AuthorDate: Thu Apr 2 15:15:28 2020 -0400

    ARROW-8244: [Python] Fix parquet.write_to_dataset to set file path in 
metadata_collector
    
    This explores a potential fix for ARROW-8244, it seems rather 
straightforward to set the file path in `write_to_dataset` (`write_table` does 
not do this, because there the user passes a full path, so no relative path is 
known).
    
    cc @rjzamora does this look the correct logic?
    
    Closes #6797 from jorisvandenbossche/ARROW-8244
    
    Authored-by: Joris Van den Bossche <[email protected]>
    Signed-off-by: François Saint-Jacques <[email protected]>
---
 python/pyarrow/parquet.py            | 31 ++++++++++++++++++++-----------
 python/pyarrow/tests/test_parquet.py | 35 ++++++++++++++++++++++++++++++++++-
 2 files changed, 54 insertions(+), 12 deletions(-)

diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 45dea9d..fbd76f1 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -422,8 +422,8 @@ schema : arrow Schema
 **options : dict
     If options contains a key `metadata_collector` then the
     corresponding value is assumed to be a list (or any object with
-    `.append` method) that will be filled with file metadata instances
-    of dataset pieces.
+    `.append` method) that will be filled with the file metadata instance
+    of the written file.
 """.format(_parquet_writer_arg_docs)
 
     def __init__(self, where, schema, filesystem=None,
@@ -1429,15 +1429,18 @@ def write_to_dataset(table, root_path, 
partition_cols=None,
         and allow you to override the partition filename. If nothing is
         passed, the filename will consist of a uuid.
     **kwargs : dict,
-        kwargs for write_table function. Using `metadata_collector` in
-        kwargs allows one to collect the file metadata instances of
-        dataset pieces. See docstring for `write_table` or
-        `ParquetWriter` for more information.
+        Additional kwargs for write_table function. See docstring for
+        `write_table` or `ParquetWriter` for more information.
+        Using `metadata_collector` in kwargs allows one to collect the
+        file metadata instances of dataset pieces. The file paths in the
+        ColumnChunkMetaData will be set relative to `root_path`.
     """
     fs, root_path = _get_filesystem_and_path(filesystem, root_path)
 
     _mkdir_if_not_exists(fs, root_path)
 
+    metadata_collector = kwargs.pop('metadata_collector', None)
+
     if partition_cols is not None and len(partition_cols) > 0:
         df = table.to_pandas(ignore_metadata=True)
         partition_keys = [df[col] for col in partition_cols]
@@ -1462,15 +1465,18 @@ def write_to_dataset(table, root_path, 
partition_cols=None,
                  for name, val in zip(partition_cols, keys)])
             subtable = pa.Table.from_pandas(subgroup, preserve_index=False,
                                             schema=subschema, safe=False)
-            prefix = '/'.join([root_path, subdir])
-            _mkdir_if_not_exists(fs, prefix)
+            _mkdir_if_not_exists(fs, '/'.join([root_path, subdir]))
             if partition_filename_cb:
                 outfile = partition_filename_cb(keys)
             else:
                 outfile = guid() + '.parquet'
-            full_path = '/'.join([prefix, outfile])
+            relative_path = '/'.join([subdir, outfile])
+            full_path = '/'.join([root_path, relative_path])
             with fs.open(full_path, 'wb') as f:
-                write_table(subtable, f, **kwargs)
+                write_table(subtable, f, metadata_collector=metadata_collector,
+                            **kwargs)
+            if metadata_collector is not None:
+                metadata_collector[-1].set_file_path(relative_path)
     else:
         if partition_filename_cb:
             outfile = partition_filename_cb(None)
@@ -1478,7 +1484,10 @@ def write_to_dataset(table, root_path, 
partition_cols=None,
             outfile = guid() + '.parquet'
         full_path = '/'.join([root_path, outfile])
         with fs.open(full_path, 'wb') as f:
-            write_table(table, f, **kwargs)
+            write_table(table, f, metadata_collector=metadata_collector,
+                        **kwargs)
+        if metadata_collector is not None:
+            metadata_collector[-1].set_file_path(outfile)
 
 
 def write_metadata(schema, where, version='1.0',
diff --git a/python/pyarrow/tests/test_parquet.py 
b/python/pyarrow/tests/test_parquet.py
index bcb3528..7548e0b 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -3233,7 +3233,7 @@ def test_direct_read_dictionary_subfield():
 
 
 @pytest.mark.pandas
-def test_dataset_metadata(tempdir):
+def test_write_to_dataset_metadata(tempdir):
     path = tempdir / "ARROW-1983-dataset"
 
     # create and write a test dataset
@@ -3253,6 +3253,7 @@ def test_dataset_metadata(tempdir):
     dataset = pq.ParquetDataset(path)
     metadata_list2 = [p.get_metadata() for p in dataset.pieces]
 
+    collected_paths = []
     # compare metadata list content:
     assert len(metadata_list) == len(metadata_list2)
     for md, md2 in zip(metadata_list, metadata_list2):
@@ -3261,8 +3262,40 @@ def test_dataset_metadata(tempdir):
         # serialized_size is initialized in the reader:
         assert d.pop('serialized_size') == 0
         assert d2.pop('serialized_size') > 0
+        # file_path is different (not set for in-file metadata)
+        assert d["row_groups"][0]["columns"][0]["file_path"] != ""
+        assert d2["row_groups"][0]["columns"][0]["file_path"] == ""
+        # collect file paths to check afterwards, ignore here
+        collected_paths.append(d["row_groups"][0]["columns"][0]["file_path"])
+        d["row_groups"][0]["columns"][0]["file_path"] = ""
         assert d == d2
 
+    # ARROW-8244 - check the file paths in the collected metadata
+    n_root = len(path.parts)
+    file_paths = ["/".join(p.parts[n_root:]) for p in path.rglob("*.parquet")]
+    assert sorted(collected_paths) == sorted(file_paths)
+
+    # writing to single file (not partitioned)
+    metadata_list = []
+    pq.write_to_dataset(pa.table({'a': [1, 2, 3]}), root_path=str(path),
+                        metadata_collector=metadata_list)
+
+    # compare metadata content
+    file_paths = list(path.glob("*.parquet"))
+    assert len(file_paths) == 1
+    file_path = file_paths[0]
+    file_metadata = pq.read_metadata(file_path)
+    d1 = metadata_list[0].to_dict()
+    d2 = file_metadata.to_dict()
+    # serialized_size is initialized in the reader:
+    assert d1.pop('serialized_size') == 0
+    assert d2.pop('serialized_size') > 0
+    # file_path is different (not set for in-file metadata)
+    assert d1["row_groups"][0]["columns"][0]["file_path"] == file_path.name
+    assert d2["row_groups"][0]["columns"][0]["file_path"] == ""
+    d1["row_groups"][0]["columns"][0]["file_path"] = ""
+    assert d1 == d2
+
 
 def test_parquet_file_too_small(tempdir):
     path = str(tempdir / "test.parquet")

Reply via email to