adriangb commented on code in PR #17201: URL: https://github.com/apache/datafusion/pull/17201#discussion_r2280525475
########## datafusion/sqllogictest/test_files/push_down_filter.slt: ########## @@ -286,5 +286,37 @@ explain select a from t where CAST(a AS string) = '0123'; physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8View) = 0123 +# Test dynamic filter pushdown with swapped join inputs (issue #17196) +# Create tables with different sizes to force join input swapping +statement ok +copy (select i as k from generate_series(1, 100) t(i)) to 'test_files/scratch/push_down_filter/small_table.parquet'; + +statement ok +copy (select i as k, i as v from generate_series(1, 1000) t(i)) to 'test_files/scratch/push_down_filter/large_table.parquet'; + +statement ok +create external table small_table stored as parquet location 'test_files/scratch/push_down_filter/small_table.parquet'; + +statement ok +create external table large_table stored as parquet location 'test_files/scratch/push_down_filter/large_table.parquet'; + +# Test that dynamic filter is applied to the correct table after join input swapping +# The small_table should be the build side, large_table should be the probe side with dynamic filter +query TT +explain select * from small_table join large_table on small_table.k = large_table.k where large_table.v >= 50; +---- +physical_plan +01)CoalesceBatchesExec: target_batch_size=8192 +02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)] +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/small_table.parquet]]}, projection=[k], file_type=parquet +04)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +05)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/large_table.parquet]]}, projection=[k, v], file_type=parquet, predicate=v@1 >= 50 AND DynamicFilterPhysicalExpr [ true ], pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 50, required_guarantees=[] Review Comment: The tree looks like this: ```sql copy (select i as k from generate_series(1, 100) t(i)) to 'test_files/scratch/push_down_filter/small_table.parquet'; copy (select i as k, i as v from generate_series(1, 1000) t(i)) to 'test_files/scratch/push_down_filter/large_table.parquet'; create external table small_table stored as parquet location 'test_files/scratch/push_down_filter/small_table.parquet'; create external table large_table stored as parquet location 'test_files/scratch/push_down_filter/large_table.parquet'; explain select * from small_table join large_table on small_table.k = large_table.k where large_table.v >= 50; ``` ``` +---------------+------------------------------------------------------------+ | plan_type | plan | +---------------+------------------------------------------------------------+ | physical_plan | ┌───────────────────────────┐ | | | │ CoalesceBatchesExec │ | | | │ -------------------- │ | | | │ target_batch_size: │ | | | │ 8192 │ | | | └─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐ | | | │ HashJoinExec │ | | | │ -------------------- ├──────────────┐ | | | │ on: (k = k) │ │ | | | └─────────────┬─────────────┘ │ | | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ | | | │ DataSourceExec ││ CoalesceBatchesExec │ | | | │ -------------------- ││ -------------------- │ | | | │ files: 1 ││ target_batch_size: │ | | | │ format: parquet ││ 8192 │ | | | └───────────────────────────┘└─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐ | | | │ FilterExec │ | | | │ -------------------- │ | | | │ predicate: v >= 50 │ | | | └─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐ | | | │ RepartitionExec │ | | | │ -------------------- │ | | | │ partition_count(in->out): │ | | | │ 1 -> 12 │ | | | │ │ | | | │ partitioning_scheme: │ | | | │ RoundRobinBatch(12) │ | | | └─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐ | | | │ DataSourceExec │ | | | │ -------------------- │ | | | │ files: 1 │ | | | │ format: parquet │ | | | │ │ | | | │ predicate: │ | | | │ v >= 50 AND true │ | | | └───────────────────────────┘ | | | | +---------------+------------------------------------------------------------+ ``` This makes sense: you always want the small table to be the build side and the large table to be the probe side. If I change the query to: ```sql explain select * from large_table join small_table on small_table.k = large_table.k where large_table.v >= 50; ``` Then we'll make a different plan but we swap the join around so that the large table continues to be the probe side: ``` +---------------+------------------------------------------------------------+ | plan_type | plan | +---------------+------------------------------------------------------------+ | physical_plan | ┌───────────────────────────┐ | | | │ ProjectionExec │ | | | │ -------------------- │ | | | │ k: k │ | | | │ v: v │ | | | └─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐ | | | │ CoalesceBatchesExec │ | | | │ -------------------- │ | | | │ target_batch_size: │ | | | │ 8192 │ | | | └─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐ | | | │ HashJoinExec │ | | | │ -------------------- ├──────────────┐ | | | │ on: (k = k) │ │ | | | └─────────────┬─────────────┘ │ | | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ | | | │ DataSourceExec ││ CoalesceBatchesExec │ | | | │ -------------------- ││ -------------------- │ | | | │ files: 1 ││ target_batch_size: │ | | | │ format: parquet ││ 8192 │ | | | └───────────────────────────┘└─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐ | | | │ FilterExec │ | | | │ -------------------- │ | | | │ predicate: v >= 50 │ | | | └─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐ | | | │ RepartitionExec │ | | | │ -------------------- │ | | | │ partition_count(in->out): │ | | | │ 1 -> 12 │ | | | │ │ | | | │ partitioning_scheme: │ | | | │ RoundRobinBatch(12) │ | | | └─────────────┬─────────────┘ | | | ┌─────────────┴─────────────┐ | | | │ DataSourceExec │ | | | │ -------------------- │ | | | │ files: 1 │ | | | │ format: parquet │ | | | │ │ | | | │ predicate: │ | | | │ v >= 50 AND true │ | | | └───────────────────────────┘ | | | | +---------------+------------------------------------------------------------+ ``` ``` +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | Plan with Metrics | ProjectionExec: expr=[k@1 as k, v@2 as v, k@0 as k], metrics=[output_rows=51, elapsed_compute=3.428µs] | | | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=51, elapsed_compute=4.584µs] | | | HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)], filter=[k@0 >= 1 AND k@0 <= 100], metrics=[output_rows=51, elapsed_compute=1.336377ms, build_input_batches=1, build_input_rows=100, input_batches=1, input_rows=951, output_batches=1, build_mem_used=3032, build_time=983.25µs, join_time=347.668µs] | | | DataSourceExec: file_groups={1 group: [[Users/adrian/GitHub/datafusion-clone/test_files/scratch/push_down_filter/small_table.parquet]]}, projection=[k], file_type=parquet, metrics=[output_rows=100, elapsed_compute=1ns, batches_splitted=0, bytes_scanned=285, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=0, page_index_rows_pruned=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=0, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, bloom_filter_eval_time=2ns, metadata_load_time=41.043µs, page_index_eval_time=2ns, row_pushdown_eval_time=2ns, statistics_eval_time=2ns, time_elapsed_opening=102.5µs, time_elapsed_processing=742.668µs, time_elapsed_scanning_total=731.75µs, time_elapsed_scanning_until_data=702.584µs] | | | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=951, elapsed_compute=22.585µs] | | | FilterExec: v@1 >= 50, metrics=[output_rows=951, elapsed_compute=152.886µs] | | | RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1, metrics=[fetch_time=2.19975ms, repartition_time=1ns, send_time=5.47µs] | | | DataSourceExec: file_groups={1 group: [[Users/adrian/GitHub/datafusion-clone/test_files/scratch/push_down_filter/large_table.parquet]]}, projection=[k, v], file_type=parquet, predicate=v@1 >= 50 AND DynamicFilterPhysicalExpr [ k@0 >= 1 AND k@0 <= 100 ], pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 50 AND k_null_count@4 != row_count@2 AND k_max@3 >= 1 AND k_null_count@4 != row_count@2 AND k_min@5 <= 100, required_guarantees=[] | | | , metrics=[output_rows=1000, elapsed_compute=1ns, batches_splitted=0, bytes_scanned=5888, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=1000, page_index_rows_pruned=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=1, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, bloom_filter_eval_time=226.251µs, metadata_load_time=204.001µs, page_index_eval_time=143.126µs, row_pushdown_eval_time=2ns, statistics_eval_time=109.96µs, time_elapsed_opening=1.627167ms, time_elapsed_processing=2.125167ms, time_elapsed_scanning_total=581.5µs, time_elapsed_scanning_until_data=546.208µs] | | | | +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org