This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8ab69992584a [SPARK-46772][SQL][TESTS] Benchmarking Avro with
Compression Codecs
8ab69992584a is described below
commit 8ab69992584aa68e882c4a4aa4863049e6a58e7e
Author: Kent Yao <[email protected]>
AuthorDate: Tue Jan 23 08:06:21 2024 -0800
[SPARK-46772][SQL][TESTS] Benchmarking Avro with Compression Codecs
### What changes were proposed in this pull request?
This PR improves AvroWriteBenchmark by adding benchmarks with codec and
their extra functionalities.
- Avro compression with different codec
- Avro deflate/xz/zstandard with different levels
- buffer pool if zstandard
### Why are the changes needed?
performance observation.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
connector/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroWriteBenchmark.scala
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #44849 from yaooqinn/SPARK-46772.
Authored-by: Kent Yao <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../AvroWriteBenchmark-jdk21-results.txt | 58 ++++++++++++++++++----
.../avro/benchmarks/AvroWriteBenchmark-results.txt | 58 ++++++++++++++++++----
.../execution/benchmark/AvroWriteBenchmark.scala | 52 ++++++++++++++++---
3 files changed, 143 insertions(+), 25 deletions(-)
diff --git a/connector/avro/benchmarks/AvroWriteBenchmark-jdk21-results.txt
b/connector/avro/benchmarks/AvroWriteBenchmark-jdk21-results.txt
index f3e1dfa39829..86c6b6647f2f 100644
--- a/connector/avro/benchmarks/AvroWriteBenchmark-jdk21-results.txt
+++ b/connector/avro/benchmarks/AvroWriteBenchmark-jdk21-results.txt
@@ -1,16 +1,56 @@
-OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1053-azure
+OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 5.15.0-1053-azure
AMD EPYC 7763 64-Core Processor
Avro writer benchmark: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Output Single Int Column 1389 1404
21 11.3 88.3 1.0X
-Output Single Double Column 1522 1523
1 10.3 96.8 0.9X
-Output Int and String Column 3398 3400
3 4.6 216.0 0.4X
-Output Partitions 2855 2874
27 5.5 181.5 0.5X
-Output Buckets 3857 3903
66 4.1 245.2 0.4X
+Output Single Int Column 1433 1505
101 11.0 91.1 1.0X
+Output Single Double Column 1467 1487
28 10.7 93.3 1.0X
+Output Int and String Column 3187 3203
23 4.9 202.6 0.4X
+Output Partitions 2759 2796
52 5.7 175.4 0.5X
+Output Buckets 3760 3767
9 4.2 239.1 0.4X
-OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1053-azure
+OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 5.15.0-1053-azure
AMD EPYC 7763 64-Core Processor
-Write wide rows into 20 files: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+Avro compression with different codec: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Write wide rows 22729 22774
63 0.0 45458.0 1.0X
+BZIP2: 116001 116248
349 0.0 1160008.1 1.0X
+DEFLATE: 6867 6870
4 0.0 68672.5 16.9X
+UNCOMPRESSED: 5339 5354
21 0.0 53388.4 21.7X
+SNAPPY: 5077 5096
28 0.0 50769.3 22.8X
+XZ: 61387 61501
161 0.0 613871.9 1.9X
+ZSTANDARD: 5333 5349
23 0.0 53331.0 21.8X
+
+OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 5.15.0-1053-azure
+AMD EPYC 7763 64-Core Processor
+Avro deflate with different levels: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------
+DEFLATE: deflate.level=1 5247 5287
58 0.0 52466.1 1.0X
+DEFLATE: deflate.level=3 5248 5252
5 0.0 52481.3 1.0X
+DEFLATE: deflate.level=5 6849 6856
10 0.0 68487.1 0.8X
+DEFLATE: deflate.level=7 6792 6826
48 0.0 67917.8 0.8X
+DEFLATE: deflate.level=9 7112 7140
39 0.0 71119.2 0.7X
+
+OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 5.15.0-1053-azure
+AMD EPYC 7763 64-Core Processor
+Avro xz with different levels: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------
+XZ: xz.level=1 12727 12744
25 0.0 127267.4 1.0X
+XZ: xz.level=3 23046 23197
214 0.0 230463.9 0.6X
+XZ: xz.level=5 48373 48750
534 0.0 483725.9 0.3X
+XZ: xz.level=7 69288 69342
76 0.0 692879.0 0.2X
+XZ: xz.level=9 148517 148563
65 0.0 1485173.4 0.1X
+
+OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 5.15.0-1053-azure
+AMD EPYC 7763 64-Core Processor
+Avro zstandard with different levels: Best Time(ms)
Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
+-----------------------------------------------------------------------------------------------------------------------------------------------
+ZSTANDARD: zstandard.level=1 4771
4775 6 0.0 47705.1 1.0X
+ZSTANDARD: zstandard.level=1, zstandard.bufferPool.enabled=true 4772
4826 76 0.0 47722.9 1.0X
+ZSTANDARD: zstandard.level=3 4811
4836 36 0.0 48109.7 1.0X
+ZSTANDARD: zstandard.level=3, zstandard.bufferPool.enabled=true 4702
4727 35 0.0 47016.3 1.0X
+ZSTANDARD: zstandard.level=5 5048
5092 62 0.0 50475.8 0.9X
+ZSTANDARD: zstandard.level=5, zstandard.bufferPool.enabled=true 4874
4896 31 0.0 48735.6 1.0X
+ZSTANDARD: zstandard.level=7 5353
5360 10 0.0 53527.3 0.9X
+ZSTANDARD: zstandard.level=7, zstandard.bufferPool.enabled=true 5288
5367 112 0.0 52882.8 0.9X
+ZSTANDARD: zstandard.level=9 6048
6136 124 0.0 60481.1 0.8X
+ZSTANDARD: zstandard.level=9, zstandard.bufferPool.enabled=true 6212
6225 18 0.0 62122.6 0.8X
diff --git a/connector/avro/benchmarks/AvroWriteBenchmark-results.txt
b/connector/avro/benchmarks/AvroWriteBenchmark-results.txt
index fef427d28379..c6cdb9338427 100644
--- a/connector/avro/benchmarks/AvroWriteBenchmark-results.txt
+++ b/connector/avro/benchmarks/AvroWriteBenchmark-results.txt
@@ -1,16 +1,56 @@
-OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1053-azure
+OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 5.15.0-1053-azure
AMD EPYC 7763 64-Core Processor
Avro writer benchmark: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Output Single Int Column 1350 1437
124 11.7 85.8 1.0X
-Output Single Double Column 1578 1588
13 10.0 100.3 0.9X
-Output Int and String Column 3090 3130
56 5.1 196.5 0.4X
-Output Partitions 2814 2884
99 5.6 178.9 0.5X
-Output Buckets 3642 3667
35 4.3 231.5 0.4X
+Output Single Int Column 1412 1423
17 11.1 89.7 1.0X
+Output Single Double Column 1598 1598
0 9.8 101.6 0.9X
+Output Int and String Column 3115 3118
4 5.0 198.0 0.5X
+Output Partitions 3096 3124
39 5.1 196.9 0.5X
+Output Buckets 3684 3718
48 4.3 234.2 0.4X
-OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1053-azure
+OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 5.15.0-1053-azure
AMD EPYC 7763 64-Core Processor
-Write wide rows into 20 files: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+Avro compression with different codec: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Write wide rows 22719 22778
83 0.0 45438.9 1.0X
+BZIP2: 132270 133173
1277 0.0 1322703.8 1.0X
+DEFLATE: 6540 6554
19 0.0 65404.2 20.2X
+UNCOMPRESSED: 5116 5300
260 0.0 51159.4 25.9X
+SNAPPY: 4775 4794
26 0.0 47752.6 27.7X
+XZ: 54274 54290
22 0.0 542742.9 2.4X
+ZSTANDARD: 4877 4880
5 0.0 48769.5 27.1X
+
+OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 5.15.0-1053-azure
+AMD EPYC 7763 64-Core Processor
+Avro deflate with different levels: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------
+DEFLATE: deflate.level=1 4910 4939
41 0.0 49101.4 1.0X
+DEFLATE: deflate.level=3 4853 4866
18 0.0 48534.7 1.0X
+DEFLATE: deflate.level=5 6414 6435
29 0.0 64144.4 0.8X
+DEFLATE: deflate.level=7 6485 6494
13 0.0 64851.4 0.8X
+DEFLATE: deflate.level=9 6835 6853
25 0.0 68351.7 0.7X
+
+OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 5.15.0-1053-azure
+AMD EPYC 7763 64-Core Processor
+Avro xz with different levels: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------
+XZ: xz.level=1 12175 12235
84 0.0 121752.9 1.0X
+XZ: xz.level=3 22352 22387
50 0.0 223518.9 0.5X
+XZ: xz.level=5 48169 48403
332 0.0 481688.2 0.3X
+XZ: xz.level=7 71237 71484
349 0.0 712371.0 0.2X
+XZ: xz.level=9 152722 155057
3302 0.0 1527215.6 0.1X
+
+OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 5.15.0-1053-azure
+AMD EPYC 7763 64-Core Processor
+Avro zstandard with different levels: Best Time(ms)
Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
+-----------------------------------------------------------------------------------------------------------------------------------------------
+ZSTANDARD: zstandard.level=1 5015
5018 4 0.0 50150.1 1.0X
+ZSTANDARD: zstandard.level=1, zstandard.bufferPool.enabled=true 5015
5050 49 0.0 50154.4 1.0X
+ZSTANDARD: zstandard.level=3 5208
5219 16 0.0 52077.4 1.0X
+ZSTANDARD: zstandard.level=3, zstandard.bufferPool.enabled=true 5072
5077 8 0.0 50720.9 1.0X
+ZSTANDARD: zstandard.level=5 5339
5342 4 0.0 53391.7 0.9X
+ZSTANDARD: zstandard.level=5, zstandard.bufferPool.enabled=true 5323
5369 65 0.0 53231.9 0.9X
+ZSTANDARD: zstandard.level=7 5852
5881 41 0.0 58518.8 0.9X
+ZSTANDARD: zstandard.level=7, zstandard.bufferPool.enabled=true 5631
5657 37 0.0 56310.2 0.9X
+ZSTANDARD: zstandard.level=9 6687
6729 59 0.0 66871.4 0.7X
+ZSTANDARD: zstandard.level=9, zstandard.bufferPool.enabled=true 6594
6596 3 0.0 65941.4 0.8X
diff --git
a/connector/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroWriteBenchmark.scala
b/connector/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroWriteBenchmark.scala
index e61ac43ae996..465949097d9b 100644
---
a/connector/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroWriteBenchmark.scala
+++
b/connector/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroWriteBenchmark.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.benchmark
import scala.util.Random
import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.sql.avro.AvroCompressionCodec
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.storage.StorageLevel
/**
@@ -42,8 +44,8 @@ object AvroWriteBenchmark extends DataSourceWriteBenchmark {
withTempPath { dir =>
withTempTable("t1") {
val width = 1000
- val values = 500000
- val files = 20
+ val values = 100000
+ val files = 12
val selectExpr = (1 to width).map(i => s"value as c$i")
// repartition to ensure we will write multiple files
val df = spark.range(values)
@@ -52,12 +54,48 @@ object AvroWriteBenchmark extends DataSourceWriteBenchmark {
// cache the data to ensure we are not benchmarking range or
repartition
df.noop()
df.createOrReplaceTempView("t1")
- val benchmark = new Benchmark(s"Write wide rows into $files files",
values, output = output)
- benchmark.addCase("Write wide rows") { _ =>
- spark.sql("SELECT * FROM t1").
-
write.format("avro").save(s"${dir.getCanonicalPath}/${Random.nextLong().abs}")
+
+ def addBenchmark(
+ benchmark: Benchmark,
+ codec: String,
+ conf: Map[String, String] = Map.empty): Unit = {
+ val name = conf.map(kv => kv._1.stripPrefix("spark.sql.avro.") + "="
+ kv._2)
+ .mkString(codec + ": ", ", ", "")
+ benchmark.addCase(name) { _ =>
+ withSQLConf(conf.toSeq: _*) {
+ spark
+ .table("t1")
+ .write
+ .option("compression", codec)
+ .format("avro")
+ .save(s"${dir.getCanonicalPath}/${Random.nextLong().abs}")
+ }
+ }
+ }
+
+ val bm = new Benchmark(s"Avro compression with different codec",
values, output = output)
+ AvroCompressionCodec.values().sortBy(_.getCodecName).foreach { codec =>
+ addBenchmark(bm, codec.name)
+ }
+ bm.run()
+
+
AvroCompressionCodec.values().filter(_.getSupportCompressionLevel).foreach {
codec =>
+ val bm = new Benchmark(
+ s"Avro ${codec.getCodecName} with different levels", values,
output = output)
+ Seq(1, 3, 5, 7, 9).foreach { level =>
+ val conf = Map(s"spark.sql.avro.${codec.getCodecName}.level" ->
level.toString)
+ addBenchmark(bm, codec.name, conf)
+ if (codec == AvroCompressionCodec.ZSTANDARD) {
+ val nondft =
+
!spark.sessionState.conf.getConf(SQLConf.AVRO_ZSTANDARD_BUFFER_POOL_ENABLED)
+ addBenchmark(
+ bm,
+ codec.name,
+ conf + (SQLConf.AVRO_ZSTANDARD_BUFFER_POOL_ENABLED.key ->
nondft.toString))
+ }
+ }
+ bm.run()
}
- benchmark.run()
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]