Ayush Anubhava created SPARK-25403:
--------------------------------------
Summary: Broadcast join is changing to sort merge join , after
spark-beeline session restarts.
Key: SPARK-25403
URL: https://issues.apache.org/jira/browse/SPARK-25403
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 2.3.1
Environment: Spark 2.3.1
Hadoop 2.7.2
Reporter: Ayush Anubhava
*Issue 1*: {color:#ff0000}Broadcast join is changing to sort merge join , after
spark-beeline session restarts{color}.
*Precondition* : The JDBC/Thrift Server is continuously running.
*Steps:*
{code:java}
0: jdbc:hive2://10.18.18.214:23040/default> use x1;
+---------+--+
| Result |
+---------+--+
+---------+--+
0: jdbc:hive2://10.18.18.214:23040/default> create table cv (a int, b string)
stored as parquet;
+---------+--+
| Result |
+---------+--+
+---------+--+
0: jdbc:hive2://10.18.18.214:23040/default> create table c (a int, b string)
stored as parquet;
+---------+--+
| Result |
+---------+--+
+---------+--+
0: jdbc:hive2://10.18.18.214:23040/default> insert into table c values (1,'a');
+---------+--+
| Result |
+---------+--+
+---------+--+
0: jdbc:hive2://10.18.18.214:23040/default> insert into table cv values (1,'a');
+---------+--+
| Result |
+---------+--+
+---------+--+
0: jdbc:hive2://10.18.18.214:23040/default> select * from c , cv where c.a = cv.
+----+----+----+----+--+
| a | b | a | b |
+----+----+----+----+--+
| 1 | a | 1 | a |
+----+----+----+----+--+
{code}
{code:java}
Before Restarting the session (spark-beeline)
{code}
*{color:#d04437}explain select * from c , cv where c.a = cv.a;{color}*
|== Physical Plan ==
*(2) {color:#d04437}BroadcastHashJoin{color}[a#3284|#3284], [a#3286|#3286],
Inner, BuildRight
:- *(2) Project [a#3284, b#3285|#3284, b#3285]
: +- *(2) Filter isnotnull(a#3284)
: +- *(2) FileScan parquet x1.c[a#3284,b#3285|#3284,b#3285] Batched: true,
Format: Parquet, Location:
InMemoryFileIndex[hdfs://hacluster/user/sparkhive/warehouse/x1.db/c],
PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema:
struct<a:int,b:string>
+- {color:#d04437}BroadcastExchange{color}
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
+- *(1) Project [a#3286, b#3287|#3286, b#3287]
+- *(1) Filter isnotnull(a#3286)
+- *(1) FileScan parquet x1.cv[a#3286,b#3287|#3286,b#3287] Batched: true,
Format: Parquet, Location:
InMemoryFileIndex[hdfs://hacluster/user/sparkhive/warehouse/x1.db/cv],
PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema:
struct<a:int,b:string>|
{code:java}
After Session Restarts (spark-beeline)
{code}
{color:#d04437} *explain select * from c , cv where c.a = cv.a;*{color}
|== Physical Plan == *(5) *{color:#d04437}SortMergeJoin{color}* [a#3312|#3312],
[a#3314|#3314], Inner :- *(2) Sort [a#3312 ASC NULLS FIRST|#3312 ASC NULLS
FIRST], false, 0 : +- Exchange hashpartitioning(a#3312, 200) : +- *(1) Project
[a#3312, b#3313|#3312, b#3313] : +- *(1) Filter isnotnull(a#3312) : +- *(1)
FileScan parquet x1.c[a#3312,b#3313|#3312,b#3313] Batched: true, Format:
Parquet, Location:
InMemoryFileIndex[hdfs://hacluster/user/sparkhive/warehouse/x1.db/c],
PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema:
struct<a:int,b:string> +- *(4) Sort [a#3314 ASC NULLS FIRST|#3314 ASC NULLS
FIRST], false, 0 +- Exchange hashpartitioning(a#3314, 200) +- *(3) Project
[a#3314, b#3315|#3314, b#3315] +- *(3) Filter isnotnull(a#3314) +- *(3)
FileScan parquet x1.cv[a#3314,b#3315|#3314,b#3315] Batched: true, Format:
Parquet, Location:
InMemoryFileIndex[hdfs://hacluster/user/sparkhive/warehouse/x1.db/cv],
PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema:
struct<a:int,b:string>|
+_*Note: JDBC Server is continuously running at the time of session restart
i.e. Application is not restarting. The driver remains the same.*_+
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]