[
https://issues.apache.org/jira/browse/SPARK-45198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Anton Uvarov updated SPARK-45198:
---------------------------------
Attachment: T2-Details-for-Query.png
T1-Details-for-Query.png
> problem using broadcast join with parquet/iceberg tables
> --------------------------------------------------------
>
> Key: SPARK-45198
> URL: https://issues.apache.org/jira/browse/SPARK-45198
> Project: Spark
> Issue Type: Bug
> Components: Build
> Affects Versions: 3.4.1
> Reporter: Anton Uvarov
> Priority: Trivial
> Attachments: T1-Details-for-Query.png, T2-Details-for-Query.png
>
> Original Estimate: 612h
> Remaining Estimate: 612h
>
> We have 2 Parquet tables: load_test_full_warehouse.gen_document_type and
> load_test_full_warehouse.generation_document_part.
> Trying to make a left join of these two tables onto each other gives a
> strange result. In the case where on the left side of the join we use a large
> table load_test_full_warehouse.generation_document_part, the optimizer uses a
> broadcast join.
> However, in the case when on the left in the join we use a small reference
> table, the optimizer chooses to execute the query using the merge sort.
> Although it would seem that the small table on the left in a left join should
> initiate a broadcast join.
> An attempt to use hints and collect statistics did not yield results. The
> following queries were used:
> spark.sql(f"""create table iceberg_warehouse.t1 using iceberg
> as SELECT /*+ BROADCAST(doc_tp) */
> doc.DOCUMENT_DATE
> , doc_tp.NAME as DOCUMENT_TYPE
> , COUNT(*) as DOC_QTY
> FROM load_test_full_warehouse.generation_document_part doc
> LEFT JOIN load_test_full_warehouse.gen_document_type doc_tp ON
> doc.DOCUMENT_TYPE_ID_INT = doc_tp.DOCUMENT_TYPE_ID_INT
> GROUP BY doc.DOCUMENT_DATE, doc_tp.NAME""")
> == Physical Plan ==
> AtomicCreateTableAsSelect (25)
> +- AdaptiveSparkPlan (24)
> +- == Final Plan ==
> * HashAggregate (15)
> +- AQEShuffleRead (14)
> +- ShuffleQueryStage (13), Statistics(sizeInBytes=16.7 MiB,
> rowCount=3.12E+5)
> +- Exchange (12)
> +- * HashAggregate (11)
> +- * Project (10)
> +- * BroadcastHashJoin LeftOuter BuildRight (9)
> :- * Project (3)
> : +- * ColumnarToRow (2)
> : +- Scan parquet
> spark_catalog.load_test_full_warehouse.generation_document_part (1)
> +- BroadcastQueryStage (8),
> Statistics(sizeInBytes=1031.8 KiB, rowCount=1.00E+3)
> +- BroadcastExchange (7)
> +- * Filter (6)
> +- * ColumnarToRow (5)
> +- Scan parquet
> spark_catalog.load_test_full_warehouse.gen_document_type (4)
> +- == Initial Plan ==
> HashAggregate (23)
> +- Exchange (22)
> +- HashAggregate (21)
> +- Project (20)
> +- BroadcastHashJoin LeftOuter BuildRight (19)
> :- Project (16)
> : +- Scan parquet
> spark_catalog.load_test_full_warehouse.generation_document_part (1)
> +- BroadcastExchange (18)
> +- Filter (17)
> +- Scan parquet
> spark_catalog.load_test_full_warehouse.gen_document_type (4)
>
> spark.sql(f"""create table iceberg_warehouse.t2 using iceberg
> as SELECT /*+ BROADCAST(doc_tp) */
> doc.DOCUMENT_DATE
> , doc_tp.NAME as DOCUMENT_TYPE
> , COUNT(*) as DOC_QTY
> FROM load_test_full_warehouse.gen_document_type doc_tp
> LEFT JOIN load_test_full_warehouse.generation_document_part doc
> ON doc.DOCUMENT_TYPE_ID_INT = doc_tp.DOCUMENT_TYPE_ID_INT
> GROUP BY doc.DOCUMENT_DATE, doc_tp.NAME""")
> == Physical Plan ==
> AtomicCreateTableAsSelect (34)
> +- AdaptiveSparkPlan (33)
> +- == Final Plan ==
> * HashAggregate (21)
> +- AQEShuffleRead (20)
> +- ShuffleQueryStage (19), Statistics(sizeInBytes=1695.3 KiB,
> rowCount=3.10E+4)
> +- Exchange (18)
> +- * HashAggregate (17)
> +- * Project (16)
> +- * SortMergeJoin LeftOuter (15)
> :- * Sort (6)
> : +- AQEShuffleRead (5)
> : +- ShuffleQueryStage (4),
> Statistics(sizeInBytes=46.9 KiB, rowCount=1.00E+3)
> : +- Exchange (3)
> : +- * ColumnarToRow (2)
> : +- Scan parquet
> spark_catalog.load_test_full_warehouse.gen_document_type (1)
> +- * Sort (14)
> +- AQEShuffleRead (13)
> +- ShuffleQueryStage (12),
> Statistics(sizeInBytes=234.7 GiB, rowCount=1.05E+10)
> +- Exchange (11)
> +- * Project (10)
> +- * Filter (9)
> +- * ColumnarToRow (8)
> +- Scan parquet
> spark_catalog.load_test_full_warehouse.generation_document_part (7)
> +- == Initial Plan ==
> HashAggregate (32)
> +- Exchange (31)
> +- HashAggregate (30)
> +- Project (29)
> +- SortMergeJoin LeftOuter (28)
> :- Sort (23)
> : +- Exchange (22)
> : +- Scan parquet
> spark_catalog.load_test_full_warehouse.gen_document_type (1)
> +- Sort (27)
> +- Exchange (26)
> +- Project (25)
> +- Filter (24)
> +- Scan parquet
> spark_catalog.load_test_full_warehouse.generation_document_part (7)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]