[
https://issues.apache.org/jira/browse/SPARK-40703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chao Sun updated SPARK-40703:
-----------------------------
Component/s: SQL
(was: Spark Core)
> Performance regression for joins in Spark 3.3 vs Spark 3.2
> ----------------------------------------------------------
>
> Key: SPARK-40703
> URL: https://issues.apache.org/jira/browse/SPARK-40703
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.3.0
> Reporter: Bryan Keller
> Priority: Major
> Attachments: spark32-plan.txt, spark33-plan.txt
>
>
> When running the TPC-DS benchmarks using a DSv2 datasource in Spark 3.3, a
> performance regression vs Spark 3.2 was discovered. More specifically, it
> appears as if {_}EnsureRequirements.ensureDistributionAndOrdering{_}() no
> longer enforces a minimum number of partitions for a join distribution in
> some cases. This impacts DSv2 datasources, because if a scan has only a
> single read partition {_}DataSourceV2ScanExecBase.outputPartitioning{_}()
> returns a _SinglePartition_ instance. The _SinglePartition_ creates a
> {_}SinglePartitionShuffleSpec{_}, and
> {_}SinglePartitionShuffleSpec.canCreatePartitioning{_}() returns true.
> Because {_}canCreatePartitioning{_}() returns true in this case,
> {_}EnsureRequirements.ensureDistributionAndOrdering{_}() won't enforce
> minimum parallelism and also will favor the single partition when considering
> the best distribution candidate. Ultimately this results in a single
> partition being selected for the join distribution, even if the other side of
> the join is a large table with many partitions. This can seriously impact
> performance of the join.
> Spark 3.2 enforces minimum parallelism differently in
> {_}ensureDistributionAndOrdering{_}() and thus does not suffer from this
> issue. It will shuffle both sides of the join to enforce parallelism.
> In the TPC-DS benchmark, some queries affected include 14a and 14b. This can
> also be demonstrated using a simple query, for example:
> {{select ics.i_item_sk from catalog_sales cs join item ics on cs.cs_item_sk =
> ics.i_item_sk}}
> ...where _item_ is a small table that is read into one partition, and
> _catalog_sales_ is a large table. These tables are part of the TPC-DS but you
> can create your own. Also, to demonstrate the issue, you may need to turn off
> broadcast joins though that is not required for this issue to occur, it
> happens when running the TPC-DS with broadcast setting at default.
> Attached is the plan for this query in Spark 3.2 and in Spark 3.3. The plan
> shows how in Spark 3.2, the join parallelism of 200 is reached by inserting
> an exchange after the item table scan. In Spark 3.3, no such exchange is
> inserted and the join parallelism is 1.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]