adriangb commented on code in PR #17201: URL: https://github.com/apache/datafusion/pull/17201#discussion_r2283200960
########## datafusion/sqllogictest/test_files/push_down_filter.slt: ########## @@ -286,5 +286,37 @@ explain select a from t where CAST(a AS string) = '0123'; physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8View) = 0123 +# Test dynamic filter pushdown with swapped join inputs (issue #17196) +# Create tables with different sizes to force join input swapping +statement ok +copy (select i as k from generate_series(1, 100) t(i)) to 'test_files/scratch/push_down_filter/small_table.parquet'; + +statement ok +copy (select i as k, i as v from generate_series(1, 1000) t(i)) to 'test_files/scratch/push_down_filter/large_table.parquet'; + +statement ok +create external table small_table stored as parquet location 'test_files/scratch/push_down_filter/small_table.parquet'; + +statement ok +create external table large_table stored as parquet location 'test_files/scratch/push_down_filter/large_table.parquet'; + +# Test that dynamic filter is applied to the correct table after join input swapping +# The small_table should be the build side, large_table should be the probe side with dynamic filter +query TT +explain select * from small_table join large_table on small_table.k = large_table.k where large_table.v >= 50; +---- +physical_plan +01)CoalesceBatchesExec: target_batch_size=8192 +02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)] +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/small_table.parquet]]}, projection=[k], file_type=parquet +04)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +05)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/large_table.parquet]]}, projection=[k, v], file_type=parquet, predicate=v@1 >= 50 AND DynamicFilterPhysicalExpr [ true ], pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 50, required_guarantees=[] Review Comment: The dynamic filter is always applied to the probe side. In those examples above it seems the join side changes, probably because `v = 50` is highly selective so the large table effectively becomes ~ 1 row and thus can be used for the build side. You can see that even without dynamic filters changing the query like that flips the build/probe sides: ```sql copy (select i as k from generate_series(1, 100) t(i)) to 'test_files/scratch/push_down_filter/small_table.parquet'; copy (select i as k, i as v from generate_series(1, 1000) t(i)) to 'test_files/scratch/push_down_filter/large_table.parquet'; set datafusion.optimizer.enable_dynamic_filter_pushdown = false; create external table small_table stored as parquet location 'test_files/scratch/push_down_filter/small_table.parquet'; create external table large_table stored as parquet location 'test_files/scratch/push_down_filter/large_table.parquet'; explain analyze select * from small_table join large_table on small_table.k = large_table.k where large_table.v >= 50; explain analyze select * from small_table join large_table on small_table.k = large_table.k where large_table.v = 50; ``` ``` DataFusion CLI v49.0.1 +-------+ | count | +-------+ | 100 | +-------+ 1 row(s) fetched. Elapsed 0.027 seconds. +-------+ | count | +-------+ | 1000 | +-------+ 1 row(s) fetched. Elapsed 0.006 seconds. 0 row(s) fetched. Elapsed 0.001 seconds. 0 row(s) fetched. Elapsed 0.002 seconds. 0 row(s) fetched. Elapsed 0.002 seconds. +-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | Plan with Metrics | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=51, elapsed_compute=5.546µs] | | | HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)], metrics=[output_rows=51, elapsed_compute=1.976173ms, build_input_batches=1, build_input_rows=100, input_batches=1, input_rows=951, output_batches=1, build_mem_used=3032, build_time=1.452836ms, join_time=520.127µs] | | | DataSourceExec: file_groups={1 group: [[Users/adrian/GitHub/datafusion-clone/test_files/scratch/push_down_filter/small_table.parquet]]}, projection=[k], file_type=parquet, metrics=[output_rows=100, elapsed_compute=1ns, batches_splitted=0, bytes_scanned=285, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=0, page_index_rows_pruned=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=0, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, bloom_filter_eval_time=2ns, metadata_load_time=56.793µs, page_index_eval_time=2ns, row_pushdown_eval_time=2ns, statistics_eval_time=2ns, time_elapsed_opening=136.292µs, time_elapsed_processing=1.070917ms, time_elapsed_scanning_total=978.833µs, time_elapsed_scanning_until_data=940.417µs] | | | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=951, elapsed_compute=24.711µs] | | | FilterExec: v@1 >= 50, metrics=[output_rows=951, elapsed_compute=263.386µs] | | | RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1, metrics=[fetch_time=2.629708ms, repartition_time=1ns, send_time=8.553µs] | | | DataSourceExec: file_groups={1 group: [[Users/adrian/GitHub/datafusion-clone/test_files/scratch/push_down_filter/large_table.parquet]]}, projection=[k, v], file_type=parquet, predicate=v@1 >= 50, pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 50, required_guarantees=[] | | | , metrics=[output_rows=1000, elapsed_compute=1ns, batches_splitted=0, bytes_scanned=5888, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=1000, page_index_rows_pruned=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=1, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, bloom_filter_eval_time=292.459µs, metadata_load_time=146.834µs, page_index_eval_time=123.543µs, row_pushdown_eval_time=2ns, statistics_eval_time=135.417µs, time_elapsed_opening=1.7685ms, time_elapsed_processing=2.520792ms, time_elapsed_scanning_total=854.999µs, time_elapsed_scanning_until_data=810.208µs] | | | | +-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ 1 row(s) fetched. Elapsed 0.013 seconds. +-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | Plan with Metrics | ProjectionExec: expr=[k@2 as k, k@0 as k, v@1 as v], metrics=[output_rows=1, elapsed_compute=3.75µs] | | | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=6.459µs] | | | HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)], metrics=[output_rows=1, elapsed_compute=194.128µs, build_input_batches=1, build_input_rows=1, input_batches=1, input_rows=100, output_batches=1, build_mem_used=89, build_time=95.668µs, join_time=54.624µs] | | | CoalescePartitionsExec, metrics=[output_rows=1, elapsed_compute=32.583µs] | | | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=14.793µs] | | | FilterExec: v@1 = 50, metrics=[output_rows=1, elapsed_compute=111.469µs] | | | RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1, metrics=[fetch_time=1.516833ms, repartition_time=1ns, send_time=5.594µs] | | | DataSourceExec: file_groups={1 group: [[Users/adrian/GitHub/datafusion-clone/test_files/scratch/push_down_filter/large_table.parquet]]}, projection=[k, v], file_type=parquet, predicate=v@1 = 50, pruning_predicate=v_null_count@2 != row_count@3 AND v_min@0 <= 50 AND 50 <= v_max@1, required_guarantees=[v in (50)] | | | , metrics=[output_rows=1000, elapsed_compute=1ns, batches_splitted=0, bytes_scanned=5888, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=1000, page_index_rows_pruned=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=1, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, bloom_filter_eval_time=146.792µs, metadata_load_time=189.126µs, page_index_eval_time=66.084µs, row_pushdown_eval_time=2ns, statistics_eval_time=67.584µs, time_elapsed_opening=850.75µs, time_elapsed_processing=1.408834ms, time_elapsed_scanning_total=641.791µs, time_elapsed_scanning_until_data=616.5µs] | | | DataSourceExec: file_groups={1 group: [[Users/adrian/GitHub/datafusion-clone/test_files/scratch/push_down_filter/small_table.parquet]]}, projection=[k], file_type=parquet, metrics=[output_rows=100, elapsed_compute=1ns, batches_splitted=0, bytes_scanned=285, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=0, page_index_rows_pruned=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=0, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, bloom_filter_eval_time=2ns, metadata_load_time=45.376µs, page_index_eval_time=2ns, row_pushdown_eval_time=2ns, statistics_eval_time=2ns, time_elapsed_opening=92.333µs, time_elapsed_processing=260.126µs, time_elapsed_scanning_total=290.167µs, time_elapsed_scanning_until_data=214.041µs] | | | | +-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ 1 row(s) fetched. Elapsed 0.008 seconds. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org