Repository: spark
Updated Branches:
  refs/heads/master 4e141a416 -> e965fb55a


[SPARK-25664][SQL][TEST] Refactor JoinBenchmark to use main method

## What changes were proposed in this pull request?

Refactor `JoinBenchmark` to use main method.
1. use `spark-submit`:
```console
bin/spark-submit --class  
org.apache.spark.sql.execution.benchmark.JoinBenchmark --jars 
./core/target/spark-core_2.11-3.0.0-SNAPSHOT-tests.jar 
./sql/catalyst/target/spark-sql_2.11-3.0.0-SNAPSHOT-tests.jar
```

2. Generate benchmark result:
```console
SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain 
org.apache.spark.sql.execution.benchmark.JoinBenchmark"
```

## How was this patch tested?

manual tests

Closes #22661 from wangyum/SPARK-25664.

Lead-authored-by: Yuming Wang <[email protected]>
Co-authored-by: Yuming Wang <[email protected]>
Co-authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e965fb55
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e965fb55
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e965fb55

Branch: refs/heads/master
Commit: e965fb55acf714bdd639d13d73f75a7d7b43efca
Parents: 4e141a4
Author: Yuming Wang <[email protected]>
Authored: Fri Oct 12 16:08:12 2018 -0700
Committer: Dongjoon Hyun <[email protected]>
Committed: Fri Oct 12 16:08:12 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/benchmark/Benchmark.scala  |   3 +-
 sql/core/benchmarks/JoinBenchmark-results.txt   |  75 +++++++
 .../sql/execution/benchmark/JoinBenchmark.scala | 216 +++++++------------
 3 files changed, 152 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e965fb55/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala 
b/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala
index 7a36b5f..bb389cd 100644
--- a/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala
+++ b/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala
@@ -200,11 +200,12 @@ private[spark] object Benchmark {
   def getProcessorName(): String = {
     val cpu = if (SystemUtils.IS_OS_MAC_OSX) {
       Utils.executeAndGetOutput(Seq("/usr/sbin/sysctl", "-n", 
"machdep.cpu.brand_string"))
+        .stripLineEnd
     } else if (SystemUtils.IS_OS_LINUX) {
       Try {
         val grepPath = Utils.executeAndGetOutput(Seq("which", 
"grep")).stripLineEnd
         Utils.executeAndGetOutput(Seq(grepPath, "-m", "1", "model name", 
"/proc/cpuinfo"))
-        .stripLineEnd.replaceFirst("model name[\\s*]:[\\s*]", "")
+          .stripLineEnd.replaceFirst("model name[\\s*]:[\\s*]", "")
       }.getOrElse("Unknown processor")
     } else {
       System.getenv("PROCESSOR_IDENTIFIER")

http://git-wip-us.apache.org/repos/asf/spark/blob/e965fb55/sql/core/benchmarks/JoinBenchmark-results.txt
----------------------------------------------------------------------
diff --git a/sql/core/benchmarks/JoinBenchmark-results.txt 
b/sql/core/benchmarks/JoinBenchmark-results.txt
new file mode 100644
index 0000000..8ceb5e7
--- /dev/null
+++ b/sql/core/benchmarks/JoinBenchmark-results.txt
@@ -0,0 +1,75 @@
+================================================================================================
+Join Benchmark
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Join w long:                             Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+Join w long wholestage off                    4464 / 4483          4.7         
212.9       1.0X
+Join w long wholestage on                      289 /  339         72.6         
 13.8      15.5X
+
+OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Join w long duplicated:                  Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+Join w long duplicated wholestage off         5662 / 5678          3.7         
270.0       1.0X
+Join w long duplicated wholestage on           332 /  345         63.1         
 15.8      17.0X
+
+OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Join w 2 ints:                           Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+Join w 2 ints wholestage off              173174 / 173183          0.1        
8257.6       1.0X
+Join w 2 ints wholestage on               166350 / 198362          0.1        
7932.2       1.0X
+
+OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Join w 2 longs:                          Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+Join w 2 longs wholestage off                 7055 / 7214          3.0         
336.4       1.0X
+Join w 2 longs wholestage on                  1869 / 1985         11.2         
 89.1       3.8X
+
+OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Join w 2 longs duplicated:               Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+Join w 2 longs duplicated wholestage off    19256 / 20283          1.1         
918.2       1.0X
+Join w 2 longs duplicated wholestage on       2467 / 2544          8.5         
117.7       7.8X
+
+OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+outer join w long:                       Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+outer join w long wholestage off              3756 / 3761          5.6         
179.1       1.0X
+outer join w long wholestage on                218 /  250         96.2         
 10.4      17.2X
+
+OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+semi join w long:                        Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+semi join w long wholestage off               2393 / 2416          8.8         
114.1       1.0X
+semi join w long wholestage on                 214 /  218         97.9         
 10.2      11.2X
+
+OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+sort merge join:                              Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+sort merge join wholestage off                     2318 / 2392          0.9    
    1105.3       1.0X
+sort merge join wholestage on                      1669 / 1811          1.3    
     795.9       1.4X
+
+OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+sort merge join with duplicates:         Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+sort merge join with duplicates wholestage off      2966 / 2976          0.7   
     1414.5       1.0X
+sort merge join with duplicates wholestage on      2413 / 2641          0.9    
    1150.5       1.2X
+
+OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+shuffle hash join:                       Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+shuffle hash join wholestage off              1475 / 1479          2.8         
351.7       1.0X
+shuffle hash join wholestage on               1209 / 1238          3.5         
288.3       1.2X
+
+

http://git-wip-us.apache.org/repos/asf/spark/blob/e965fb55/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala
index 37744dc..7bad4cb 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala
@@ -19,229 +19,163 @@ package org.apache.spark.sql.execution.benchmark
 
 import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.IntegerType
 
 /**
- * Benchmark to measure performance for aggregate primitives.
- * To run this:
- *  build/sbt "sql/test-only *benchmark.JoinBenchmark"
- *
- * Benchmarks in this file are skipped in normal builds.
+ * Benchmark to measure performance for joins.
+ * To run this benchmark:
+ * {{{
+ *   1. without sbt:
+ *      bin/spark-submit --class <this class> --jars <spark core test jar> 
<spark sql test jar>
+ *   2. build/sbt "sql/test:runMain <this class>"
+ *   3. generate result:
+ *      SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this 
class>"
+ *      Results will be written to "benchmarks/JoinBenchmark-results.txt".
+ * }}}
  */
-class JoinBenchmark extends BenchmarkWithCodegen {
+object JoinBenchmark extends SqlBasedBenchmark {
 
-  ignore("broadcast hash join, long key") {
+  def broadcastHashJoinLongKey(): Unit = {
     val N = 20 << 20
     val M = 1 << 16
 
-    val dim = broadcast(sparkSession.range(M).selectExpr("id as k", "cast(id 
as string) as v"))
-    runBenchmark("Join w long", N) {
-      val df = sparkSession.range(N).join(dim, (col("id") % M) === col("k"))
+    val dim = broadcast(spark.range(M).selectExpr("id as k", "cast(id as 
string) as v"))
+    codegenBenchmark("Join w long", N) {
+      val df = spark.range(N).join(dim, (col("id") % M) === col("k"))
       
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
       df.count()
     }
-
-    /*
-    Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5
-    Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
-    Join w long:                        Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
-    
-------------------------------------------------------------------------------------------
-    Join w long codegen=false                3002 / 3262          7.0         
143.2       1.0X
-    Join w long codegen=true                  321 /  371         65.3          
15.3       9.3X
-    */
   }
 
-  ignore("broadcast hash join, long key with duplicates") {
+  def broadcastHashJoinLongKeyWithDuplicates(): Unit = {
     val N = 20 << 20
     val M = 1 << 16
-
-    val dim = broadcast(sparkSession.range(M).selectExpr("id as k", "cast(id 
as string) as v"))
-    runBenchmark("Join w long duplicated", N) {
-      val dim = broadcast(sparkSession.range(M).selectExpr("cast(id/10 as 
long) as k"))
-      val df = sparkSession.range(N).join(dim, (col("id") % M) === col("k"))
+    val dim = broadcast(spark.range(M).selectExpr("cast(id/10 as long) as k"))
+    codegenBenchmark("Join w long duplicated", N) {
+      val df = spark.range(N).join(dim, (col("id") % M) === col("k"))
       
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
       df.count()
     }
-
-    /*
-     *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5
-     *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
-     *Join w long duplicated:             Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
-     
*-------------------------------------------------------------------------------------------
-     *Join w long duplicated codegen=false      3446 / 3478          6.1       
  164.3       1.0X
-     *Join w long duplicated codegen=true       322 /  351         65.2        
  15.3      10.7X
-     */
   }
 
-  ignore("broadcast hash join, two int key") {
+  def broadcastHashJoinTwoIntKey(): Unit = {
     val N = 20 << 20
     val M = 1 << 16
-    val dim2 = broadcast(sparkSession.range(M)
+    val dim2 = broadcast(spark.range(M)
       .selectExpr("cast(id as int) as k1", "cast(id as int) as k2", "cast(id 
as string) as v"))
 
-    runBenchmark("Join w 2 ints", N) {
-      val df = sparkSession.range(N).join(dim2,
+    codegenBenchmark("Join w 2 ints", N) {
+      val df = spark.range(N).join(dim2,
         (col("id") % M).cast(IntegerType) === col("k1")
           && (col("id") % M).cast(IntegerType) === col("k2"))
       
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
       df.count()
     }
-
-    /*
-     *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5
-     *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
-     *Join w 2 ints:                      Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
-     
*-------------------------------------------------------------------------------------------
-     *Join w 2 ints codegen=false              4426 / 4501          4.7        
 211.1       1.0X
-     *Join w 2 ints codegen=true                791 /  818         26.5        
  37.7       5.6X
-     */
   }
 
-  ignore("broadcast hash join, two long key") {
+  def broadcastHashJoinTwoLongKey(): Unit = {
     val N = 20 << 20
     val M = 1 << 16
-    val dim3 = broadcast(sparkSession.range(M)
+    val dim3 = broadcast(spark.range(M)
       .selectExpr("id as k1", "id as k2", "cast(id as string) as v"))
 
-    runBenchmark("Join w 2 longs", N) {
-      val df = sparkSession.range(N).join(dim3,
+    codegenBenchmark("Join w 2 longs", N) {
+      val df = spark.range(N).join(dim3,
         (col("id") % M) === col("k1") && (col("id") % M) === col("k2"))
       
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
       df.count()
     }
-
-    /*
-     *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5
-     *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
-     *Join w 2 longs:                     Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
-     
*-------------------------------------------------------------------------------------------
-     *Join w 2 longs codegen=false             5905 / 6123          3.6        
 281.6       1.0X
-     *Join w 2 longs codegen=true              2230 / 2529          9.4        
 106.3       2.6X
-     */
   }
 
-  ignore("broadcast hash join, two long key with duplicates") {
+  def broadcastHashJoinTwoLongKeyWithDuplicates(): Unit = {
     val N = 20 << 20
     val M = 1 << 16
-    val dim4 = broadcast(sparkSession.range(M)
+    val dim4 = broadcast(spark.range(M)
       .selectExpr("cast(id/10 as long) as k1", "cast(id/10 as long) as k2"))
 
-    runBenchmark("Join w 2 longs duplicated", N) {
-      val df = sparkSession.range(N).join(dim4,
+    codegenBenchmark("Join w 2 longs duplicated", N) {
+      val df = spark.range(N).join(dim4,
         (col("id") bitwiseAND M) === col("k1") && (col("id") bitwiseAND M) === 
col("k2"))
       
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
       df.count()
     }
-
-    /*
-     *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
-     *Join w 2 longs duplicated:          Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
-     
*-------------------------------------------------------------------------------------------
-     *Join w 2 longs duplicated codegen=false      6420 / 6587          3.3    
     306.1       1.0X
-     *Join w 2 longs duplicated codegen=true      2080 / 2139         10.1     
     99.2       3.1X
-     */
   }
 
-  ignore("broadcast hash join, outer join long key") {
+  def broadcastHashJoinOuterJoinLongKey(): Unit = {
     val N = 20 << 20
     val M = 1 << 16
-    val dim = broadcast(sparkSession.range(M).selectExpr("id as k", "cast(id 
as string) as v"))
-    runBenchmark("outer join w long", N) {
-      val df = sparkSession.range(N).join(dim, (col("id") % M) === col("k"), 
"left")
+    val dim = broadcast(spark.range(M).selectExpr("id as k", "cast(id as 
string) as v"))
+    codegenBenchmark("outer join w long", N) {
+      val df = spark.range(N).join(dim, (col("id") % M) === col("k"), "left")
       
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
       df.count()
     }
-
-    /*
-     *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5
-     *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
-     *outer join w long:                  Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
-     
*-------------------------------------------------------------------------------------------
-     *outer join w long codegen=false          3055 / 3189          6.9        
 145.7       1.0X
-     *outer join w long codegen=true            261 /  276         80.5        
  12.4      11.7X
-     */
   }
 
-  ignore("broadcast hash join, semi join long key") {
+  def broadcastHashJoinSemiJoinLongKey(): Unit = {
     val N = 20 << 20
     val M = 1 << 16
-    val dim = broadcast(sparkSession.range(M).selectExpr("id as k", "cast(id 
as string) as v"))
-    runBenchmark("semi join w long", N) {
-      val df = sparkSession.range(N).join(dim, (col("id") % M) === col("k"), 
"leftsemi")
+    val dim = broadcast(spark.range(M).selectExpr("id as k", "cast(id as 
string) as v"))
+    codegenBenchmark("semi join w long", N) {
+      val df = spark.range(N).join(dim, (col("id") % M) === col("k"), 
"leftsemi")
       
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
       df.count()
     }
-
-    /*
-     *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5
-     *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
-     *semi join w long:                   Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
-     
*-------------------------------------------------------------------------------------------
-     *semi join w long codegen=false           1912 / 1990         11.0        
  91.2       1.0X
-     *semi join w long codegen=true             237 /  244         88.3        
  11.3       8.1X
-     */
   }
 
-  ignore("sort merge join") {
+  def sortMergeJoin(): Unit = {
     val N = 2 << 20
-    runBenchmark("merge join", N) {
-      val df1 = sparkSession.range(N).selectExpr(s"id * 2 as k1")
-      val df2 = sparkSession.range(N).selectExpr(s"id * 3 as k2")
+    codegenBenchmark("sort merge join", N) {
+      val df1 = spark.range(N).selectExpr(s"id * 2 as k1")
+      val df2 = spark.range(N).selectExpr(s"id * 3 as k2")
       val df = df1.join(df2, col("k1") === col("k2"))
       
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
       df.count()
     }
-
-    /*
-     *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
-     *merge join:                         Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
-     
*-------------------------------------------------------------------------------------------
-     *merge join codegen=false                 1588 / 1880          1.3        
 757.1       1.0X
-     *merge join codegen=true                  1477 / 1531          1.4        
 704.2       1.1X
-     */
   }
 
-  ignore("sort merge join with duplicates") {
+  def sortMergeJoinWithDuplicates(): Unit = {
     val N = 2 << 20
-    runBenchmark("sort merge join", N) {
-      val df1 = sparkSession.range(N)
+    codegenBenchmark("sort merge join with duplicates", N) {
+      val df1 = spark.range(N)
         .selectExpr(s"(id * 15485863) % ${N*10} as k1")
-      val df2 = sparkSession.range(N)
+      val df2 = spark.range(N)
         .selectExpr(s"(id * 15485867) % ${N*10} as k2")
       val df = df1.join(df2, col("k1") === col("k2"))
       
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
       df.count()
     }
-
-    /*
-     *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
-     *sort merge join:                    Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
-     
*-------------------------------------------------------------------------------------------
-     *sort merge join codegen=false            3626 / 3667          0.6        
1728.9       1.0X
-     *sort merge join codegen=true             3405 / 3438          0.6        
1623.8       1.1X
-     */
   }
 
-  ignore("shuffle hash join") {
-    val N = 4 << 20
-    sparkSession.conf.set("spark.sql.shuffle.partitions", "2")
-    sparkSession.conf.set("spark.sql.autoBroadcastJoinThreshold", "10000000")
-    sparkSession.conf.set("spark.sql.join.preferSortMergeJoin", "false")
-    runBenchmark("shuffle hash join", N) {
-      val df1 = sparkSession.range(N).selectExpr(s"id as k1")
-      val df2 = sparkSession.range(N / 3).selectExpr(s"id * 3 as k2")
-      val df = df1.join(df2, col("k1") === col("k2"))
-      
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[ShuffledHashJoinExec]).isDefined)
-      df.count()
+  def shuffleHashJoin(): Unit = {
+    val N: Long = 4 << 20
+    withSQLConf(
+      SQLConf.SHUFFLE_PARTITIONS.key -> "2",
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10000000",
+      SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
+      codegenBenchmark("shuffle hash join", N) {
+        val df1 = spark.range(N).selectExpr(s"id as k1")
+        val df2 = spark.range(N / 3).selectExpr(s"id * 3 as k2")
+        val df = df1.join(df2, col("k1") === col("k2"))
+        
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[ShuffledHashJoinExec]).isDefined)
+        df.count()
+      }
     }
+  }
 
-    /*
-     *Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Windows 7 6.1
-     *Intel64 Family 6 Model 94 Stepping 3, GenuineIntel
-     *shuffle hash join:                  Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
-     
*-------------------------------------------------------------------------------------------
-     *shuffle hash join codegen=false          2005 / 2010          2.1        
 478.0       1.0X
-     *shuffle hash join codegen=true           1773 / 1792          2.4        
 422.7       1.1X
-     */
+  override def runBenchmarkSuite(): Unit = {
+    runBenchmark("Join Benchmark") {
+      broadcastHashJoinLongKey()
+      broadcastHashJoinLongKeyWithDuplicates()
+      broadcastHashJoinTwoIntKey()
+      broadcastHashJoinTwoLongKey()
+      broadcastHashJoinTwoLongKeyWithDuplicates()
+      broadcastHashJoinOuterJoinLongKey()
+      broadcastHashJoinSemiJoinLongKey()
+      sortMergeJoin()
+      sortMergeJoinWithDuplicates()
+      shuffleHashJoin()
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to