[
https://issues.apache.org/jira/browse/SPARK-8102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14577106#comment-14577106
]
Hao Ren edited comment on SPARK-8102 at 6/8/15 1:11 PM:
--------------------------------------------------------
Here are the physical plan for the two queries:
Query 1:
{code}
Project [period#20L,categoryName#0,regionName#10,action#15,list_id#16L]
ShuffledHashJoin [refCategoryID#3,regionCode#9], [category#17,region#18],
BuildRight
Exchange (HashPartitioning [refCategoryID#3,regionCode#9], 12)
Project [regionName#10,categoryName#0,refCategoryID#3,regionCode#9]
CartesianProduct
Project [categoryName#0,refCategoryID#3]
PhysicalRDD
[categoryName#0,familyName#1,parentRefCategoryID#2,refCategoryID#3],
MapPartitionsRDD[5] at mapPartitions at SQLContext.scala:439
Project [regionName#10,regionCode#9]
PhysicalRDD
[cityName#4,countryCode#5,countryName#6,dptCode#7,dptName#8,regionCode#9,regionName#10,zipCode#11],
MapPartitionsRDD[11] at mapPartitions at SQLContext.scala:439
Exchange (HashPartitioning [category#17,region#18], 12)
Project [timestamp_sec#13L AS
period#20L,category#17,region#18,action#15,list_id#16L]
PhysicalRDD
[syslog#12,timestamp_sec#13L,timestamp_usec#14,action#15,list_id#16L,category#17,region#18,expiration_time#19],
MapPartitionsRDD[16] at map at SQLContext.scala:394
{code}
Query 2:
{code}
Project [period#20L,categoryName#0,regionName#10,action#15,list_id#16L]
ShuffledHashJoin [region#18], [regionCode#9], BuildRight
Exchange (HashPartitioning [region#18], 12)
Project [categoryName#0,list_id#16L,period#20L,action#15,region#18]
ShuffledHashJoin [refCategoryID#3], [category#17], BuildRight
Exchange (HashPartitioning [refCategoryID#3], 12)
Project [categoryName#0,refCategoryID#3]
PhysicalRDD
[categoryName#0,familyName#1,parentRefCategoryID#2,refCategoryID#3],
MapPartitionsRDD[5] at mapPartitions at SQLContext.scala:439
Exchange (HashPartitioning [category#17], 12)
Project [timestamp_sec#13L AS
period#20L,category#17,region#18,action#15,list_id#16L]
PhysicalRDD
[syslog#12,timestamp_sec#13L,timestamp_usec#14,action#15,list_id#16L,category#17,region#18,expiration_time#19],
MapPartitionsRDD[16] at map at SQLContext.scala:394
Exchange (HashPartitioning [regionCode#9], 12)
Project [regionName#10,regionCode#9]
PhysicalRDD
[cityName#4,countryCode#5,countryName#6,dptCode#7,dptName#8,regionCode#9,regionName#10,zipCode#11],
MapPartitionsRDD[11] at mapPartitions at SQLContext.scala:439
{code}
And they are different.
We find `CartesianProduct` in Query 1 's physical plan. that might be why it is
slower than the second one. As Spark is based on hive 0.13.+ in which implicit
join notation, like the 2 queries above, is supported. It would be better that
Spark can choose the best execution plan. Users have no idea why performance is
improved by just permuting two tables.
was (Author: invkrh):
Here are the physical plan for the two queries:
Query 1:
{code}
Project [period#20L,categoryName#0,regionName#10,action#15,list_id#16L]
ShuffledHashJoin [refCategoryID#3,regionCode#9], [category#17,region#18],
BuildRight
Exchange (HashPartitioning [refCategoryID#3,regionCode#9], 12)
Project [regionName#10,categoryName#0,refCategoryID#3,regionCode#9]
CartesianProduct
Project [categoryName#0,refCategoryID#3]
PhysicalRDD
[categoryName#0,familyName#1,parentRefCategoryID#2,refCategoryID#3],
MapPartitionsRDD[5] at mapPartitions at SQLContext.scala:439
Project [regionName#10,regionCode#9]
PhysicalRDD
[cityName#4,countryCode#5,countryName#6,dptCode#7,dptName#8,regionCode#9,regionName#10,zipCode#11],
MapPartitionsRDD[11] at mapPartitions at SQLContext.scala:439
Exchange (HashPartitioning [category#17,region#18], 12)
Project [timestamp_sec#13L AS
period#20L,category#17,region#18,action#15,list_id#16L]
PhysicalRDD
[syslog#12,timestamp_sec#13L,timestamp_usec#14,action#15,list_id#16L,category#17,region#18,expiration_time#19],
MapPartitionsRDD[16] at map at SQLContext.scala:394
{code}
Query 2:
{code}
Project [period#20L,categoryName#0,regionName#10,action#15,list_id#16L]
ShuffledHashJoin [region#18], [regionCode#9], BuildRight
Exchange (HashPartitioning [region#18], 12)
Project [categoryName#0,list_id#16L,period#20L,action#15,region#18]
ShuffledHashJoin [refCategoryID#3], [category#17], BuildRight
Exchange (HashPartitioning [refCategoryID#3], 12)
Project [categoryName#0,refCategoryID#3]
PhysicalRDD
[categoryName#0,familyName#1,parentRefCategoryID#2,refCategoryID#3],
MapPartitionsRDD[5] at mapPartitions at SQLContext.scala:439
Exchange (HashPartitioning [category#17], 12)
Project [timestamp_sec#13L AS
period#20L,category#17,region#18,action#15,list_id#16L]
PhysicalRDD
[syslog#12,timestamp_sec#13L,timestamp_usec#14,action#15,list_id#16L,category#17,region#18,expiration_time#19],
MapPartitionsRDD[16] at map at SQLContext.scala:394
Exchange (HashPartitioning [regionCode#9], 12)
Project [regionName#10,regionCode#9]
PhysicalRDD
[cityName#4,countryCode#5,countryName#6,dptCode#7,dptName#8,regionCode#9,regionName#10,zipCode#11],
MapPartitionsRDD[11] at mapPartitions at SQLContext.scala:439
{code}
And they are different.
We find `CartesianProduct` in Query 1 's physical plan. that's why it is slower
than the second one. As Spark is based on hive 0.13.+ in which implicit join
notation, like the 2 queries above, is supported. It would be better that Spark
can choose the best execution plan. Users have no idea why performance is
improved by just permuting two tables.
> Big performance difference when joining 3 tables in different order
> -------------------------------------------------------------------
>
> Key: SPARK-8102
> URL: https://issues.apache.org/jira/browse/SPARK-8102
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 1.3.1
> Environment: spark in local mode
> Reporter: Hao Ren
>
> Given 3 tables loaded from CSV files:
> ( tables name => size)
> *click_meter_site_grouped* =>10 687 455 bytes
> *t_zipcode* => 2 738 954 bytes
> *t_category* => 2 182 bytes
> When joining the 3 tables, I notice a large performance difference if they
> are joined in different order.
> Here are the SQL queries to compare:
> {code}
> -- snippet 1
> SELECT g.period, c.categoryName, z.regionName, action, list_id, cnt
> FROM t_category c, t_zipcode z, click_meter_site_grouped g
> WHERE c.refCategoryID = g.category AND z.regionCode = g.region
> {code}
> {code}
> -- snippet 2
> SELECT g.period, c.categoryName, z.regionName, action, list_id, cnt
> FROM t_category c, click_meter_site_grouped g, t_zipcode z
> WHERE c.refCategoryID = g.category AND z.regionCode = g.region
> {code}
> As you see, the largest table *click_meter_site_grouped* is the last table in
> FROM clause in the first snippet, and it is in the middle of table list in
> second one.
> Snippet 2 runs three times faster than Snippet 1.
> (8 seconds VS 24 seconds)
> As the data is just sampled from a large data set, if we test it on the
> original data set, it will normally result in a performance issue.
> After checking the log, we found something strange In snippet 1's log:
> 15/06/04 15:32:03 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:04 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:04 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:05 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:05 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:05 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:05 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:06 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:06 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:06 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:07 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:07 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:07 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:07 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:08 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:08 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:08 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:09 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:09 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:09 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:09 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:10 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:10 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:10 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:11 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:11 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:11 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:11 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:12 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:12 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:12 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:13 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:13 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:13 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:13 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:14 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:14 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:14 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:15 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:15 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:15 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:16 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:16 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:16 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:16 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:17 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:17 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:17 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:18 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:18 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:18 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:18 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:19 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:19 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:19 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:20 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:20 INFO HadoopRDD: Input split:
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> It seems that *t_zipcode* is loaded 56 times !!! And, for snippet 2,
> everything is fine, all the three tables are loaded only once.
> Knowing that SparkSQL's join can automatically broadcast table in join when
> its size is below *autoBroadcastJoinThreshold*. Not sure if the over-load is
> caused by auto broadcast.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]