[
https://issues.apache.org/jira/browse/SPARK-13900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15196753#comment-15196753
]
Ashok kumar Rajendran commented on SPARK-13900:
-----------------------------------------------
Explain plan for query with OR condition is as below.
Explain execution ====================
16/03/15 21:00:22 INFO datasources.DataSourceStrategy: Selected 24 partitions
out of 24, pruned 0.0% partitions.
16/03/15 21:00:22 INFO storage.MemoryStore: Block broadcast_8 stored as values
in memory (estimated size 73.3 KB, free 511.5 KB)
16/03/15 21:00:22 INFO storage.MemoryStore: Block broadcast_8_piece0 stored as
bytes in memory (estimated size 4.9 KB, free 516.5 KB)
16/03/15 21:00:22 INFO storage.BlockManagerInfo: Added broadcast_8_piece0 in
memory on 10.88.12.80:50492 (size: 4.9 KB, free: 7.0 GB)
16/03/15 21:00:22 INFO spark.SparkContext: Created broadcast 8 from explain at
JavaSparkSQL.java:682
16/03/15 21:00:23 INFO storage.MemoryStore: Block broadcast_9 stored as values
in memory (estimated size 73.3 KB, free 589.8 KB)
16/03/15 21:00:23 INFO storage.MemoryStore: Block broadcast_9_piece0 stored as
bytes in memory (estimated size 4.9 KB, free 594.7 KB)
16/03/15 21:00:23 INFO storage.BlockManagerInfo: Added broadcast_9_piece0 in
memory on 10.88.12.80:50492 (size: 4.9 KB, free: 7.0 GB)
16/03/15 21:00:23 INFO spark.SparkContext: Created broadcast 9 from explain at
JavaSparkSQL.java:682
== Parsed Logical Plan ==
'Project
[unresolvedalias('TableA_Dimension1),unresolvedalias('TableA_Dimension2),unresolvedalias('TableA_Dimension3),unresolvedalias('TableA_timestamp_millis),unresolvedalias('TableA_field40),unresolvedalias('TableB_dimension1
AS TableB_dimension1#248),unresolvedalias('TableB_dimension2 AS
inv_ua#249),unresolvedalias('TableB_dimension3 AS
TableB_dimension3#250),unresolvedalias('TableB_timestamp_mills AS
TableB_timestamp_mills#251)]
+- 'Filter (((((('TableA_Dimension1 = 'TableB_dimension1) ||
('TableA_Dimension3 = 'TableB_dimension3)) || ('TableA_Dimension2 =
'TableB_dimension2)) && ('TableA_timestamp_millis >= 'TableB_timestamp_mills))
&& ('TableA_timestamp_millis <= ('TableB_timestamp_mills + 3600000))) &&
('TableA_partition_hour_bucket >= 'TableB_partition_hour_bucket))
+- 'Join Inner, None
:- 'UnresolvedRelation `TableA`, None
+- 'UnresolvedRelation `TableB`, None
== Analyzed Logical Plan ==
TableA_Dimension1: string, TableA_Dimension2: string, TableA_Dimension3:
string, TableA_timestamp_millis: string, TableA_field40: string,
TableB_dimension1: string, inv_ua: string, TableB_dimension3: string,
TableB_timestamp_mills: string
Project
[TableA_Dimension1#74,TableA_Dimension2#94,TableA_Dimension3#68,TableA_timestamp_millis#38,TableA_field40#40,TableB_dimension1#162
AS TableB_dimension1#248,TableB_dimension2#155 AS
inv_ua#249,TableB_dimension3#156 AS
TableB_dimension3#250,TableB_timestamp_mills#157 AS TableB_timestamp_mills#251]
+- Filter ((((((TableA_Dimension1#74 = TableB_dimension1#162) ||
(TableA_Dimension3#68 = TableB_dimension3#156)) || (TableA_Dimension2#94 =
TableB_dimension2#155)) && (TableA_timestamp_millis#38 >=
TableB_timestamp_mills#157)) && (cast(TableA_timestamp_millis#38 as double) <=
(cast(TableB_timestamp_mills#157 as double) + cast(3600000 as double)))) &&
(TableA_partition_hour_bucket#153 >= TableB_partition_hour_bucket#159))
+- Join Inner, None
:- Subquery TableA
: +-
Relation[TableA_field0#0,TableA_field1#1,TableA_field2#2,TableA_field3#3,TableA_field4#4,TableA_field5#5,TableA_field6#6,TableA_field7#7,TableA_field8#8,TableA_field9#9,TableA_field10#10,TableA_field11#11,TableA_field12#12,TableA_field13#13,TableA_field14#14,TableA_field15#15,....,TableA_field150#150]
ParquetRelation
+- Subquery TableB
+-
Relation[timeBucket#154,TableB_dimension2#155,TableB_dimension3#156,TableB_timestamp_mills#157,endTime#158,TableB_partition_hour_bucket#159,endTimeBucket#160,count#161L,TableB_dimension1#162]
ParquetRelation
== Optimized Logical Plan ==
Project
[TableA_Dimension1#74,TableA_Dimension2#94,TableA_Dimension3#68,TableA_timestamp_millis#38,TableA_field40#40,TableB_dimension1#162
AS TableB_dimension1#248,TableB_dimension2#155 AS
inv_ua#249,TableB_dimension3#156 AS
TableB_dimension3#250,TableB_timestamp_mills#157 AS TableB_timestamp_mills#251]
+- Join Inner, Some(((((((TableA_Dimension1#74 = TableB_dimension1#162) ||
(TableA_Dimension3#68 = TableB_dimension3#156)) || (TableA_Dimension2#94 =
TableB_dimension2#155)) && (TableA_timestamp_millis#38 >=
TableB_timestamp_mills#157)) && (cast(TableA_timestamp_millis#38 as double) <=
(cast(TableB_timestamp_mills#157 as double) + 3600000.0))) &&
(TableA_partition_hour_bucket#153 >= TableB_partition_hour_bucket#159)))
:- Project
[TableA_timestamp_millis#38,TableA_Dimension1#74,TableA_field40#40,TableA_Dimension3#68,TableA_partition_hour_bucket#153,TableA_Dimension2#94]
: +-
Relation[TableA_field0#0,TableA_field1#1,TableA_field2#2,TableA_field3#3,TableA_field4#4,TableA_field5#5,TableA_field6#6,TableA_field7#7,TableA_field8#8,TableA_field9#9,TableA_field10#10,TableA_field11#11,TableA_field12#12,TableA_field13#13,TableA_field14#14,TableA_field15#15,....,TableA_field150#150]
ParquetRelation
+- Project
[TableB_dimension3#156,TableB_partition_hour_bucket#159,TableB_dimension1#162,TableB_timestamp_mills#157,TableB_dimension2#155]
+-
Relation[timeBucket#154,TableB_dimension2#155,TableB_dimension3#156,TableB_timestamp_mills#157,endTime#158,TableB_partition_hour_bucket#159,endTimeBucket#160,count#161L,TableB_dimension1#162]
ParquetRelation
== Physical Plan ==
Project
[TableA_Dimension1#74,TableA_Dimension2#94,TableA_Dimension3#68,TableA_timestamp_millis#38,TableA_field40#40,TableB_dimension1#162
AS TableB_dimension1#248,TableB_dimension2#155 AS
inv_ua#249,TableB_dimension3#156 AS
TableB_dimension3#250,TableB_timestamp_mills#157 AS TableB_timestamp_mills#251]
+- BroadcastNestedLoopJoin BuildRight, Inner, Some(((((((TableA_Dimension1#74 =
TableB_dimension1#162) || (TableA_Dimension3#68 = TableB_dimension3#156)) ||
(TableA_Dimension2#94 = TableB_dimension2#155)) && (TableA_timestamp_millis#38
>= TableB_timestamp_mills#157)) && (cast(TableA_timestamp_millis#38 as double)
<= (cast(TableB_timestamp_mills#157 as double) + 3600000.0))) &&
(TableA_partition_hour_bucket#153 >= TableB_partition_hour_bucket#159)))
:- Scan
ParquetRelation[TableA_timestamp_millis#38,TableA_Dimension1#74,TableA_field40#40,TableA_Dimension3#68,TableA_partition_hour_bucket#153,TableA_Dimension2#94]
InputPaths: hdfs://myhdfs:8888/backfill/akraj/spark/output/TableA-parq
+- Scan
ParquetRelation[TableB_dimension3#156,TableB_partition_hour_bucket#159,TableB_dimension1#162,TableB_timestamp_mills#157,TableB_dimension2#155]
InputPaths: hdfs://myhdfs:8888/backfill/akraj/spark/input/TableB-parq
Explain execution ====================
> Spark SQL queries with OR condition is not optimized properly
> -------------------------------------------------------------
>
> Key: SPARK-13900
> URL: https://issues.apache.org/jira/browse/SPARK-13900
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.6.0
> Reporter: Ashok kumar Rajendran
>
> I have a large table with few billions of rows and have a very small table
> with 4 dimensional values. All the data is stored in parquet format. I would
> like to get rows that match any of these dimensions. For example,
> Select field1, field2 from A, B where A.dimension1 = B.dimension1 OR
> A.dimension2 = B.dimension2 OR A.dimension3 = B.dimension3 OR A.dimension4 =
> B.dimension4.
> The query plan takes this as BroadcastNestedLoopJoin and executes for very
> long time.
> If I execute this as Union queries, it takes around 1.5mins for each
> dimension. Each query internally does BroadcastHashJoin.
> Select field1, field2 from A, B where A.dimension1 = B.dimension1
> UNION ALL
> Select field1, field2 from A, B where A.dimension2 = B.dimension2
> UNION ALL
> Select field1, field2 from A, B where A.dimension3 = B.dimension3
> UNION ALL
> Select field1, field2 from A, B where A.dimension4 = B.dimension4.
> This is obviously not an optimal solution as it makes multiple scanning at
> same table but it gives result much better than OR condition.
> Seems the SQL optimizer is not working properly which causes huge performance
> impact on this type of OR query.
> Please correct me if I miss anything here.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]