[ 
https://issues.apache.org/jira/browse/ARROW-12364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated ARROW-12364:
-----------------------------------
    Labels: dataset parquet pull-request-available python  (was: dataset 
parquet python)

> [Python] [Dataset] Add metadata_collector option to ds.write_dataset()
> ----------------------------------------------------------------------
>
>                 Key: ARROW-12364
>                 URL: https://issues.apache.org/jira/browse/ARROW-12364
>             Project: Apache Arrow
>          Issue Type: Wish
>          Components: Parquet, Python
>    Affects Versions: 3.0.0
>         Environment: Ubuntu 18.04
>            Reporter: Lance Dacey
>            Assignee: Weston Pace
>            Priority: Major
>              Labels: dataset, parquet, pull-request-available, python
>             Fix For: 5.0.0
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> The legacy pq.write_to_dataset() has an option to save metadata to a list 
> when writing partitioned data.
> {code:python}
>     collector = []
>     pq.write_to_dataset(
>         table=table,
>         root_path=output_path,
>         use_legacy_dataset=True,
>         metadata_collector=collector,
>     )
>     fragments = []
>     for piece in collector:
>         files.append(filesystem.sep.join([output_path, 
> piece.row_group(0).column(0).file_path]))
> {code}
> This allows me to save a list of the specific parquet files which were 
> created when writing the partitions to storage. I use this when scheduling 
> tasks with Airflow.
> Task A downloads data and partitions it --> Task B reads the file fragments 
> which were just saved and transforms it --> Task C creates a list of dataset 
> filters from the file fragments I transformed, reads each filter to into a 
> table and then processes the data further (normally dropping duplicates or 
> selecting a subset of the columns) and saves it for visualization
> {code:java}
> fragments = 
> ['dev/date_id=20180111/transform-split-20210301013200-68.parquet', 
> 'dev/date_id=20180114/transform-split-20210301013200-69.parquet', 
> 'dev/date_id=20180128/transform-split-20210301013200-57.parquet', ]
> {code}
> I can use this list downstream to do two things:
>  1) I can read the list of fragments directly as a new dataset and transform 
> the data
> {code:java}
> ds.dataset(fragments)
> {code}
> 2) I can generate filters from the fragment paths which were saved using 
> ds._get_partition_keys(). This allows me to query the dataset and retrieve 
> all fragments within the partition. For example, if I partition by date and I 
> process data every 30 minutes I might have 48 individual file fragments 
> within a single partition. I need to know to query the *entire* partition 
> instead of reading a single fragment.
> {code:java}
> def consolidate_filters(fragments):
>     """Retrieves the partition_expressions from a list of dataset fragments 
> to build a list of unique filters"""
>     filters = []
>     for frag in fragments:
>         partitions = ds._get_partition_keys(frag.partition_expression)
>         filter = [(k, "==", v) for k, v in partitions.items()]
>         if filter not in filters:
>             filters.append(filter)
>     return filters
> filter_expression = pq._filters_to_expression(
>                 filters=consolidate_filters(fragments=fragments)
>             )
> {code}
> My current problem is that when I use ds.write_dataset(), I do not have a 
> convenient method for generating a list of the file fragments I just saved. 
> My only choice is to use basename_template and fs.glob() to find a list of 
> the files based on the basename_template pattern. This is much slower and a 
> waste of listing files on blob storage. [Related stackoverflow question with 
> the basis of the approach I am using now 
> |https://stackoverflow.com/questions/66252660/pyarrow-identify-the-fragments-written-or-filters-used-when-writing-a-parquet/66266585#66266585]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to