2010YOUY01 opened a new issue, #18295:
URL: https://github.com/apache/datafusion/issues/18295

   ### Is your feature request related to a problem or challenge?
   
   Related to https://github.com/apache/datafusion/issues/15628
   
   For queries like
   ```
   select *
   from t1
   join t2
   on expensive_and_selective_predicate(t1.v1, t2.v1)
   limit 10
   ```
   
   The query plan will look like:
   ```
   -- For simpler demonstration
   > set datafusion.execution.target_partitions = 1;
   0 row(s) fetched.
   Elapsed 0.000 seconds.
   
   > explain analyze select *
   from generate_series(100000) as t1(v1)
   join generate_series(100000) as t2(v1)
   on t1.v1 < t2.v1
   limit 10;
   
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type         | plan                                                   
                                                                                
                                                                                
                                                                 |
   
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | Plan with Metrics | GlobalLimitExec: skip=0, fetch=10, 
metrics=[output_rows=10, elapsed_compute=5.917µs]                               
                                                                                
                                                                                
     |
   |                   |   NestedLoopJoinExec: join_type=Inner, filter=v1@0 < 
v1@1, metrics=[output_rows=8192, elapsed_compute=582.002µs, 
build_input_batches=13, build_input_rows=100001, input_batches=1, 
input_rows=8192, output_batches=1, build_mem_used=853216, build_time=440.208µs, 
join_time=141.792µs] |
   |                   |     ProjectionExec: expr=[value@0 as v1], 
metrics=[output_rows=100001, elapsed_compute=6.415µs]                           
                                                                                
                                                                              |
   |                   |       LazyMemoryExec: partitions=1, 
batch_generators=[generate_series: start=0, end=100000, batch_size=8192], 
metrics=[output_rows=100001, elapsed_compute=223.997µs]                         
                                                                                
          |
   |                   |     ProjectionExec: expr=[value@0 as v1], 
metrics=[output_rows=8192, elapsed_compute=792ns]                               
                                                                                
                                                                              |
   |                   |       LazyMemoryExec: partitions=1, 
batch_generators=[generate_series: start=0, end=100000, batch_size=8192], 
metrics=[output_rows=8192, elapsed_compute=15.667µs]                            
                                                                                
          |
   |                   |                                                        
                                                                                
                                                                                
                                                                 |
   
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   1 row(s) fetched.
   Elapsed 0.004 seconds.
   ```
   
   ### Issue
   There is certain early termination applied to avoid joining till the end, 
however such early termination is done at batch level, instead of row level. 
It's possible to terminate sooner and speed up the above (1st one) workload.
   
   The global limit executor terminates the execution once the target limit is 
reached, and NLJ will accumulate `batch_size`(default 8192) rows inside before 
output, instead of stop once limit `10` is reached.
   
   ### Potential Optimization
   Ideally it can stop once `limit` is reached, and become 800x (8192 buffer 
size / 10 limit) faster in the best case.
   
   ### Describe the solution you'd like
   
   Push limit into nested loop join operator (potentially also other join 
types) , and it should terminate once the limit is reached.
   
   ### Describe alternatives you've considered
   
   _No response_
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to