eeroel opened a new issue, #39906: URL: https://github.com/apache/arrow/issues/39906
### Describe the enhancement requested Hello, My use case is querying a Delta Lake table using Pyarrow with the help of https://github.com/delta-io/delta-rs. The library creates a dataset with partition expressions that correspond to the guarantees from per-file statistics. I noticed that when the dataset consists of a few thousand files, (Py)arrow spends relatively long on pre-filtering the fragments. This is especially noticeable when filtering a field on multiple alternative values in a chained OR-expression, as the time scales linearly with the number of alternatives. This leads to situations where it is sometimes actually faster to only post-filter the data (using an .isin() pyarrow.Expression instead of a chained OR). Here's an example script that creates 8010 files with simple two-field partition expressions, and filters on a string field using 10 alternative values; this takes ~600ms on a M1 Max Macbook Pro. ``` import pyarrow.dataset as ds import pyarrow import time from functools import reduce import pyarrow.fs as pa_fs from pyarrow.dataset import ( Expression, FileSystemDataset, ParquetFileFormat, ParquetFragmentScanOptions, ParquetReadOptions, ) format = ParquetFileFormat() def generate_partitions(): for foo in range(1,10): for bar in range(100, 1000): pexp = (ds.field("foo") >= foo*10) pexp = pexp & (ds.field("bar") >= "value"+str(bar*10)) & (ds.field("bar") <= "value"+str((bar+1)*10)) fn = f"{foo}_{bar}.parquet" yield fn, pexp schema = pyarrow.schema([ ('bar', pyarrow.string()) ]) vals = [f"foo{k}" for k in range(10)] fs = pa_fs.LocalFileSystem() fragments = [ format.make_fragment( file, filesystem=fs, partition_expression=part_expression, ) for file, part_expression in generate_partitions() ] dataset = FileSystemDataset(fragments, schema, format, fs) print(f"Fragment count: {len(fragments)}") condition = reduce(lambda x, y: x | y, [ds.field("bar") == x for x in vals], ds.scalar(False)) t=time.time() dataset.to_table(filter=condition) print(f"Time spent: {time.time()-t}") ``` So my question is: is this level of performance expected for this operation, or is there potentially something to optimize here? Naively, I would expect just filtering the expressions using (10 filters)*(8000 expressions) = 80k comparisons to be 1-2 orders of magnitude faster, but I haven't figured out how to profile the performance so I'm not sure exactly where in the logic this time is spent. ### Component(s) C++, Python -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
