[
https://issues.apache.org/jira/browse/SPARK-45198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Anton Uvarov updated SPARK-45198:
---------------------------------
Description:
We have 2 Parquet tables: load_test_full_warehouse.gen_document_type and
load_test_full_warehouse.generation_document_part.
Trying to make a left join of these two tables onto each other gives a strange
result. In the case where on the left side of the join we use a large table
load_test_full_warehouse.generation_document_part, the optimizer uses a
broadcast join.
However, in the case when on the left in the join we use a small reference
table, the optimizer chooses to execute the query using the merge sort.
Although it would seem that the small table on the left in a left join should
initiate a broadcast join.
An attempt to use hints and collect statistics did not yield results. The
following queries were used:
spark.sql(f"""create table iceberg_warehouse.t1 using iceberg
as SELECT /*+ BROADCAST(doc_tp) */
doc.DOCUMENT_DATE
, doc_tp.NAME as DOCUMENT_TYPE
, COUNT(*) as DOC_QTY
FROM load_test_full_warehouse.generation_document_part doc
LEFT JOIN load_test_full_warehouse.gen_document_type doc_tp ON
doc.DOCUMENT_TYPE_ID_INT = doc_tp.DOCUMENT_TYPE_ID_INT
GROUP BY doc.DOCUMENT_DATE, doc_tp.NAME""")
== Physical Plan ==
AtomicCreateTableAsSelect (25)
+- AdaptiveSparkPlan (24)
+- == Final Plan ==
* HashAggregate (15)
+- AQEShuffleRead (14)
+- ShuffleQueryStage (13), Statistics(sizeInBytes=16.7 MiB,
rowCount=3.12E+5)
+- Exchange (12)
+- * HashAggregate (11)
+- * Project (10)
+- * BroadcastHashJoin LeftOuter BuildRight (9)
:- * Project (3)
: +- * ColumnarToRow (2)
: +- Scan parquet
spark_catalog.load_test_full_warehouse.generation_document_part (1)
+- BroadcastQueryStage (8),
Statistics(sizeInBytes=1031.8 KiB, rowCount=1.00E+3)
+- BroadcastExchange (7)
+- * Filter (6)
+- * ColumnarToRow (5)
+- Scan parquet
spark_catalog.load_test_full_warehouse.gen_document_type (4)
+- == Initial Plan ==
HashAggregate (23)
+- Exchange (22)
+- HashAggregate (21)
+- Project (20)
+- BroadcastHashJoin LeftOuter BuildRight (19)
:- Project (16)
: +- Scan parquet
spark_catalog.load_test_full_warehouse.generation_document_part (1)
+- BroadcastExchange (18)
+- Filter (17)
+- Scan parquet
spark_catalog.load_test_full_warehouse.gen_document_type (4)
spark.sql(f"""create table iceberg_warehouse.t2 using iceberg
as SELECT /*+ BROADCAST(doc_tp) */
doc.DOCUMENT_DATE
, doc_tp.NAME as DOCUMENT_TYPE
, COUNT(*) as DOC_QTY
FROM load_test_full_warehouse.gen_document_type doc_tp
LEFT JOIN load_test_full_warehouse.generation_document_part doc
ON doc.DOCUMENT_TYPE_ID_INT = doc_tp.DOCUMENT_TYPE_ID_INT
GROUP BY doc.DOCUMENT_DATE, doc_tp.NAME""")
== Physical Plan ==
AtomicCreateTableAsSelect (34)
+- AdaptiveSparkPlan (33)
+- == Final Plan ==
* HashAggregate (21)
+- AQEShuffleRead (20)
+- ShuffleQueryStage (19), Statistics(sizeInBytes=1695.3 KiB,
rowCount=3.10E+4)
+- Exchange (18)
+- * HashAggregate (17)
+- * Project (16)
+- * SortMergeJoin LeftOuter (15)
:- * Sort (6)
: +- AQEShuffleRead (5)
: +- ShuffleQueryStage (4),
Statistics(sizeInBytes=46.9 KiB, rowCount=1.00E+3)
: +- Exchange (3)
: +- * ColumnarToRow (2)
: +- Scan parquet
spark_catalog.load_test_full_warehouse.gen_document_type (1)
+- * Sort (14)
+- AQEShuffleRead (13)
+- ShuffleQueryStage (12),
Statistics(sizeInBytes=234.7 GiB, rowCount=1.05E+10)
+- Exchange (11)
+- * Project (10)
+- * Filter (9)
+- * ColumnarToRow (8)
+- Scan parquet
spark_catalog.load_test_full_warehouse.generation_document_part (7)
+- == Initial Plan ==
HashAggregate (32)
+- Exchange (31)
+- HashAggregate (30)
+- Project (29)
+- SortMergeJoin LeftOuter (28)
:- Sort (23)
: +- Exchange (22)
: +- Scan parquet
spark_catalog.load_test_full_warehouse.gen_document_type (1)
+- Sort (27)
+- Exchange (26)
+- Project (25)
+- Filter (24)
+- Scan parquet
spark_catalog.load_test_full_warehouse.generation_document_part (7)
was:
We have 2 Parquet tables: load_test_full_warehouse.gen_document_type and
load_test_full_warehouse.generation_document_part.
Trying to make a left join of these two tables onto each other gives a strange
result. In the case where on the left side of the join we use a large table
load_test_full_warehouse.generation_document_part, the optimizer uses a
broadcast join.
However, in the case when on the left in the join we use a small reference
table, the optimizer chooses to execute the query using the merge sort.
Although it would seem that the small table on the left in a left join should
initiate a broadcast join.
An attempt to use hints and collect statistics did not yield results. The
following queries were used:
spark.sql(f"""create table iceberg_warehouse.t1 using iceberg
as SELECT /*+ BROADCAST(doc_tp) */
doc.DOCUMENT_DATE
, doc_tp.NAME as DOCUMENT_TYPE
, COUNT(*) as DOC_QTY
FROM load_test_full_warehouse.generation_document_part doc
LEFT JOIN load_test_full_warehouse.gen_document_type doc_tp ON
doc.DOCUMENT_TYPE_ID_INT = doc_tp.DOCUMENT_TYPE_ID_INT
GROUP BY doc.DOCUMENT_DATE, doc_tp.NAME""")
spark.sql(f"""create table iceberg_warehouse.t2 using iceberg
as SELECT /*+ BROADCAST(doc_tp) */
doc.DOCUMENT_DATE
, doc_tp.NAME as DOCUMENT_TYPE
, COUNT(*) as DOC_QTY
FROM load_test_full_warehouse.gen_document_type doc_tp
LEFT JOIN load_test_full_warehouse.generation_document_part doc
ON doc.DOCUMENT_TYPE_ID_INT = doc_tp.DOCUMENT_TYPE_ID_INT
GROUP BY doc.DOCUMENT_DATE, doc_tp.NAME""")
> problem using broadcast join with parquet/iceberg tables
> --------------------------------------------------------
>
> Key: SPARK-45198
> URL: https://issues.apache.org/jira/browse/SPARK-45198
> Project: Spark
> Issue Type: Bug
> Components: Build
> Affects Versions: 3.4.1
> Reporter: Anton Uvarov
> Priority: Trivial
> Original Estimate: 612h
> Remaining Estimate: 612h
>
> We have 2 Parquet tables: load_test_full_warehouse.gen_document_type and
> load_test_full_warehouse.generation_document_part.
> Trying to make a left join of these two tables onto each other gives a
> strange result. In the case where on the left side of the join we use a large
> table load_test_full_warehouse.generation_document_part, the optimizer uses a
> broadcast join.
> However, in the case when on the left in the join we use a small reference
> table, the optimizer chooses to execute the query using the merge sort.
> Although it would seem that the small table on the left in a left join should
> initiate a broadcast join.
> An attempt to use hints and collect statistics did not yield results. The
> following queries were used:
> spark.sql(f"""create table iceberg_warehouse.t1 using iceberg
> as SELECT /*+ BROADCAST(doc_tp) */
> doc.DOCUMENT_DATE
> , doc_tp.NAME as DOCUMENT_TYPE
> , COUNT(*) as DOC_QTY
> FROM load_test_full_warehouse.generation_document_part doc
> LEFT JOIN load_test_full_warehouse.gen_document_type doc_tp ON
> doc.DOCUMENT_TYPE_ID_INT = doc_tp.DOCUMENT_TYPE_ID_INT
> GROUP BY doc.DOCUMENT_DATE, doc_tp.NAME""")
> == Physical Plan ==
> AtomicCreateTableAsSelect (25)
> +- AdaptiveSparkPlan (24)
> +- == Final Plan ==
> * HashAggregate (15)
> +- AQEShuffleRead (14)
> +- ShuffleQueryStage (13), Statistics(sizeInBytes=16.7 MiB,
> rowCount=3.12E+5)
> +- Exchange (12)
> +- * HashAggregate (11)
> +- * Project (10)
> +- * BroadcastHashJoin LeftOuter BuildRight (9)
> :- * Project (3)
> : +- * ColumnarToRow (2)
> : +- Scan parquet
> spark_catalog.load_test_full_warehouse.generation_document_part (1)
> +- BroadcastQueryStage (8),
> Statistics(sizeInBytes=1031.8 KiB, rowCount=1.00E+3)
> +- BroadcastExchange (7)
> +- * Filter (6)
> +- * ColumnarToRow (5)
> +- Scan parquet
> spark_catalog.load_test_full_warehouse.gen_document_type (4)
> +- == Initial Plan ==
> HashAggregate (23)
> +- Exchange (22)
> +- HashAggregate (21)
> +- Project (20)
> +- BroadcastHashJoin LeftOuter BuildRight (19)
> :- Project (16)
> : +- Scan parquet
> spark_catalog.load_test_full_warehouse.generation_document_part (1)
> +- BroadcastExchange (18)
> +- Filter (17)
> +- Scan parquet
> spark_catalog.load_test_full_warehouse.gen_document_type (4)
>
> spark.sql(f"""create table iceberg_warehouse.t2 using iceberg
> as SELECT /*+ BROADCAST(doc_tp) */
> doc.DOCUMENT_DATE
> , doc_tp.NAME as DOCUMENT_TYPE
> , COUNT(*) as DOC_QTY
> FROM load_test_full_warehouse.gen_document_type doc_tp
> LEFT JOIN load_test_full_warehouse.generation_document_part doc
> ON doc.DOCUMENT_TYPE_ID_INT = doc_tp.DOCUMENT_TYPE_ID_INT
> GROUP BY doc.DOCUMENT_DATE, doc_tp.NAME""")
> == Physical Plan ==
> AtomicCreateTableAsSelect (34)
> +- AdaptiveSparkPlan (33)
> +- == Final Plan ==
> * HashAggregate (21)
> +- AQEShuffleRead (20)
> +- ShuffleQueryStage (19), Statistics(sizeInBytes=1695.3 KiB,
> rowCount=3.10E+4)
> +- Exchange (18)
> +- * HashAggregate (17)
> +- * Project (16)
> +- * SortMergeJoin LeftOuter (15)
> :- * Sort (6)
> : +- AQEShuffleRead (5)
> : +- ShuffleQueryStage (4),
> Statistics(sizeInBytes=46.9 KiB, rowCount=1.00E+3)
> : +- Exchange (3)
> : +- * ColumnarToRow (2)
> : +- Scan parquet
> spark_catalog.load_test_full_warehouse.gen_document_type (1)
> +- * Sort (14)
> +- AQEShuffleRead (13)
> +- ShuffleQueryStage (12),
> Statistics(sizeInBytes=234.7 GiB, rowCount=1.05E+10)
> +- Exchange (11)
> +- * Project (10)
> +- * Filter (9)
> +- * ColumnarToRow (8)
> +- Scan parquet
> spark_catalog.load_test_full_warehouse.generation_document_part (7)
> +- == Initial Plan ==
> HashAggregate (32)
> +- Exchange (31)
> +- HashAggregate (30)
> +- Project (29)
> +- SortMergeJoin LeftOuter (28)
> :- Sort (23)
> : +- Exchange (22)
> : +- Scan parquet
> spark_catalog.load_test_full_warehouse.gen_document_type (1)
> +- Sort (27)
> +- Exchange (26)
> +- Project (25)
> +- Filter (24)
> +- Scan parquet
> spark_catalog.load_test_full_warehouse.generation_document_part (7)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]