Repository: spark Updated Branches: refs/heads/master ff601cf71 -> d25f425c9
[SPARK-25499][TEST] Refactor BenchmarkBase and Benchmark ## What changes were proposed in this pull request? Currently there are two classes with the same naming BenchmarkBase: 1. `org.apache.spark.util.BenchmarkBase` 2. `org.apache.spark.sql.execution.benchmark.BenchmarkBase` This is very confusing. And the benchmark object `org.apache.spark.sql.execution.benchmark.FilterPushdownBenchmark` is using the one in `org.apache.spark.util.BenchmarkBase`, while there is another class `BenchmarkBase` in the same package of it... Here I propose: 1. the package `org.apache.spark.util.BenchmarkBase` should be in test package of core module. Move it to package `org.apache.spark.benchmark` . 2. Move `org.apache.spark.util.Benchmark` to test package of core module. Move it to package `org.apache.spark.benchmark` . 3. Rename the class `org.apache.spark.sql.execution.benchmark.BenchmarkBase` as `BenchmarkWithCodegen` ## How was this patch tested? Unit test Closes #22513 from gengliangwang/refactorBenchmarkBase. Authored-by: Gengliang Wang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d25f425c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d25f425c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d25f425c Branch: refs/heads/master Commit: d25f425c9652a3611dd5fea8a37df4abb13e126e Parents: ff601cf Author: Gengliang Wang <[email protected]> Authored: Fri Sep 21 22:20:55 2018 +0800 Committer: Wenchen Fan <[email protected]> Committed: Fri Sep 21 22:20:55 2018 +0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/util/Benchmark.scala | 225 ------------------ .../org/apache/spark/util/BenchmarkBase.scala | 57 ----- .../org/apache/spark/benchmark/Benchmark.scala | 227 +++++++++++++++++++ .../apache/spark/benchmark/BenchmarkBase.scala | 57 +++++ .../apache/spark/serializer/KryoBenchmark.scala | 2 +- .../linalg/UDTSerializationBenchmark.scala | 2 +- .../org/apache/spark/sql/HashBenchmark.scala | 2 +- .../spark/sql/HashByteArrayBenchmark.scala | 2 +- .../spark/sql/UnsafeProjectionBenchmark.scala | 2 +- .../org/apache/spark/sql/DatasetBenchmark.scala | 2 +- ...ernalAppendOnlyUnsafeRowArrayBenchmark.scala | 2 +- .../benchmark/AggregateBenchmark.scala | 4 +- .../sql/execution/benchmark/BenchmarkBase.scala | 54 ----- .../benchmark/BenchmarkWideTable.scala | 5 +- .../benchmark/BenchmarkWithCodegen.scala | 54 +++++ .../benchmark/DataSourceReadBenchmark.scala | 3 +- .../benchmark/DataSourceWriteBenchmark.scala | 2 +- .../benchmark/FilterPushdownBenchmark.scala | 15 +- .../sql/execution/benchmark/JoinBenchmark.scala | 2 +- .../sql/execution/benchmark/MiscBenchmark.scala | 4 +- .../benchmark/PrimitiveArrayBenchmark.scala | 4 +- .../sql/execution/benchmark/SortBenchmark.scala | 4 +- .../benchmark/TPCDSQueryBenchmark.scala | 2 +- .../benchmark/UnsafeArrayDataBenchmark.scala | 4 +- .../benchmark/WideSchemaBenchmark.scala | 3 +- .../CompressionSchemeBenchmark.scala | 2 +- .../datasources/csv/CSVBenchmarks.scala | 3 +- .../datasources/json/JsonBenchmarks.scala | 3 +- .../vectorized/ColumnarBatchBenchmark.scala | 2 +- .../ObjectHashAggregateExecBenchmark.scala | 4 +- .../spark/sql/hive/orc/OrcReadBenchmark.scala | 4 +- 31 files changed, 383 insertions(+), 375 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d25f425c/core/src/main/scala/org/apache/spark/util/Benchmark.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/Benchmark.scala b/core/src/main/scala/org/apache/spark/util/Benchmark.scala deleted file mode 100644 index 7def44b..0000000 --- a/core/src/main/scala/org/apache/spark/util/Benchmark.scala +++ /dev/null @@ -1,225 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -import java.io.{OutputStream, PrintStream} - -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer -import scala.concurrent.duration._ -import scala.util.Try - -import org.apache.commons.io.output.TeeOutputStream -import org.apache.commons.lang3.SystemUtils - -/** - * Utility class to benchmark components. An example of how to use this is: - * val benchmark = new Benchmark("My Benchmark", valuesPerIteration) - * benchmark.addCase("V1")(<function>) - * benchmark.addCase("V2")(<function>) - * benchmark.run - * This will output the average time to run each function and the rate of each function. - * - * The benchmark function takes one argument that is the iteration that's being run. - * - * @param name name of this benchmark. - * @param valuesPerIteration number of values used in the test case, used to compute rows/s. - * @param minNumIters the min number of iterations that will be run per case, not counting warm-up. - * @param warmupTime amount of time to spend running dummy case iterations for JIT warm-up. - * @param minTime further iterations will be run for each case until this time is used up. - * @param outputPerIteration if true, the timing for each run will be printed to stdout. - * @param output optional output stream to write benchmark results to - */ -private[spark] class Benchmark( - name: String, - valuesPerIteration: Long, - minNumIters: Int = 2, - warmupTime: FiniteDuration = 2.seconds, - minTime: FiniteDuration = 2.seconds, - outputPerIteration: Boolean = false, - output: Option[OutputStream] = None) { - import Benchmark._ - val benchmarks = mutable.ArrayBuffer.empty[Benchmark.Case] - - val out = if (output.isDefined) { - new PrintStream(new TeeOutputStream(System.out, output.get)) - } else { - System.out - } - - /** - * Adds a case to run when run() is called. The given function will be run for several - * iterations to collect timing statistics. - * - * @param name of the benchmark case - * @param numIters if non-zero, forces exactly this many iterations to be run - */ - def addCase(name: String, numIters: Int = 0)(f: Int => Unit): Unit = { - addTimerCase(name, numIters) { timer => - timer.startTiming() - f(timer.iteration) - timer.stopTiming() - } - } - - /** - * Adds a case with manual timing control. When the function is run, timing does not start - * until timer.startTiming() is called within the given function. The corresponding - * timer.stopTiming() method must be called before the function returns. - * - * @param name of the benchmark case - * @param numIters if non-zero, forces exactly this many iterations to be run - */ - def addTimerCase(name: String, numIters: Int = 0)(f: Benchmark.Timer => Unit): Unit = { - benchmarks += Benchmark.Case(name, f, numIters) - } - - /** - * Runs the benchmark and outputs the results to stdout. This should be copied and added as - * a comment with the benchmark. Although the results vary from machine to machine, it should - * provide some baseline. - */ - def run(): Unit = { - require(benchmarks.nonEmpty) - // scalastyle:off - println("Running benchmark: " + name) - - val results = benchmarks.map { c => - println(" Running case: " + c.name) - measure(valuesPerIteration, c.numIters)(c.fn) - } - println - - val firstBest = results.head.bestMs - // The results are going to be processor specific so it is useful to include that. - out.println(Benchmark.getJVMOSInfo()) - out.println(Benchmark.getProcessorName()) - out.printf("%-40s %16s %12s %13s %10s\n", name + ":", "Best/Avg Time(ms)", "Rate(M/s)", - "Per Row(ns)", "Relative") - out.println("-" * 96) - results.zip(benchmarks).foreach { case (result, benchmark) => - out.printf("%-40s %16s %12s %13s %10s\n", - benchmark.name, - "%5.0f / %4.0f" format (result.bestMs, result.avgMs), - "%10.1f" format result.bestRate, - "%6.1f" format (1000 / result.bestRate), - "%3.1fX" format (firstBest / result.bestMs)) - } - out.println - // scalastyle:on - } - - /** - * Runs a single function `f` for iters, returning the average time the function took and - * the rate of the function. - */ - def measure(num: Long, overrideNumIters: Int)(f: Timer => Unit): Result = { - System.gc() // ensures garbage from previous cases don't impact this one - val warmupDeadline = warmupTime.fromNow - while (!warmupDeadline.isOverdue) { - f(new Benchmark.Timer(-1)) - } - val minIters = if (overrideNumIters != 0) overrideNumIters else minNumIters - val minDuration = if (overrideNumIters != 0) 0 else minTime.toNanos - val runTimes = ArrayBuffer[Long]() - var i = 0 - while (i < minIters || runTimes.sum < minDuration) { - val timer = new Benchmark.Timer(i) - f(timer) - val runTime = timer.totalTime() - runTimes += runTime - - if (outputPerIteration) { - // scalastyle:off - println(s"Iteration $i took ${runTime / 1000} microseconds") - // scalastyle:on - } - i += 1 - } - // scalastyle:off - println(s" Stopped after $i iterations, ${runTimes.sum / 1000000} ms") - // scalastyle:on - val best = runTimes.min - val avg = runTimes.sum / runTimes.size - Result(avg / 1000000.0, num / (best / 1000.0), best / 1000000.0) - } -} - -private[spark] object Benchmark { - - /** - * Object available to benchmark code to control timing e.g. to exclude set-up time. - * - * @param iteration specifies this is the nth iteration of running the benchmark case - */ - class Timer(val iteration: Int) { - private var accumulatedTime: Long = 0L - private var timeStart: Long = 0L - - def startTiming(): Unit = { - assert(timeStart == 0L, "Already started timing.") - timeStart = System.nanoTime - } - - def stopTiming(): Unit = { - assert(timeStart != 0L, "Have not started timing.") - accumulatedTime += System.nanoTime - timeStart - timeStart = 0L - } - - def totalTime(): Long = { - assert(timeStart == 0L, "Have not stopped timing.") - accumulatedTime - } - } - - case class Case(name: String, fn: Timer => Unit, numIters: Int) - case class Result(avgMs: Double, bestRate: Double, bestMs: Double) - - /** - * This should return a user helpful processor information. Getting at this depends on the OS. - * This should return something like "Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz" - */ - def getProcessorName(): String = { - val cpu = if (SystemUtils.IS_OS_MAC_OSX) { - Utils.executeAndGetOutput(Seq("/usr/sbin/sysctl", "-n", "machdep.cpu.brand_string")) - } else if (SystemUtils.IS_OS_LINUX) { - Try { - val grepPath = Utils.executeAndGetOutput(Seq("which", "grep")).stripLineEnd - Utils.executeAndGetOutput(Seq(grepPath, "-m", "1", "model name", "/proc/cpuinfo")) - .stripLineEnd.replaceFirst("model name[\\s*]:[\\s*]", "") - }.getOrElse("Unknown processor") - } else { - System.getenv("PROCESSOR_IDENTIFIER") - } - cpu - } - - /** - * This should return a user helpful JVM & OS information. - * This should return something like - * "OpenJDK 64-Bit Server VM 1.8.0_65-b17 on Linux 4.1.13-100.fc21.x86_64" - */ - def getJVMOSInfo(): String = { - val vmName = System.getProperty("java.vm.name") - val runtimeVersion = System.getProperty("java.runtime.version") - val osName = System.getProperty("os.name") - val osVersion = System.getProperty("os.version") - s"${vmName} ${runtimeVersion} on ${osName} ${osVersion}" - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/d25f425c/core/src/main/scala/org/apache/spark/util/BenchmarkBase.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/BenchmarkBase.scala b/core/src/main/scala/org/apache/spark/util/BenchmarkBase.scala deleted file mode 100644 index c84032b..0000000 --- a/core/src/main/scala/org/apache/spark/util/BenchmarkBase.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -import java.io.{File, FileOutputStream, OutputStream} - -/** - * A base class for generate benchmark results to a file. - */ -abstract class BenchmarkBase { - var output: Option[OutputStream] = None - - def benchmark(): Unit - - final def runBenchmark(benchmarkName: String)(func: => Any): Unit = { - val separator = "=" * 96 - val testHeader = (separator + '\n' + benchmarkName + '\n' + separator + '\n' + '\n').getBytes - output.foreach(_.write(testHeader)) - func - output.foreach(_.write('\n')) - } - - def main(args: Array[String]): Unit = { - val regenerateBenchmarkFiles: Boolean = System.getenv("SPARK_GENERATE_BENCHMARK_FILES") == "1" - if (regenerateBenchmarkFiles) { - val resultFileName = s"${this.getClass.getSimpleName.replace("$", "")}-results.txt" - val file = new File(s"benchmarks/$resultFileName") - if (!file.exists()) { - file.createNewFile() - } - output = Some(new FileOutputStream(file)) - } - - benchmark() - - output.foreach { o => - if (o != null) { - o.close() - } - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/d25f425c/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala b/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala new file mode 100644 index 0000000..7a36b5f --- /dev/null +++ b/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.benchmark + +import java.io.{OutputStream, PrintStream} + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.duration._ +import scala.util.Try + +import org.apache.commons.io.output.TeeOutputStream +import org.apache.commons.lang3.SystemUtils + +import org.apache.spark.util.Utils + +/** + * Utility class to benchmark components. An example of how to use this is: + * val benchmark = new Benchmark("My Benchmark", valuesPerIteration) + * benchmark.addCase("V1")(<function>) + * benchmark.addCase("V2")(<function>) + * benchmark.run + * This will output the average time to run each function and the rate of each function. + * + * The benchmark function takes one argument that is the iteration that's being run. + * + * @param name name of this benchmark. + * @param valuesPerIteration number of values used in the test case, used to compute rows/s. + * @param minNumIters the min number of iterations that will be run per case, not counting warm-up. + * @param warmupTime amount of time to spend running dummy case iterations for JIT warm-up. + * @param minTime further iterations will be run for each case until this time is used up. + * @param outputPerIteration if true, the timing for each run will be printed to stdout. + * @param output optional output stream to write benchmark results to + */ +private[spark] class Benchmark( + name: String, + valuesPerIteration: Long, + minNumIters: Int = 2, + warmupTime: FiniteDuration = 2.seconds, + minTime: FiniteDuration = 2.seconds, + outputPerIteration: Boolean = false, + output: Option[OutputStream] = None) { + import Benchmark._ + val benchmarks = mutable.ArrayBuffer.empty[Benchmark.Case] + + val out = if (output.isDefined) { + new PrintStream(new TeeOutputStream(System.out, output.get)) + } else { + System.out + } + + /** + * Adds a case to run when run() is called. The given function will be run for several + * iterations to collect timing statistics. + * + * @param name of the benchmark case + * @param numIters if non-zero, forces exactly this many iterations to be run + */ + def addCase(name: String, numIters: Int = 0)(f: Int => Unit): Unit = { + addTimerCase(name, numIters) { timer => + timer.startTiming() + f(timer.iteration) + timer.stopTiming() + } + } + + /** + * Adds a case with manual timing control. When the function is run, timing does not start + * until timer.startTiming() is called within the given function. The corresponding + * timer.stopTiming() method must be called before the function returns. + * + * @param name of the benchmark case + * @param numIters if non-zero, forces exactly this many iterations to be run + */ + def addTimerCase(name: String, numIters: Int = 0)(f: Benchmark.Timer => Unit): Unit = { + benchmarks += Benchmark.Case(name, f, numIters) + } + + /** + * Runs the benchmark and outputs the results to stdout. This should be copied and added as + * a comment with the benchmark. Although the results vary from machine to machine, it should + * provide some baseline. + */ + def run(): Unit = { + require(benchmarks.nonEmpty) + // scalastyle:off + println("Running benchmark: " + name) + + val results = benchmarks.map { c => + println(" Running case: " + c.name) + measure(valuesPerIteration, c.numIters)(c.fn) + } + println + + val firstBest = results.head.bestMs + // The results are going to be processor specific so it is useful to include that. + out.println(Benchmark.getJVMOSInfo()) + out.println(Benchmark.getProcessorName()) + out.printf("%-40s %16s %12s %13s %10s\n", name + ":", "Best/Avg Time(ms)", "Rate(M/s)", + "Per Row(ns)", "Relative") + out.println("-" * 96) + results.zip(benchmarks).foreach { case (result, benchmark) => + out.printf("%-40s %16s %12s %13s %10s\n", + benchmark.name, + "%5.0f / %4.0f" format (result.bestMs, result.avgMs), + "%10.1f" format result.bestRate, + "%6.1f" format (1000 / result.bestRate), + "%3.1fX" format (firstBest / result.bestMs)) + } + out.println + // scalastyle:on + } + + /** + * Runs a single function `f` for iters, returning the average time the function took and + * the rate of the function. + */ + def measure(num: Long, overrideNumIters: Int)(f: Timer => Unit): Result = { + System.gc() // ensures garbage from previous cases don't impact this one + val warmupDeadline = warmupTime.fromNow + while (!warmupDeadline.isOverdue) { + f(new Benchmark.Timer(-1)) + } + val minIters = if (overrideNumIters != 0) overrideNumIters else minNumIters + val minDuration = if (overrideNumIters != 0) 0 else minTime.toNanos + val runTimes = ArrayBuffer[Long]() + var i = 0 + while (i < minIters || runTimes.sum < minDuration) { + val timer = new Benchmark.Timer(i) + f(timer) + val runTime = timer.totalTime() + runTimes += runTime + + if (outputPerIteration) { + // scalastyle:off + println(s"Iteration $i took ${runTime / 1000} microseconds") + // scalastyle:on + } + i += 1 + } + // scalastyle:off + println(s" Stopped after $i iterations, ${runTimes.sum / 1000000} ms") + // scalastyle:on + val best = runTimes.min + val avg = runTimes.sum / runTimes.size + Result(avg / 1000000.0, num / (best / 1000.0), best / 1000000.0) + } +} + +private[spark] object Benchmark { + + /** + * Object available to benchmark code to control timing e.g. to exclude set-up time. + * + * @param iteration specifies this is the nth iteration of running the benchmark case + */ + class Timer(val iteration: Int) { + private var accumulatedTime: Long = 0L + private var timeStart: Long = 0L + + def startTiming(): Unit = { + assert(timeStart == 0L, "Already started timing.") + timeStart = System.nanoTime + } + + def stopTiming(): Unit = { + assert(timeStart != 0L, "Have not started timing.") + accumulatedTime += System.nanoTime - timeStart + timeStart = 0L + } + + def totalTime(): Long = { + assert(timeStart == 0L, "Have not stopped timing.") + accumulatedTime + } + } + + case class Case(name: String, fn: Timer => Unit, numIters: Int) + case class Result(avgMs: Double, bestRate: Double, bestMs: Double) + + /** + * This should return a user helpful processor information. Getting at this depends on the OS. + * This should return something like "Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz" + */ + def getProcessorName(): String = { + val cpu = if (SystemUtils.IS_OS_MAC_OSX) { + Utils.executeAndGetOutput(Seq("/usr/sbin/sysctl", "-n", "machdep.cpu.brand_string")) + } else if (SystemUtils.IS_OS_LINUX) { + Try { + val grepPath = Utils.executeAndGetOutput(Seq("which", "grep")).stripLineEnd + Utils.executeAndGetOutput(Seq(grepPath, "-m", "1", "model name", "/proc/cpuinfo")) + .stripLineEnd.replaceFirst("model name[\\s*]:[\\s*]", "") + }.getOrElse("Unknown processor") + } else { + System.getenv("PROCESSOR_IDENTIFIER") + } + cpu + } + + /** + * This should return a user helpful JVM & OS information. + * This should return something like + * "OpenJDK 64-Bit Server VM 1.8.0_65-b17 on Linux 4.1.13-100.fc21.x86_64" + */ + def getJVMOSInfo(): String = { + val vmName = System.getProperty("java.vm.name") + val runtimeVersion = System.getProperty("java.runtime.version") + val osName = System.getProperty("os.name") + val osVersion = System.getProperty("os.version") + s"${vmName} ${runtimeVersion} on ${osName} ${osVersion}" + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/d25f425c/core/src/test/scala/org/apache/spark/benchmark/BenchmarkBase.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/benchmark/BenchmarkBase.scala b/core/src/test/scala/org/apache/spark/benchmark/BenchmarkBase.scala new file mode 100644 index 0000000..9a37e02 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/benchmark/BenchmarkBase.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.benchmark + +import java.io.{File, FileOutputStream, OutputStream} + +/** + * A base class for generate benchmark results to a file. + */ +abstract class BenchmarkBase { + var output: Option[OutputStream] = None + + def benchmark(): Unit + + final def runBenchmark(benchmarkName: String)(func: => Any): Unit = { + val separator = "=" * 96 + val testHeader = (separator + '\n' + benchmarkName + '\n' + separator + '\n' + '\n').getBytes + output.foreach(_.write(testHeader)) + func + output.foreach(_.write('\n')) + } + + def main(args: Array[String]): Unit = { + val regenerateBenchmarkFiles: Boolean = System.getenv("SPARK_GENERATE_BENCHMARK_FILES") == "1" + if (regenerateBenchmarkFiles) { + val resultFileName = s"${this.getClass.getSimpleName.replace("$", "")}-results.txt" + val file = new File(s"benchmarks/$resultFileName") + if (!file.exists()) { + file.createNewFile() + } + output = Some(new FileOutputStream(file)) + } + + benchmark() + + output.foreach { o => + if (o != null) { + o.close() + } + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/d25f425c/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala index a1cf357..f4fc008 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala @@ -21,8 +21,8 @@ import scala.reflect.ClassTag import scala.util.Random import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.benchmark.Benchmark import org.apache.spark.serializer.KryoTest._ -import org.apache.spark.util.Benchmark class KryoBenchmark extends SparkFunSuite { val benchmark = new Benchmark("Benchmark Kryo Unsafe vs safe Serialization", 1024 * 1024 * 15, 10) http://git-wip-us.apache.org/repos/asf/spark/blob/d25f425c/mllib/src/test/scala/org/apache/spark/mllib/linalg/UDTSerializationBenchmark.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/UDTSerializationBenchmark.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/UDTSerializationBenchmark.scala index 5973479..e2976e1 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/UDTSerializationBenchmark.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/UDTSerializationBenchmark.scala @@ -17,8 +17,8 @@ package org.apache.spark.mllib.linalg +import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.util.Benchmark /** * Serialization benchmark for VectorUDT. http://git-wip-us.apache.org/repos/asf/spark/blob/d25f425c/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala index 9a89e62..7a2a66c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashBenchmark.scala @@ -17,11 +17,11 @@ package org.apache.spark.sql +import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection import org.apache.spark.sql.types._ -import org.apache.spark.util.Benchmark /** * Benchmark for the previous interpreted hash function(InternalRow.hashCode) vs codegened http://git-wip-us.apache.org/repos/asf/spark/blob/d25f425c/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala index f6c8111..a60eb20 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/HashByteArrayBenchmark.scala @@ -19,10 +19,10 @@ package org.apache.spark.sql import java.util.Random +import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.catalyst.expressions.{HiveHasher, XXH64} import org.apache.spark.unsafe.Platform import org.apache.spark.unsafe.hash.Murmur3_x86_32 -import org.apache.spark.util.Benchmark /** * Synthetic benchmark for MurMurHash 3 and xxHash64. http://git-wip-us.apache.org/repos/asf/spark/blob/d25f425c/sql/catalyst/src/test/scala/org/apache/spark/sql/UnsafeProjectionBenchmark.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/UnsafeProjectionBenchmark.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/UnsafeProjectionBenchmark.scala index 6c63769..faff681 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/UnsafeProjectionBenchmark.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/UnsafeProjectionBenchmark.scala @@ -17,11 +17,11 @@ package org.apache.spark.sql +import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.UnsafeProjection import org.apache.spark.sql.types._ -import org.apache.spark.util.Benchmark /** * Benchmark `UnsafeProjection` for fixed-length/primitive-type fields. http://git-wip-us.apache.org/repos/asf/spark/blob/d25f425c/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala index 1a0672b..fa2f0b6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala @@ -18,11 +18,11 @@ package org.apache.spark.sql import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.expressions.scalalang.typed import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.StringType -import org.apache.spark.util.Benchmark /** * Benchmark for Dataset typed operations comparing with DataFrame and RDD versions. http://git-wip-us.apache.org/repos/asf/spark/blob/d25f425c/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala index 59397db..611b2fc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala @@ -20,10 +20,10 @@ package org.apache.spark.sql.execution import scala.collection.mutable.ArrayBuffer import org.apache.spark.{SparkConf, SparkContext, SparkEnv, TaskContext} +import org.apache.spark.benchmark.Benchmark import org.apache.spark.internal.config import org.apache.spark.memory.MemoryTestingUtils import org.apache.spark.sql.catalyst.expressions.UnsafeRow -import org.apache.spark.util.Benchmark import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter object ExternalAppendOnlyUnsafeRowArrayBenchmark { http://git-wip-us.apache.org/repos/asf/spark/blob/d25f425c/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala index 8f4ee85..57a6fdb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.benchmark import java.util.HashMap import org.apache.spark.SparkConf +import org.apache.spark.benchmark.Benchmark import org.apache.spark.internal.config._ import org.apache.spark.memory.{StaticMemoryManager, TaskMemoryManager} import org.apache.spark.sql.catalyst.expressions.UnsafeRow @@ -30,7 +31,6 @@ import org.apache.spark.sql.types.{LongType, StructType} import org.apache.spark.unsafe.Platform import org.apache.spark.unsafe.hash.Murmur3_x86_32 import org.apache.spark.unsafe.map.BytesToBytesMap -import org.apache.spark.util.Benchmark /** * Benchmark to measure performance for aggregate primitives. @@ -39,7 +39,7 @@ import org.apache.spark.util.Benchmark * * Benchmarks in this file are skipped in normal builds. */ -class AggregateBenchmark extends BenchmarkBase { +class AggregateBenchmark extends BenchmarkWithCodegen { ignore("aggregate without grouping") { val N = 500L << 22 http://git-wip-us.apache.org/repos/asf/spark/blob/d25f425c/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BenchmarkBase.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BenchmarkBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BenchmarkBase.scala deleted file mode 100644 index c99a5ae..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BenchmarkBase.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.benchmark - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.SparkSession -import org.apache.spark.util.Benchmark - -/** - * Common base trait for micro benchmarks that are supposed to run standalone (i.e. not together - * with other test suites). - */ -private[benchmark] trait BenchmarkBase extends SparkFunSuite { - - lazy val sparkSession = SparkSession.builder - .master("local[1]") - .appName("microbenchmark") - .config("spark.sql.shuffle.partitions", 1) - .config("spark.sql.autoBroadcastJoinThreshold", 1) - .getOrCreate() - - /** Runs function `f` with whole stage codegen on and off. */ - def runBenchmark(name: String, cardinality: Long)(f: => Unit): Unit = { - val benchmark = new Benchmark(name, cardinality) - - benchmark.addCase(s"$name wholestage off", numIters = 2) { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", value = false) - f - } - - benchmark.addCase(s"$name wholestage on", numIters = 5) { iter => - sparkSession.conf.set("spark.sql.codegen.wholeStage", value = true) - f - } - - benchmark.run() - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/d25f425c/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BenchmarkWideTable.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BenchmarkWideTable.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BenchmarkWideTable.scala index 9dcaca0..76367cb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BenchmarkWideTable.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BenchmarkWideTable.scala @@ -17,8 +17,7 @@ package org.apache.spark.sql.execution.benchmark -import org.apache.spark.util.Benchmark - +import org.apache.spark.benchmark.Benchmark /** * Benchmark to measure performance for wide table. @@ -27,7 +26,7 @@ import org.apache.spark.util.Benchmark * * Benchmarks in this file are skipped in normal builds. */ -class BenchmarkWideTable extends BenchmarkBase { +class BenchmarkWideTable extends BenchmarkWithCodegen { ignore("project on wide table") { val N = 1 << 20 http://git-wip-us.apache.org/repos/asf/spark/blob/d25f425c/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BenchmarkWithCodegen.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BenchmarkWithCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BenchmarkWithCodegen.scala new file mode 100644 index 0000000..5133150 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BenchmarkWithCodegen.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import org.apache.spark.SparkFunSuite +import org.apache.spark.benchmark.Benchmark +import org.apache.spark.sql.SparkSession + +/** + * Common base trait for micro benchmarks that are supposed to run standalone (i.e. not together + * with other test suites). + */ +private[benchmark] trait BenchmarkWithCodegen extends SparkFunSuite { + + lazy val sparkSession = SparkSession.builder + .master("local[1]") + .appName("microbenchmark") + .config("spark.sql.shuffle.partitions", 1) + .config("spark.sql.autoBroadcastJoinThreshold", 1) + .getOrCreate() + + /** Runs function `f` with whole stage codegen on and off. */ + def runBenchmark(name: String, cardinality: Long)(f: => Unit): Unit = { + val benchmark = new Benchmark(name, cardinality) + + benchmark.addCase(s"$name wholestage off", numIters = 2) { iter => + sparkSession.conf.set("spark.sql.codegen.wholeStage", value = false) + f + } + + benchmark.addCase(s"$name wholestage on", numIters = 5) { iter => + sparkSession.conf.set("spark.sql.codegen.wholeStage", value = true) + f + } + + benchmark.run() + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/d25f425c/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala index 8711f5a..cf9bda2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala @@ -22,13 +22,14 @@ import scala.collection.JavaConverters._ import scala.util.{Random, Try} import org.apache.spark.SparkConf +import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.parquet.{SpecificParquetRecordReaderBase, VectorizedParquetRecordReader} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnVector -import org.apache.spark.util.{Benchmark, Utils} +import org.apache.spark.util.Utils /** http://git-wip-us.apache.org/repos/asf/spark/blob/d25f425c/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceWriteBenchmark.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceWriteBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceWriteBenchmark.scala index e3463d9..994d6b5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceWriteBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceWriteBenchmark.scala @@ -17,9 +17,9 @@ package org.apache.spark.sql.execution.benchmark import org.apache.spark.SparkConf +import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.SparkSession import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.util.Benchmark trait DataSourceWriteBenchmark { val conf = new SparkConf() http://git-wip-us.apache.org/repos/asf/spark/blob/d25f425c/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala index 9ecea99..3b7f107 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala @@ -22,22 +22,25 @@ import java.io.File import scala.util.{Random, Try} import org.apache.spark.SparkConf +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.functions.monotonically_increasing_id import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType import org.apache.spark.sql.types.{ByteType, Decimal, DecimalType, TimestampType} -import org.apache.spark.util.{Benchmark, BenchmarkBase => FileBenchmarkBase, Utils} +import org.apache.spark.util.Utils /** * Benchmark to measure read performance with Filter pushdown. * To run this benchmark: - * 1. without sbt: bin/spark-submit --class <this class> <spark sql test jar> - * 2. build/sbt "sql/test:runMain <this class>" - * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>" - * Results will be written to "benchmarks/FilterPushdownBenchmark-results.txt". + * {{{ + * 1. without sbt: bin/spark-submit --class <this class> <spark sql test jar> + * 2. build/sbt "sql/test:runMain <this class>" + * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>" + * Results will be written to "benchmarks/FilterPushdownBenchmark-results.txt". + * }}} */ -object FilterPushdownBenchmark extends FileBenchmarkBase { +object FilterPushdownBenchmark extends BenchmarkBase { private val conf = new SparkConf() .setAppName(this.getClass.getSimpleName) http://git-wip-us.apache.org/repos/asf/spark/blob/d25f425c/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala index 5a25d72..37744dc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.types.IntegerType * * Benchmarks in this file are skipped in normal builds. */ -class JoinBenchmark extends BenchmarkBase { +class JoinBenchmark extends BenchmarkWithCodegen { ignore("broadcast hash join, long key") { val N = 20 << 20 http://git-wip-us.apache.org/repos/asf/spark/blob/d25f425c/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala index f039aea..f44da24 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.benchmark -import org.apache.spark.util.Benchmark +import org.apache.spark.benchmark.Benchmark /** * Benchmark to measure whole stage codegen performance. @@ -26,7 +26,7 @@ import org.apache.spark.util.Benchmark * * Benchmarks in this file are skipped in normal builds. */ -class MiscBenchmark extends BenchmarkBase { +class MiscBenchmark extends BenchmarkWithCodegen { ignore("filter & aggregate without group") { val N = 500L << 22 http://git-wip-us.apache.org/repos/asf/spark/blob/d25f425c/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/PrimitiveArrayBenchmark.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/PrimitiveArrayBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/PrimitiveArrayBenchmark.scala index 7f467d1..8b27518 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/PrimitiveArrayBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/PrimitiveArrayBenchmark.scala @@ -17,8 +17,8 @@ package org.apache.spark.sql.execution.benchmark +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} import org.apache.spark.sql.SparkSession -import org.apache.spark.util.{Benchmark, BenchmarkBase => FileBenchmarkBase} /** * Benchmark primitive arrays via DataFrame and Dataset program using primitive arrays @@ -28,7 +28,7 @@ import org.apache.spark.util.{Benchmark, BenchmarkBase => FileBenchmarkBase} * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>" * Results will be written to "benchmarks/PrimitiveArrayBenchmark-results.txt". */ -object PrimitiveArrayBenchmark extends FileBenchmarkBase { +object PrimitiveArrayBenchmark extends BenchmarkBase { lazy val sparkSession = SparkSession.builder .master("local[1]") .appName("microbenchmark") http://git-wip-us.apache.org/repos/asf/spark/blob/d25f425c/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala index 50ae26a..17619ec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala @@ -19,9 +19,9 @@ package org.apache.spark.sql.execution.benchmark import java.util.{Arrays, Comparator} +import org.apache.spark.benchmark.Benchmark import org.apache.spark.unsafe.array.LongArray import org.apache.spark.unsafe.memory.MemoryBlock -import org.apache.spark.util.Benchmark import org.apache.spark.util.collection.Sorter import org.apache.spark.util.collection.unsafe.sort._ import org.apache.spark.util.random.XORShiftRandom @@ -33,7 +33,7 @@ import org.apache.spark.util.random.XORShiftRandom * * Benchmarks in this file are skipped in normal builds. */ -class SortBenchmark extends BenchmarkBase { +class SortBenchmark extends BenchmarkWithCodegen { private def referenceKeyPrefixSort(buf: LongArray, lo: Int, hi: Int, refCmp: PrefixComparator) { val sortBuffer = new LongArray(MemoryBlock.fromLongArray(new Array[Long](buf.size().toInt))) http://git-wip-us.apache.org/repos/asf/spark/blob/d25f425c/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala index fccee97..2d72b1c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala @@ -18,13 +18,13 @@ package org.apache.spark.sql.execution.benchmark import org.apache.spark.SparkConf +import org.apache.spark.benchmark.Benchmark import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.util.Benchmark /** * Benchmark to measure TPCDS query performance. http://git-wip-us.apache.org/repos/asf/spark/blob/d25f425c/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UnsafeArrayDataBenchmark.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UnsafeArrayDataBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UnsafeArrayDataBenchmark.scala index 6c7779b..51ab0e1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UnsafeArrayDataBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UnsafeArrayDataBenchmark.scala @@ -19,10 +19,10 @@ package org.apache.spark.sql.execution.benchmark import scala.util.Random +import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, UnsafeRow} import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeArrayWriter} -import org.apache.spark.util.Benchmark /** * Benchmark [[UnsafeArrayDataBenchmark]] for UnsafeArrayData @@ -32,7 +32,7 @@ import org.apache.spark.util.Benchmark * * Benchmarks in this file are skipped in normal builds. */ -class UnsafeArrayDataBenchmark extends BenchmarkBase { +class UnsafeArrayDataBenchmark extends BenchmarkWithCodegen { def calculateHeaderPortionInBytes(count: Int) : Int = { /* 4 + 4 * count // Use this expression for SPARK-15962 */ http://git-wip-us.apache.org/repos/asf/spark/blob/d25f425c/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala index c368f17..81017a6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala @@ -22,8 +22,9 @@ import java.io.{File, FileOutputStream, OutputStream} import org.scalatest.BeforeAndAfterEach import org.apache.spark.SparkFunSuite +import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.functions._ -import org.apache.spark.util.{Benchmark, Utils} +import org.apache.spark.util.Utils /** * Benchmark for performance with very wide and nested DataFrames. http://git-wip-us.apache.org/repos/asf/spark/blob/d25f425c/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala index 619b76f..9c26d67 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala @@ -23,10 +23,10 @@ import java.nio.charset.StandardCharsets import org.apache.commons.lang3.RandomStringUtils import org.apache.commons.math3.distribution.LogNormalDistribution +import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.execution.columnar.{BOOLEAN, INT, LONG, NativeColumnType, SHORT, STRING} import org.apache.spark.sql.types.AtomicType -import org.apache.spark.util.Benchmark import org.apache.spark.util.Utils._ /** http://git-wip-us.apache.org/repos/asf/spark/blob/d25f425c/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala index 24f5f55..6d319eb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala @@ -19,10 +19,11 @@ package org.apache.spark.sql.execution.datasources.csv import java.io.File import org.apache.spark.SparkConf +import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.{Column, Row, SparkSession} import org.apache.spark.sql.functions.lit import org.apache.spark.sql.types._ -import org.apache.spark.util.{Benchmark, Utils} +import org.apache.spark.util.Utils /** * Benchmark to measure CSV read/write performance. http://git-wip-us.apache.org/repos/asf/spark/blob/d25f425c/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala index a2b747e..e40cb9b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala @@ -19,10 +19,11 @@ package org.apache.spark.sql.execution.datasources.json import java.io.File import org.apache.spark.SparkConf +import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.functions.lit import org.apache.spark.sql.types._ -import org.apache.spark.util.{Benchmark, Utils} +import org.apache.spark.util.Utils /** * The benchmarks aims to measure performance of JSON parsing when encoding is set and isn't. http://git-wip-us.apache.org/repos/asf/spark/blob/d25f425c/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala index 8aeb06d..d69cf11 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala @@ -21,11 +21,11 @@ import java.nio.charset.StandardCharsets import scala.util.Random +import org.apache.spark.benchmark.Benchmark import org.apache.spark.memory.MemoryMode import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.types.{ArrayType, BinaryType, IntegerType} import org.apache.spark.unsafe.Platform -import org.apache.spark.util.Benchmark import org.apache.spark.util.collection.BitSet /** http://git-wip-us.apache.org/repos/asf/spark/blob/d25f425c/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/ObjectHashAggregateExecBenchmark.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/ObjectHashAggregateExecBenchmark.scala b/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/ObjectHashAggregateExecBenchmark.scala index e599d1a..3b33785 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/ObjectHashAggregateExecBenchmark.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/ObjectHashAggregateExecBenchmark.scala @@ -21,6 +21,7 @@ import scala.concurrent.duration._ import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFPercentileApprox +import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.Column import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogFunction @@ -31,9 +32,8 @@ import org.apache.spark.sql.hive.execution.TestingTypedCount import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.LongType -import org.apache.spark.util.Benchmark -class ObjectHashAggregateExecBenchmark extends BenchmarkBase with TestHiveSingleton { +class ObjectHashAggregateExecBenchmark extends BenchmarkWithCodegen with TestHiveSingleton { ignore("Hive UDAF vs Spark AF") { val N = 2 << 15 http://git-wip-us.apache.org/repos/asf/spark/blob/d25f425c/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala index bf6efa7..0eab7d1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala @@ -22,11 +22,11 @@ import java.io.File import scala.util.{Random, Try} import org.apache.spark.SparkConf +import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import org.apache.spark.util.{Benchmark, Utils} - +import org.apache.spark.util.Utils /** * Benchmark to measure ORC read performance. --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
