milenkovicm commented on PR #1752: URL: https://github.com/apache/datafusion-ballista/pull/1752#issuecomment-4582832561
# Q72 (AQE) ~ 65 sec <img width="1484" height="348" alt="image" src="https://github.com/user-attachments/assets/f9dcac80-fc46-408b-b6f8-f3546cb604e0" /> ``` ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Job sO09M9B physical plan: ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- AdaptiveDatafusionExec: is_final=true, plan_id=2, stage_id=15, stage_resolved=true SortPreservingMergeExec: [total_cnt@5 DESC, i_item_desc@0 ASC NULLS LAST, w_warehouse_name@1 ASC NULLS LAST, d_week_seq@2 ASC NULLS LAST], fetch=100 ExchangeExec: partitioning=None, plan_id=19, stage_id=14, stage_resolved=true SortExec: TopK(fetch=100), expr=[total_cnt@5 DESC, i_item_desc@0 ASC NULLS LAST, w_warehouse_name@1 ASC NULLS LAST, d_week_seq@2 ASC NULLS LAST], preserve_partitioning=[true] ProjectionExec: expr=[i_item_desc@0 as i_item_desc, w_warehouse_name@1 as w_warehouse_name, d_week_seq@2 as d_week_seq, sum(CASE WHEN promotion.p_promo_sk IS NULL THEN Int64(1) ELSE Int64(0) END)@3 as no_promo, sum(CASE WHEN promotion.p_promo_sk IS NOT NULL THEN Int64(1) ELSE Int64(0) END)@4 as promo, count(Int64(1))@5 as total_cnt] AggregateExec: mode=FinalPartitioned, gby=[i_item_desc@0 as i_item_desc, w_warehouse_name@1 as w_warehouse_name, d_week_seq@2 as d_week_seq], aggr=[sum(CASE WHEN promotion.p_promo_sk IS NULL THEN Int64(1) ELSE Int64(0) END), sum(CASE WHEN promotion.p_promo_sk IS NOT NULL THEN Int64(1) ELSE Int64(0) END), count(Int64(1))] ExchangeExec: partitioning=Hash([i_item_desc@0, w_warehouse_name@1, d_week_seq@2], 128), plan_id=18, stage_id=13, stage_resolved=true AggregateExec: mode=Partial, gby=[i_item_desc@1 as i_item_desc, w_warehouse_name@0 as w_warehouse_name, d_week_seq@2 as d_week_seq], aggr=[sum(CASE WHEN promotion.p_promo_sk IS NULL THEN Int64(1) ELSE Int64(0) END), sum(CASE WHEN promotion.p_promo_sk IS NOT NULL THEN Int64(1) ELSE Int64(0) END), count(Int64(1))] HashJoinExec: mode=CollectLeft, join_type=Left, on=[(cs_item_sk@0, cr_item_sk@0), (cs_order_number@1, cr_order_number@1)], projection=[w_warehouse_name@2, i_item_desc@3, d_week_seq@4, p_promo_sk@5] ExchangeExec: partitioning=None, plan_id=17, stage_id=11, stage_resolved=true, broadcast=true ProjectionExec: expr=[cs_item_sk@1 as cs_item_sk, cs_order_number@2 as cs_order_number, w_warehouse_name@3 as w_warehouse_name, i_item_desc@4 as i_item_desc, d_week_seq@5 as d_week_seq, p_promo_sk@0 as p_promo_sk] HashJoinExec: mode=CollectLeft, join_type=Right, on=[(p_promo_sk@0, cs_promo_sk@1)], projection=[p_promo_sk@0, cs_item_sk@1, cs_order_number@3, w_warehouse_name@4, i_item_desc@5, d_week_seq@6] DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpcds_data/10g_parquet/promotion.parquet]]}, projection=[p_promo_sk], file_type=parquet HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(cs_ship_date_sk@0, d_date_sk@0)], filter=d_date@1 > d_date@0 + IntervalMonthDayNano { months: 0, days: 5, nanoseconds: 0 }, projection=[cs_item_sk@1, cs_promo_sk@2, cs_order_number@3, w_warehouse_name@4, i_item_desc@5, d_week_seq@7] HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d_date_sk@0, inv_date_sk@4), (d_week_seq@1, d_week_seq@8)], projection=[cs_ship_date_sk@2, cs_item_sk@3, cs_promo_sk@4, cs_order_number@5, w_warehouse_name@7, i_item_desc@8, d_date@9, d_week_seq@10] DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpcds_data/10g_parquet/date_dim.parquet]]}, projection=[d_date_sk, d_week_seq], file_type=parquet, predicate=DynamicFilter [ empty ] ExchangeExec: partitioning=None, plan_id=14, stage_id=10, stage_resolved=true, broadcast=true ProjectionExec: expr=[cs_ship_date_sk@2 as cs_ship_date_sk, cs_item_sk@3 as cs_item_sk, cs_promo_sk@4 as cs_promo_sk, cs_order_number@5 as cs_order_number, inv_date_sk@6 as inv_date_sk, w_warehouse_name@7 as w_warehouse_name, i_item_desc@8 as i_item_desc, d_date@0 as d_date, d_week_seq@1 as d_week_seq] HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d_date_sk@0, cs_sold_date_sk@0)], projection=[d_date@1, d_week_seq@2, cs_ship_date_sk@4, cs_item_sk@5, cs_promo_sk@6, cs_order_number@7, inv_date_sk@8, w_warehouse_name@9, i_item_desc@10] FilterExec: d_year@3 = 1999, projection=[d_date_sk@0, d_date@1, d_week_seq@2] DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpcds_data/10g_parquet/date_dim.parquet]]}, projection=[d_date_sk, d_date, d_week_seq, d_year], file_type=parquet, predicate=d_year@6 = 1999 AND d_year@6 = 1999 AND d_year@6 = 1999 AND d_year@6 = 1999 AND d_year@6 = 1999 AND d_year@6 = 1999 AND d_year@6 = 1999 AND d_year@6 = 1999 AND d_year@6 = 1999 AND d_year@6 = 1999 AND d_year@6 = 1999 AND d_year@6 = 1999 AND d_year@6 = 1999 AND d_year@6 = 1999 AND d_year@6 = 1999 AND d_year@6 = 1999 AND d_year@6 = 1999, pruning_predicate=d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 1999 AND 1999 <= d_year_max@1 AND d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 1999 AND 1999 <= d_year_max@1 AND d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 1999 AND 1999 <= d_year_max@1 AND d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 1999 AND 1999 <= d_year_max@1 AND d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 1999 AND 199 9 <= d_year_max@1 AND d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 1999 AND 1999 <= d_year_max@1 AND d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 1999 AND 1999 <= d_year_max@1 AND d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 1999 AND 1999 <= d_year_max@1 AND d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 1999 AND 1999 <= d_year_max@1 AND d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 1999 AND 1999 <= d_year_max@1 AND d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 1999 AND 1999 <= d_year_max@1 AND d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 1999 AND 1999 <= d_year_max@1 AND d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 1999 AND 1999 <= d_year_max@1 AND d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 1999 AND 1999 <= d_year_max@1 AND d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 1999 AND 1999 <= d_year_max@1 AND d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 1999 AND 1999 <= d_year_max@1 AND d_year_null_count@2 != row_count@3 AND d_year_min@0 <= 1999 AND 1999 <= d_year_max@1, required_guarantees=[d_year in (1999)] HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(hd_demo_sk@0, cs_bill_hdemo_sk@2)], projection=[cs_sold_date_sk@1, cs_ship_date_sk@2, cs_item_sk@4, cs_promo_sk@5, cs_order_number@6, inv_date_sk@7, w_warehouse_name@8, i_item_desc@9] ExchangeExec: partitioning=None, plan_id=13, stage_id=9, stage_resolved=true, broadcast=true FilterExec: hd_buy_potential@1 = 501-1000, projection=[hd_demo_sk@0] DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpcds_data/10g_parquet/household_demographics.parquet]]}, projection=[hd_demo_sk, hd_buy_potential], file_type=parquet, predicate=hd_buy_potential@2 = 501-1000 AND hd_buy_potential@2 = 501-1000 AND hd_buy_potential@2 = 501-1000 AND hd_buy_potential@2 = 501-1000 AND hd_buy_potential@2 = 501-1000 AND hd_buy_potential@2 = 501-1000 AND hd_buy_potential@2 = 501-1000 AND hd_buy_potential@2 = 501-1000 AND hd_buy_potential@2 = 501-1000 AND hd_buy_potential@2 = 501-1000 AND hd_buy_potential@2 = 501-1000 AND hd_buy_potential@2 = 501-1000 AND hd_buy_potential@2 = 501-1000 AND hd_buy_potential@2 = 501-1000 AND hd_buy_potential@2 = 501-1000 AND hd_buy_potential@2 = 501-1000 AND hd_buy_potential@2 = 501-1000, pruning_predicate=hd_buy_potential_null_count@2 != row_count@3 AND hd_buy_potential_min@0 <= 501-1000 AND 501-1000 <= hd_buy_potential_max@1 AND hd_buy_potential_null_count@2 != row_count@3 AND hd_buy_potential_min@0 <= 501-1000 AND 501-1000 <= hd_buy_potential_max@1 AND hd_buy_potential_null_count@2 != row_count@3 AND hd_buy_potential_min@0 <= 501-1000 AND 501-1000 <= hd_buy_potential_max@1 AND hd_buy_potential_null_count@2 != row_count@3 AND hd_buy_potential_min@0 <= 501-1000 AND 501-1000 <= hd_buy_potential_max@1 AND hd_buy_potential_null_count@2 != row_count@3 AND hd_buy_potential_min@0 <= 501-1000 AND 501-1000 <= hd_buy_potential_max@1 AND hd_buy_potential_null_count@2 != row_count@3 AND hd_buy_potential_min@0 <= 501-1000 AND 501-1000 <= hd_buy_potential_max@1 AND hd_buy_potential_null_count@2 != row_count@3 AND hd_buy_potential_min@0 <= 501-1000 AND 501-1000 <= hd_buy_potential_max@1 AND hd_buy_potential_null_count@2 != row_count@3 AND hd_buy_potential_min@0 <= 501-1000 AND 501-1000 <= hd_buy_potential_max@1 AND hd_buy_potential_null_count@2 != row_count@3 AND hd_buy_potential_min@0 <= 501-1000 AND 501-1000 <= hd_buy_potential_max@1 AND hd_buy_potential_null_count@2 != row_count@3 AND hd_buy_potential_min@0 <= 501-1000 AND 501-1000 <= hd_buy_potential_max@1 AND hd_buy_potential_null_count@2 != row_count@3 AND hd_buy_potential_min@0 <= 501-1000 AND 501-1000 <= hd_buy_potential_max@1 AND hd_buy_potential_null_count@2 != row_count@3 AND hd_buy_potential_min@0 <= 501-1000 AND 501-1000 <= hd_buy_potential_max@1 AND hd_buy_potential_null_count@2 != row_count@3 AND hd_buy_potential_min@0 <= 501-1000 AND 501-1000 <= hd_buy_potential_max@1 AND hd_buy_potential_null_count@2 != row_count@3 AND hd_buy_potential_min@0 <= 501-1000 AND 501-1000 <= hd_buy_potential_max@1 AND hd_buy_potential_null_count@2 != row_count@3 AND hd_buy_potential_min@0 <= 501-1000 AND 501-1000 <= hd_buy_potential_max@1 AND hd_buy_potential_null_count@2 != row_count@3 AND hd_buy_potential_min@0 <= 501-1000 AND 501-1000 <= hd_buy_potential_max@1 AND hd_buy_potential_null_count@2 != row_count@3 AND hd_buy_potential_min@0 <= 501-1000 AND 501-1000 <= hd_buy_potential_max@1, required_guara ntees=[hd_buy_potential in (501-1000)] ExchangeExec: partitioning=Hash([cs_bill_hdemo_sk@2], 128), plan_id=11, stage_id=8, stage_resolved=true HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(cd_demo_sk@0, cs_bill_cdemo_sk@2)], projection=[cs_sold_date_sk@1, cs_ship_date_sk@2, cs_bill_hdemo_sk@4, cs_item_sk@5, cs_promo_sk@6, cs_order_number@7, inv_date_sk@8, w_warehouse_name@9, i_item_desc@10] ExchangeExec: partitioning=None, plan_id=10, stage_id=7, stage_resolved=true, broadcast=true FilterExec: cd_marital_status@1 = S, projection=[cd_demo_sk@0] DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpcds_data/10g_parquet/customer_demographics.parquet]]}, projection=[cd_demo_sk, cd_marital_status], file_type=parquet, predicate=cd_marital_status@2 = S AND cd_marital_status@2 = S AND cd_marital_status@2 = S AND cd_marital_status@2 = S AND cd_marital_status@2 = S AND cd_marital_status@2 = S AND cd_marital_status@2 = S AND cd_marital_status@2 = S AND cd_marital_status@2 = S AND cd_marital_status@2 = S AND cd_marital_status@2 = S AND cd_marital_status@2 = S AND cd_marital_status@2 = S AND cd_marital_status@2 = S AND cd_marital_status@2 = S AND cd_marital_status@2 = S AND cd_marital_status@2 = S, pruning_predicate=cd_marital_status_null_count@2 != row_count@3 AND cd_marital_status_min@0 <= S AND S <= cd_marital_status_max@1 AND cd_marital_status_null_count@2 != row_count@3 AND cd_marital_status_min@0 <= S AND S <= cd_marital_status_max@1 AND cd_marital_status_null_count@2 != row_coun t@3 AND cd_marital_status_min@0 <= S AND S <= cd_marital_status_max@1 AND cd_marital_status_null_count@2 != row_count@3 AND cd_marital_status_min@0 <= S AND S <= cd_marital_status_max@1 AND cd_marital_status_null_count@2 != row_count@3 AND cd_marital_status_min@0 <= S AND S <= cd_marital_status_max@1 AND cd_marital_status_null_count@2 != row_count@3 AND cd_marital_status_min@0 <= S AND S <= cd_marital_status_max@1 AND cd_marital_status_null_count@2 != row_count@3 AND cd_marital_status_min@0 <= S AND S <= cd_marital_status_max@1 AND cd_marital_status_null_count@2 != row_count@3 AND cd_marital_status_min@0 <= S AND S <= cd_marital_status_max@1 AND cd_marital_status_null_count@2 != row_count@3 AND cd_marital_status_min@0 <= S AND S <= cd_marital_status_max@1 AND cd_marital_status_null_count@2 != row_count@3 AND cd_marital_status_min@0 <= S AND S <= cd_marital_status_max@1 AND cd_marital_status_null_count@2 != row_count@3 AND cd_marital_status_min@0 <= S AND S <= cd_marital_status_max@1 AND cd_marital_status_null_count@2 != row_count@3 AND cd_marital_status_min@0 <= S AND S <= cd_marital_status_max@1 AND cd_marital_status_null_count@2 != row_count@3 AND cd_marital_status_min@0 <= S AND S <= cd_marital_status_max@1 AND cd_marital_status_null_count@2 != row_count@3 AND cd_marital_status_min@0 <= S AND S <= cd_marital_status_max@1 AND cd_marital_status_null_count@2 != row_count@3 AND cd_marital_status_min@0 <= S AND S <= cd_marital_status_max@1 AND cd_marital_status_null_count@2 != row_count@3 AND cd_marital_status_min@0 <= S AND S <= cd_marital_status_max@1 AND cd_marital_status_null_count@2 != row_count@3 AND cd_marital_status_min@0 <= S AND S <= cd_marital_status_max@1, required_guarantees=[cd_marital_status in (S)] ExchangeExec: partitioning=Hash([cs_bill_cdemo_sk@2], 128), plan_id=8, stage_id=6, stage_resolved=true ProjectionExec: expr=[cs_sold_date_sk@0 as cs_sold_date_sk, cs_ship_date_sk@1 as cs_ship_date_sk, cs_bill_cdemo_sk@2 as cs_bill_cdemo_sk, cs_bill_hdemo_sk@3 as cs_bill_hdemo_sk, cs_item_sk@4 as cs_item_sk, cs_promo_sk@5 as cs_promo_sk, cs_order_number@6 as cs_order_number, inv_date_sk@7 as inv_date_sk, w_warehouse_name@8 as w_warehouse_name, i_item_desc@10 as i_item_desc] SortMergeJoinExec: join_type=Inner, on=[(cs_item_sk@4, i_item_sk@0)] ExchangeExec: partitioning=Hash([cs_item_sk@4], 128), plan_id=6, stage_id=4, stage_resolved=true ProjectionExec: expr=[cs_sold_date_sk@1 as cs_sold_date_sk, cs_ship_date_sk@2 as cs_ship_date_sk, cs_bill_cdemo_sk@3 as cs_bill_cdemo_sk, cs_bill_hdemo_sk@4 as cs_bill_hdemo_sk, cs_item_sk@5 as cs_item_sk, cs_promo_sk@6 as cs_promo_sk, cs_order_number@7 as cs_order_number, inv_date_sk@8 as inv_date_sk, w_warehouse_name@0 as w_warehouse_name] HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(w_warehouse_sk@0, inv_warehouse_sk@8)], projection=[w_warehouse_name@1, cs_sold_date_sk@2, cs_ship_date_sk@3, cs_bill_cdemo_sk@4, cs_bill_hdemo_sk@5, cs_item_sk@6, cs_promo_sk@7, cs_order_number@8, inv_date_sk@9] ExchangeExec: partitioning=None, plan_id=5, stage_id=3, stage_resolved=true, broadcast=true DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpcds_data/10g_parquet/warehouse.parquet]]}, projection=[w_warehouse_sk, w_warehouse_name], file_type=parquet ExchangeExec: partitioning=Hash([inv_warehouse_sk@8], 128), plan_id=3, stage_id=2, stage_resolved=true ProjectionExec: expr=[cs_sold_date_sk@0 as cs_sold_date_sk, cs_ship_date_sk@1 as cs_ship_date_sk, cs_bill_cdemo_sk@2 as cs_bill_cdemo_sk, cs_bill_hdemo_sk@3 as cs_bill_hdemo_sk, cs_item_sk@4 as cs_item_sk, cs_promo_sk@5 as cs_promo_sk, cs_order_number@6 as cs_order_number, inv_date_sk@8 as inv_date_sk, inv_warehouse_sk@10 as inv_warehouse_sk] SortMergeJoinExec: join_type=Inner, on=[(cs_item_sk@4, inv_item_sk@1)], filter=inv_quantity_on_hand@1 < cs_quantity@0 SortExec: expr=[cs_item_sk@4 ASC], preserve_partitioning=[true] ExchangeExec: partitioning=Hash([cs_item_sk@4], 128), plan_id=0, stage_id=0, stage_resolved=true DataSourceExec: file_groups={128 groups: [[Users/marko/TMP/tpcds_data/10g_parquet/catalog_sales.parquet:0..7953492], [Users/marko/TMP/tpcds_data/10g_parquet/catalog_sales.parquet:7953492..15906984], [Users/marko/TMP/tpcds_data/10g_parquet/catalog_sales.parquet:15906984..23860476], [Users/marko/TMP/tpcds_data/10g_parquet/catalog_sales.parquet:23860476..31813968], [Users/marko/TMP/tpcds_data/10g_parquet/catalog_sales.parquet:31813968..39767460], ...]}, projection=[cs_sold_date_sk, cs_ship_date_sk, cs_bill_cdemo_sk, cs_bill_hdemo_sk, cs_item_sk, cs_promo_sk, cs_order_number, cs_quantity], file_type=parquet SortExec: expr=[inv_item_sk@1 ASC], preserve_partitioning=[true] ExchangeExec: partitioning=Hash([inv_item_sk@1], 128), plan_id=1, stage_id=1, stage_resolved=true DataSourceExec: file_groups={128 groups: [[Users/marko/TMP/tpcds_data/10g_parquet/inventory.parquet:0..3954771], [Users/marko/TMP/tpcds_data/10g_parquet/inventory.parquet:3954771..7909542], [Users/marko/TMP/tpcds_data/10g_parquet/inventory.parquet:7909542..11864313], [Users/marko/TMP/tpcds_data/10g_parquet/inventory.parquet:11864313..15819084], [Users/marko/TMP/tpcds_data/10g_parquet/inventory.parquet:15819084..19773855], ...]}, projection=[inv_date_sk, inv_item_sk, inv_warehouse_sk, inv_quantity_on_hand], file_type=parquet SortExec: expr=[i_item_sk@0 ASC], preserve_partitioning=[true] ExchangeExec: partitioning=Hash([i_item_sk@0], 128), plan_id=7, stage_id=5, stage_resolved=true DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpcds_data/10g_parquet/item.parquet]]}, projection=[i_item_sk, i_item_desc], file_type=parquet DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpcds_data/10g_parquet/date_dim.parquet]]}, projection=[d_date_sk, d_date], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] ExchangeExec: partitioning=Hash([cr_item_sk@0, cr_order_number@1], 128), plan_id=16, stage_id=12, stage_resolved=true DataSourceExec: file_groups={128 groups: [[Users/marko/TMP/tpcds_data/10g_parquet/catalog_returns.parquet:0..781584], [Users/marko/TMP/tpcds_data/10g_parquet/catalog_returns.parquet:781584..1563168], [Users/marko/TMP/tpcds_data/10g_parquet/catalog_returns.parquet:1563168..2344752], [Users/marko/TMP/tpcds_data/10g_parquet/catalog_returns.parquet:2344752..3126336], [Users/marko/TMP/tpcds_data/10g_parquet/catalog_returns.parquet:3126336..3907920], ...]}, projection=[cr_item_sk, cr_order_number], file_type=parquet ``` SET datafusion.optimizer.hash_join_single_partition_threshold=7485760" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
