[ https://issues.apache.org/jira/browse/SPARK-40703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-40703: ------------------------------------ Assignee: (was: Apache Spark) > Performance regression for joins in Spark 3.3 vs Spark 3.2 > ---------------------------------------------------------- > > Key: SPARK-40703 > URL: https://issues.apache.org/jira/browse/SPARK-40703 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.3.0 > Reporter: Bryan Keller > Priority: Major > Attachments: spark32-plan.txt, spark33-plan.txt, test.py > > > When running the TPC-DS benchmarks using a DSv2 datasource in Spark 3.3, a > performance regression vs Spark 3.2 was discovered. More specifically, it > appears as if {_}EnsureRequirements.ensureDistributionAndOrdering{_}() no > longer enforces a minimum number of partitions for a join distribution in > some cases. This impacts DSv2 datasources, because if a scan has only a > single read partition {_}DataSourceV2ScanExecBase.outputPartitioning{_}() > returns a _SinglePartition_ instance. The _SinglePartition_ creates a > {_}SinglePartitionShuffleSpec{_}, and > {_}SinglePartitionShuffleSpec.canCreatePartitioning{_}() returns true. > Because {_}canCreatePartitioning{_}() returns true in this case, > {_}EnsureRequirements.ensureDistributionAndOrdering{_}() won't enforce > minimum parallelism and also will favor the single partition when considering > the best distribution candidate. Ultimately this results in a single > partition being selected for the join distribution, even if the other side of > the join is a large table with many partitions. This can seriously impact > performance of the join. > Spark 3.2 enforces minimum parallelism differently in > {_}ensureDistributionAndOrdering{_}() and thus does not suffer from this > issue. It will shuffle both sides of the join to enforce parallelism. > In the TPC-DS benchmark, some queries affected include 14a and 14b. This can > also be demonstrated using a simple query, for example: > {{select ics.i_item_sk from catalog_sales cs join item ics on cs.cs_item_sk = > ics.i_item_sk}} > ...where _item_ is a small table that is read into one partition, and > _catalog_sales_ is a large table. These tables are part of the TPC-DS but you > can create your own. Also, to demonstrate the issue, you may need to turn off > broadcast joins though that is not required for this issue to occur, it > happens when running the TPC-DS with broadcast setting at default. > Attached is the plan for this query in Spark 3.2 and in Spark 3.3. The plan > shows how in Spark 3.2, the join parallelism of 200 is reached by inserting > an exchange after the item table scan. In Spark 3.3, no such exchange is > inserted and the join parallelism is 1. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org