This is an automated email from the ASF dual-hosted git repository.
fsaintjacques pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new ac3bfe4 ARROW-8244: [Python] Fix parquet.write_to_dataset to set file
path in metadata_collector
ac3bfe4 is described below
commit ac3bfe47821cb8368f657860f115e88077eaf64d
Author: Joris Van den Bossche <[email protected]>
AuthorDate: Thu Apr 2 15:15:28 2020 -0400
ARROW-8244: [Python] Fix parquet.write_to_dataset to set file path in
metadata_collector
This explores a potential fix for ARROW-8244, it seems rather
straightforward to set the file path in `write_to_dataset` (`write_table` does
not do this, because there the user passes a full path, so no relative path is
known).
cc @rjzamora does this look the correct logic?
Closes #6797 from jorisvandenbossche/ARROW-8244
Authored-by: Joris Van den Bossche <[email protected]>
Signed-off-by: François Saint-Jacques <[email protected]>
---
python/pyarrow/parquet.py | 31 ++++++++++++++++++++-----------
python/pyarrow/tests/test_parquet.py | 35 ++++++++++++++++++++++++++++++++++-
2 files changed, 54 insertions(+), 12 deletions(-)
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 45dea9d..fbd76f1 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -422,8 +422,8 @@ schema : arrow Schema
**options : dict
If options contains a key `metadata_collector` then the
corresponding value is assumed to be a list (or any object with
- `.append` method) that will be filled with file metadata instances
- of dataset pieces.
+ `.append` method) that will be filled with the file metadata instance
+ of the written file.
""".format(_parquet_writer_arg_docs)
def __init__(self, where, schema, filesystem=None,
@@ -1429,15 +1429,18 @@ def write_to_dataset(table, root_path,
partition_cols=None,
and allow you to override the partition filename. If nothing is
passed, the filename will consist of a uuid.
**kwargs : dict,
- kwargs for write_table function. Using `metadata_collector` in
- kwargs allows one to collect the file metadata instances of
- dataset pieces. See docstring for `write_table` or
- `ParquetWriter` for more information.
+ Additional kwargs for write_table function. See docstring for
+ `write_table` or `ParquetWriter` for more information.
+ Using `metadata_collector` in kwargs allows one to collect the
+ file metadata instances of dataset pieces. The file paths in the
+ ColumnChunkMetaData will be set relative to `root_path`.
"""
fs, root_path = _get_filesystem_and_path(filesystem, root_path)
_mkdir_if_not_exists(fs, root_path)
+ metadata_collector = kwargs.pop('metadata_collector', None)
+
if partition_cols is not None and len(partition_cols) > 0:
df = table.to_pandas(ignore_metadata=True)
partition_keys = [df[col] for col in partition_cols]
@@ -1462,15 +1465,18 @@ def write_to_dataset(table, root_path,
partition_cols=None,
for name, val in zip(partition_cols, keys)])
subtable = pa.Table.from_pandas(subgroup, preserve_index=False,
schema=subschema, safe=False)
- prefix = '/'.join([root_path, subdir])
- _mkdir_if_not_exists(fs, prefix)
+ _mkdir_if_not_exists(fs, '/'.join([root_path, subdir]))
if partition_filename_cb:
outfile = partition_filename_cb(keys)
else:
outfile = guid() + '.parquet'
- full_path = '/'.join([prefix, outfile])
+ relative_path = '/'.join([subdir, outfile])
+ full_path = '/'.join([root_path, relative_path])
with fs.open(full_path, 'wb') as f:
- write_table(subtable, f, **kwargs)
+ write_table(subtable, f, metadata_collector=metadata_collector,
+ **kwargs)
+ if metadata_collector is not None:
+ metadata_collector[-1].set_file_path(relative_path)
else:
if partition_filename_cb:
outfile = partition_filename_cb(None)
@@ -1478,7 +1484,10 @@ def write_to_dataset(table, root_path,
partition_cols=None,
outfile = guid() + '.parquet'
full_path = '/'.join([root_path, outfile])
with fs.open(full_path, 'wb') as f:
- write_table(table, f, **kwargs)
+ write_table(table, f, metadata_collector=metadata_collector,
+ **kwargs)
+ if metadata_collector is not None:
+ metadata_collector[-1].set_file_path(outfile)
def write_metadata(schema, where, version='1.0',
diff --git a/python/pyarrow/tests/test_parquet.py
b/python/pyarrow/tests/test_parquet.py
index bcb3528..7548e0b 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -3233,7 +3233,7 @@ def test_direct_read_dictionary_subfield():
@pytest.mark.pandas
-def test_dataset_metadata(tempdir):
+def test_write_to_dataset_metadata(tempdir):
path = tempdir / "ARROW-1983-dataset"
# create and write a test dataset
@@ -3253,6 +3253,7 @@ def test_dataset_metadata(tempdir):
dataset = pq.ParquetDataset(path)
metadata_list2 = [p.get_metadata() for p in dataset.pieces]
+ collected_paths = []
# compare metadata list content:
assert len(metadata_list) == len(metadata_list2)
for md, md2 in zip(metadata_list, metadata_list2):
@@ -3261,8 +3262,40 @@ def test_dataset_metadata(tempdir):
# serialized_size is initialized in the reader:
assert d.pop('serialized_size') == 0
assert d2.pop('serialized_size') > 0
+ # file_path is different (not set for in-file metadata)
+ assert d["row_groups"][0]["columns"][0]["file_path"] != ""
+ assert d2["row_groups"][0]["columns"][0]["file_path"] == ""
+ # collect file paths to check afterwards, ignore here
+ collected_paths.append(d["row_groups"][0]["columns"][0]["file_path"])
+ d["row_groups"][0]["columns"][0]["file_path"] = ""
assert d == d2
+ # ARROW-8244 - check the file paths in the collected metadata
+ n_root = len(path.parts)
+ file_paths = ["/".join(p.parts[n_root:]) for p in path.rglob("*.parquet")]
+ assert sorted(collected_paths) == sorted(file_paths)
+
+ # writing to single file (not partitioned)
+ metadata_list = []
+ pq.write_to_dataset(pa.table({'a': [1, 2, 3]}), root_path=str(path),
+ metadata_collector=metadata_list)
+
+ # compare metadata content
+ file_paths = list(path.glob("*.parquet"))
+ assert len(file_paths) == 1
+ file_path = file_paths[0]
+ file_metadata = pq.read_metadata(file_path)
+ d1 = metadata_list[0].to_dict()
+ d2 = file_metadata.to_dict()
+ # serialized_size is initialized in the reader:
+ assert d1.pop('serialized_size') == 0
+ assert d2.pop('serialized_size') > 0
+ # file_path is different (not set for in-file metadata)
+ assert d1["row_groups"][0]["columns"][0]["file_path"] == file_path.name
+ assert d2["row_groups"][0]["columns"][0]["file_path"] == ""
+ d1["row_groups"][0]["columns"][0]["file_path"] = ""
+ assert d1 == d2
+
def test_parquet_file_too_small(tempdir):
path = str(tempdir / "test.parquet")