[ https://issues.apache.org/jira/browse/SPARK-25071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16577385#comment-16577385 ]
Yuming Wang commented on SPARK-25071: ------------------------------------- I think it's correct. CBO based on RowCount. {code:scala} def getOutputSize( attributes: Seq[Attribute], outputRowCount: BigInt, attrStats: AttributeMap[ColumnStat] = AttributeMap(Nil)): BigInt = { // Output size can't be zero, or sizeInBytes of BinaryNode will also be zero // (simple computation of statistics returns product of children). if (outputRowCount > 0) outputRowCount * getSizePerRow(attributes, attrStats) else 1 } {code} {code:scala} def getSizePerRow( attributes: Seq[Attribute], attrStats: AttributeMap[ColumnStat] = AttributeMap(Nil)): BigInt = { // We assign a generic overhead for a Row object, the actual overhead is different for different // Row format. 8 + attributes.map { attr => if (attrStats.get(attr).map(_.avgLen.isDefined).getOrElse(false)) { attr.dataType match { case StringType => // UTF8String: base + offset + numBytes attrStats(attr).avgLen.get + 8 + 4 case _ => attrStats(attr).avgLen.get } } else { attr.dataType.defaultSize } }.sum } {code} So for Scenario 2: right.stats.sizeInBytes=32 left.stats.sizeInBytes=32 {code:scala} private def broadcastSide( canBuildLeft: Boolean, canBuildRight: Boolean, left: LogicalPlan, right: LogicalPlan): BuildSide = { def smallerSide = if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight else BuildLeft if (canBuildRight && canBuildLeft) { // Broadcast smaller side base on its estimated physical size // if both sides have broadcast hint smallerSide } else if (canBuildRight) { BuildRight } else if (canBuildLeft) { BuildLeft } else { // for the last default broadcast nested loop join smallerSide } } {code} you can verify it by: {code:scala} spark.sql("CREATE TABLE small4 (c1 bigint) TBLPROPERTIES ('numRows'='2', 'rawDataSize'='600','totalSize'='80')") spark.sql("CREATE TABLE big4 (c1 string) TBLPROPERTIES ('numRows'='2', 'rawDataSize'='60000000', 'totalSize'='800')") val plan = spark.sql("select * from small4 t1 join big4 t2 on (t1.c1 = t2.c1)").queryExecution.executedPlan val buildSide = plan.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide println(buildSide) or spark.sql("CREATE TABLE small4 (c1 bigint) TBLPROPERTIES ('numRows'='2', 'rawDataSize'='600','totalSize'='80')") spark.sql("CREATE TABLE big4 (c1 bigint, c2 bigint) TBLPROPERTIES ('numRows'='2', 'rawDataSize'='60000000', 'totalSize'='800')") val plan = spark.sql("select * from small4 t1 join big4 t2 on (t1.c1 = t2.c1)").queryExecution.executedPlan val buildSide = plan.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide println(buildSide) {code} > BuildSide is coming not as expected with join queries > ----------------------------------------------------- > > Key: SPARK-25071 > URL: https://issues.apache.org/jira/browse/SPARK-25071 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.3.1 > Environment: Spark 2.3.1 > Hadoop 2.7.3 > Reporter: Ayush Anubhava > Priority: Major > > *BuildSide is not coming as expected.* > Pre-requisites: > *CBO is set as true & spark.sql.cbo.joinReorder.enabled= true.* > *import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec* > *Steps:* > *Scenario 1:* > spark.sql("CREATE TABLE small3 (c1 bigint) TBLPROPERTIES ('numRows'='2', > 'rawDataSize'='600','totalSize'='80000000000')") > spark.sql("CREATE TABLE big3 (c1 bigint) TBLPROPERTIES ('numRows'='2', > 'rawDataSize'='60000000', 'totalSize'='800')") > val plan = spark.sql("select * from small3 t1 join big3 t2 on (t1.c1 = > t2.c1)").queryExecution.executedPlan > val buildSide = > plan.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide > println(buildSide) > > *Result 1:* > scala> val plan = spark.sql("select * from small3 t1 join big3 t2 on (t1.c1 = > t2.c1)").queryExecution.executedPlan > plan: org.apache.spark.sql.execution.SparkPlan = > *(2) BroadcastHashJoin [c1#0L|#0L], [c1#1L|#1L], Inner, BuildRight > :- *(2) Filter isnotnull(c1#0L) > : +- HiveTableScan [c1#0L|#0L], HiveTableRelation `default`.`small3`, > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#0L|#0L] > +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, > false])) > +- *(1) Filter isnotnull(c1#1L) > +- HiveTableScan [c1#1L|#1L], HiveTableRelation `default`.`big3`, > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#1L|#1L] > scala> val buildSide = > plan.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide > buildSide: org.apache.spark.sql.execution.joins.BuildSide = BuildRight > scala> println(buildSide) > *BuildRight* > > *Scenario 2:* > spark.sql("CREATE TABLE small4 (c1 bigint) TBLPROPERTIES ('numRows'='2', > 'rawDataSize'='600','totalSize'='80')") > spark.sql("CREATE TABLE big4 (c1 bigint) TBLPROPERTIES ('numRows'='2', > 'rawDataSize'='60000000', 'totalSize'='800')") > val plan = spark.sql("select * from small4 t1 join big4 t2 on (t1.c1 = > t2.c1)").queryExecution.executedPlan > val buildSide = > plan.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide > println(buildSide) > *Result 2:* > scala> val plan = spark.sql("select * from small4 t1 join big4 t2 on (t1.c1 = > t2.c1)").queryExecution.executedPlan > plan: org.apache.spark.sql.execution.SparkPlan = > *(2) BroadcastHashJoin [c1#4L|#4L], [c1#5L|#5L], Inner, BuildRight > :- *(2) Filter isnotnull(c1#4L) > : +- HiveTableScan [c1#4L|#4L], HiveTableRelation `default`.`small4`, > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#4L|#4L] > +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, > false])) > +- *(1) Filter isnotnull(c1#5L) > +- HiveTableScan [c1#5L|#5L], HiveTableRelation `default`.`big4`, > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#5L|#5L] > scala> val buildSide = > plan.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide > buildSide: org.apache.spark.sql.execution.joins.BuildSide = *BuildRight* > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org