[
https://issues.apache.org/jira/browse/ARROW-12364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated ARROW-12364:
-----------------------------------
Labels: dataset parquet pull-request-available python (was: dataset
parquet python)
> [Python] [Dataset] Add metadata_collector option to ds.write_dataset()
> ----------------------------------------------------------------------
>
> Key: ARROW-12364
> URL: https://issues.apache.org/jira/browse/ARROW-12364
> Project: Apache Arrow
> Issue Type: Wish
> Components: Parquet, Python
> Affects Versions: 3.0.0
> Environment: Ubuntu 18.04
> Reporter: Lance Dacey
> Assignee: Weston Pace
> Priority: Major
> Labels: dataset, parquet, pull-request-available, python
> Fix For: 5.0.0
>
> Time Spent: 10m
> Remaining Estimate: 0h
>
> The legacy pq.write_to_dataset() has an option to save metadata to a list
> when writing partitioned data.
> {code:python}
> collector = []
> pq.write_to_dataset(
> table=table,
> root_path=output_path,
> use_legacy_dataset=True,
> metadata_collector=collector,
> )
> fragments = []
> for piece in collector:
> files.append(filesystem.sep.join([output_path,
> piece.row_group(0).column(0).file_path]))
> {code}
> This allows me to save a list of the specific parquet files which were
> created when writing the partitions to storage. I use this when scheduling
> tasks with Airflow.
> Task A downloads data and partitions it --> Task B reads the file fragments
> which were just saved and transforms it --> Task C creates a list of dataset
> filters from the file fragments I transformed, reads each filter to into a
> table and then processes the data further (normally dropping duplicates or
> selecting a subset of the columns) and saves it for visualization
> {code:java}
> fragments =
> ['dev/date_id=20180111/transform-split-20210301013200-68.parquet',
> 'dev/date_id=20180114/transform-split-20210301013200-69.parquet',
> 'dev/date_id=20180128/transform-split-20210301013200-57.parquet', ]
> {code}
> I can use this list downstream to do two things:
> 1) I can read the list of fragments directly as a new dataset and transform
> the data
> {code:java}
> ds.dataset(fragments)
> {code}
> 2) I can generate filters from the fragment paths which were saved using
> ds._get_partition_keys(). This allows me to query the dataset and retrieve
> all fragments within the partition. For example, if I partition by date and I
> process data every 30 minutes I might have 48 individual file fragments
> within a single partition. I need to know to query the *entire* partition
> instead of reading a single fragment.
> {code:java}
> def consolidate_filters(fragments):
> """Retrieves the partition_expressions from a list of dataset fragments
> to build a list of unique filters"""
> filters = []
> for frag in fragments:
> partitions = ds._get_partition_keys(frag.partition_expression)
> filter = [(k, "==", v) for k, v in partitions.items()]
> if filter not in filters:
> filters.append(filter)
> return filters
> filter_expression = pq._filters_to_expression(
> filters=consolidate_filters(fragments=fragments)
> )
> {code}
> My current problem is that when I use ds.write_dataset(), I do not have a
> convenient method for generating a list of the file fragments I just saved.
> My only choice is to use basename_template and fs.glob() to find a list of
> the files based on the basename_template pattern. This is much slower and a
> waste of listing files on blob storage. [Related stackoverflow question with
> the basis of the approach I am using now
> |https://stackoverflow.com/questions/66252660/pyarrow-identify-the-fragments-written-or-filters-used-when-writing-a-parquet/66266585#66266585]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)