[ 
https://issues.apache.org/jira/browse/ARROW-15716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacob Wujciak-Jens updated ARROW-15716:
---------------------------------------
    Component/s: Python

> [Dataset][Python] Parse a list of fragment paths to gather filters
> ------------------------------------------------------------------
>
>                 Key: ARROW-15716
>                 URL: https://issues.apache.org/jira/browse/ARROW-15716
>             Project: Apache Arrow
>          Issue Type: Wish
>          Components: Python
>    Affects Versions: 7.0.0
>            Reporter: Lance Dacey
>            Priority: Minor
>
> Is it possible for partitioning.parse() to be updated to parse a list of 
> paths instead of just a single path? 
> I am passing the .paths from file_visitor to downstream tasks to process data 
> which was recently saved, but I can run into problems with this if I 
> overwrite data with delete_matching in order to consolidate small files since 
> the paths won't exist. 
> Here is the output of my current approach to use filters instead of reading 
> the paths directly:
> {code:java}
> # Fragments saved during write_dataset 
> ['dev/dataset/fragments/date_id=20210813/data-0.parquet', 
> 'dev/dataset/fragments/date_id=20210114/data-2.parquet', 
> 'dev/dataset/fragments/date_id=20210114/data-1.parquet', 
> 'dev/dataset/fragments/date_id=20210114/data-0.parquet']
> # Run partitioning.parse() on each fragment 
> [<pyarrow.compute.Expression (date_id == 20210813)>, 
> <pyarrow.compute.Expression (date_id == 20210114)>, 
> <pyarrow.compute.Expression (date_id == 20210114)>, 
> <pyarrow.compute.Expression (date_id == 20210114)>]
> # Format those expressions into a list of tuples
> [('date_id', 'in', [20210114, 20210813])]
> # Convert to an expression which is used as a filter in .to_table()
> is_in(date_id, {value_set=int64:[
>   20210114,
>   20210813
> ], skip_nulls=false})
> {code}
> And here is how I am creating the filter from a list of .paths (perhaps there 
> is a better way?):
> {code:python}
>         partitioning = ds.HivePartitioning(partition_schema)
>         expressions = []
>         for file in paths:
>             expressions.append(partitioning.parse(file))
>         values = []
>         filters = []
>         for expression in expressions:
>             partitions = ds._get_partition_keys(expression)
>             if len(partitions.keys()) > 1:
>                 element = [(k, "==", v) for k, v in partitions.items()]
>                 if element not in filters:
>                     filters.append(element)
>             else:
>                 for k, v in partitions.items():
>                     if v not in values:
>                         values.append(v)
>                 filters = [(k, "in", sorted(values))]
> filt_exp = pa.parquet._filters_to_expression(filters)
> dataset.to_table(filter=filt_exp)
> {code}
> My hope would be to do something like filt_exp = partitioning.parse(paths) 
> which would return a dataset expression.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to