eeroel opened a new issue, #39906:
URL: https://github.com/apache/arrow/issues/39906

   ### Describe the enhancement requested
   
   Hello,
   
   My use case is querying a Delta Lake table using Pyarrow with the help of 
https://github.com/delta-io/delta-rs. The library creates a dataset with 
partition expressions that correspond to the guarantees from per-file 
statistics. I noticed that when the dataset consists of a few thousand files, 
(Py)arrow spends relatively long on pre-filtering the fragments. This is 
especially noticeable when filtering a field on multiple alternative values in 
a chained OR-expression, as the time scales linearly with the number of 
alternatives. This leads to situations where it is sometimes actually faster to 
only post-filter the data (using an .isin() pyarrow.Expression instead of a 
chained OR). Here's an example script that creates 8010 files with simple 
two-field partition expressions, and filters on a string field using 10 
alternative values; this takes ~600ms on a M1 Max Macbook Pro.
   
   ```
   import pyarrow.dataset as ds
   import pyarrow
   import time
   from functools import reduce
   
   import pyarrow.fs as pa_fs
   from pyarrow.dataset import (
       Expression,
       FileSystemDataset,
       ParquetFileFormat,
       ParquetFragmentScanOptions,
       ParquetReadOptions,
   )
   
   format = ParquetFileFormat()
   
   def generate_partitions():
       for foo in range(1,10):
           for bar in range(100, 1000):
               pexp = (ds.field("foo") >= foo*10)
               pexp = pexp & (ds.field("bar") >= "value"+str(bar*10)) & 
(ds.field("bar") <= "value"+str((bar+1)*10))
               fn = f"{foo}_{bar}.parquet"
               yield fn, pexp
   
   schema = pyarrow.schema([
       ('bar', pyarrow.string())
   ])
   
   vals = [f"foo{k}" for k in range(10)]
   
   fs = pa_fs.LocalFileSystem()
   
   fragments = [
       format.make_fragment(
           file,
           filesystem=fs,
           partition_expression=part_expression,
       )
       for file, part_expression in generate_partitions()
   ]
   
   dataset = FileSystemDataset(fragments, schema, format, fs)
   
   print(f"Fragment count: {len(fragments)}")
   
   condition = reduce(lambda x, y: x | y, [ds.field("bar") == x for x in vals], 
ds.scalar(False))
   
   t=time.time()
   dataset.to_table(filter=condition)
   print(f"Time spent: {time.time()-t}")
   ```
   
   So my question is: is this level of performance expected for this operation, 
or is there potentially something to optimize here? Naively, I would expect 
just filtering the expressions using (10 filters)*(8000 expressions) = 80k 
comparisons to be 1-2 orders of magnitude faster, but I haven't figured out how 
to profile the performance so I'm not sure exactly where in the logic this time 
is spent.
   
   ### Component(s)
   
   C++, Python


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to