Repository: spark
Updated Branches:
  refs/heads/master 6540c2f8f -> ccd07b736


[SPARK-25665][SQL][TEST] Refactor ObjectHashAggregateExecBenchmark to…

## What changes were proposed in this pull request?

Refactor ObjectHashAggregateExecBenchmark to use main method

## How was this patch tested?

Manually tested:
```
bin/spark-submit --class 
org.apache.spark.sql.execution.benchmark.ObjectHashAggregateExecBenchmark 
--jars 
sql/catalyst/target/spark-catalyst_2.11-3.0.0-SNAPSHOT-tests.jar,core/target/spark-core_2.11-3.0.0-SNAPSHOT-tests.jar,sql/hive/target/spark-hive_2.11-3.0.0-SNAPSHOT.jar
 --packages org.spark-project.hive:hive-exec:1.2.1.spark2 
sql/hive/target/spark-hive_2.11-3.0.0-SNAPSHOT-tests.jar
```
Generated results with:
```
SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "hive/test:runMain 
org.apache.spark.sql.execution.benchmark.ObjectHashAggregateExecBenchmark"
```

Closes #22804 from peter-toth/SPARK-25665.

Lead-authored-by: Peter Toth <peter.t...@gmail.com>
Co-authored-by: Dongjoon Hyun <dongj...@apache.org>
Signed-off-by: Dongjoon Hyun <dongj...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ccd07b73
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ccd07b73
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ccd07b73

Branch: refs/heads/master
Commit: ccd07b736640c87ac6980a1c7c2d706ef3bab1bf
Parents: 6540c2f
Author: Peter Toth <peter.t...@gmail.com>
Authored: Thu Oct 25 12:42:31 2018 -0700
Committer: Dongjoon Hyun <dongj...@apache.org>
Committed: Thu Oct 25 12:42:31 2018 -0700

----------------------------------------------------------------------
 ...ObjectHashAggregateExecBenchmark-results.txt |  45 ++++
 .../ObjectHashAggregateExecBenchmark.scala      | 218 +++++++++----------
 2 files changed, 152 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ccd07b73/sql/hive/benchmarks/ObjectHashAggregateExecBenchmark-results.txt
----------------------------------------------------------------------
diff --git a/sql/hive/benchmarks/ObjectHashAggregateExecBenchmark-results.txt 
b/sql/hive/benchmarks/ObjectHashAggregateExecBenchmark-results.txt
new file mode 100644
index 0000000..f3044da
--- /dev/null
+++ b/sql/hive/benchmarks/ObjectHashAggregateExecBenchmark-results.txt
@@ -0,0 +1,45 @@
+================================================================================================
+Hive UDAF vs Spark AF
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+hive udaf vs spark af:                   Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+hive udaf w/o group by                        6370 / 6400          0.0       
97193.6       1.0X
+spark af w/o group by                           54 /   63          1.2         
820.8     118.4X
+hive udaf w/ group by                         4492 / 4507          0.0       
68539.5       1.4X
+spark af w/ group by w/o fallback               58 /   64          1.1         
881.7     110.2X
+spark af w/ group by w/ fallback               136 /  142          0.5        
2075.0      46.8X
+
+
+================================================================================================
+ObjectHashAggregateExec vs SortAggregateExec - typed_count
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+object agg v.s. sort agg:                Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+sort agg w/ group by                        41500 / 41630          2.5         
395.8       1.0X
+object agg w/ group by w/o fallback         10075 / 10122         10.4         
 96.1       4.1X
+object agg w/ group by w/ fallback          28131 / 28205          3.7         
268.3       1.5X
+sort agg w/o group by                         6182 / 6221         17.0         
 59.0       6.7X
+object agg w/o group by w/o fallback          5435 / 5468         19.3         
 51.8       7.6X
+
+
+================================================================================================
+ObjectHashAggregateExec vs SortAggregateExec - percentile_approx
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+object agg v.s. sort agg:                Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+sort agg w/ group by                           970 / 1025          2.2         
462.5       1.0X
+object agg w/ group by w/o fallback            772 /  798          2.7         
368.1       1.3X
+object agg w/ group by w/ fallback            1013 / 1044          2.1         
483.1       1.0X
+sort agg w/o group by                          751 /  781          2.8         
358.0       1.3X
+object agg w/o group by w/o fallback           772 /  814          2.7         
368.0       1.3X
+
+

http://git-wip-us.apache.org/repos/asf/spark/blob/ccd07b73/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/ObjectHashAggregateExecBenchmark.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/ObjectHashAggregateExecBenchmark.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/ObjectHashAggregateExecBenchmark.scala
index 3b33785..50ee096 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/ObjectHashAggregateExecBenchmark.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/ObjectHashAggregateExecBenchmark.scala
@@ -21,207 +21,189 @@ import scala.concurrent.duration._
 
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFPercentileApprox
 
-import org.apache.spark.benchmark.Benchmark
-import org.apache.spark.sql.Column
-import org.apache.spark.sql.catalyst.FunctionIdentifier
-import org.apache.spark.sql.catalyst.catalog.CatalogFunction
+import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
+import org.apache.spark.sql.{Column, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.Literal
 import 
org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile
-import org.apache.spark.sql.hive.HiveSessionCatalog
+import org.apache.spark.sql.catalyst.plans.SQLHelper
 import org.apache.spark.sql.hive.execution.TestingTypedCount
-import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.LongType
 
-class ObjectHashAggregateExecBenchmark extends BenchmarkWithCodegen with 
TestHiveSingleton {
-  ignore("Hive UDAF vs Spark AF") {
-    val N = 2 << 15
+/**
+ * Benchmark to measure hash based aggregation.
+ * To run this benchmark:
+ * {{{
+ *   1. without sbt: bin/spark-submit --class <this class>
+ *        --jars <spark catalyst test jar>,<spark core test jar>,<spark hive 
jar>
+ *        --packages org.spark-project.hive:hive-exec:1.2.1.spark2
+ *        <spark hive test jar>
+ *   2. build/sbt "hive/test:runMain <this class>"
+ *   3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt 
"hive/test:runMain <this class>"
+ *      Results will be written to 
"benchmarks/ObjectHashAggregateExecBenchmark-results.txt".
+ * }}}
+ */
+object ObjectHashAggregateExecBenchmark extends BenchmarkBase with SQLHelper {
+
+  private val spark: SparkSession = TestHive.sparkSession
+  private val sql = spark.sql _
+  import spark.implicits._
 
+  private def hiveUDAFvsSparkAF(N: Int): Unit = {
     val benchmark = new Benchmark(
       name = "hive udaf vs spark af",
       valuesPerIteration = N,
       minNumIters = 5,
       warmupTime = 5.seconds,
       minTime = 10.seconds,
-      outputPerIteration = true
+      outputPerIteration = true,
+      output = output
     )
 
-    registerHiveFunction("hive_percentile_approx", 
classOf[GenericUDAFPercentileApprox])
+    sql(
+      s"CREATE TEMPORARY FUNCTION hive_percentile_approx AS '" +
+        s"${classOf[GenericUDAFPercentileApprox].getName}'"
+    )
 
-    sparkSession.range(N).createOrReplaceTempView("t")
+    spark.range(N).createOrReplaceTempView("t")
 
     benchmark.addCase("hive udaf w/o group by") { _ =>
-      sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "false")
-      sparkSession.sql("SELECT hive_percentile_approx(id, 0.5) FROM 
t").collect()
+      withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "false") {
+        sql("SELECT hive_percentile_approx(id, 0.5) FROM t").collect()
+      }
     }
 
     benchmark.addCase("spark af w/o group by") { _ =>
-      sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true")
-      sparkSession.sql("SELECT percentile_approx(id, 0.5) FROM t").collect()
+      withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "true") {
+        sql("SELECT percentile_approx(id, 0.5) FROM t").collect()
+      }
     }
 
     benchmark.addCase("hive udaf w/ group by") { _ =>
-      sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "false")
-      sparkSession.sql(
-        s"SELECT hive_percentile_approx(id, 0.5) FROM t GROUP BY CAST(id / ${N 
/ 4} AS BIGINT)"
-      ).collect()
+      withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "false") {
+        sql(
+          s"SELECT hive_percentile_approx(id, 0.5) FROM t GROUP BY CAST(id / 
${N / 4} AS BIGINT)"
+        ).collect()
+      }
     }
 
     benchmark.addCase("spark af w/ group by w/o fallback") { _ =>
-      sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true")
-      sparkSession.sql(
-        s"SELECT percentile_approx(id, 0.5) FROM t GROUP BY CAST(id / ${N / 4} 
AS BIGINT)"
-      ).collect()
+      withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "true") {
+        sql(s"SELECT percentile_approx(id, 0.5) FROM t GROUP BY CAST(id / ${N 
/ 4} AS BIGINT)")
+          .collect()
+      }
     }
 
     benchmark.addCase("spark af w/ group by w/ fallback") { _ =>
-      sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true")
-      
sparkSession.conf.set(SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key, "2")
-      sparkSession.sql(
-        s"SELECT percentile_approx(id, 0.5) FROM t GROUP BY CAST(id / ${N / 4} 
AS BIGINT)"
-      ).collect()
+      withSQLConf(
+        SQLConf.USE_OBJECT_HASH_AGG.key -> "true",
+        SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key -> "2") {
+        sql(s"SELECT percentile_approx(id, 0.5) FROM t GROUP BY CAST(id / ${N 
/ 4} AS BIGINT)")
+          .collect()
+      }
     }
 
     benchmark.run()
-
-    /*
-    Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5
-    Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
-
-    hive udaf vs spark af:                   Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
-    
------------------------------------------------------------------------------------------------
-    hive udaf w/o group by                        5326 / 5408          0.0     
  81264.2       1.0X
-    spark af w/o group by                           93 /  111          0.7     
   1415.6      57.4X
-    hive udaf w/ group by                         3804 / 3946          0.0     
  58050.1       1.4X
-    spark af w/ group by w/o fallback               71 /   90          0.9     
   1085.7      74.8X
-    spark af w/ group by w/ fallback                98 /  111          0.7     
   1501.6      54.1X
-     */
   }
 
-  ignore("ObjectHashAggregateExec vs SortAggregateExec - typed_count") {
-    val N: Long = 1024 * 1024 * 100
-
+  private def objectHashAggregateExecVsSortAggregateExecUsingTypedCount(N: 
Int): Unit = {
     val benchmark = new Benchmark(
       name = "object agg v.s. sort agg",
       valuesPerIteration = N,
       minNumIters = 1,
       warmupTime = 10.seconds,
       minTime = 45.seconds,
-      outputPerIteration = true
+      outputPerIteration = true,
+      output = output
     )
 
-    import sparkSession.implicits._
-
     def typed_count(column: Column): Column =
       Column(TestingTypedCount(column.expr).toAggregateExpression())
 
-    val df = sparkSession.range(N)
+    val df = spark.range(N)
 
     benchmark.addCase("sort agg w/ group by") { _ =>
-      sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "false")
-      df.groupBy($"id" < (N / 2)).agg(typed_count($"id")).collect()
+      withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "false") {
+        df.groupBy($"id" < (N / 2)).agg(typed_count($"id")).collect()
+      }
     }
 
     benchmark.addCase("object agg w/ group by w/o fallback") { _ =>
-      sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true")
-      df.groupBy($"id" < (N / 2)).agg(typed_count($"id")).collect()
+      withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "true") {
+        df.groupBy($"id" < (N / 2)).agg(typed_count($"id")).collect()
+      }
     }
 
     benchmark.addCase("object agg w/ group by w/ fallback") { _ =>
-      sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true")
-      
sparkSession.conf.set(SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key, "2")
-      df.groupBy($"id" < (N / 2)).agg(typed_count($"id")).collect()
+      withSQLConf(
+        SQLConf.USE_OBJECT_HASH_AGG.key -> "true",
+        SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key -> "2") {
+        df.groupBy($"id" < (N / 2)).agg(typed_count($"id")).collect()
+      }
     }
 
     benchmark.addCase("sort agg w/o group by") { _ =>
-      sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "false")
-      df.select(typed_count($"id")).collect()
+      withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "false") {
+        df.select(typed_count($"id")).collect()
+      }
     }
 
     benchmark.addCase("object agg w/o group by w/o fallback") { _ =>
-      sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true")
-      df.select(typed_count($"id")).collect()
+      withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "true") {
+        df.select(typed_count($"id")).collect()
+      }
     }
 
     benchmark.run()
-
-    /*
-    Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5
-    Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
-
-    object agg v.s. sort agg:                Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
-    
------------------------------------------------------------------------------------------------
-    sort agg w/ group by                        31251 / 31908          3.4     
    298.0       1.0X
-    object agg w/ group by w/o fallback           6903 / 7141         15.2     
     65.8       4.5X
-    object agg w/ group by w/ fallback          20945 / 21613          5.0     
    199.7       1.5X
-    sort agg w/o group by                         4734 / 5463         22.1     
     45.2       6.6X
-    object agg w/o group by w/o fallback          4310 / 4529         24.3     
     41.1       7.3X
-     */
   }
 
-  ignore("ObjectHashAggregateExec vs SortAggregateExec - percentile_approx") {
-    val N = 2 << 20
-
+  private def 
objectHashAggregateExecVsSortAggregateExecUsingPercentileApprox(N: Int): Unit = 
{
     val benchmark = new Benchmark(
       name = "object agg v.s. sort agg",
       valuesPerIteration = N,
       minNumIters = 5,
       warmupTime = 15.seconds,
       minTime = 45.seconds,
-      outputPerIteration = true
+      outputPerIteration = true,
+      output = output
     )
 
-    import sparkSession.implicits._
-
-    val df = sparkSession.range(N).coalesce(1)
+    val df = spark.range(N).coalesce(1)
 
     benchmark.addCase("sort agg w/ group by") { _ =>
-      sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "false")
-      df.groupBy($"id" / (N / 4) cast LongType).agg(percentile_approx($"id", 
0.5)).collect()
+      withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "false") {
+        df.groupBy($"id" / (N / 4) cast LongType).agg(percentile_approx($"id", 
0.5)).collect()
+      }
     }
 
     benchmark.addCase("object agg w/ group by w/o fallback") { _ =>
-      sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true")
-      df.groupBy($"id" / (N / 4) cast LongType).agg(percentile_approx($"id", 
0.5)).collect()
+      withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "true") {
+        df.groupBy($"id" / (N / 4) cast LongType).agg(percentile_approx($"id", 
0.5)).collect()
+      }
     }
 
     benchmark.addCase("object agg w/ group by w/ fallback") { _ =>
-      sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true")
-      
sparkSession.conf.set(SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key, "2")
-      df.groupBy($"id" / (N / 4) cast LongType).agg(percentile_approx($"id", 
0.5)).collect()
+      withSQLConf(
+        SQLConf.USE_OBJECT_HASH_AGG.key -> "true",
+        SQLConf.OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD.key -> "2") {
+        df.groupBy($"id" / (N / 4) cast LongType).agg(percentile_approx($"id", 
0.5)).collect()
+      }
     }
 
     benchmark.addCase("sort agg w/o group by") { _ =>
-      sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "false")
-      df.select(percentile_approx($"id", 0.5)).collect()
+      withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "false") {
+        df.select(percentile_approx($"id", 0.5)).collect()
+      }
     }
 
     benchmark.addCase("object agg w/o group by w/o fallback") { _ =>
-      sparkSession.conf.set(SQLConf.USE_OBJECT_HASH_AGG.key, "true")
-      df.select(percentile_approx($"id", 0.5)).collect()
+      withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "true") {
+        df.select(percentile_approx($"id", 0.5)).collect()
+      }
     }
 
     benchmark.run()
-
-    /*
-    Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5
-    Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
-
-    object agg v.s. sort agg:                Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
-    
------------------------------------------------------------------------------------------------
-    sort agg w/ group by                          3418 / 3530          0.6     
   1630.0       1.0X
-    object agg w/ group by w/o fallback           3210 / 3314          0.7     
   1530.7       1.1X
-    object agg w/ group by w/ fallback            3419 / 3511          0.6     
   1630.1       1.0X
-    sort agg w/o group by                         4336 / 4499          0.5     
   2067.3       0.8X
-    object agg w/o group by w/o fallback          4271 / 4372          0.5     
   2036.7       0.8X
-     */
-  }
-
-  private def registerHiveFunction(functionName: String, clazz: Class[_]): 
Unit = {
-    val sessionCatalog = 
sparkSession.sessionState.catalog.asInstanceOf[HiveSessionCatalog]
-    val functionIdentifier = FunctionIdentifier(functionName, database = None)
-    val func = CatalogFunction(functionIdentifier, clazz.getName, resources = 
Nil)
-    sessionCatalog.registerFunction(func, overrideIfExists = false)
   }
 
   private def percentile_approx(
@@ -229,4 +211,18 @@ class ObjectHashAggregateExecBenchmark extends 
BenchmarkWithCodegen with TestHiv
     val approxPercentile = new ApproximatePercentile(column.expr, 
Literal(percentage))
     Column(approxPercentile.toAggregateExpression(isDistinct))
   }
+
+  override def runBenchmarkSuite(): Unit = {
+    runBenchmark("Hive UDAF vs Spark AF") {
+      hiveUDAFvsSparkAF(2 << 15)
+    }
+
+    runBenchmark("ObjectHashAggregateExec vs SortAggregateExec - typed_count") 
{
+      objectHashAggregateExecVsSortAggregateExecUsingTypedCount(1024 * 1024 * 
100)
+    }
+
+    runBenchmark("ObjectHashAggregateExec vs SortAggregateExec - 
percentile_approx") {
+      objectHashAggregateExecVsSortAggregateExecUsingPercentileApprox(2 << 20)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to