[ 
https://issues.apache.org/jira/browse/ARROW-12364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17367199#comment-17367199
 ] 

Lance Dacey commented on ARROW-12364:
-------------------------------------

Hi @jorisvandenbossche, you asked me to create a separate issue for the 
metadata collector for ds.write_dataset. Just wanted to make sure that you had 
a chance to take a look.

I had to switch back to the legacy dataset writer for most projects. Using 
fs.glob() can be very slow on very large datasets with many thousands of files, 
and my workflow often depends on knowing which files were written during a 
previous Airflow task.

> [Python] [Dataset] Add metadata_collector option to ds.write_dataset()
> ----------------------------------------------------------------------
>
>                 Key: ARROW-12364
>                 URL: https://issues.apache.org/jira/browse/ARROW-12364
>             Project: Apache Arrow
>          Issue Type: Wish
>          Components: Parquet, Python
>    Affects Versions: 3.0.0
>         Environment: Ubuntu 18.04
>            Reporter: Lance Dacey
>            Priority: Major
>              Labels: dataset, parquet, python
>
> The legacy pq.write_to_dataset() has an option to save metadata to a list 
> when writing partitioned data.
> {code:python}
>     collector = []
>     pq.write_to_dataset(
>         table=table,
>         root_path=output_path,
>         use_legacy_dataset=True,
>         metadata_collector=collector,
>     )
>     fragments = []
>     for piece in collector:
>         files.append(filesystem.sep.join([output_path, 
> piece.row_group(0).column(0).file_path]))
> {code}
> This allows me to save a list of the specific parquet files which were 
> created when writing the partitions to storage. I use this when scheduling 
> tasks with Airflow.
> Task A downloads data and partitions it --> Task B reads the file fragments 
> which were just saved and transforms it --> Task C creates a list of dataset 
> filters from the file fragments I transformed, reads each filter to into a 
> table and then processes the data further (normally dropping duplicates or 
> selecting a subset of the columns) and saves it for visualization
> {code:java}
> fragments = 
> ['dev/date_id=20180111/transform-split-20210301013200-68.parquet', 
> 'dev/date_id=20180114/transform-split-20210301013200-69.parquet', 
> 'dev/date_id=20180128/transform-split-20210301013200-57.parquet', ]
> {code}
> I can use this list downstream to do two things:
>  1) I can read the list of fragments directly as a new dataset and transform 
> the data
> {code:java}
> ds.dataset(fragments)
> {code}
> 2) I can generate filters from the fragment paths which were saved using 
> ds._get_partition_keys(). This allows me to query the dataset and retrieve 
> all fragments within the partition. For example, if I partition by date and I 
> process data every 30 minutes I might have 48 individual file fragments 
> within a single partition. I need to know to query the *entire* partition 
> instead of reading a single fragment.
> {code:java}
> def consolidate_filters(fragments):
>     """Retrieves the partition_expressions from a list of dataset fragments 
> to build a list of unique filters"""
>     filters = []
>     for frag in fragments:
>         partitions = ds._get_partition_keys(frag.partition_expression)
>         filter = [(k, "==", v) for k, v in partitions.items()]
>         if filter not in filters:
>             filters.append(filter)
>     return filters
> filter_expression = pq._filters_to_expression(
>                 filters=consolidate_filters(fragments=fragments)
>             )
> {code}
> My current problem is that when I use ds.write_dataset(), I do not have a 
> convenient method for generating a list of the file fragments I just saved. 
> My only choice is to use basename_template and fs.glob() to find a list of 
> the files based on the basename_template pattern. This is much slower and a 
> waste of listing files on blob storage. [Related stackoverflow question with 
> the basis of the approach I am using now 
> |https://stackoverflow.com/questions/66252660/pyarrow-identify-the-fragments-written-or-filters-used-when-writing-a-parquet/66266585#66266585]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to