Repository: spark
Updated Branches:
refs/heads/master 32d6d9d72 -> 235d28333
[MINOR][SQL][TEST] Test shuffle hash join while is not expected
## What changes were proposed in this pull request?
igore("shuffle hash join") is to shuffle hash join to test _case class
ShuffledHashJoinExec_.
But when you 'ignore' -> 'test', the test is _case class BroadcastHashJoinExec_.
Before modified, as a result ofï¼canBroadcast is true.
Print information in _canBroadcast(plan: LogicalPlan)_
```
canBroadcast plan.stats.sizeInBytes:6710880
canBroadcast conf.autoBroadcastJoinThreshold:10000000
```
After modified, plan.stats.sizeInBytes is 11184808.
Print information in _canBuildLocalHashMap(plan: LogicalPlan)_
and _muchSmaller(a: LogicalPlan, b: LogicalPlan)_ :
```
canBuildLocalHashMap plan.stats.sizeInBytes:11184808
canBuildLocalHashMap conf.autoBroadcastJoinThreshold:10000000
canBuildLocalHashMap conf.numShufflePartitions:2
```
```
muchSmaller a.stats.sizeInBytes * 3:33554424
muchSmaller b.stats.sizeInBytes:33554432
```
## How was this patch tested?
existing test case.
Author: caoxuewen <[email protected]>
Closes #19069 from heary-cao/shuffle_hash_join.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/235d2833
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/235d2833
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/235d2833
Branch: refs/heads/master
Commit: 235d28333c63719008ee755138db5c964237f526
Parents: 32d6d9d
Author: caoxuewen <[email protected]>
Authored: Wed Aug 30 10:10:24 2017 -0700
Committer: gatorsmile <[email protected]>
Committed: Wed Aug 30 10:10:24 2017 -0700
----------------------------------------------------------------------
.../sql/execution/benchmark/JoinBenchmark.scala | 56 +++++++++++++-------
1 file changed, 37 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/235d2833/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala
index 46db41a..5a25d72 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.benchmark
+import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType
@@ -35,7 +36,9 @@ class JoinBenchmark extends BenchmarkBase {
val dim = broadcast(sparkSession.range(M).selectExpr("id as k", "cast(id
as string) as v"))
runBenchmark("Join w long", N) {
- sparkSession.range(N).join(dim, (col("id") % M) === col("k")).count()
+ val df = sparkSession.range(N).join(dim, (col("id") % M) === col("k"))
+
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+ df.count()
}
/*
@@ -55,7 +58,9 @@ class JoinBenchmark extends BenchmarkBase {
val dim = broadcast(sparkSession.range(M).selectExpr("id as k", "cast(id
as string) as v"))
runBenchmark("Join w long duplicated", N) {
val dim = broadcast(sparkSession.range(M).selectExpr("cast(id/10 as
long) as k"))
- sparkSession.range(N).join(dim, (col("id") % M) === col("k")).count()
+ val df = sparkSession.range(N).join(dim, (col("id") % M) === col("k"))
+
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+ df.count()
}
/*
@@ -75,9 +80,11 @@ class JoinBenchmark extends BenchmarkBase {
.selectExpr("cast(id as int) as k1", "cast(id as int) as k2", "cast(id
as string) as v"))
runBenchmark("Join w 2 ints", N) {
- sparkSession.range(N).join(dim2,
+ val df = sparkSession.range(N).join(dim2,
(col("id") % M).cast(IntegerType) === col("k1")
- && (col("id") % M).cast(IntegerType) === col("k2")).count()
+ && (col("id") % M).cast(IntegerType) === col("k2"))
+
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+ df.count()
}
/*
@@ -97,9 +104,10 @@ class JoinBenchmark extends BenchmarkBase {
.selectExpr("id as k1", "id as k2", "cast(id as string) as v"))
runBenchmark("Join w 2 longs", N) {
- sparkSession.range(N).join(dim3,
+ val df = sparkSession.range(N).join(dim3,
(col("id") % M) === col("k1") && (col("id") % M) === col("k2"))
- .count()
+
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+ df.count()
}
/*
@@ -119,9 +127,10 @@ class JoinBenchmark extends BenchmarkBase {
.selectExpr("cast(id/10 as long) as k1", "cast(id/10 as long) as k2"))
runBenchmark("Join w 2 longs duplicated", N) {
- sparkSession.range(N).join(dim4,
+ val df = sparkSession.range(N).join(dim4,
(col("id") bitwiseAND M) === col("k1") && (col("id") bitwiseAND M) ===
col("k2"))
- .count()
+
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+ df.count()
}
/*
@@ -138,7 +147,9 @@ class JoinBenchmark extends BenchmarkBase {
val M = 1 << 16
val dim = broadcast(sparkSession.range(M).selectExpr("id as k", "cast(id
as string) as v"))
runBenchmark("outer join w long", N) {
- sparkSession.range(N).join(dim, (col("id") % M) === col("k"),
"left").count()
+ val df = sparkSession.range(N).join(dim, (col("id") % M) === col("k"),
"left")
+
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+ df.count()
}
/*
@@ -156,7 +167,9 @@ class JoinBenchmark extends BenchmarkBase {
val M = 1 << 16
val dim = broadcast(sparkSession.range(M).selectExpr("id as k", "cast(id
as string) as v"))
runBenchmark("semi join w long", N) {
- sparkSession.range(N).join(dim, (col("id") % M) === col("k"),
"leftsemi").count()
+ val df = sparkSession.range(N).join(dim, (col("id") % M) === col("k"),
"leftsemi")
+
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+ df.count()
}
/*
@@ -174,7 +187,9 @@ class JoinBenchmark extends BenchmarkBase {
runBenchmark("merge join", N) {
val df1 = sparkSession.range(N).selectExpr(s"id * 2 as k1")
val df2 = sparkSession.range(N).selectExpr(s"id * 3 as k2")
- df1.join(df2, col("k1") === col("k2")).count()
+ val df = df1.join(df2, col("k1") === col("k2"))
+
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
+ df.count()
}
/*
@@ -193,7 +208,9 @@ class JoinBenchmark extends BenchmarkBase {
.selectExpr(s"(id * 15485863) % ${N*10} as k1")
val df2 = sparkSession.range(N)
.selectExpr(s"(id * 15485867) % ${N*10} as k2")
- df1.join(df2, col("k1") === col("k2")).count()
+ val df = df1.join(df2, col("k1") === col("k2"))
+
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
+ df.count()
}
/*
@@ -212,18 +229,19 @@ class JoinBenchmark extends BenchmarkBase {
sparkSession.conf.set("spark.sql.join.preferSortMergeJoin", "false")
runBenchmark("shuffle hash join", N) {
val df1 = sparkSession.range(N).selectExpr(s"id as k1")
- val df2 = sparkSession.range(N / 5).selectExpr(s"id * 3 as k2")
- df1.join(df2, col("k1") === col("k2")).count()
+ val df2 = sparkSession.range(N / 3).selectExpr(s"id * 3 as k2")
+ val df = df1.join(df2, col("k1") === col("k2"))
+
assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[ShuffledHashJoinExec]).isDefined)
+ df.count()
}
/*
- *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5
- *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+ *Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Windows 7 6.1
+ *Intel64 Family 6 Model 94 Stepping 3, GenuineIntel
*shuffle hash join: Best/Avg Time(ms) Rate(M/s) Per
Row(ns) Relative
*-------------------------------------------------------------------------------------------
- *shuffle hash join codegen=false 1101 / 1391 3.8
262.6 1.0X
- *shuffle hash join codegen=true 528 / 578 7.9
125.8 2.1X
+ *shuffle hash join codegen=false 2005 / 2010 2.1
478.0 1.0X
+ *shuffle hash join codegen=true 1773 / 1792 2.4
422.7 1.1X
*/
}
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]