Repository: spark
Updated Branches:
  refs/heads/master 32d6d9d72 -> 235d28333


[MINOR][SQL][TEST] Test shuffle hash join while is not expected

## What changes were proposed in this pull request?

igore("shuffle hash join") is to shuffle hash join to test _case class 
ShuffledHashJoinExec_.
But when you 'ignore' -> 'test', the test is _case class BroadcastHashJoinExec_.

Before modified,  as a result of:canBroadcast is true.
Print information in _canBroadcast(plan: LogicalPlan)_
```
canBroadcast plan.stats.sizeInBytes:6710880
canBroadcast conf.autoBroadcastJoinThreshold:10000000
```

After modified, plan.stats.sizeInBytes is 11184808.
Print information in _canBuildLocalHashMap(plan: LogicalPlan)_
and _muchSmaller(a: LogicalPlan, b: LogicalPlan)_ :

```
canBuildLocalHashMap plan.stats.sizeInBytes:11184808
canBuildLocalHashMap conf.autoBroadcastJoinThreshold:10000000
canBuildLocalHashMap conf.numShufflePartitions:2
```
```
muchSmaller a.stats.sizeInBytes * 3:33554424
muchSmaller b.stats.sizeInBytes:33554432
```
## How was this patch tested?

existing test case.

Author: caoxuewen <[email protected]>

Closes #19069 from heary-cao/shuffle_hash_join.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/235d2833
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/235d2833
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/235d2833

Branch: refs/heads/master
Commit: 235d28333c63719008ee755138db5c964237f526
Parents: 32d6d9d
Author: caoxuewen <[email protected]>
Authored: Wed Aug 30 10:10:24 2017 -0700
Committer: gatorsmile <[email protected]>
Committed: Wed Aug 30 10:10:24 2017 -0700

----------------------------------------------------------------------
 .../sql/execution/benchmark/JoinBenchmark.scala | 56 +++++++++++++-------
 1 file changed, 37 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/235d2833/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala
index 46db41a..5a25d72 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.benchmark
 
+import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.IntegerType
 
@@ -35,7 +36,9 @@ class JoinBenchmark extends BenchmarkBase {
 
     val dim = broadcast(sparkSession.range(M).selectExpr("id as k", "cast(id 
as string) as v"))
     runBenchmark("Join w long", N) {
-      sparkSession.range(N).join(dim, (col("id") % M) === col("k")).count()
+      val df = sparkSession.range(N).join(dim, (col("id") % M) === col("k"))
+      
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+      df.count()
     }
 
     /*
@@ -55,7 +58,9 @@ class JoinBenchmark extends BenchmarkBase {
     val dim = broadcast(sparkSession.range(M).selectExpr("id as k", "cast(id 
as string) as v"))
     runBenchmark("Join w long duplicated", N) {
       val dim = broadcast(sparkSession.range(M).selectExpr("cast(id/10 as 
long) as k"))
-      sparkSession.range(N).join(dim, (col("id") % M) === col("k")).count()
+      val df = sparkSession.range(N).join(dim, (col("id") % M) === col("k"))
+      
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+      df.count()
     }
 
     /*
@@ -75,9 +80,11 @@ class JoinBenchmark extends BenchmarkBase {
       .selectExpr("cast(id as int) as k1", "cast(id as int) as k2", "cast(id 
as string) as v"))
 
     runBenchmark("Join w 2 ints", N) {
-      sparkSession.range(N).join(dim2,
+      val df = sparkSession.range(N).join(dim2,
         (col("id") % M).cast(IntegerType) === col("k1")
-          && (col("id") % M).cast(IntegerType) === col("k2")).count()
+          && (col("id") % M).cast(IntegerType) === col("k2"))
+      
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+      df.count()
     }
 
     /*
@@ -97,9 +104,10 @@ class JoinBenchmark extends BenchmarkBase {
       .selectExpr("id as k1", "id as k2", "cast(id as string) as v"))
 
     runBenchmark("Join w 2 longs", N) {
-      sparkSession.range(N).join(dim3,
+      val df = sparkSession.range(N).join(dim3,
         (col("id") % M) === col("k1") && (col("id") % M) === col("k2"))
-        .count()
+      
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+      df.count()
     }
 
     /*
@@ -119,9 +127,10 @@ class JoinBenchmark extends BenchmarkBase {
       .selectExpr("cast(id/10 as long) as k1", "cast(id/10 as long) as k2"))
 
     runBenchmark("Join w 2 longs duplicated", N) {
-      sparkSession.range(N).join(dim4,
+      val df = sparkSession.range(N).join(dim4,
         (col("id") bitwiseAND M) === col("k1") && (col("id") bitwiseAND M) === 
col("k2"))
-        .count()
+      
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+      df.count()
     }
 
     /*
@@ -138,7 +147,9 @@ class JoinBenchmark extends BenchmarkBase {
     val M = 1 << 16
     val dim = broadcast(sparkSession.range(M).selectExpr("id as k", "cast(id 
as string) as v"))
     runBenchmark("outer join w long", N) {
-      sparkSession.range(N).join(dim, (col("id") % M) === col("k"), 
"left").count()
+      val df = sparkSession.range(N).join(dim, (col("id") % M) === col("k"), 
"left")
+      
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+      df.count()
     }
 
     /*
@@ -156,7 +167,9 @@ class JoinBenchmark extends BenchmarkBase {
     val M = 1 << 16
     val dim = broadcast(sparkSession.range(M).selectExpr("id as k", "cast(id 
as string) as v"))
     runBenchmark("semi join w long", N) {
-      sparkSession.range(N).join(dim, (col("id") % M) === col("k"), 
"leftsemi").count()
+      val df = sparkSession.range(N).join(dim, (col("id") % M) === col("k"), 
"leftsemi")
+      
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+      df.count()
     }
 
     /*
@@ -174,7 +187,9 @@ class JoinBenchmark extends BenchmarkBase {
     runBenchmark("merge join", N) {
       val df1 = sparkSession.range(N).selectExpr(s"id * 2 as k1")
       val df2 = sparkSession.range(N).selectExpr(s"id * 3 as k2")
-      df1.join(df2, col("k1") === col("k2")).count()
+      val df = df1.join(df2, col("k1") === col("k2"))
+      
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
+      df.count()
     }
 
     /*
@@ -193,7 +208,9 @@ class JoinBenchmark extends BenchmarkBase {
         .selectExpr(s"(id * 15485863) % ${N*10} as k1")
       val df2 = sparkSession.range(N)
         .selectExpr(s"(id * 15485867) % ${N*10} as k2")
-      df1.join(df2, col("k1") === col("k2")).count()
+      val df = df1.join(df2, col("k1") === col("k2"))
+      
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
+      df.count()
     }
 
     /*
@@ -212,18 +229,19 @@ class JoinBenchmark extends BenchmarkBase {
     sparkSession.conf.set("spark.sql.join.preferSortMergeJoin", "false")
     runBenchmark("shuffle hash join", N) {
       val df1 = sparkSession.range(N).selectExpr(s"id as k1")
-      val df2 = sparkSession.range(N / 5).selectExpr(s"id * 3 as k2")
-      df1.join(df2, col("k1") === col("k2")).count()
+      val df2 = sparkSession.range(N / 3).selectExpr(s"id * 3 as k2")
+      val df = df1.join(df2, col("k1") === col("k2"))
+      
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[ShuffledHashJoinExec]).isDefined)
+      df.count()
     }
 
     /*
-     *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5
-     *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+     *Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Windows 7 6.1
+     *Intel64 Family 6 Model 94 Stepping 3, GenuineIntel
      *shuffle hash join:                  Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
      
*-------------------------------------------------------------------------------------------
-     *shuffle hash join codegen=false          1101 / 1391          3.8        
 262.6       1.0X
-     *shuffle hash join codegen=true            528 /  578          7.9        
 125.8       2.1X
+     *shuffle hash join codegen=false          2005 / 2010          2.1        
 478.0       1.0X
+     *shuffle hash join codegen=true           1773 / 1792          2.4        
 422.7       1.1X
      */
   }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to