shner-elmo opened a new issue, #7781:
URL: https://github.com/apache/arrow-datafusion/issues/7781

   ### Describe the bug
   
   So I noticed that when executing the query `SELECT DISTINCT my_column FROM 
dataset LIMIT 10` (trough the Python connector), it was taking lots of time and 
consuming most of my RAM.
   I then looked at the query plan, and it seems like its actually doing `GROUP 
BY my_column` which causes a full-scan, what makes it even worse, is that all 
10 values returned are present in the **first** parquet file in the dataset 
(`pyarrow.Dataset.files[0]`), so it could've just stopped scanning after the 
first file immediately.
   
   
   ### To Reproduce
   
   ```py
   import datafusion
   import time
   
   start = time.perf_counter()
   
   ctx = datafusion.SessionContext()
   ctx.register_dataset('dataset', dataset)
   out = ctx.sql("SELECT DISTINCT my_column FROM dataset LIMIT 10")
   
   print('datafusion', time.perf_counter() - start)
   
   print(repr(out.logical_plan()))
   print('-' * 10)
   print(repr(out.optimized_logical_plan()))
   print('-' * 10)
   print(repr(out.execution_plan()))
   print('-' * 10)
   print(repr(out.explain()))
   print('-' * 10)
   ```
   ```
   datafusion 0.0003965760000141927
   Limit: skip=0, fetch=10
     Distinct:
       Projection: dataset.canonical_url
         TableScan: dataset
   ----------
   Limit: skip=0, fetch=10
     Aggregate: groupBy=[[dataset.canonical_url]], aggr=[[]]
       TableScan: dataset projection=[canonical_url]
   ----------
   GlobalLimitExec: skip=0, fetch=10
     CoalescePartitionsExec
       LocalLimitExec: fetch=10
         AggregateExec: mode=FinalPartitioned, gby=[canonical_url@0 as 
canonical_url], aggr=[]
           CoalesceBatchesExec: target_batch_size=8192
             RepartitionExec: partitioning=Hash([canonical_url@0], 8), 
input_partitions=568
               AggregateExec: mode=Partial, gby=[canonical_url@0 as 
canonical_url], aggr=[]
                 DatasetExec: number_of_fragments=568, 
projection=[canonical_url]
   
   ----------
   DataFrame()
   
+---------------+---------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                 |
   
+---------------+---------------------------------------------------------------------------------------------+
   | logical_plan  | Limit: skip=0, fetch=10                                    
                                 |
   |               |   Aggregate: groupBy=[[dataset.canonical_url]], aggr=[[]]  
                                 |
   |               |     TableScan: dataset projection=[canonical_url]          
                                 |
   | physical_plan | GlobalLimitExec: skip=0, fetch=10                          
                                 |
   |               |   CoalescePartitionsExec                                   
                                 |
   |               |     LocalLimitExec: fetch=10                               
                                 |
   |               |       AggregateExec: mode=FinalPartitioned, 
gby=[canonical_url@0 as canonical_url], aggr=[] |
   |               |         CoalesceBatchesExec: target_batch_size=8192        
                                 |
   |               |           RepartitionExec: 
partitioning=Hash([canonical_url@0], 8), input_partitions=568    |
   |               |             AggregateExec: mode=Partial, 
gby=[canonical_url@0 as canonical_url], aggr=[]    |
   |               |               DatasetExec: number_of_fragments=568, 
projection=[canonical_url]              |
   |               |                                                            
                                 |
   
+---------------+---------------------------------------------------------------------------------------------+
   None
   ----------
   ```
   
   ### Expected behavior
   
   It should stop scanning at the first 10 unique values found.
   
   ### Additional context
   
   For context, I'm querying a Parquet dataset, and the files are stored 
locally:
   
   ```py
   import pyarrow.dataset as ds
   dataset = ds.dataset(...)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to