[ 
https://issues.apache.org/jira/browse/SPARK-8102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14577106#comment-14577106
 ] 

Hao Ren edited comment on SPARK-8102 at 6/8/15 1:43 PM:
--------------------------------------------------------

Here are the physical plan for the two queries:

Query 1:
{code}
Project [period#20L,categoryName#0,regionName#10,action#15,list_id#16L]
 ShuffledHashJoin [refCategoryID#3,regionCode#9], [category#17,region#18], 
BuildRight
  Exchange (HashPartitioning [refCategoryID#3,regionCode#9], 12)
   Project [regionName#10,categoryName#0,refCategoryID#3,regionCode#9]
    CartesianProduct
     Project [categoryName#0,refCategoryID#3]
      PhysicalRDD 
[categoryName#0,familyName#1,parentRefCategoryID#2,refCategoryID#3], 
MapPartitionsRDD[5] at mapPartitions at SQLContext.scala:439
     Project [regionName#10,regionCode#9]
      PhysicalRDD 
[cityName#4,countryCode#5,countryName#6,dptCode#7,dptName#8,regionCode#9,regionName#10,zipCode#11],
 MapPartitionsRDD[11] at mapPartitions at SQLContext.scala:439
  Exchange (HashPartitioning [category#17,region#18], 12)
   Project [timestamp_sec#13L AS 
period#20L,category#17,region#18,action#15,list_id#16L]
    PhysicalRDD 
[syslog#12,timestamp_sec#13L,timestamp_usec#14,action#15,list_id#16L,category#17,region#18,expiration_time#19],
 MapPartitionsRDD[16] at map at SQLContext.scala:394
{code}

Query 2:
{code}
Project [period#20L,categoryName#0,regionName#10,action#15,list_id#16L]
 ShuffledHashJoin [region#18], [regionCode#9], BuildRight
  Exchange (HashPartitioning [region#18], 12)
   Project [categoryName#0,list_id#16L,period#20L,action#15,region#18]
    ShuffledHashJoin [refCategoryID#3], [category#17], BuildRight
     Exchange (HashPartitioning [refCategoryID#3], 12)
      Project [categoryName#0,refCategoryID#3]
       PhysicalRDD 
[categoryName#0,familyName#1,parentRefCategoryID#2,refCategoryID#3], 
MapPartitionsRDD[5] at mapPartitions at SQLContext.scala:439
     Exchange (HashPartitioning [category#17], 12)
      Project [timestamp_sec#13L AS 
period#20L,category#17,region#18,action#15,list_id#16L]
       PhysicalRDD 
[syslog#12,timestamp_sec#13L,timestamp_usec#14,action#15,list_id#16L,category#17,region#18,expiration_time#19],
 MapPartitionsRDD[16] at map at SQLContext.scala:394
  Exchange (HashPartitioning [regionCode#9], 12)
   Project [regionName#10,regionCode#9]
    PhysicalRDD 
[cityName#4,countryCode#5,countryName#6,dptCode#7,dptName#8,regionCode#9,regionName#10,zipCode#11],
 MapPartitionsRDD[11] at mapPartitions at SQLContext.scala:439
{code}

And they are different.

We find `CartesianProduct` in Query 1 's physical plan. that might be why it is 
slower than the second one. As Spark is based on hive 0.13.+ in which implicit 
join notation, like the 2 queries above, is supported. It would be better that 
Spark can choose the best execution plan. Users have no idea why performance is 
improved by just permuting two tables.

Just by curiosity, I tested a third query whose physical plan is similar to the 
second one, but it takes much more time (277 s) without `CartesianProduct`:

Query 3
{code}
SELECT g.period, c.categoryName, z.regionName, action, list_id, cnt
FROM t_zipcode z, click_meter_site_grouped g, t_category c
WHERE c.refCategoryID = g.category AND z.regionCode = g.region
{code}
{code}
Project [period#20L,categoryName#0,regionName#10,action#15,list_id#16L]
 ShuffledHashJoin [category#17], [refCategoryID#3], BuildRight
  Exchange (HashPartitioning [category#17], 12)
   Project [regionName#10,list_id#16L,period#20L,category#17,action#15]
    ShuffledHashJoin [regionCode#9], [region#18], BuildRight
     Exchange (HashPartitioning [regionCode#9], 12)
      Project [regionName#10,regionCode#9]
       PhysicalRDD 
[cityName#4,countryCode#5,countryName#6,dptCode#7,dptName#8,regionCode#9,regionName#10,zipCode#11],
 MapPartitionsRDD[11] at mapPartitions at SQLContext.scala:439
     Exchange (HashPartitioning [region#18], 12)
      Project [timestamp_sec#13L AS 
period#20L,category#17,region#18,action#15,list_id#16L]
       PhysicalRDD 
[syslog#12,timestamp_sec#13L,timestamp_usec#14,action#15,list_id#16L,category#17,region#18,expiration_time#19],
 MapPartitionsRDD[16] at map at SQLContext.scala:394
  Exchange (HashPartitioning [refCategoryID#3], 12)
   Project [categoryName#0,refCategoryID#3]
    PhysicalRDD 
[categoryName#0,familyName#1,parentRefCategoryID#2,refCategoryID#3], 
MapPartitionsRDD[5] at mapPartitions at SQLContext.scala:439
{code}


was (Author: invkrh):
Here are the physical plan for the two queries:

Query 1:
{code}
Project [period#20L,categoryName#0,regionName#10,action#15,list_id#16L]
 ShuffledHashJoin [refCategoryID#3,regionCode#9], [category#17,region#18], 
BuildRight
  Exchange (HashPartitioning [refCategoryID#3,regionCode#9], 12)
   Project [regionName#10,categoryName#0,refCategoryID#3,regionCode#9]
    CartesianProduct
     Project [categoryName#0,refCategoryID#3]
      PhysicalRDD 
[categoryName#0,familyName#1,parentRefCategoryID#2,refCategoryID#3], 
MapPartitionsRDD[5] at mapPartitions at SQLContext.scala:439
     Project [regionName#10,regionCode#9]
      PhysicalRDD 
[cityName#4,countryCode#5,countryName#6,dptCode#7,dptName#8,regionCode#9,regionName#10,zipCode#11],
 MapPartitionsRDD[11] at mapPartitions at SQLContext.scala:439
  Exchange (HashPartitioning [category#17,region#18], 12)
   Project [timestamp_sec#13L AS 
period#20L,category#17,region#18,action#15,list_id#16L]
    PhysicalRDD 
[syslog#12,timestamp_sec#13L,timestamp_usec#14,action#15,list_id#16L,category#17,region#18,expiration_time#19],
 MapPartitionsRDD[16] at map at SQLContext.scala:394
{code}

Query 2:
{code}
Project [period#20L,categoryName#0,regionName#10,action#15,list_id#16L]
 ShuffledHashJoin [region#18], [regionCode#9], BuildRight
  Exchange (HashPartitioning [region#18], 12)
   Project [categoryName#0,list_id#16L,period#20L,action#15,region#18]
    ShuffledHashJoin [refCategoryID#3], [category#17], BuildRight
     Exchange (HashPartitioning [refCategoryID#3], 12)
      Project [categoryName#0,refCategoryID#3]
       PhysicalRDD 
[categoryName#0,familyName#1,parentRefCategoryID#2,refCategoryID#3], 
MapPartitionsRDD[5] at mapPartitions at SQLContext.scala:439
     Exchange (HashPartitioning [category#17], 12)
      Project [timestamp_sec#13L AS 
period#20L,category#17,region#18,action#15,list_id#16L]
       PhysicalRDD 
[syslog#12,timestamp_sec#13L,timestamp_usec#14,action#15,list_id#16L,category#17,region#18,expiration_time#19],
 MapPartitionsRDD[16] at map at SQLContext.scala:394
  Exchange (HashPartitioning [regionCode#9], 12)
   Project [regionName#10,regionCode#9]
    PhysicalRDD 
[cityName#4,countryCode#5,countryName#6,dptCode#7,dptName#8,regionCode#9,regionName#10,zipCode#11],
 MapPartitionsRDD[11] at mapPartitions at SQLContext.scala:439
{code}

And they are different.

We find `CartesianProduct` in Query 1 's physical plan. that might be why it is 
slower than the second one. As Spark is based on hive 0.13.+ in which implicit 
join notation, like the 2 queries above, is supported. It would be better that 
Spark can choose the best execution plan. Users have no idea why performance is 
improved by just permuting two tables.

Just by curiosity, I tested a third query which has the same physical plan as 
the second one, but it takes much more time (277 s) without `CartesianProduct`:

Query 3
{code}
SELECT g.period, c.categoryName, z.regionName, action, list_id, cnt
FROM t_zipcode z, click_meter_site_grouped g, t_category c
WHERE c.refCategoryID = g.category AND z.regionCode = g.region
{code}
{code}
Project [period#20L,categoryName#0,regionName#10,action#15,list_id#16L]
 ShuffledHashJoin [category#17], [refCategoryID#3], BuildRight
  Exchange (HashPartitioning [category#17], 12)
   Project [regionName#10,list_id#16L,period#20L,category#17,action#15]
    ShuffledHashJoin [regionCode#9], [region#18], BuildRight
     Exchange (HashPartitioning [regionCode#9], 12)
      Project [regionName#10,regionCode#9]
       PhysicalRDD 
[cityName#4,countryCode#5,countryName#6,dptCode#7,dptName#8,regionCode#9,regionName#10,zipCode#11],
 MapPartitionsRDD[11] at mapPartitions at SQLContext.scala:439
     Exchange (HashPartitioning [region#18], 12)
      Project [timestamp_sec#13L AS 
period#20L,category#17,region#18,action#15,list_id#16L]
       PhysicalRDD 
[syslog#12,timestamp_sec#13L,timestamp_usec#14,action#15,list_id#16L,category#17,region#18,expiration_time#19],
 MapPartitionsRDD[16] at map at SQLContext.scala:394
  Exchange (HashPartitioning [refCategoryID#3], 12)
   Project [categoryName#0,refCategoryID#3]
    PhysicalRDD 
[categoryName#0,familyName#1,parentRefCategoryID#2,refCategoryID#3], 
MapPartitionsRDD[5] at mapPartitions at SQLContext.scala:439
{code}

> Big performance difference when joining 3 tables in different order
> -------------------------------------------------------------------
>
>                 Key: SPARK-8102
>                 URL: https://issues.apache.org/jira/browse/SPARK-8102
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 1.3.1
>         Environment: spark in local mode
>            Reporter: Hao Ren
>
> Given 3 tables loaded from CSV files: 
> ( tables name => size)
> *click_meter_site_grouped* =>10 687 455 bytes
> *t_zipcode* => 2 738 954 bytes
> *t_category* => 2 182 bytes
> When joining the 3 tables, I notice a large performance difference if they 
> are joined in different order.
> Here are the SQL queries to compare:
> {code}
> -- snippet 1
> SELECT g.period, c.categoryName, z.regionName, action, list_id, cnt
> FROM t_category c, t_zipcode z, click_meter_site_grouped g
> WHERE c.refCategoryID = g.category AND z.regionCode = g.region
> {code}
> {code}
> -- snippet 2
> SELECT g.period, c.categoryName, z.regionName, action, list_id, cnt
> FROM t_category c, click_meter_site_grouped g, t_zipcode z
> WHERE c.refCategoryID = g.category AND z.regionCode = g.region
> {code}
> As you see, the largest table *click_meter_site_grouped* is the last table in 
> FROM clause in the first snippet,  and it is in the middle of table list in 
> second one.
> Snippet 2 runs three times faster than Snippet 1.
> (8 seconds VS 24 seconds)
> As the data is just sampled from a large data set, if we test it on the 
> original data set, it will normally result in a performance issue.
> After checking the log, we found something strange In snippet 1's log:
> 15/06/04 15:32:03 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:04 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:04 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:05 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:05 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:05 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:05 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:06 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:06 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:06 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:07 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:07 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:07 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:07 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:08 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:08 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:08 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:09 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:09 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:09 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:09 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:10 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:10 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:10 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:11 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:11 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:11 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:11 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:12 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:12 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:12 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:13 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:13 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:13 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:13 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:14 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:14 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:14 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:15 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:15 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:15 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:16 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:16 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:16 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:16 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:17 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:17 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:17 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:18 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:18 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:18 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:18 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:19 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:19 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:19 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:20 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> 15/06/04 15:32:20 INFO HadoopRDD: Input split: 
> file:/home/invkrh/workspace/java/data_spark_etl/data-sample/bconf/zipcodes_4.txt:0+2738954
> It seems that *t_zipcode* is loaded 56 times !!! And, for snippet 2, 
> everything is fine, all the three tables are loaded only once.
> Knowing that SparkSQL's join can automatically broadcast table in join when 
> its size is below *autoBroadcastJoinThreshold*. Not sure if the over-load is 
> caused by auto broadcast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to