nuno-faria commented on code in PR #17201: URL: https://github.com/apache/datafusion/pull/17201#discussion_r2284559288
########## datafusion/sqllogictest/test_files/push_down_filter.slt: ########## @@ -286,5 +286,37 @@ explain select a from t where CAST(a AS string) = '0123'; physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8View) = 0123 +# Test dynamic filter pushdown with swapped join inputs (issue #17196) +# Create tables with different sizes to force join input swapping +statement ok +copy (select i as k from generate_series(1, 100) t(i)) to 'test_files/scratch/push_down_filter/small_table.parquet'; + +statement ok +copy (select i as k, i as v from generate_series(1, 1000) t(i)) to 'test_files/scratch/push_down_filter/large_table.parquet'; + +statement ok +create external table small_table stored as parquet location 'test_files/scratch/push_down_filter/small_table.parquet'; + +statement ok +create external table large_table stored as parquet location 'test_files/scratch/push_down_filter/large_table.parquet'; + +# Test that dynamic filter is applied to the correct table after join input swapping +# The small_table should be the build side, large_table should be the probe side with dynamic filter +query TT +explain select * from small_table join large_table on small_table.k = large_table.k where large_table.v >= 50; +---- +physical_plan +01)CoalesceBatchesExec: target_batch_size=8192 +02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)] +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/small_table.parquet]]}, projection=[k], file_type=parquet +04)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +05)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/large_table.parquet]]}, projection=[k, v], file_type=parquet, predicate=v@1 >= 50 AND DynamicFilterPhysicalExpr [ true ], pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 50, required_guarantees=[] Review Comment: I see, so the filter is always applied from the build side to the probe side, independently of the query. Maybe I misunderstood but I though that was the original issue (https://github.com/apache/datafusion/issues/17196), that the filter was always applied to the same side, even when it made sense to do the opposite. For example: ```sql copy (select i as k from generate_series(1, 1000000) t(i)) to 't1.parquet'; copy (select i as k, i as v from generate_series(1, 10000000) t(i)) to 't2.parquet'; create external table t1 stored as parquet location 't1.parquet'; create external table t2 stored as parquet location 't2.parquet'; explain analyze select * from t1 join t2 on t1.k = t2.k where t2.v| plan_type | plan || Plan with Metrics | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=71.205µs] | | | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(k@0, k@0)], filter=[k@0 >= 14 AND k@0 <= 999997], metrics=[output_rows=1, elapsed_compute=740.259412ms, build_input_batches=120, build_input_rows=1000000, input_batches=12, input_rows=11713, output_batches=12, build_mem_used=34742816, build_time=703.6063ms, join_time=7.0188ms] | | | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1000000, elapsed_compute=15.225303ms] | | | RepartitionExec: partitioning=Hash([k@0], 12), input_partitions=12, metrics=[fetch_time=1.6347235s, repartition_time=105.8625ms, send_time=8.7999ms] | | | RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1, metrics=[fetch_time=135.86ms, repartition_time=1ns, send_time=1.4348ms] | | | DataSourceExec: file_groups={1 group: [[t1.parquet]]}, projection=[k], file_type=parquet, metrics=[output_rows=1000000, elapsed_compute=1ns, batches_splitted=0, bytes_scanned=1310405, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=0, page_index_rows_pruned=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=0, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, bloom_filter_eval_time=2ns, metadata_load_time=120.801µs, page_index_eval_time=2ns, row_pushdown_eval_time=2ns, statistics_eval_time=2ns, time_elapsed_opening=232.8µs, time_elapsed_processing=134.8576ms, time_elapsed_scanning_total=145.4359ms, time_elapsed_scanning_until_data=17.3163ms] | | | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=11713, elapsed_compute=244.4µs] | | | RepartitionExec: partitioning=Hash([k@0], 12), input_partitions=12, metrics=[fetch_time=55.1884ms, repartition_time=1.360111ms, send_time=123.432µs] | | | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=11713, elapsed_compute=107.3µs] | | | FilterExec: v@1 >= 1000000, metrics=[output_rows=11713, elapsed_compute=681.411µs] | | | DataSourceExec: file_groups={12 groups: [[t2.parquet:0..2133322], [t2.parquet:2133322..4266644], [t2.parquet:4266644..6399966], [t2.parquet:6399966..8533288], [t2.parquet:8533288..10666610], ...]}, projection=[k, v], file_type=parquet, predicate=v@1 >= 1000000 AND DynamicFilterPhysicalExpr [ k@0 >= 14 AND k@0 <= 999997 ], pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 1000000 AND k_null_count@4 != row_count@2 AND k_max@3 >= 14 AND k_null_count@4 != row_count@2 AND k_min@5 <= 999997, required_guarantees=[] | | | , metrics=[output_rows=20480, elapsed_compute=12ns, batches_splitted=0, bytes_scanned=379500, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=20480, page_index_rows_pruned=1028096, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=1, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=9, bloom_filter_eval_time=180.612µs, metadata_load_time=7.630712ms, page_index_eval_time=317.512µs, row_pushdown_eval_time=24ns, statistics_eval_time=1.603512ms, time_elapsed_opening=16.959ms, time_elapsed_processing=53.3003ms, time_elapsed_scanning_total=39.3134ms, time_elapsed_scanning_until_data=36.7037ms] | | | |``` In this example `t2` is filtered by `v` and by a dynamic filter on `k`, while in theory it would be faster to apply a dynamic filter from `t2` to `t1` (maybe we would need some heuristic to determine which would be the best approach?). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org