adriangb commented on code in PR #17201:
URL: https://github.com/apache/datafusion/pull/17201#discussion_r2280525475


##########
datafusion/sqllogictest/test_files/push_down_filter.slt:
##########
@@ -286,5 +286,37 @@ explain select a from t where CAST(a AS string) = '0123';
 physical_plan DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]},
 projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8View) = 0123
 
 
+# Test dynamic filter pushdown with swapped join inputs (issue #17196)
+# Create tables with different sizes to force join input swapping
+statement ok
+copy (select i as k from generate_series(1, 100) t(i)) to 
'test_files/scratch/push_down_filter/small_table.parquet';
+
+statement ok
+copy (select i as k, i as v from generate_series(1, 1000) t(i)) to 
'test_files/scratch/push_down_filter/large_table.parquet';
+
+statement ok
+create external table small_table stored as parquet location 
'test_files/scratch/push_down_filter/small_table.parquet';
+
+statement ok
+create external table large_table stored as parquet location 
'test_files/scratch/push_down_filter/large_table.parquet';
+
+# Test that dynamic filter is applied to the correct table after join input 
swapping
+# The small_table should be the build side, large_table should be the probe 
side with dynamic filter
+query TT
+explain select * from small_table join large_table on small_table.k = 
large_table.k where large_table.v >= 50;
+----
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=8192
+02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)]
+03)----DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/small_table.parquet]]},
 projection=[k], file_type=parquet
+04)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+05)------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/large_table.parquet]]},
 projection=[k, v], file_type=parquet, predicate=v@1 >= 50 AND 
DynamicFilterPhysicalExpr [ true ], pruning_predicate=v_null_count@1 != 
row_count@2 AND v_max@0 >= 50, required_guarantees=[]

Review Comment:
   The tree looks like this:
   
   ```sql
   copy (select i as k from generate_series(1, 100) t(i)) to 
'test_files/scratch/push_down_filter/small_table.parquet';
   copy (select i as k, i as v from generate_series(1, 1000) t(i)) to 
'test_files/scratch/push_down_filter/large_table.parquet';
   create external table small_table stored as parquet location 
'test_files/scratch/push_down_filter/small_table.parquet';
   create external table large_table stored as parquet location 
'test_files/scratch/push_down_filter/large_table.parquet';
   explain select * from small_table join large_table on small_table.k = 
large_table.k where large_table.v >= 50;
   ```
   
   ```
   
+---------------+------------------------------------------------------------+
   | plan_type     | plan                                                       
|
   
+---------------+------------------------------------------------------------+
   | physical_plan | ┌───────────────────────────┐                              
|
   |               | │    CoalesceBatchesExec    │                              
|
   |               | │    --------------------   │                              
|
   |               | │     target_batch_size:    │                              
|
   |               | │            8192           │                              
|
   |               | └─────────────┬─────────────┘                              
|
   |               | ┌─────────────┴─────────────┐                              
|
   |               | │        HashJoinExec       │                              
|
   |               | │    --------------------   ├──────────────┐               
|
   |               | │        on: (k = k)        │              │               
|
   |               | └─────────────┬─────────────┘              │               
|
   |               | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ 
|
   |               | │       DataSourceExec      ││    CoalesceBatchesExec    │ 
|
   |               | │    --------------------   ││    --------------------   │ 
|
   |               | │          files: 1         ││     target_batch_size:    │ 
|
   |               | │      format: parquet      ││            8192           │ 
|
   |               | └───────────────────────────┘└─────────────┬─────────────┘ 
|
   |               |                              ┌─────────────┴─────────────┐ 
|
   |               |                              │         FilterExec        │ 
|
   |               |                              │    --------------------   │ 
|
   |               |                              │     predicate: v >= 50    │ 
|
   |               |                              └─────────────┬─────────────┘ 
|
   |               |                              ┌─────────────┴─────────────┐ 
|
   |               |                              │      RepartitionExec      │ 
|
   |               |                              │    --------------------   │ 
|
   |               |                              │ partition_count(in->out): │ 
|
   |               |                              │          1 -> 12          │ 
|
   |               |                              │                           │ 
|
   |               |                              │    partitioning_scheme:   │ 
|
   |               |                              │    RoundRobinBatch(12)    │ 
|
   |               |                              └─────────────┬─────────────┘ 
|
   |               |                              ┌─────────────┴─────────────┐ 
|
   |               |                              │       DataSourceExec      │ 
|
   |               |                              │    --------------------   │ 
|
   |               |                              │          files: 1         │ 
|
   |               |                              │      format: parquet      │ 
|
   |               |                              │                           │ 
|
   |               |                              │         predicate:        │ 
|
   |               |                              │      v >= 50 AND true     │ 
|
   |               |                              └───────────────────────────┘ 
|
   |               |                                                            
|
   
+---------------+------------------------------------------------------------+
   ```
   
   This makes sense: you always want the small table to be the build side and 
the large table to be the probe side.
   
   If I change the query to:
   
   ```sql
   explain select * from large_table join small_table on small_table.k = 
large_table.k where large_table.v >= 50;
   ```
   
   Then we'll make a different plan but we swap the join around so that the 
large table continues to be the probe side:
   
   ```
   
+---------------+------------------------------------------------------------+
   | plan_type     | plan                                                       
|
   
+---------------+------------------------------------------------------------+
   | physical_plan | ┌───────────────────────────┐                              
|
   |               | │       ProjectionExec      │                              
|
   |               | │    --------------------   │                              
|
   |               | │            k: k           │                              
|
   |               | │            v: v           │                              
|
   |               | └─────────────┬─────────────┘                              
|
   |               | ┌─────────────┴─────────────┐                              
|
   |               | │    CoalesceBatchesExec    │                              
|
   |               | │    --------------------   │                              
|
   |               | │     target_batch_size:    │                              
|
   |               | │            8192           │                              
|
   |               | └─────────────┬─────────────┘                              
|
   |               | ┌─────────────┴─────────────┐                              
|
   |               | │        HashJoinExec       │                              
|
   |               | │    --------------------   ├──────────────┐               
|
   |               | │        on: (k = k)        │              │               
|
   |               | └─────────────┬─────────────┘              │               
|
   |               | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ 
|
   |               | │       DataSourceExec      ││    CoalesceBatchesExec    │ 
|
   |               | │    --------------------   ││    --------------------   │ 
|
   |               | │          files: 1         ││     target_batch_size:    │ 
|
   |               | │      format: parquet      ││            8192           │ 
|
   |               | └───────────────────────────┘└─────────────┬─────────────┘ 
|
   |               |                              ┌─────────────┴─────────────┐ 
|
   |               |                              │         FilterExec        │ 
|
   |               |                              │    --------------------   │ 
|
   |               |                              │     predicate: v >= 50    │ 
|
   |               |                              └─────────────┬─────────────┘ 
|
   |               |                              ┌─────────────┴─────────────┐ 
|
   |               |                              │      RepartitionExec      │ 
|
   |               |                              │    --------------------   │ 
|
   |               |                              │ partition_count(in->out): │ 
|
   |               |                              │          1 -> 12          │ 
|
   |               |                              │                           │ 
|
   |               |                              │    partitioning_scheme:   │ 
|
   |               |                              │    RoundRobinBatch(12)    │ 
|
   |               |                              └─────────────┬─────────────┘ 
|
   |               |                              ┌─────────────┴─────────────┐ 
|
   |               |                              │       DataSourceExec      │ 
|
   |               |                              │    --------------------   │ 
|
   |               |                              │          files: 1         │ 
|
   |               |                              │      format: parquet      │ 
|
   |               |                              │                           │ 
|
   |               |                              │         predicate:        │ 
|
   |               |                              │      v >= 50 AND true     │ 
|
   |               |                              └───────────────────────────┘ 
|
   |               |                                                            
|
   
+---------------+------------------------------------------------------------+
   ```
   
   ```
   
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type         | plan                                                   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                   |
   
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | Plan with Metrics | ProjectionExec: expr=[k@1 as k, v@2 as v, k@0 as k], 
metrics=[output_rows=51, elapsed_compute=3.428µs]                               
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                     |
   |                   |   CoalesceBatchesExec: target_batch_size=8192, 
metrics=[output_rows=51, elapsed_compute=4.584µs]                               
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                           |
   |                   |     HashJoinExec: mode=CollectLeft, join_type=Inner, 
on=[(k@0, k@0)], filter=[k@0 >= 1 AND k@0 <= 100], metrics=[output_rows=51, 
elapsed_compute=1.336377ms, build_input_batches=1, build_input_rows=100, 
input_batches=1, input_rows=951, output_batches=1, build_mem_used=3032, 
build_time=983.25µs, join_time=347.668µs]                                       
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
        |
   |                   |       DataSourceExec: file_groups={1 group: 
[[Users/adrian/GitHub/datafusion-clone/test_files/scratch/push_down_filter/small_table.parquet]]},
 projection=[k], file_type=parquet, metrics=[output_rows=100, 
elapsed_compute=1ns, batches_splitted=0, bytes_scanned=285, file_open_errors=0, 
file_scan_errors=0, files_ranges_pruned_statistics=0, 
num_predicate_creation_errors=0, page_index_rows_matched=0, 
page_index_rows_pruned=0, predicate_evaluation_errors=0, 
pushdown_rows_matched=0, pushdown_rows_pruned=0, 
row_groups_matched_bloom_filter=0, row_groups_matched_statistics=0, 
row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, 
bloom_filter_eval_time=2ns, metadata_load_time=41.043µs, 
page_index_eval_time=2ns, row_pushdown_eval_time=2ns, statistics_eval_time=2ns, 
time_elapsed_opening=102.5µs, time_elapsed_processing=742.668µs, 
time_elapsed_scanning_total=731.75µs, 
time_elapsed_scanning_until_data=702.584µs] |
   |                   |       CoalesceBatchesExec: target_batch_size=8192, 
metrics=[output_rows=951, elapsed_compute=22.585µs]                             
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                       |
   |                   |         FilterExec: v@1 >= 50, 
metrics=[output_rows=951, elapsed_compute=152.886µs]                            
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
           |
   |                   |           RepartitionExec: 
partitioning=RoundRobinBatch(12), input_partitions=1, 
metrics=[fetch_time=2.19975ms, repartition_time=1ns, send_time=5.47µs]          
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                         |
   |                   |             DataSourceExec: file_groups={1 group: 
[[Users/adrian/GitHub/datafusion-clone/test_files/scratch/push_down_filter/large_table.parquet]]},
 projection=[k, v], file_type=parquet, predicate=v@1 >= 50 AND 
DynamicFilterPhysicalExpr [ k@0 >= 1 AND k@0 <= 100 ], 
pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 50 AND 
k_null_count@4 != row_count@2 AND k_max@3 >= 1 AND k_null_count@4 != 
row_count@2 AND k_min@5 <= 100, required_guarantees=[]                          
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                     |
   |                   | , metrics=[output_rows=1000, elapsed_compute=1ns, 
batches_splitted=0, bytes_scanned=5888, file_open_errors=0, file_scan_errors=0, 
files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, 
page_index_rows_matched=1000, page_index_rows_pruned=0, 
predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, 
row_groups_matched_bloom_filter=0, row_groups_matched_statistics=1, 
row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, 
bloom_filter_eval_time=226.251µs, metadata_load_time=204.001µs, 
page_index_eval_time=143.126µs, row_pushdown_eval_time=2ns, 
statistics_eval_time=109.96µs, time_elapsed_opening=1.627167ms, 
time_elapsed_processing=2.125167ms, time_elapsed_scanning_total=581.5µs, 
time_elapsed_scanning_until_data=546.208µs]                                     
                                                                                
                                  |
   |                   |                                                        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                   |
   
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to