[ 
https://issues.apache.org/jira/browse/SPARK-13900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15196753#comment-15196753
 ] 

Ashok kumar Rajendran commented on SPARK-13900:
-----------------------------------------------

Explain plan for query with OR condition is as below.

Explain execution ====================
16/03/15 21:00:22 INFO datasources.DataSourceStrategy: Selected 24 partitions 
out of 24, pruned 0.0% partitions.
16/03/15 21:00:22 INFO storage.MemoryStore: Block broadcast_8 stored as values 
in memory (estimated size 73.3 KB, free 511.5 KB)
16/03/15 21:00:22 INFO storage.MemoryStore: Block broadcast_8_piece0 stored as 
bytes in memory (estimated size 4.9 KB, free 516.5 KB)
16/03/15 21:00:22 INFO storage.BlockManagerInfo: Added broadcast_8_piece0 in 
memory on 10.88.12.80:50492 (size: 4.9 KB, free: 7.0 GB)
16/03/15 21:00:22 INFO spark.SparkContext: Created broadcast 8 from explain at 
JavaSparkSQL.java:682
16/03/15 21:00:23 INFO storage.MemoryStore: Block broadcast_9 stored as values 
in memory (estimated size 73.3 KB, free 589.8 KB)
16/03/15 21:00:23 INFO storage.MemoryStore: Block broadcast_9_piece0 stored as 
bytes in memory (estimated size 4.9 KB, free 594.7 KB)
16/03/15 21:00:23 INFO storage.BlockManagerInfo: Added broadcast_9_piece0 in 
memory on 10.88.12.80:50492 (size: 4.9 KB, free: 7.0 GB)
16/03/15 21:00:23 INFO spark.SparkContext: Created broadcast 9 from explain at 
JavaSparkSQL.java:682
== Parsed Logical Plan ==
'Project 
[unresolvedalias('TableA_Dimension1),unresolvedalias('TableA_Dimension2),unresolvedalias('TableA_Dimension3),unresolvedalias('TableA_timestamp_millis),unresolvedalias('TableA_field40),unresolvedalias('TableB_dimension1
 AS TableB_dimension1#248),unresolvedalias('TableB_dimension2 AS 
inv_ua#249),unresolvedalias('TableB_dimension3 AS 
TableB_dimension3#250),unresolvedalias('TableB_timestamp_mills AS 
TableB_timestamp_mills#251)]
+- 'Filter (((((('TableA_Dimension1 = 'TableB_dimension1) || 
('TableA_Dimension3 = 'TableB_dimension3)) || ('TableA_Dimension2 = 
'TableB_dimension2)) && ('TableA_timestamp_millis >= 'TableB_timestamp_mills)) 
&& ('TableA_timestamp_millis <= ('TableB_timestamp_mills + 3600000))) && 
('TableA_partition_hour_bucket >= 'TableB_partition_hour_bucket))
   +- 'Join Inner, None
      :- 'UnresolvedRelation `TableA`, None
      +- 'UnresolvedRelation `TableB`, None

== Analyzed Logical Plan ==
TableA_Dimension1: string, TableA_Dimension2: string, TableA_Dimension3: 
string, TableA_timestamp_millis: string, TableA_field40: string, 
TableB_dimension1: string, inv_ua: string, TableB_dimension3: string, 
TableB_timestamp_mills: string
Project 
[TableA_Dimension1#74,TableA_Dimension2#94,TableA_Dimension3#68,TableA_timestamp_millis#38,TableA_field40#40,TableB_dimension1#162
 AS TableB_dimension1#248,TableB_dimension2#155 AS 
inv_ua#249,TableB_dimension3#156 AS 
TableB_dimension3#250,TableB_timestamp_mills#157 AS TableB_timestamp_mills#251]
+- Filter ((((((TableA_Dimension1#74 = TableB_dimension1#162) || 
(TableA_Dimension3#68 = TableB_dimension3#156)) || (TableA_Dimension2#94 = 
TableB_dimension2#155)) && (TableA_timestamp_millis#38 >= 
TableB_timestamp_mills#157)) && (cast(TableA_timestamp_millis#38 as double) <= 
(cast(TableB_timestamp_mills#157 as double) + cast(3600000 as double)))) && 
(TableA_partition_hour_bucket#153 >= TableB_partition_hour_bucket#159))
   +- Join Inner, None
      :- Subquery TableA
      :  +- 
Relation[TableA_field0#0,TableA_field1#1,TableA_field2#2,TableA_field3#3,TableA_field4#4,TableA_field5#5,TableA_field6#6,TableA_field7#7,TableA_field8#8,TableA_field9#9,TableA_field10#10,TableA_field11#11,TableA_field12#12,TableA_field13#13,TableA_field14#14,TableA_field15#15,....,TableA_field150#150]
 ParquetRelation
      +- Subquery TableB
         +- 
Relation[timeBucket#154,TableB_dimension2#155,TableB_dimension3#156,TableB_timestamp_mills#157,endTime#158,TableB_partition_hour_bucket#159,endTimeBucket#160,count#161L,TableB_dimension1#162]
 ParquetRelation

== Optimized Logical Plan ==
Project 
[TableA_Dimension1#74,TableA_Dimension2#94,TableA_Dimension3#68,TableA_timestamp_millis#38,TableA_field40#40,TableB_dimension1#162
 AS TableB_dimension1#248,TableB_dimension2#155 AS 
inv_ua#249,TableB_dimension3#156 AS 
TableB_dimension3#250,TableB_timestamp_mills#157 AS TableB_timestamp_mills#251]
+- Join Inner, Some(((((((TableA_Dimension1#74 = TableB_dimension1#162) || 
(TableA_Dimension3#68 = TableB_dimension3#156)) || (TableA_Dimension2#94 = 
TableB_dimension2#155)) && (TableA_timestamp_millis#38 >= 
TableB_timestamp_mills#157)) && (cast(TableA_timestamp_millis#38 as double) <= 
(cast(TableB_timestamp_mills#157 as double) + 3600000.0))) && 
(TableA_partition_hour_bucket#153 >= TableB_partition_hour_bucket#159)))
   :- Project 
[TableA_timestamp_millis#38,TableA_Dimension1#74,TableA_field40#40,TableA_Dimension3#68,TableA_partition_hour_bucket#153,TableA_Dimension2#94]
   :  +- 
Relation[TableA_field0#0,TableA_field1#1,TableA_field2#2,TableA_field3#3,TableA_field4#4,TableA_field5#5,TableA_field6#6,TableA_field7#7,TableA_field8#8,TableA_field9#9,TableA_field10#10,TableA_field11#11,TableA_field12#12,TableA_field13#13,TableA_field14#14,TableA_field15#15,....,TableA_field150#150]
 ParquetRelation
   +- Project 
[TableB_dimension3#156,TableB_partition_hour_bucket#159,TableB_dimension1#162,TableB_timestamp_mills#157,TableB_dimension2#155]
      +- 
Relation[timeBucket#154,TableB_dimension2#155,TableB_dimension3#156,TableB_timestamp_mills#157,endTime#158,TableB_partition_hour_bucket#159,endTimeBucket#160,count#161L,TableB_dimension1#162]
 ParquetRelation

== Physical Plan ==
Project 
[TableA_Dimension1#74,TableA_Dimension2#94,TableA_Dimension3#68,TableA_timestamp_millis#38,TableA_field40#40,TableB_dimension1#162
 AS TableB_dimension1#248,TableB_dimension2#155 AS 
inv_ua#249,TableB_dimension3#156 AS 
TableB_dimension3#250,TableB_timestamp_mills#157 AS TableB_timestamp_mills#251]
+- BroadcastNestedLoopJoin BuildRight, Inner, Some(((((((TableA_Dimension1#74 = 
TableB_dimension1#162) || (TableA_Dimension3#68 = TableB_dimension3#156)) || 
(TableA_Dimension2#94 = TableB_dimension2#155)) && (TableA_timestamp_millis#38 
>= TableB_timestamp_mills#157)) && (cast(TableA_timestamp_millis#38 as double) 
<= (cast(TableB_timestamp_mills#157 as double) + 3600000.0))) && 
(TableA_partition_hour_bucket#153 >= TableB_partition_hour_bucket#159)))
   :- Scan 
ParquetRelation[TableA_timestamp_millis#38,TableA_Dimension1#74,TableA_field40#40,TableA_Dimension3#68,TableA_partition_hour_bucket#153,TableA_Dimension2#94]
 InputPaths: hdfs://myhdfs:8888/backfill/akraj/spark/output/TableA-parq
   +- Scan 
ParquetRelation[TableB_dimension3#156,TableB_partition_hour_bucket#159,TableB_dimension1#162,TableB_timestamp_mills#157,TableB_dimension2#155]
 InputPaths: hdfs://myhdfs:8888/backfill/akraj/spark/input/TableB-parq
Explain execution ====================


> Spark SQL queries with OR condition is not optimized properly
> -------------------------------------------------------------
>
>                 Key: SPARK-13900
>                 URL: https://issues.apache.org/jira/browse/SPARK-13900
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.0
>            Reporter: Ashok kumar Rajendran
>
> I have a large table with few billions of rows and have a very small table 
> with 4 dimensional values. All the data is stored in parquet format. I would 
> like to get rows that match any of these dimensions. For example,
> Select field1, field2 from A, B where A.dimension1 = B.dimension1 OR 
> A.dimension2 = B.dimension2 OR A.dimension3 = B.dimension3 OR A.dimension4 = 
> B.dimension4.
> The query plan takes this as BroadcastNestedLoopJoin and executes for very 
> long time.
> If I execute this as Union queries, it takes around 1.5mins for each 
> dimension. Each query internally does BroadcastHashJoin.
> Select field1, field2 from A, B where A.dimension1 = B.dimension1
> UNION ALL
> Select field1, field2 from A, B where A.dimension2 = B.dimension2
> UNION ALL
> Select field1, field2 from A, B where  A.dimension3 = B.dimension3
> UNION ALL
> Select field1, field2 from A, B where  A.dimension4 = B.dimension4.
> This is obviously not an optimal solution as it makes multiple scanning at 
> same table but it gives result much better than OR condition. 
> Seems the SQL optimizer is not working properly which causes huge performance 
> impact on this type of OR query.
> Please correct me if I miss anything here. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to