tobixdev commented on issue #17488: URL: https://github.com/apache/datafusion/issues/17488#issuecomment-3274351360
You were right the reproducer wasnt quite there. I now improved it and I think we can now observe the issue. I consulted our query plan and basically tried to construct the physical plan directly as close as possible without copying further code. My takeaway from that is that the implementation in DF 50 is more sensitive to a larger left side of the join. In the join that we are having issues with, the left side is 1224 rows, while the right side is a single row. DF 49 seems to perform on both workloads equally well. DF 50 seems to have problems with the larger left side. The planner will hide that problem (I think) as it will switch the join sides if possible. However, in our system were not quite there yet and therefore we currently have suboptimal joins. Here are now the two files (exactly how I ran them). For switching between the larger left / right side, simply change the `LEFT_SIZE` and `RIGHT_SIZE` constants. I ran both of them with `cargo run --release` on an AMD Ryzen™ 9 9900X. [repro-datafusion-49.zip](https://github.com/user-attachments/files/22252932/repro-datafusion-49.zip) [repro-datafusion-50.zip](https://github.com/user-attachments/files/22253039/repro-datafusion-50.zip) On a side note: I think, if we add statistics to our physical data source nodes the DF optimizer should be able to swap the sides. However, maybe we can still improve the situation if you have any ideas. DataFusion 49: ``` NestedLoopJoin (left_size=1224, right_size=1): Elapsed Time: 61.477 ms Count: 13 Plan: ProjectionExec: expr=[product@2 as product, product_label@3 as product_label, sim_property2@5 as sim_property2], metrics=[output_rows=13, elapsed_compute=1.443µs] NestedLoopJoinExec: join_type=Inner, filter=compare(join_proj_push_down_1@0, join_proj_push_down_3@0) AND compare(join_proj_push_down_2@1, join_proj_push_down_4@1), metrics=[output_rows=13, elapsed_compute=2ns, build_input_batches=1, build_input_rows=1224, input_batches=1, input_rows=1, output_batches=1, build_mem_used=84544, build_time=19.747µs, join_time=61.445763ms] DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] NestedLoopJoin (left_size=1, right_size=1224): Elapsed Time: 61.431 ms Count: 13 Plan: ProjectionExec: expr=[product@2 as product, product_label@3 as product_label, sim_property2@5 as sim_property2], metrics=[output_rows=13, elapsed_compute=1.092µs] NestedLoopJoinExec: join_type=Inner, filter=compare(join_proj_push_down_1@0, join_proj_push_down_3@0) AND compare(join_proj_push_down_2@1, join_proj_push_down_4@1), metrics=[output_rows=13, elapsed_compute=2ns, build_input_batches=1, build_input_rows=1, input_batches=1, input_rows=1224, output_batches=1, build_mem_used=31472, build_time=28.734µs, join_time=61.393723ms] DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] ``` DataFusion 50: ``` NestedLoopJoin (left_size=1224, right_size=1): Elapsed Time: 1821.913 ms Count: 1224 Plan: ProjectionExec: expr=[product@2 as product, product_label@3 as product_label, sim_property2@5 as sim_property2], metrics=[output_rows=1224, elapsed_compute=6.081µs] NestedLoopJoinExec: join_type=Inner, filter=compare(join_proj_push_down_1@0, join_proj_push_down_3@0) AND compare(join_proj_push_down_2@1, join_proj_push_down_4@1), metrics=[output_rows=1224, elapsed_compute=2ns, build_input_batches=1, build_input_rows=1224, input_batches=1, input_rows=1, output_batches=1, build_mem_used=83824, build_time=25.828µs, join_time=1.821812225s] DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] NestedLoopJoin (left_size=1, right_size=1224): Elapsed Time: 60.65 ms Count: 13 Plan: ProjectionExec: expr=[product@2 as product, product_label@3 as product_label, sim_property2@5 as sim_property2], metrics=[output_rows=13, elapsed_compute=1.032µs] NestedLoopJoinExec: join_type=Inner, filter=compare(join_proj_push_down_1@0, join_proj_push_down_3@0) AND compare(join_proj_push_down_2@1, join_proj_push_down_4@1), metrics=[output_rows=13, elapsed_compute=2ns, build_input_batches=1, build_input_rows=1, input_batches=1, input_rows=1224, output_batches=1, build_mem_used=30656, build_time=26.45µs, join_time=60.609296ms] DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] DataSourceExec: partitions=1, partition_sizes=[1], metrics=[] ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
