Repository: spark Updated Branches: refs/heads/master 0e318acd0 -> 42b6c1fb0
[SPARK-25931][SQL] Benchmarking creation of Jackson parser ## What changes were proposed in this pull request? Added new benchmark which forcibly invokes Jackson parser to check overhead of its creation for short and wide JSON strings. Existing benchmarks do not allow to check that due to an optimisation introduced by #21909 for empty schema pushed down to JSON datasource. The `count()` action passes empty schema as required schema to the datasource, and Jackson parser is not created at all in that case. Besides of new benchmark I also refactored existing benchmarks: - Added `numIters` to control number of iteration in each benchmark - Renamed `JSON per-line parsing` -> `count a short column`, `JSON parsing of wide lines` -> `count a wide column`, and `Count a dataset with 10 columns` -> `Select a subset of 10 columns`. Closes #22920 from MaxGekk/json-benchmark-follow-up. Lead-authored-by: Maxim Gekk <[email protected]> Co-authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/42b6c1fb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/42b6c1fb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/42b6c1fb Branch: refs/heads/master Commit: 42b6c1fb05ead89331791cd27ea7c97ff7fd8e16 Parents: 0e318ac Author: Maxim Gekk <[email protected]> Authored: Sat Nov 3 09:09:39 2018 -0700 Committer: Dongjoon Hyun <[email protected]> Committed: Sat Nov 3 09:09:39 2018 -0700 ---------------------------------------------------------------------- sql/core/benchmarks/JSONBenchmark-results.txt | 35 +++-- .../datasources/json/JsonBenchmark.scala | 156 +++++++++++++------ 2 files changed, 131 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/42b6c1fb/sql/core/benchmarks/JSONBenchmark-results.txt ---------------------------------------------------------------------- diff --git a/sql/core/benchmarks/JSONBenchmark-results.txt b/sql/core/benchmarks/JSONBenchmark-results.txt index 9993730..4774294 100644 --- a/sql/core/benchmarks/JSONBenchmark-results.txt +++ b/sql/core/benchmarks/JSONBenchmark-results.txt @@ -7,31 +7,42 @@ OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz JSON schema inferring: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ -No encoding 62946 / 63310 1.6 629.5 1.0X -UTF-8 is set 112814 / 112866 0.9 1128.1 0.6X +No encoding 71832 / 72149 1.4 718.3 1.0X +UTF-8 is set 101700 / 101819 1.0 1017.0 0.7X Preparing data for benchmarking ... OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz -JSON per-line parsing: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +count a short column: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ -No encoding 16468 / 16553 6.1 164.7 1.0X -UTF-8 is set 16420 / 16441 6.1 164.2 1.0X +No encoding 16501 / 16519 6.1 165.0 1.0X +UTF-8 is set 16477 / 16516 6.1 164.8 1.0X Preparing data for benchmarking ... OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz -JSON parsing of wide lines: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +count a wide column: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ -No encoding 39789 / 40053 0.3 3978.9 1.0X -UTF-8 is set 39505 / 39584 0.3 3950.5 1.0X +No encoding 39871 / 40242 0.3 3987.1 1.0X +UTF-8 is set 39581 / 39721 0.3 3958.1 1.0X +Preparing data for benchmarking ... +OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +Select a subset of 10 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +Select 10 columns + count() 16011 / 16033 0.6 1601.1 1.0X +Select 1 column + count() 14350 / 14392 0.7 1435.0 1.1X +count() 3007 / 3034 3.3 300.7 5.3X + +Preparing data for benchmarking ... OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz -Count a dataset with 10 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +creation of JSON parser per line: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ -Select 10 columns + count() 15997 / 16015 0.6 1599.7 1.0X -Select 1 column + count() 13280 / 13326 0.8 1328.0 1.2X -count() 3006 / 3021 3.3 300.6 5.3X +Short column without encoding 8334 / 8453 1.2 833.4 1.0X +Short column with UTF-8 13627 / 13784 0.7 1362.7 0.6X +Wide column without encoding 155073 / 155351 0.1 15507.3 0.1X +Wide column with UTF-8 212114 / 212263 0.0 21211.4 0.0X http://git-wip-us.apache.org/repos/asf/spark/blob/42b6c1fb/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala index 04f724e..f50c25e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala @@ -39,13 +39,17 @@ import org.apache.spark.sql.types._ object JSONBenchmark extends SqlBasedBenchmark { import spark.implicits._ - def schemaInferring(rowsNum: Int): Unit = { + def prepareDataInfo(benchmark: Benchmark): Unit = { + // scalastyle:off println + benchmark.out.println("Preparing data for benchmarking ...") + // scalastyle:on println + } + + def schemaInferring(rowsNum: Int, numIters: Int): Unit = { val benchmark = new Benchmark("JSON schema inferring", rowsNum, output = output) withTempPath { path => - // scalastyle:off println - benchmark.out.println("Preparing data for benchmarking ...") - // scalastyle:on println + prepareDataInfo(benchmark) spark.sparkContext.range(0, rowsNum, 1) .map(_ => "a") @@ -54,11 +58,11 @@ object JSONBenchmark extends SqlBasedBenchmark { .option("encoding", "UTF-8") .json(path.getAbsolutePath) - benchmark.addCase("No encoding", 3) { _ => + benchmark.addCase("No encoding", numIters) { _ => spark.read.json(path.getAbsolutePath) } - benchmark.addCase("UTF-8 is set", 3) { _ => + benchmark.addCase("UTF-8 is set", numIters) { _ => spark.read .option("encoding", "UTF-8") .json(path.getAbsolutePath) @@ -68,28 +72,29 @@ object JSONBenchmark extends SqlBasedBenchmark { } } - def perlineParsing(rowsNum: Int): Unit = { - val benchmark = new Benchmark("JSON per-line parsing", rowsNum, output = output) + def writeShortColumn(path: String, rowsNum: Int): StructType = { + spark.sparkContext.range(0, rowsNum, 1) + .map(_ => "a") + .toDF("fieldA") + .write.json(path) + new StructType().add("fieldA", StringType) + } - withTempPath { path => - // scalastyle:off println - benchmark.out.println("Preparing data for benchmarking ...") - // scalastyle:on println + def countShortColumn(rowsNum: Int, numIters: Int): Unit = { + val benchmark = new Benchmark("count a short column", rowsNum, output = output) - spark.sparkContext.range(0, rowsNum, 1) - .map(_ => "a") - .toDF("fieldA") - .write.json(path.getAbsolutePath) - val schema = new StructType().add("fieldA", StringType) + withTempPath { path => + prepareDataInfo(benchmark) + val schema = writeShortColumn(path.getAbsolutePath, rowsNum) - benchmark.addCase("No encoding", 3) { _ => + benchmark.addCase("No encoding", numIters) { _ => spark.read .schema(schema) .json(path.getAbsolutePath) .count() } - benchmark.addCase("UTF-8 is set", 3) { _ => + benchmark.addCase("UTF-8 is set", numIters) { _ => spark.read .option("encoding", "UTF-8") .schema(schema) @@ -101,35 +106,36 @@ object JSONBenchmark extends SqlBasedBenchmark { } } - def perlineParsingOfWideColumn(rowsNum: Int): Unit = { - val benchmark = new Benchmark("JSON parsing of wide lines", rowsNum, output = output) + def writeWideColumn(path: String, rowsNum: Int): StructType = { + spark.sparkContext.range(0, rowsNum, 1) + .map { i => + val s = "abcdef0123456789ABCDEF" * 20 + s"""{"a":"$s","b": $i,"c":"$s","d":$i,"e":"$s","f":$i,"x":"$s","y":$i,"z":"$s"}""" + } + .toDF().write.text(path) + new StructType() + .add("a", StringType).add("b", LongType) + .add("c", StringType).add("d", LongType) + .add("e", StringType).add("f", LongType) + .add("x", StringType).add("y", LongType) + .add("z", StringType) + } + + def countWideColumn(rowsNum: Int, numIters: Int): Unit = { + val benchmark = new Benchmark("count a wide column", rowsNum, output = output) withTempPath { path => - // scalastyle:off println - benchmark.out.println("Preparing data for benchmarking ...") - // scalastyle:on println + prepareDataInfo(benchmark) + val schema = writeWideColumn(path.getAbsolutePath, rowsNum) - spark.sparkContext.range(0, rowsNum, 1) - .map { i => - val s = "abcdef0123456789ABCDEF" * 20 - s"""{"a":"$s","b": $i,"c":"$s","d":$i,"e":"$s","f":$i,"x":"$s","y":$i,"z":"$s"}""" - } - .toDF().write.text(path.getAbsolutePath) - val schema = new StructType() - .add("a", StringType).add("b", LongType) - .add("c", StringType).add("d", LongType) - .add("e", StringType).add("f", LongType) - .add("x", StringType).add("y", LongType) - .add("z", StringType) - - benchmark.addCase("No encoding", 3) { _ => + benchmark.addCase("No encoding", numIters) { _ => spark.read .schema(schema) .json(path.getAbsolutePath) .count() } - benchmark.addCase("UTF-8 is set", 3) { _ => + benchmark.addCase("UTF-8 is set", numIters) { _ => spark.read .option("encoding", "UTF-8") .schema(schema) @@ -141,12 +147,14 @@ object JSONBenchmark extends SqlBasedBenchmark { } } - def countBenchmark(rowsNum: Int): Unit = { + def selectSubsetOfColumns(rowsNum: Int, numIters: Int): Unit = { val colsNum = 10 val benchmark = - new Benchmark(s"Count a dataset with $colsNum columns", rowsNum, output = output) + new Benchmark(s"Select a subset of $colsNum columns", rowsNum, output = output) withTempPath { path => + prepareDataInfo(benchmark) + val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", IntegerType)) val schema = StructType(fields) val columnNames = schema.fieldNames @@ -158,13 +166,13 @@ object JSONBenchmark extends SqlBasedBenchmark { val ds = spark.read.schema(schema).json(path.getAbsolutePath) - benchmark.addCase(s"Select $colsNum columns + count()", 3) { _ => + benchmark.addCase(s"Select $colsNum columns + count()", numIters) { _ => ds.select("*").filter((_: Row) => true).count() } - benchmark.addCase(s"Select 1 column + count()", 3) { _ => + benchmark.addCase(s"Select 1 column + count()", numIters) { _ => ds.select($"col1").filter((_: Row) => true).count() } - benchmark.addCase(s"count()", 3) { _ => + benchmark.addCase(s"count()", numIters) { _ => ds.count() } @@ -172,12 +180,64 @@ object JSONBenchmark extends SqlBasedBenchmark { } } + def jsonParserCreation(rowsNum: Int, numIters: Int): Unit = { + val benchmark = new Benchmark("creation of JSON parser per line", rowsNum, output = output) + + withTempPath { path => + prepareDataInfo(benchmark) + + val shortColumnPath = path.getAbsolutePath + "/short" + val shortSchema = writeShortColumn(shortColumnPath, rowsNum) + + val wideColumnPath = path.getAbsolutePath + "/wide" + val wideSchema = writeWideColumn(wideColumnPath, rowsNum) + + benchmark.addCase("Short column without encoding", numIters) { _ => + spark.read + .schema(shortSchema) + .json(shortColumnPath) + .filter((_: Row) => true) + .count() + } + + benchmark.addCase("Short column with UTF-8", numIters) { _ => + spark.read + .option("encoding", "UTF-8") + .schema(shortSchema) + .json(shortColumnPath) + .filter((_: Row) => true) + .count() + } + + benchmark.addCase("Wide column without encoding", numIters) { _ => + spark.read + .schema(wideSchema) + .json(wideColumnPath) + .filter((_: Row) => true) + .count() + } + + benchmark.addCase("Wide column with UTF-8", numIters) { _ => + spark.read + .option("encoding", "UTF-8") + .schema(wideSchema) + .json(wideColumnPath) + .filter((_: Row) => true) + .count() + } + + benchmark.run() + } + } + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + val numIters = 3 runBenchmark("Benchmark for performance of JSON parsing") { - schemaInferring(100 * 1000 * 1000) - perlineParsing(100 * 1000 * 1000) - perlineParsingOfWideColumn(10 * 1000 * 1000) - countBenchmark(10 * 1000 * 1000) + schemaInferring(100 * 1000 * 1000, numIters) + countShortColumn(100 * 1000 * 1000, numIters) + countWideColumn(10 * 1000 * 1000, numIters) + selectSubsetOfColumns(10 * 1000 * 1000, numIters) + jsonParserCreation(10 * 1000 * 1000, numIters) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
