Lance Dacey created ARROW-12364:
-----------------------------------

             Summary: [Python] [Dataset] Add metadata_collector option to 
ds.write_dataset()
                 Key: ARROW-12364
                 URL: https://issues.apache.org/jira/browse/ARROW-12364
             Project: Apache Arrow
          Issue Type: Wish
          Components: Parquet, Python
    Affects Versions: 3.0.0
         Environment: Ubuntu 18.04
            Reporter: Lance Dacey


The legacy pq.write_to_dataset() has an option to save metadata to a list when 
writing partitioned data.
{code:python}
    collector = []
    pq.write_to_dataset(
        table=table,
        root_path=output_path,
        use_legacy_dataset=True,
        metadata_collector=collector,
    )
    fragments = []
    for piece in collector:
        files.append(filesystem.sep.join([output_path, 
piece.row_group(0).column(0).file_path]))
{code}
This allows me to save a list of the specific parquet files which were created 
when writing the partitions to storage. I use this when scheduling tasks with 
Airflow.

Task A downloads data and partitions it --> Task B reads the file fragments 
which were just saved and transforms it --> Task C creates a list of dataset 
filters from the file fragments I transformed, reads each filter to into a 
table and then processes the data further (normally dropping duplicates or 
selecting a subset of the columns) and saves it for visualization
{code:java}
fragments = ['dev/date_id=20180111/transform-split-20210301013200-68.parquet', 
'dev/date_id=20180114/transform-split-20210301013200-69.parquet', 
'dev/date_id=20180128/transform-split-20210301013200-57.parquet', ]
{code}
I can use this list downstream to do two things:
 1) I can read the list of fragments directly as a new dataset and transform 
the data
{code:java}
ds.dataset(fragments)
{code}
2) I can generate filters from the fragment paths which were saved using 
ds._get_partition_keys(). This allows me to query the dataset and retrieve all 
fragments within the partition. For example, if I partition by date and I 
process data every 30 minutes I might have 48 individual file fragments within 
a single partition. I need to know to query the *entire* partition instead of 
reading a single fragment.
{code:java}
def consolidate_filters(fragments):
    """Retrieves the partition_expressions from a list of dataset fragments to 
build a list of unique filters"""
    filters = []
    for frag in fragments:
        partitions = ds._get_partition_keys(frag.partition_expression)
        filter = [(k, "==", v) for k, v in partitions.items()]
        if filter not in filters:
            filters.append(filter)
    return filters

filter_expression = pq._filters_to_expression(
                filters=consolidate_filters(fragments=fragments)
            )
{code}
My current problem is that when I use ds.write_dataset(), I do not have a 
convenient method for generating a list of the file fragments I just saved. My 
only choice is to use basename_template and fs.glob() to find a list of the 
files based on the basename_template pattern. This is much slower and a waste 
of listing files on blob storage. [Related stackoverflow question with the 
basis of the approach I am using now 
|https://stackoverflow.com/questions/66252660/pyarrow-identify-the-fragments-written-or-filters-used-when-writing-a-parquet/66266585#66266585]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to