nuno-faria commented on PR #17197: URL: https://github.com/apache/datafusion/pull/17197#issuecomment-3193592981
@adriangb I tested again and still see the issues. I'm testing with `datafusion-cli` using the debug mode, in 481f7f9382dd83d7d5416cdfb1bf52d26d7cec40. ```shell ❯ git log -1 commit 481f7f9382dd83d7d5416cdfb1bf52d26d7cec40 (HEAD -> fix-hash-join-partitioned) Author: Adrian Garcia Badaracco <1755071+adria...@users.noreply.github.com> Date: Fri Aug 15 13:44:49 2025 -0500 remove field, run test multiple times ❯ cargo run DataFusion CLI v49.0.0 > copy (select i as k from generate_series(1, 10000000) as t(i)) to 't1.parquet'; +----------+ | count | +----------+ | 10000000 | +----------+ 1 row(s) fetched. Elapsed 4.200 seconds. > copy (select i as k, i as v from generate_series(1, 10000000) as t(i)) to 't2.parquet'; +----------+ | count | +----------+ | 10000000 | +----------+ 1 row(s) fetched. Elapsed 4.345 seconds. > create external table t1 stored as parquet location 't1.parquet'; 0 row(s) fetched. Elapsed 0.005 seconds. > create external table t2 stored as parquet location 't2.parquet'; 0 row(s) fetched. Elapsed 0.006 seconds. ``` - Here are a few runs showing the first issue: ```sql > explain analyze select * from t1 join t2 on t1.k = t2.k where v| plan_type | plan || Plan with Metrics | ProjectionExec: expr=[k@2 as k, k@0 as k, v@1 as v], metrics=[output_rows=1, elapsed_compute=6.711µs] | | | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=2.824202ms] | | | HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)], filter=[k@0 >= 1 AND k@0 <= 1], metrics=[output_rows=1, elapsed_compute=1.043652101s, build_input_batches=1, build_input_rows=1, input_batches=709, input_rows=5805696, output_batches=709, build_mem_used=89, build_time=839.3µs, join_time=1.0427621s] | | | CoalescePartitionsExec, metrics=[output_rows=1, elapsed_compute=63.8µs] | | | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=43.9µs] | | | FilterExec: v@1 = 1, metrics=[output_rows=1, elapsed_compute=569.911µs] | | | DataSourceExec: file_groups={12 groups: [[t2.parquet:0..2133322], [t2.parquet:2133322..4266644], [t2.parquet:4266644..6399966], [t2.parquet:6399966..8533288], [t2.parquet:8533288..10666610], ...]}, projection=[k, v], file_type=parquet, predicate=v@1 = 1, pruning_predicate=v_null_count@2 != row_count@3 AND v_min@0 <= 1 AND 1 <= v_max@1, required_guarantees=[v in (1)] | | | , metrics=[output_rows=20480, elapsed_compute=12ns, batches_splitted=0, bytes_scanned=412894, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=20480, page_index_rows_pruned=1028096, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=1, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=9, bloom_filter_eval_time=252.512µs, metadata_load_time=3.801512ms, page_index_eval_time=329.512µs, row_pushdown_eval_time=24ns, statistics_eval_time=1.223112ms, time_elapsed_opening=11.4854ms, time_elapsed_processing=46.3598ms, time_elapsed_scanning_total=35.9475ms, time_elapsed_scanning_until_data=32.8674ms] | | | DataSourceExec: file_groups={12 groups: [[t1.parquet:0..1066678], [t1.parquet:1066678..2133356], [t1.parquet:2133356..3200034], [t1.parquet:3200034..4266712], [t1.parquet:4266712..5333390], ...]}, projection=[k], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ k@0 >= 1 AND k@0 <= 1 ], pruning_predicate=k_null_count@1 != row_count@2 AND k_max@0 >= 1 AND k_null_count@1 != row_count@2 AND k_min@3 <= 1, required_guarantees=[] | | here -> | , metrics=[output_rows=5805696, elapsed_compute=12ns, batches_splitted=0, bytes_scanned=7485222, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=0, page_index_rows_pruned=1048576, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=0, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=3, bloom_filter_eval_time=3.62µs, metadata_load_time=3.269812ms, page_index_eval_time=218.712µs, row_pushdown_eval_time=24ns, statistics_eval_time=443.22µs, time_elapsed_opening=6.0144ms, time_elapsed_processing=814.7442ms, time_elapsed_scanning_total=2.0536666s, time_elapsed_scanning_until_data=119.6913ms] | | | |row(s) fetched. Elapsed 0.434 seconds. > explain analyze select * from t1 join t2 on t1.k = t2.k where v| plan_type | plan || Plan with Metrics | ProjectionExec: expr=[k@2 as k, k@0 as k, v@1 as v], metrics=[output_rows=1, elapsed_compute=8.611µs] | | | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=3.987404ms] | | | HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)], filter=[k@0 >= 1 AND k@0 <= 1], metrics=[output_rows=1, elapsed_compute=1.458327901s, build_input_batches=1, build_input_rows=1, input_batches=899, input_rows=7360512, output_batches=899, build_mem_used=89, build_time=974µs, join_time=1.4572807s] | | | CoalescePartitionsExec, metrics=[output_rows=1, elapsed_compute=52.6µs] | | | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=29.1µs] | | | FilterExec: v@1 = 1, metrics=[output_rows=1, elapsed_compute=586.111µs] | | | DataSourceExec: file_groups={12 groups: [[t2.parquet:0..2133322], [t2.parquet:2133322..4266644], [t2.parquet:4266644..6399966], [t2.parquet:6399966..8533288], [t2.parquet:8533288..10666610], ...]}, projection=[k, v], file_type=parquet, predicate=v@1 = 1, pruning_predicate=v_null_count@2 != row_count@3 AND v_min@0 <= 1 AND 1 <= v_max@1, required_guarantees=[v in (1)] | | | , metrics=[output_rows=20480, elapsed_compute=12ns, batches_splitted=0, bytes_scanned=412894, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=20480, page_index_rows_pruned=1028096, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=1, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=9, bloom_filter_eval_time=130.412µs, metadata_load_time=4.664712ms, page_index_eval_time=198.612µs, row_pushdown_eval_time=24ns, statistics_eval_time=1.181212ms, time_elapsed_opening=12.7488ms, time_elapsed_processing=46.6822ms, time_elapsed_scanning_total=35.0437ms, time_elapsed_scanning_until_data=31.9659ms] | | | DataSourceExec: file_groups={12 groups: [[t1.parquet:0..1066678], [t1.parquet:1066678..2133356], [t1.parquet:2133356..3200034], [t1.parquet:3200034..4266712], [t1.parquet:4266712..5333390], ...]}, projection=[k], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ k@0 >= 1 AND k@0 <= 1 ], pruning_predicate=k_null_count@1 != row_count@2 AND k_max@0 >= 1 AND k_null_count@1 != row_count@2 AND k_min@3 <= 1, required_guarantees=[] | | here -> | , metrics=[output_rows=7360512, elapsed_compute=12ns, batches_splitted=0, bytes_scanned=9474601, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=20480, page_index_rows_pruned=2076672, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=0, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=1, bloom_filter_eval_time=1.023µs, metadata_load_time=2.509312ms, page_index_eval_time=578.312µs, row_pushdown_eval_time=24ns, statistics_eval_time=164.423µs, time_elapsed_opening=5.3009ms, time_elapsed_processing=1.1821071s, time_elapsed_scanning_total=2.937489s, time_elapsed_scanning_until_data=188.8519ms] | | | |row(s) fetched. Elapsed 0.496 seconds. > explain analyze select * from t1 join t2 on t1.k = t2.k where v| plan_type | plan || Plan with Metrics | ProjectionExec: expr=[k@2 as k, k@0 as k, v@1 as v], metrics=[output_rows=1, elapsed_compute=6.711µs] | | | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=6.169003ms] | | | HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)], filter=[k@0 >= 1 AND k@0 <= 1], metrics=[output_rows=1, elapsed_compute=2.217662901s, build_input_batches=1, build_input_rows=1, input_batches=1221, input_rows=10000000, output_batches=1221, build_mem_used=89, build_time=711.7µs, join_time=2.2169098s] | | | CoalescePartitionsExec, metrics=[output_rows=1, elapsed_compute=51.6µs] | | | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=26.6µs] | | | FilterExec: v@1 = 1, metrics=[output_rows=1, elapsed_compute=585.411µs] | | | DataSourceExec: file_groups={12 groups: [[t2.parquet:0..2133322], [t2.parquet:2133322..4266644], [t2.parquet:4266644..6399966], [t2.parquet:6399966..8533288], [t2.parquet:8533288..10666610], ...]}, projection=[k, v], file_type=parquet, predicate=v@1 = 1, pruning_predicate=v_null_count@2 != row_count@3 AND v_min@0 <= 1 AND 1 <= v_max@1, required_guarantees=[v in (1)] | | | , metrics=[output_rows=20480, elapsed_compute=12ns, batches_splitted=0, bytes_scanned=412894, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=20480, page_index_rows_pruned=1028096, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=1, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=9, bloom_filter_eval_time=129.712µs, metadata_load_time=4.275612ms, page_index_eval_time=206.112µs, row_pushdown_eval_time=24ns, statistics_eval_time=1.155312ms, time_elapsed_opening=13.5415ms, time_elapsed_processing=47.6242ms, time_elapsed_scanning_total=35.1671ms, time_elapsed_scanning_until_data=32.096ms] | | | DataSourceExec: file_groups={12 groups: [[t1.parquet:0..1066678], [t1.parquet:1066678..2133356], [t1.parquet:2133356..3200034], [t1.parquet:3200034..4266712], [t1.parquet:4266712..5333390], ...]}, projection=[k], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ k@0 >= 1 AND k@0 <= 1 ], pruning_predicate=k_null_count@1 != row_count@2 AND k_max@0 >= 1 AND k_null_count@1 != row_count@2 AND k_min@3 <= 1, required_guarantees=[] | | here -> | , metrics=[output_rows=10000000, elapsed_compute=12ns, batches_splitted=0, bytes_scanned=12781310, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=0, page_index_rows_pruned=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=0, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, bloom_filter_eval_time=1.123µs, metadata_load_time=2.642712ms, page_index_eval_time=4.312µs, row_pushdown_eval_time=24ns, statistics_eval_time=131.323µs, time_elapsed_opening=4.6513ms, time_elapsed_processing=1.8171676s, time_elapsed_scanning_total=4.5157559s, time_elapsed_scanning_until_data=260.5798ms] | | | |row(s) fetched. Elapsed 0.549 seconds. ``` On main it always returns 20480 rows for `t1`: ```sql| plan_type | plan || Plan with Metrics | ProjectionExec: expr=[k@2 as k, k@0 as k, v@1 as v], metrics=[output_rows=1, elapsed_compute=6.611µs] | | | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=31.7µs] | | | HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)], filter=[k@0 >= 1 AND k@0 <= 1], metrics=[output_rows=1, elapsed_compute=4.247301ms, build_input_batches=1, build_input_rows=1, input_batches=3, input_rows=20480, output_batches=3, build_mem_used=89, build_time=673.7µs, join_time=3.555ms] | | | CoalescePartitionsExec, metrics=[output_rows=1, elapsed_compute=66.4µs] | | | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=28.7µs] | | | FilterExec: v@1 = 1, metrics=[output_rows=1, elapsed_compute=601.011µs] | | | DataSourceExec: file_groups={12 groups: [[t2.parquet:0..2133322], [t2.parquet:2133322..4266644], [t2.parquet:4266644..6399966], [t2.parquet:6399966..8533288], [t2.parquet:8533288..10666610], ...]}, projection=[k, v], file_type=parquet, predicate=v@1 = 1, pruning_predicate=v_null_count@2 != row_count@3 AND v_min@0 <= 1 AND 1 <= v_max@1, required_guarantees=[v in (1)] | | | , metrics=[output_rows=20480, elapsed_compute=12ns, batches_splitted=0, bytes_scanned=412894, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=20480, page_index_rows_pruned=1028096, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=1, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=9, bloom_filter_eval_time=91.312µs, metadata_load_time=5.554412ms, page_index_eval_time=173.912µs, row_pushdown_eval_time=24ns, statistics_eval_time=1.118912ms, time_elapsed_opening=15.7229ms, time_elapsed_processing=49.715ms, time_elapsed_scanning_total=35.0119ms, time_elapsed_scanning_until_data=31.9417ms] | | | DataSourceExec: file_groups={12 groups: [[t1.parquet:0..1066678], [t1.parquet:1066678..2133356], [t1.parquet:2133356..3200034], [t1.parquet:3200034..4266712], [t1.parquet:4266712..5333390], ...]}, projection=[k], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ k@0 >= 1 AND k@0 <= 1 ], pruning_predicate=k_null_count@1 != row_count@2 AND k_max@0 >= 1 AND k_null_count@1 != row_count@2 AND k_min@3 <= 1, required_guarantees=[] | | here -> | , metrics=[output_rows=20480, elapsed_compute=12ns, batches_splitted=0, bytes_scanned=206447, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=20480, page_index_rows_pruned=1028096, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=1, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=9, bloom_filter_eval_time=134.212µs, metadata_load_time=5.683512ms, page_index_eval_time=172.312µs, row_pushdown_eval_time=24ns, statistics_eval_time=1.148112ms, time_elapsed_opening=12.28ms, time_elapsed_processing=29.6494ms, time_elapsed_scanning_total=21.961ms, time_elapsed_scanning_until_data=16.4331ms] | | | |``` And there is the second issue, where `DynamicFilterPhysicalExpr` is `true`: ```sql > explain analyze select * from t1 join t2 on t1.k = t2.k where v = 1 or v| plan_type | plan | +-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | Plan with Metrics | ProjectionExec: expr=[k@2 as k, k@0 as k, v@1 as v], metrics=[output_rows=2, elapsed_compute=11.31µs] | | | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=2, elapsed_compute=7.552604ms] | | | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(k@0, k@0)], metrics=[output_rows=2, elapsed_compute=431.141012ms, build_input_batches=2, build_input_rows=2, input_batches=1175, input_rows=10000000, output_batches=1368, build_mem_used=908, build_time=11.9206ms, join_time=408.0587ms] | | | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=2, elapsed_compute=50.702µs] | | | RepartitionExec: partitioning=Hash([k@0], 12), input_partitions=12, metrics=[fetch_time=51.8786ms, repartition_time=75.211µs, send_time=53.142µs] | | | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=2, elapsed_compute=28.3µs] | | | FilterExec: v@1 = 1 OR v@1 = 2, metrics=[output_rows=2, elapsed_compute=1.082711ms] | | | DataSourceExec: file_groups={12 groups: [[t2.parquet:0..2133322], [t2.parquet:2133322..4266644], [t2.parquet:4266644..6399966], [t2.parquet:6399966..8533288], [t2.parquet:8533288..10666610], ...]}, projection=[k, v], file_type=parquet, predicate=v@1 = 1 OR v@1 = 2, pruning_predicate=v_null_count@2 != row_count@3 AND v_min@0 <= 1 AND 1 <= v_max@1 OR v_null_count@2 != row_count@3 AND v_min@0 <= 2 AND 2 <= v_max@1, required_guarantees=[v in (1, 2)] | | | , metrics=[output_rows=20480, elapsed_compute=12ns, batches_splitted=0, bytes_scanned=412894, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=20480, page_index_rows_pruned=1028096, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=1, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=9, bloom_filter_eval_time=167.712µs, metadata_load_time=7.794612ms, page_index_eval_time=234.612µs, row_pushdown_eval_time=24ns, statistics_eval_time=1.390112ms, time_elapsed_opening=15.7157ms, time_elapsed_processing=49.7691ms, time_elapsed_scanning_total=35.6062ms, time_elapsed_scanning_until_data=32.055ms] | | | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=10000000, elapsed_compute=88.506601ms] | | | RepartitionExec: partitioning=Hash([k@0], 12), input_partitions=12, metrics=[fetch_time=1.961983s, repartition_time=1.462319702s, send_time=193.006324ms] | | | DataSourceExec: file_groups={12 groups: [[t1.parquet:0..1066678], [t1.parquet:1066678..2133356], [t1.parquet:2133356..3200034], [t1.parquet:3200034..4266712], [t1.parquet:4266712..5333390], ...]}, projection=[k], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ], metrics=[output_rows=10000000, elapsed_compute=12ns, batches_splitted=0, bytes_scanned=12781310, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=0, page_index_rows_pruned=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=0, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, bloom_filter_eval_time=24ns, metadata_load_time=2.152412ms, page_index_eval_time=3.512µs, row_pushdown_eval_time=24ns, statistics_eval_time=24ns, time_elapsed_opening=3.8613ms, time_elapsed_processing=1.9387351s, time_elapsed_scanning_total=3.9634373s, time_elapsed_scanning_until_data=251.6241ms] | | | |row(s) fetched. Elapsed 0.501 seconds. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org