[
https://issues.apache.org/jira/browse/SPARK-25071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ayush Anubhava updated SPARK-25071:
-----------------------------------
Description:
*BuildSide is not coming as expected.*
Pre-requisites:
*CBO is set as true & spark.sql.cbo.joinReorder.enabled= true.*
*import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec*
*Steps:*
*Scenario 1:*
spark.sql("CREATE TABLE small3 (c1 bigint) TBLPROPERTIES ('numRows'='2',
'rawDataSize'='600','totalSize'='80000000000')")
spark.sql("CREATE TABLE big3 (c1 bigint) TBLPROPERTIES ('numRows'='2',
'rawDataSize'='60000000', 'totalSize'='800')")
val plan = spark.sql("select * from small3 t1 join big3 t2 on (t1.c1 =
t2.c1)").queryExecution.executedPlan
val buildSide =
plan.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide
println(buildSide)
*Result 1:*
scala> val plan = spark.sql("select * from small3 t1 join big3 t2 on (t1.c1 =
t2.c1)").queryExecution.executedPlan
plan: org.apache.spark.sql.execution.SparkPlan =
*(2) BroadcastHashJoin [c1#0L|#0L], [c1#1L|#1L], Inner, BuildRight
:- *(2) Filter isnotnull(c1#0L)
: +- HiveTableScan [c1#0L|#0L], HiveTableRelation `default`.`small3`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#0L|#0L]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *(1) Filter isnotnull(c1#1L)
+- HiveTableScan [c1#1L|#1L], HiveTableRelation `default`.`big3`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#1L|#1L]
scala> val buildSide =
plan.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide
buildSide: org.apache.spark.sql.execution.joins.BuildSide = BuildRight
scala> println(buildSide)
*BuildRight*
*Scenario 2:*
spark.sql("CREATE TABLE small4 (c1 bigint) TBLPROPERTIES ('numRows'='2',
'rawDataSize'='600','totalSize'='80')")
spark.sql("CREATE TABLE big4 (c1 bigint) TBLPROPERTIES ('numRows'='2',
'rawDataSize'='60000000', 'totalSize'='800')")
val plan = spark.sql("select * from small4 t1 join big4 t2 on (t1.c1 =
t2.c1)").queryExecution.executedPlan
val buildSide =
plan.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide
println(buildSide)
*Result 2:*
scala> val plan = spark.sql("select * from small4 t1 join big4 t2 on (t1.c1 =
t2.c1)").queryExecution.executedPlan
plan: org.apache.spark.sql.execution.SparkPlan =
*(2) BroadcastHashJoin [c1#4L|#4L], [c1#5L|#5L], Inner, BuildRight
:- *(2) Filter isnotnull(c1#4L)
: +- HiveTableScan [c1#4L|#4L], HiveTableRelation `default`.`small4`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#4L|#4L]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *(1) Filter isnotnull(c1#5L)
+- HiveTableScan [c1#5L|#5L], HiveTableRelation `default`.`big4`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#5L|#5L]
scala> val buildSide =
plan.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide
buildSide: org.apache.spark.sql.execution.joins.BuildSide = *BuildRight*
was:
*BuildSide is not coming as expected.*
Pre-requisites:
*CBO is set as true & spark.sql.cbo.joinReorder.enabled= true.*
*import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec*
*Steps:*
*Scenario 1:*
spark.sql("CREATE TABLE small3 (c1 bigint) TBLPROPERTIES ('numRows'='2',
'rawDataSize'='600','totalSize'='80000000000')")
spark.sql("CREATE TABLE big3 (c1 bigint) TBLPROPERTIES ('numRows'='2',
'rawDataSize'='60000000', 'totalSize'='800')")
val plan = spark.sql("select * from small4 t1 join big4 t2 on (t1.c1 =
t2.c1)").queryExecution.executedPlan
val buildSide = plan.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide
println(buildSide)
*Result 1:*
scala> val plan = spark.sql("select * from small3 t1 join big3 t2 on (t1.c1 =
t2.c1)").queryExecution.executedPlan
plan: org.apache.spark.sql.execution.SparkPlan =
*(2) BroadcastHashJoin [c1#0L], [c1#1L], Inner, BuildRight
:- *(2) Filter isnotnull(c1#0L)
: +- HiveTableScan [c1#0L], HiveTableRelation `default`.`small3`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#0L]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *(1) Filter isnotnull(c1#1L)
+- HiveTableScan [c1#1L], HiveTableRelation `default`.`big3`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#1L]
scala> val buildSide =
plan.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide
buildSide: org.apache.spark.sql.execution.joins.BuildSide = BuildRight
scala> println(buildSide)
*BuildRight*
*Scenario 2:*
spark.sql("CREATE TABLE small4 (c1 bigint) TBLPROPERTIES ('numRows'='2',
'rawDataSize'='600','totalSize'='80')")
spark.sql("CREATE TABLE big4 (c1 bigint) TBLPROPERTIES ('numRows'='2',
'rawDataSize'='60000000', 'totalSize'='800')")
val plan = spark.sql("select * from small4 t1 join big4 t2 on (t1.c1 =
t2.c1)").queryExecution.executedPlan
val buildSide = plan.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide
println(buildSide)
*Result 2:*
scala> val plan = spark.sql("select * from small4 t1 join big4 t2 on (t1.c1 =
t2.c1)").queryExecution.executedPlan
plan: org.apache.spark.sql.execution.SparkPlan =
*(2) BroadcastHashJoin [c1#4L], [c1#5L], Inner, BuildRight
:- *(2) Filter isnotnull(c1#4L)
: +- HiveTableScan [c1#4L], HiveTableRelation `default`.`small4`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#4L]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *(1) Filter isnotnull(c1#5L)
+- HiveTableScan [c1#5L], HiveTableRelation `default`.`big4`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#5L]
scala> val buildSide =
plan.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide
buildSide: org.apache.spark.sql.execution.joins.BuildSide = *BuildRight*
> BuildSide is coming not as expected with join queries
> -----------------------------------------------------
>
> Key: SPARK-25071
> URL: https://issues.apache.org/jira/browse/SPARK-25071
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.3.1
> Environment: Spark 2.3.1
> Hadoop 2.7.3
> Reporter: Ayush Anubhava
> Priority: Major
> Attachments: SPARK-25071_IMG2.PNG
>
>
> *BuildSide is not coming as expected.*
> Pre-requisites:
> *CBO is set as true & spark.sql.cbo.joinReorder.enabled= true.*
> *import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec*
> *Steps:*
> *Scenario 1:*
> spark.sql("CREATE TABLE small3 (c1 bigint) TBLPROPERTIES ('numRows'='2',
> 'rawDataSize'='600','totalSize'='80000000000')")
> spark.sql("CREATE TABLE big3 (c1 bigint) TBLPROPERTIES ('numRows'='2',
> 'rawDataSize'='60000000', 'totalSize'='800')")
> val plan = spark.sql("select * from small3 t1 join big3 t2 on (t1.c1 =
> t2.c1)").queryExecution.executedPlan
> val buildSide =
> plan.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide
> println(buildSide)
>
> *Result 1:*
> scala> val plan = spark.sql("select * from small3 t1 join big3 t2 on (t1.c1 =
> t2.c1)").queryExecution.executedPlan
> plan: org.apache.spark.sql.execution.SparkPlan =
> *(2) BroadcastHashJoin [c1#0L|#0L], [c1#1L|#1L], Inner, BuildRight
> :- *(2) Filter isnotnull(c1#0L)
> : +- HiveTableScan [c1#0L|#0L], HiveTableRelation `default`.`small3`,
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#0L|#0L]
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint,
> false]))
> +- *(1) Filter isnotnull(c1#1L)
> +- HiveTableScan [c1#1L|#1L], HiveTableRelation `default`.`big3`,
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#1L|#1L]
> scala> val buildSide =
> plan.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide
> buildSide: org.apache.spark.sql.execution.joins.BuildSide = BuildRight
> scala> println(buildSide)
> *BuildRight*
>
> *Scenario 2:*
> spark.sql("CREATE TABLE small4 (c1 bigint) TBLPROPERTIES ('numRows'='2',
> 'rawDataSize'='600','totalSize'='80')")
> spark.sql("CREATE TABLE big4 (c1 bigint) TBLPROPERTIES ('numRows'='2',
> 'rawDataSize'='60000000', 'totalSize'='800')")
> val plan = spark.sql("select * from small4 t1 join big4 t2 on (t1.c1 =
> t2.c1)").queryExecution.executedPlan
> val buildSide =
> plan.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide
> println(buildSide)
> *Result 2:*
> scala> val plan = spark.sql("select * from small4 t1 join big4 t2 on (t1.c1 =
> t2.c1)").queryExecution.executedPlan
> plan: org.apache.spark.sql.execution.SparkPlan =
> *(2) BroadcastHashJoin [c1#4L|#4L], [c1#5L|#5L], Inner, BuildRight
> :- *(2) Filter isnotnull(c1#4L)
> : +- HiveTableScan [c1#4L|#4L], HiveTableRelation `default`.`small4`,
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#4L|#4L]
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint,
> false]))
> +- *(1) Filter isnotnull(c1#5L)
> +- HiveTableScan [c1#5L|#5L], HiveTableRelation `default`.`big4`,
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#5L|#5L]
> scala> val buildSide =
> plan.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide
> buildSide: org.apache.spark.sql.execution.joins.BuildSide = *BuildRight*
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]