nuno-faria commented on code in PR #17201:
URL: https://github.com/apache/datafusion/pull/17201#discussion_r2284559288


##########
datafusion/sqllogictest/test_files/push_down_filter.slt:
##########
@@ -286,5 +286,37 @@ explain select a from t where CAST(a AS string) = '0123';
 physical_plan DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]},
 projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8View) = 0123
 
 
+# Test dynamic filter pushdown with swapped join inputs (issue #17196)
+# Create tables with different sizes to force join input swapping
+statement ok
+copy (select i as k from generate_series(1, 100) t(i)) to 
'test_files/scratch/push_down_filter/small_table.parquet';
+
+statement ok
+copy (select i as k, i as v from generate_series(1, 1000) t(i)) to 
'test_files/scratch/push_down_filter/large_table.parquet';
+
+statement ok
+create external table small_table stored as parquet location 
'test_files/scratch/push_down_filter/small_table.parquet';
+
+statement ok
+create external table large_table stored as parquet location 
'test_files/scratch/push_down_filter/large_table.parquet';
+
+# Test that dynamic filter is applied to the correct table after join input 
swapping
+# The small_table should be the build side, large_table should be the probe 
side with dynamic filter
+query TT
+explain select * from small_table join large_table on small_table.k = 
large_table.k where large_table.v >= 50;
+----
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=8192
+02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)]
+03)----DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/small_table.parquet]]},
 projection=[k], file_type=parquet
+04)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+05)------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/large_table.parquet]]},
 projection=[k, v], file_type=parquet, predicate=v@1 >= 50 AND 
DynamicFilterPhysicalExpr [ true ], pruning_predicate=v_null_count@1 != 
row_count@2 AND v_max@0 >= 50, required_guarantees=[]

Review Comment:
   I see, so the filter is always applied from the build side to the probe 
side, independently of the query. Maybe I misunderstood but I though that was 
the original issue (https://github.com/apache/datafusion/issues/17196), that 
the filter was always applied to the same side, even when it made sense to do 
the opposite.
   
   For example:
   ```sql
   copy (select i as k from generate_series(1, 1000000) t(i)) to 't1.parquet';
   copy (select i as k, i as v from generate_series(1, 10000000) t(i)) to 
't2.parquet';
   create external table t1 stored as parquet location 't1.parquet';
   create external table t2 stored as parquet location 't2.parquet';
   
   explain analyze select *
   from t1
   join t2 on t1.k = t2.k
   where t2.v >= 1000000;
   
   
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type         | plan                                                   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                            |
   
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | Plan with Metrics | CoalesceBatchesExec: target_batch_size=8192, 
metrics=[output_rows=1, elapsed_compute=71.205µs]                               
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                      |
   |                   |   HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(k@0, k@0)], filter=[k@0 >= 14 AND k@0 <= 999997], metrics=[output_rows=1, 
elapsed_compute=740.259412ms, build_input_batches=120, 
build_input_rows=1000000, input_batches=12, input_rows=11713, 
output_batches=12, build_mem_used=34742816, build_time=703.6063ms, 
join_time=7.0188ms]                                                             
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                         |
   |                   |     CoalesceBatchesExec: target_batch_size=8192, 
metrics=[output_rows=1000000, elapsed_compute=15.225303ms]                      
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                  |
   |                   |       RepartitionExec: partitioning=Hash([k@0], 12), 
input_partitions=12, metrics=[fetch_time=1.6347235s, 
repartition_time=105.8625ms, send_time=8.7999ms]                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                         |
   |                   |         RepartitionExec: 
partitioning=RoundRobinBatch(12), input_partitions=1, 
metrics=[fetch_time=135.86ms, repartition_time=1ns, send_time=1.4348ms]         
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                    |
   |                   |           DataSourceExec: file_groups={1 group: 
[[t1.parquet]]}, projection=[k], file_type=parquet, 
metrics=[output_rows=1000000, elapsed_compute=1ns, batches_splitted=0, 
bytes_scanned=1310405, file_open_errors=0, file_scan_errors=0, 
files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, 
page_index_rows_matched=0, page_index_rows_pruned=0, 
predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, 
row_groups_matched_bloom_filter=0, row_groups_matched_statistics=0, 
row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, 
bloom_filter_eval_time=2ns, metadata_load_time=120.801µs, 
page_index_eval_time=2ns, row_pushdown_eval_time=2ns, statistics_eval_time=2ns, 
time_elapsed_opening=232.8µs, time_elapsed_processing=134.8576ms, 
time_elapsed_scanning_total=145.4359ms, 
time_elapsed_scanning_until_data=17.3163ms]                                     
       |
   |                   |     CoalesceBatchesExec: target_batch_size=8192, 
metrics=[output_rows=11713, elapsed_compute=244.4µs]                            
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                  |
   |                   |       RepartitionExec: partitioning=Hash([k@0], 12), 
input_partitions=12, metrics=[fetch_time=55.1884ms, 
repartition_time=1.360111ms, send_time=123.432µs]                               
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                          |
   |                   |         CoalesceBatchesExec: target_batch_size=8192, 
metrics=[output_rows=11713, elapsed_compute=107.3µs]                            
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                              |
   |                   |           FilterExec: v@1 >= 1000000, 
metrics=[output_rows=11713, elapsed_compute=681.411µs]                          
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                             |
   |                   |             DataSourceExec: file_groups={12 groups: 
[[t2.parquet:0..2133322], [t2.parquet:2133322..4266644], 
[t2.parquet:4266644..6399966], [t2.parquet:6399966..8533288], 
[t2.parquet:8533288..10666610], ...]}, projection=[k, v], file_type=parquet, 
predicate=v@1 >= 1000000 AND DynamicFilterPhysicalExpr [ k@0 >= 14 AND k@0 <= 
999997 ], pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 
1000000 AND k_null_count@4 != row_count@2 AND k_max@3 >= 14 AND k_null_count@4 
!= row_count@2 AND k_min@5 <= 999997, required_guarantees=[]                    
                                                                                
                                                                                
                                                                                
                                                                                
                     |
   |                   | , metrics=[output_rows=20480, elapsed_compute=12ns, 
batches_splitted=0, bytes_scanned=379500, file_open_errors=0, 
file_scan_errors=0, files_ranges_pruned_statistics=0, 
num_predicate_creation_errors=0, page_index_rows_matched=20480, 
page_index_rows_pruned=1028096, predicate_evaluation_errors=0, 
pushdown_rows_matched=0, pushdown_rows_pruned=0, 
row_groups_matched_bloom_filter=0, row_groups_matched_statistics=1, 
row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=9, 
bloom_filter_eval_time=180.612µs, metadata_load_time=7.630712ms, 
page_index_eval_time=317.512µs, row_pushdown_eval_time=24ns, 
statistics_eval_time=1.603512ms, time_elapsed_opening=16.959ms, 
time_elapsed_processing=53.3003ms, time_elapsed_scanning_total=39.3134ms, 
time_elapsed_scanning_until_data=36.7037ms]                                     
                                                                             |
   |                   |                                                        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                            |
   
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   ```
   
   In this example `t2` is filtered by `v` and by a dynamic filter on `k`, 
while in theory it would be faster to apply a dynamic filter from `t2` to `t1` 
(maybe we would need some heuristic to determine which would be the best 
approach?).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to