mhilton opened a new issue, #12633:
URL: https://github.com/apache/datafusion/issues/12633

   ### Describe the bug
   
   `NestedLoopJoinExec` (really `NestedLoopJoinStream`) produces one output 
batch for each probe side input batch. However it is possible for each row of 
build-side input to produce probe-side length output rows. This can lead to 
some very large output batches being produced.
   
   Some queries are being unnecessarily terminated due to high resource usage 
due to this.
   
   ### To Reproduce
   
   Using datafusion-cli:
   
   ```
   > SHOW datafusion.execution.batch_size;
   +---------------------------------+-------+
   | name                            | value |
   +---------------------------------+-------+
   | datafusion.execution.batch_size | 8192  |
   +---------------------------------+-------+
   1 row(s) fetched. 
   Elapsed 0.010 seconds.
   
   > CREATE TABLE test AS VALUES (0), (1), (2), (3), (4), (5), (6), (7), (8), 
(9);
   0 row(s) fetched. 
   Elapsed 0.035 seconds.
   
   > EXPLAIN ANALYZE WITH test_t AS (SELECT concat(t1.column1, t2.column1, 
t3.column1, t4.column1, t5.column1) AS v FROM test t1, test t2, test t3, test 
t4, test t5) SELECT * FROM test_t tt1 FULL OUTER JOIN test_t tt2 ON 
tt1.v<>tt2.v;
   
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type         | plan                                                   
                                                                                
                                                                                
                                                             |
   
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | Plan with Metrics | NestedLoopJoinExec: join_type=Full, filter=v@0 != v@1, 
metrics=[output_rows=9999900000, build_input_batches=10000, 
build_input_rows=100000, input_batches=10000, input_rows=100000, 
output_batches=10001, build_mem_used=2492500, build_time=35.770239ms, 
join_time=309.795829686s] |
   |                   |   CoalescePartitionsExec, metrics=[output_rows=100000, 
elapsed_compute=4.001292ms]                                                     
                                                                                
                                                             |
   |                   |     ProjectionExec: expr=[concat(CAST(column1@1 AS 
Utf8), CAST(column1@2 AS Utf8), CAST(column1@3 AS Utf8), CAST(column1@4 AS 
Utf8), CAST(column1@0 AS Utf8)) as v], metrics=[output_rows=100000, 
elapsed_compute=54.673797ms]                                                    
  |
   |                   |       CrossJoinExec, metrics=[output_rows=100000, 
build_input_batches=1, build_input_rows=10, input_batches=1000, 
input_rows=10000, output_batches=10000, build_mem_used=224, 
build_time=31.793µs, join_time=9.555719ms]                                      
                      |
   |                   |         MemoryExec: partitions=1, partition_sizes=[1], 
metrics=[]                                                                      
                                                                                
                                                             |
   |                   |         RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1, 
metrics=[fetch_time=1.579937ms, repartition_time=1ns, send_time=8.415125ms]     
                                                                                
                                     |
   |                   |           ProjectionExec: expr=[column1@1 as column1, 
column1@2 as column1, column1@3 as column1, column1@0 as column1], 
metrics=[output_rows=10000, elapsed_compute=375.839µs]                          
                                                                           |
   |                   |             CrossJoinExec, metrics=[output_rows=10000, 
build_input_batches=1, build_input_rows=10, input_batches=100, input_rows=1000, 
output_batches=1000, build_mem_used=224, build_time=4.541µs, 
join_time=567.211µs]                                                            
|
   |                   |               MemoryExec: partitions=1, 
partition_sizes=[1], metrics=[]                                                 
                                                                                
                                                                            |
   |                   |               ProjectionExec: expr=[column1@1 as 
column1, column1@2 as column1, column1@0 as column1], 
metrics=[output_rows=1000, elapsed_compute=35.377µs]                            
                                                                                
             |
   |                   |                 CrossJoinExec, 
metrics=[output_rows=1000, build_input_batches=1, build_input_rows=10, 
input_batches=10, input_rows=100, output_batches=100, build_mem_used=224, 
build_time=1.917µs, join_time=52.879µs]                                         
                    |
   |                   |                   MemoryExec: partitions=1, 
partition_sizes=[1], metrics=[]                                                 
                                                                                
                                                                        |
   |                   |                   CrossJoinExec, 
metrics=[output_rows=100, build_input_batches=1, build_input_rows=10, 
input_batches=1, input_rows=10, output_batches=10, build_mem_used=224, 
build_time=2.417µs, join_time=11.377µs]                                         
                      |
   |                   |                     MemoryExec: partitions=1, 
partition_sizes=[1], metrics=[]                                                 
                                                                                
                                                                      |
   |                   |                     MemoryExec: partitions=1, 
partition_sizes=[1], metrics=[]                                                 
                                                                                
                                                                      |
   |                   |   ProjectionExec: expr=[concat(CAST(column1@1 AS 
Utf8), CAST(column1@2 AS Utf8), CAST(column1@3 AS Utf8), CAST(column1@4 AS 
Utf8), CAST(column1@0 AS Utf8)) as v], metrics=[output_rows=100000, 
elapsed_compute=524.070156ms]                                                   
    |
   |                   |     CrossJoinExec, metrics=[output_rows=100000, 
build_input_batches=1, build_input_rows=10, input_batches=1000, 
input_rows=10000, output_batches=10000, build_mem_used=224, build_time=7.084µs, 
join_time=73.915678ms]                                                          
    |
   |                   |       MemoryExec: partitions=1, partition_sizes=[1], 
metrics=[]                                                                      
                                                                                
                                                               |
   |                   |       RepartitionExec: 
partitioning=RoundRobinBatch(10), input_partitions=1, 
metrics=[fetch_time=2.26374ms, repartition_time=1ns, send_time=28.111445436s]   
                                                                                
                                       |
   |                   |         ProjectionExec: expr=[column1@1 as column1, 
column1@2 as column1, column1@3 as column1, column1@0 as column1], 
metrics=[output_rows=10000, elapsed_compute=477.113µs]                          
                                                                             |
   |                   |           CrossJoinExec, metrics=[output_rows=10000, 
build_input_batches=1, build_input_rows=10, input_batches=100, input_rows=1000, 
output_batches=1000, build_mem_used=224, build_time=2.75µs, 
join_time=1.069152ms]                                                           
   |
   |                   |             MemoryExec: partitions=1, 
partition_sizes=[1], metrics=[]                                                 
                                                                                
                                                                              |
   |                   |             ProjectionExec: expr=[column1@1 as 
column1, column1@2 as column1, column1@0 as column1], 
metrics=[output_rows=1000, elapsed_compute=94.708µs]                            
                                                                                
               |
   |                   |               CrossJoinExec, 
metrics=[output_rows=1000, build_input_batches=1, build_input_rows=10, 
input_batches=10, input_rows=100, output_batches=100, build_mem_used=224, 
build_time=708ns, join_time=165.327µs]                                          
                      |
   |                   |                 MemoryExec: partitions=1, 
partition_sizes=[1], metrics=[]                                                 
                                                                                
                                                                          |
   |                   |                 CrossJoinExec, 
metrics=[output_rows=100, build_input_batches=1, build_input_rows=10, 
input_batches=1, input_rows=10, output_batches=10, build_mem_used=224, 
build_time=583ns, join_time=9.25µs]                                             
                        |
   |                   |                   MemoryExec: partitions=1, 
partition_sizes=[1], metrics=[]                                                 
                                                                                
                                                                        |
   |                   |                   MemoryExec: partitions=1, 
partition_sizes=[1], metrics=[]                                                 
                                                                                
                                                                        |
   |                   |                                                        
                                                                                
                                                                                
                                                             |
   
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   1 row(s) fetched. 
   Elapsed 34.928 seconds.
   ```
   
   In this (obviously contrived) example the `NestedLoopJoinExec` produces 
9999900000 `output_rows` in just 10001 `output_batches` meaning the mean batch 
size is 999890.01 rows long. That is significantly bigger than the expected 
size of 8192.
   
   ### Expected behavior
   
   `NestedLoopJoinExec` should produce output batches much nearer to the 
configured batch size in length. Reducing the memory used by output record 
batches.
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to