Repository: spark Updated Branches: refs/heads/master 77e52448e -> 950ab7995
[SPARK-24777][SQL] Add write benchmark for AVRO ## What changes were proposed in this pull request? Refactor `DataSourceWriteBenchmark` and add write benchmark for AVRO. ## How was this patch tested? Build and run the benchmark. Closes #22451 from gengliangwang/avroWriteBenchmark. Authored-by: Gengliang Wang <gengliang.w...@databricks.com> Signed-off-by: gatorsmile <gatorsm...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/950ab799 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/950ab799 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/950ab799 Branch: refs/heads/master Commit: 950ab79957fc0cdc2dafac94765787e87ece9e74 Parents: 77e5244 Author: Gengliang Wang <gengliang.w...@databricks.com> Authored: Thu Sep 20 17:41:24 2018 -0700 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Thu Sep 20 17:41:24 2018 -0700 ---------------------------------------------------------------------- .../benchmark/AvroWriteBenchmark.scala | 40 ++++++++++ .../BuiltInDataSourceWriteBenchmark.scala | 79 ++++++++++++++++++++ .../benchmark/DataSourceWriteBenchmark.scala | 75 +++---------------- 3 files changed, 131 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/950ab799/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroWriteBenchmark.scala ---------------------------------------------------------------------- diff --git a/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroWriteBenchmark.scala b/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroWriteBenchmark.scala new file mode 100644 index 0000000..df13b4a --- /dev/null +++ b/external/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroWriteBenchmark.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +/** + * Benchmark to measure Avro data sources write performance. + * Usage: + * 1. with spark-submit: bin/spark-submit --class <this class> <spark sql test jar> + * 2. with sbt: build/sbt "avro/test:runMain <this class>" + */ +object AvroWriteBenchmark extends DataSourceWriteBenchmark { + def main(args: Array[String]): Unit = { + /* + Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz + Avro writer benchmark: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + Output Single Int Column 2481 / 2499 6.3 157.8 1.0X + Output Single Double Column 2705 / 2710 5.8 172.0 0.9X + Output Int and String Column 5539 / 5639 2.8 352.2 0.4X + Output Partitions 4613 / 5004 3.4 293.3 0.5X + Output Buckets 5554 / 5561 2.8 353.1 0.4X + */ + runBenchmark("Avro") + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/950ab799/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala new file mode 100644 index 0000000..2de516c --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.benchmark + +/** + * Benchmark to measure built-in data sources write performance. + * By default it measures 4 data source format: Parquet, ORC, JSON, CSV. Run it with spark-submit: + * spark-submit --class <this class> <spark sql test jar> + * Or with sbt: + * build/sbt "sql/test:runMain <this class>" + * + * To measure specified formats, run it with arguments: + * spark-submit --class <this class> <spark sql test jar> format1 [format2] [...] + * Or with sbt: + * build/sbt "sql/test:runMain <this class> format1 [format2] [...]" + */ +object BuiltInDataSourceWriteBenchmark extends DataSourceWriteBenchmark { + def main(args: Array[String]): Unit = { + val formats: Seq[String] = if (args.isEmpty) { + Seq("Parquet", "ORC", "JSON", "CSV") + } else { + args + } + + spark.conf.set("spark.sql.parquet.compression.codec", "snappy") + spark.conf.set("spark.sql.orc.compression.codec", "snappy") + /* + Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz + Parquet writer benchmark: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + Output Single Int Column 1815 / 1932 8.7 115.4 1.0X + Output Single Double Column 1877 / 1878 8.4 119.3 1.0X + Output Int and String Column 6265 / 6543 2.5 398.3 0.3X + Output Partitions 4067 / 4457 3.9 258.6 0.4X + Output Buckets 5608 / 5820 2.8 356.6 0.3X + + ORC writer benchmark: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + Output Single Int Column 1201 / 1239 13.1 76.3 1.0X + Output Single Double Column 1542 / 1600 10.2 98.0 0.8X + Output Int and String Column 6495 / 6580 2.4 412.9 0.2X + Output Partitions 3648 / 3842 4.3 231.9 0.3X + Output Buckets 5022 / 5145 3.1 319.3 0.2X + + JSON writer benchmark: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + Output Single Int Column 1988 / 2093 7.9 126.4 1.0X + Output Single Double Column 2854 / 2911 5.5 181.4 0.7X + Output Int and String Column 6467 / 6653 2.4 411.1 0.3X + Output Partitions 4548 / 5055 3.5 289.1 0.4X + Output Buckets 5664 / 5765 2.8 360.1 0.4X + + CSV writer benchmark: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + Output Single Int Column 3025 / 3190 5.2 192.3 1.0X + Output Single Double Column 3575 / 3634 4.4 227.3 0.8X + Output Int and String Column 7313 / 7399 2.2 464.9 0.4X + Output Partitions 5105 / 5190 3.1 324.6 0.6X + Output Buckets 6986 / 6992 2.3 444.1 0.4X + */ + formats.foreach { format => + runBenchmark(format) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/950ab799/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceWriteBenchmark.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceWriteBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceWriteBenchmark.scala index 2d2cdeb..e3463d9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceWriteBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceWriteBenchmark.scala @@ -21,25 +21,14 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.Benchmark -/** - * Benchmark to measure data source write performance. - * By default it measures 4 data source format: Parquet, ORC, JSON, CSV: - * spark-submit --class <this class> <spark sql test jar> - * To measure specified formats, run it with arguments: - * spark-submit --class <this class> <spark sql test jar> format1 [format2] [...] - */ -object DataSourceWriteBenchmark { +trait DataSourceWriteBenchmark { val conf = new SparkConf() .setAppName("DataSourceWriteBenchmark") .setIfMissing("spark.master", "local[1]") - .set("spark.sql.parquet.compression.codec", "snappy") - .set("spark.sql.orc.compression.codec", "snappy") + .set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true") val spark = SparkSession.builder.config(conf).getOrCreate() - // Set default configs. Individual cases will change them if necessary. - spark.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true") - val tempTable = "temp" val numRows = 1024 * 1024 * 15 @@ -86,64 +75,24 @@ object DataSourceWriteBenchmark { } } - def main(args: Array[String]): Unit = { + def runBenchmark(format: String): Unit = { val tableInt = "tableInt" val tableDouble = "tableDouble" val tableIntString = "tableIntString" val tablePartition = "tablePartition" val tableBucket = "tableBucket" - val formats: Seq[String] = if (args.isEmpty) { - Seq("Parquet", "ORC", "JSON", "CSV") - } else { - args - } - /* - Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz - Parquet writer benchmark: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - Output Single Int Column 1815 / 1932 8.7 115.4 1.0X - Output Single Double Column 1877 / 1878 8.4 119.3 1.0X - Output Int and String Column 6265 / 6543 2.5 398.3 0.3X - Output Partitions 4067 / 4457 3.9 258.6 0.4X - Output Buckets 5608 / 5820 2.8 356.6 0.3X - - ORC writer benchmark: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - Output Single Int Column 1201 / 1239 13.1 76.3 1.0X - Output Single Double Column 1542 / 1600 10.2 98.0 0.8X - Output Int and String Column 6495 / 6580 2.4 412.9 0.2X - Output Partitions 3648 / 3842 4.3 231.9 0.3X - Output Buckets 5022 / 5145 3.1 319.3 0.2X - - JSON writer benchmark: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - Output Single Int Column 1988 / 2093 7.9 126.4 1.0X - Output Single Double Column 2854 / 2911 5.5 181.4 0.7X - Output Int and String Column 6467 / 6653 2.4 411.1 0.3X - Output Partitions 4548 / 5055 3.5 289.1 0.4X - Output Buckets 5664 / 5765 2.8 360.1 0.4X - - CSV writer benchmark: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - Output Single Int Column 3025 / 3190 5.2 192.3 1.0X - Output Single Double Column 3575 / 3634 4.4 227.3 0.8X - Output Int and String Column 7313 / 7399 2.2 464.9 0.4X - Output Partitions 5105 / 5190 3.1 324.6 0.6X - Output Buckets 6986 / 6992 2.3 444.1 0.4X - */ withTempTable(tempTable) { spark.range(numRows).createOrReplaceTempView(tempTable) - formats.foreach { format => - withTable(tableInt, tableDouble, tableIntString, tablePartition, tableBucket) { - val benchmark = new Benchmark(s"$format writer benchmark", numRows) - writeNumeric(tableInt, format, benchmark, "Int") - writeNumeric(tableDouble, format, benchmark, "Double") - writeIntString(tableIntString, format, benchmark) - writePartition(tablePartition, format, benchmark) - writeBucket(tableBucket, format, benchmark) - benchmark.run() - } + withTable(tableInt, tableDouble, tableIntString, tablePartition, tableBucket) { + val benchmark = new Benchmark(s"$format writer benchmark", numRows) + writeNumeric(tableInt, format, benchmark, "Int") + writeNumeric(tableDouble, format, benchmark, "Double") + writeIntString(tableIntString, format, benchmark) + writePartition(tablePartition, format, benchmark) + writeBucket(tableBucket, format, benchmark) + benchmark.run() } } } } + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org