[ https://issues.apache.org/jira/browse/SPARK-36568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-36568: ------------------------------------ Assignee: Apache Spark > Missed broadcast join in V2 plan > -------------------------------- > > Key: SPARK-36568 > URL: https://issues.apache.org/jira/browse/SPARK-36568 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.2.0 > Reporter: Bruce Robbins > Assignee: Apache Spark > Priority: Major > > There are some joins that use broadcast hash join with DataSourceV1 but sort > merge join with DataSourceV2, even though the two joins are loading the same > files [1]. > Example: > Create data: > {noformat} > import scala.util.Random > val rand = new Random(245665L) > val df = spark.range(1, 20000).map { x => > (x, > rand.alphanumeric.take(20).mkString, > rand.alphanumeric.take(20).mkString, > rand.alphanumeric.take(20).mkString > ) > }.toDF("key", "col1", "col2", "col3") > df.write.mode("overwrite").format("parquet").save("/tmp/tbl") > df.write.mode("overwrite").format("parquet").save("/tmp/lookup") > {noformat} > Run this code: > {noformat} > bin/spark-shell --conf spark.sql.autoBroadcastJoinThreshold=400000 > spark.read.format("parquet").load("/tmp/tbl").createOrReplaceTempView("tbl") > spark.read.format("parquet").load("/tmp/lookup").createOrReplaceTempView("lookup") > sql("""select t.key, t.col1, t.col2, t.col3 > from tbl t > join lookup l > on t.key = l.key""").explain > {noformat} > For V2, do the same, except set {{spark.sql.sources.useV1SourceList=""}}. > For V1, the result is: > {noformat} > == Physical Plan == > AdaptiveSparkPlan isFinalPlan=false > +- Project [key#0L, col1#1, col2#2, col3#3] > +- BroadcastHashJoin [key#0L], [key#8L], Inner, BuildRight, false > :- Filter isnotnull(key#0L) > : +- FileScan parquet [key#0L,col1#1,col2#2,col3#3] Batched: true, > DataFilters: [isnotnull(key#0L)], Format: Parquet, Location: > InMemoryFileIndex(1 paths)[file:/tmp/tbl], PartitionFilters: [], > PushedFilters: [IsNotNull(key)], ReadSchema: > struct<key:bigint,col1:string,col2:string,col3:string> > +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, > false]),false), [id=#32] > +- Filter isnotnull(key#8L) > +- FileScan parquet [key#8L] Batched: true, DataFilters: > [isnotnull(key#8L)], Format: Parquet, Location: InMemoryFileIndex(1 > paths)[file:/tmp/lookup], PartitionFilters: [], PushedFilters: > [IsNotNull(key)], ReadSchema: struct<key:bigint> > {noformat} > For V2, the result is: > {noformat} > == Physical Plan == > AdaptiveSparkPlan isFinalPlan=false > +- Project [key#0L, col1#1, col2#2, col3#3] > +- SortMergeJoin [key#0L], [key#8L], Inner > :- Sort [key#0L ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(key#0L, 200), ENSURE_REQUIREMENTS, > [id=#33] > : +- Filter isnotnull(key#0L) > : +- BatchScan[key#0L, col1#1, col2#2, col3#3] ParquetScan > DataFilters: [isnotnull(key#0L)], Format: parquet, Location: > InMemoryFileIndex(1 paths)[file:/tmp/tbl], PartitionFilters: [], > PushedFilters: [IsNotNull(key)], ReadSchema: > struct<key:bigint,col1:string,col2:string,col3:string>, PushedFilters: > [IsNotNull(key)] RuntimeFilters: [] > +- Sort [key#8L ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(key#8L, 200), ENSURE_REQUIREMENTS, > [id=#34] > +- Filter isnotnull(key#8L) > +- BatchScan[key#8L] ParquetScan DataFilters: > [isnotnull(key#8L)], Format: parquet, Location: InMemoryFileIndex(1 > paths)[file:/tmp/lookup], PartitionFilters: [], PushedFilters: > [IsNotNull(key)], ReadSchema: struct<key:bigint>, PushedFilters: > [IsNotNull(key)] RuntimeFilters: [] > {noformat} > The initial plan with V1 uses broadcast hash join, but the initial plan with > V2 uses sort merge join. > The V1 logical plan contains a projection over the relation for {{lookup}}, > which restricts the output columns to just {{key}}. As a result, > {{SizeInBytesOnlyStatsPlanVisitor#visitUnaryNode}}, when visiting the project > node, reduces sizeInBytes based on the pruning: > {noformat} > Project [key#0L, col1#1, col2#2, col3#3] > +- Join Inner, (key#0L = key#8L) > :- Filter isnotnull(key#0L) > : +- Relation [key#0L,col1#1,col2#2,col3#3] parquet > +- Project [key#8L] > +- Filter isnotnull(key#8L) > +- Relation [key#8L,col1#9,col2#10,col3#11] parquet > {noformat} > The V2 logical plan does not contain this projection: > {noformat} > +- Join Inner, (key#0L = key#8L) > :- Filter isnotnull(key#0L) > : +- RelationV2[key#0L, col1#1, col2#2, col3#3] parquet file:/tmp/tbl > +- Filter isnotnull(key#8L) > +- RelationV2[key#8L] parquet file:/tmp/lookup > {noformat} > [1] With my example, AQE converts the join to a broadcast hash join at run > time for the V2 case. However, if AQE was disabled, it would obviously remain > a sort merge join. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org