This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a85070baae74 [SPARK-46256][CORE] Parallel Compression Support for ZSTD
a85070baae74 is described below
commit a85070baae749596ef1645cb3feaf13427116020
Author: Kent Yao <[email protected]>
AuthorDate: Tue Dec 5 19:15:56 2023 +0800
[SPARK-46256][CORE] Parallel Compression Support for ZSTD
### What changes were proposed in this pull request?
This PR adds a new configuration named `spark.io.compression.zstd.workers`,
which enables Parallel Compression Support for ZSTD.
### Why are the changes needed?
Sufficient performance improvement.
```
[info] OpenJDK 64-Bit Server VM 17.0.9+0 on Mac OS X 14.1.2
[info] Apple M2 Max
[info] Parallel Compression at level 9: Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
[info]
------------------------------------------------------------------------------------------------------------------------
[info] Parallel Compression with 0 workers 883
885 3 0.0 6896156.6 1.0X
[info] Parallel Compression with 1 workers 785
789 3 0.0 6136529.9 1.1X
[info] Parallel Compression with 2 workers 428
431 3 0.0 3346625.6 2.1X
[info] Parallel Compression with 4 workers 273
275 1 0.0 2131454.4 3.2X
[info] Parallel Compression with 8 workers 215
218 2 0.0 1681013.7 4.1X
[info] Parallel Compression with 16 workers 222
226 4 0.0 1733805.3 4.0X
```
### Does this PR introduce _any_ user-facing change?
Yes, we introduced a new configuration for this new feature.
### How was this patch tested?
It's done by new benchmarks added in ZStandardCompressionCodec.
https://github.com/yaooqinn/spark/actions/runs/7095872839
https://github.com/yaooqinn/spark/actions/runs/7095873975
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #44172 from yaooqinn/SPARK-46256.
Authored-by: Kent Yao <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
.../ZStandardBenchmark-jdk21-results.txt | 46 ++++++++++++++++------
core/benchmarks/ZStandardBenchmark-results.txt | 46 ++++++++++++++++------
.../org/apache/spark/internal/config/package.scala | 11 ++++++
.../org/apache/spark/io/CompressionCodec.scala | 8 +++-
.../org/apache/spark/io/ZStandardBenchmark.scala | 30 +++++++++++++-
docs/configuration.md | 11 ++++++
6 files changed, 124 insertions(+), 28 deletions(-)
diff --git a/core/benchmarks/ZStandardBenchmark-jdk21-results.txt
b/core/benchmarks/ZStandardBenchmark-jdk21-results.txt
index 9f06fffc5088..f5383af55c3a 100644
--- a/core/benchmarks/ZStandardBenchmark-jdk21-results.txt
+++ b/core/benchmarks/ZStandardBenchmark-jdk21-results.txt
@@ -6,22 +6,44 @@ OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux
5.15.0-1051-azure
AMD EPYC 7763 64-Core Processor
Benchmark ZStandardCompressionCodec: Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
--------------------------------------------------------------------------------------------------------------------------------------
-Compression 10000 times at level 1 without buffer pool 675
925 281 0.0 67464.0 1.0X
-Compression 10000 times at level 2 without buffer pool 901
902 3 0.0 90079.7 0.7X
-Compression 10000 times at level 3 without buffer pool 994
998 6 0.0 99413.5 0.7X
-Compression 10000 times at level 1 with buffer pool 813
813 0 0.0 81311.2 0.8X
-Compression 10000 times at level 2 with buffer pool 873
874 0 0.0 87335.4 0.8X
-Compression 10000 times at level 3 with buffer pool 987
987 1 0.0 98671.2 0.7X
+Compression 10000 times at level 1 without buffer pool 672
681 10 0.0 67171.0 1.0X
+Compression 10000 times at level 2 without buffer pool 715
718 4 0.0 71458.8 0.9X
+Compression 10000 times at level 3 without buffer pool 831
835 4 0.0 83139.1 0.8X
+Compression 10000 times at level 1 with buffer pool 609
611 2 0.0 60881.5 1.1X
+Compression 10000 times at level 2 with buffer pool 648
649 1 0.0 64791.0 1.0X
+Compression 10000 times at level 3 with buffer pool 744
751 6 0.0 74392.4 0.9X
OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1051-azure
AMD EPYC 7763 64-Core Processor
Benchmark ZStandardCompressionCodec: Best Time(ms)
Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------------------------
-Decompression 10000 times from level 1 without buffer pool 826
829 3 0.0 82582.8 1.0X
-Decompression 10000 times from level 2 without buffer pool 827
829 2 0.0 82728.9 1.0X
-Decompression 10000 times from level 3 without buffer pool 821
823 2 0.0 82099.0 1.0X
-Decompression 10000 times from level 1 with buffer pool 755
755 1 0.0 75470.0 1.1X
-Decompression 10000 times from level 2 with buffer pool 756
758 3 0.0 75579.0 1.1X
-Decompression 10000 times from level 3 with buffer pool 755
755 0 0.0 75526.6 1.1X
+Decompression 10000 times from level 1 without buffer pool 842
849 12 0.0 84240.0 1.0X
+Decompression 10000 times from level 2 without buffer pool 842
846 6 0.0 84185.2 1.0X
+Decompression 10000 times from level 3 without buffer pool 843
844 1 0.0 84285.4 1.0X
+Decompression 10000 times from level 1 with buffer pool 770
771 1 0.0 77024.9 1.1X
+Decompression 10000 times from level 2 with buffer pool 771
771 0 0.0 77120.4 1.1X
+Decompression 10000 times from level 3 with buffer pool 770
771 0 0.0 77031.9 1.1X
+
+OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1051-azure
+AMD EPYC 7763 64-Core Processor
+Parallel Compression at level 3: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------
+Parallel Compression with 0 workers 48 50
3 0.0 376597.0 1.0X
+Parallel Compression with 1 workers 41 42
3 0.0 318927.3 1.2X
+Parallel Compression with 2 workers 38 40
2 0.0 297410.2 1.3X
+Parallel Compression with 4 workers 37 39
1 0.0 287605.8 1.3X
+Parallel Compression with 8 workers 39 40
1 0.0 301948.1 1.2X
+Parallel Compression with 16 workers 41 43
1 0.0 317095.6 1.2X
+
+OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1051-azure
+AMD EPYC 7763 64-Core Processor
+Parallel Compression at level 9: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------
+Parallel Compression with 0 workers 174 175
1 0.0 1360596.3 1.0X
+Parallel Compression with 1 workers 189 228
24 0.0 1477060.7 0.9X
+Parallel Compression with 2 workers 109 118
15 0.0 851455.9 1.6X
+Parallel Compression with 4 workers 114 118
3 0.0 891964.9 1.5X
+Parallel Compression with 8 workers 115 122
4 0.0 899748.7 1.5X
+Parallel Compression with 16 workers 119 123
2 0.0 931210.7 1.5X
diff --git a/core/benchmarks/ZStandardBenchmark-results.txt
b/core/benchmarks/ZStandardBenchmark-results.txt
index 2c5e894ca782..64375b7a379a 100644
--- a/core/benchmarks/ZStandardBenchmark-results.txt
+++ b/core/benchmarks/ZStandardBenchmark-results.txt
@@ -6,22 +6,44 @@ OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux
5.15.0-1051-azure
AMD EPYC 7763 64-Core Processor
Benchmark ZStandardCompressionCodec: Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
--------------------------------------------------------------------------------------------------------------------------------------
-Compression 10000 times at level 1 without buffer pool 657
659 2 0.0 65716.9 1.0X
-Compression 10000 times at level 2 without buffer pool 706
707 1 0.0 70617.1 0.9X
-Compression 10000 times at level 3 without buffer pool 788
788 0 0.0 78755.6 0.8X
-Compression 10000 times at level 1 with buffer pool 585
586 1 0.0 58455.1 1.1X
-Compression 10000 times at level 2 with buffer pool 614
616 1 0.0 61437.2 1.1X
-Compression 10000 times at level 3 with buffer pool 717
717 0 0.0 71705.1 0.9X
+Compression 10000 times at level 1 without buffer pool 666
669 3 0.0 66598.6 1.0X
+Compression 10000 times at level 2 without buffer pool 711
711 1 0.0 71077.5 0.9X
+Compression 10000 times at level 3 without buffer pool 816
816 0 0.0 81575.8 0.8X
+Compression 10000 times at level 1 with buffer pool 591
592 1 0.0 59095.6 1.1X
+Compression 10000 times at level 2 with buffer pool 630
632 1 0.0 62995.1 1.1X
+Compression 10000 times at level 3 with buffer pool 742
742 0 0.0 74180.7 0.9X
OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1051-azure
AMD EPYC 7763 64-Core Processor
Benchmark ZStandardCompressionCodec: Best Time(ms)
Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------------------------
-Decompression 10000 times from level 1 without buffer pool 595
596 0 0.0 59535.3 1.0X
-Decompression 10000 times from level 2 without buffer pool 594
595 1 0.0 59432.7 1.0X
-Decompression 10000 times from level 3 without buffer pool 595
600 10 0.0 59485.8 1.0X
-Decompression 10000 times from level 1 with buffer pool 549
549 1 0.0 54859.6 1.1X
-Decompression 10000 times from level 2 with buffer pool 549
550 0 0.0 54948.0 1.1X
-Decompression 10000 times from level 3 with buffer pool 549
549 0 0.0 54917.9 1.1X
+Decompression 10000 times from level 1 without buffer pool 600
602 1 0.0 60024.1 1.0X
+Decompression 10000 times from level 2 without buffer pool 600
603 2 0.0 59973.0 1.0X
+Decompression 10000 times from level 3 without buffer pool 601
602 1 0.0 60075.9 1.0X
+Decompression 10000 times from level 1 with buffer pool 553
553 0 0.0 55316.4 1.1X
+Decompression 10000 times from level 2 with buffer pool 553
554 1 0.0 55271.5 1.1X
+Decompression 10000 times from level 3 with buffer pool 553
553 0 0.0 55261.4 1.1X
+
+OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1051-azure
+AMD EPYC 7763 64-Core Processor
+Parallel Compression at level 3: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------
+Parallel Compression with 0 workers 49 50
1 0.0 380070.1 1.0X
+Parallel Compression with 1 workers 41 42
4 0.0 319807.1 1.2X
+Parallel Compression with 2 workers 38 41
2 0.0 297706.4 1.3X
+Parallel Compression with 4 workers 38 40
2 0.0 296505.8 1.3X
+Parallel Compression with 8 workers 39 41
1 0.0 305793.6 1.2X
+Parallel Compression with 16 workers 43 44
1 0.0 332233.1 1.1X
+
+OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1051-azure
+AMD EPYC 7763 64-Core Processor
+Parallel Compression at level 9: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------
+Parallel Compression with 0 workers 175 175
0 0.0 1363800.8 1.0X
+Parallel Compression with 1 workers 186 187
3 0.0 1455096.4 0.9X
+Parallel Compression with 2 workers 110 115
6 0.0 863272.6 1.6X
+Parallel Compression with 4 workers 104 108
2 0.0 810721.1 1.7X
+Parallel Compression with 8 workers 110 112
2 0.0 859303.5 1.6X
+Parallel Compression with 16 workers 109 112
2 0.0 847863.6 1.6X
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index e7e69dcb01ea..bce7ee70c7b2 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -1910,6 +1910,17 @@ package object config {
.booleanConf
.createWithDefault(true)
+ private[spark] val IO_COMPRESSION_ZSTD_WORKERS =
+ ConfigBuilder("spark.io.compression.zstd.workers")
+ .doc("Thread size spawned to compress in parallel when using Zstd. When
the value is 0, " +
+ "no worker is spawned, it works in single-threaded mode. When value >
0, it triggers " +
+ "asynchronous mode, corresponding number of threads are spawned. More
workers improve " +
+ "performance, but also increase memory cost.")
+ .version("4.0.0")
+ .intConf
+ .checkValue(_ >= 0, "The number of workers must not be negative.")
+ .createWithDefault(0)
+
private[spark] val IO_COMPRESSION_ZSTD_LEVEL =
ConfigBuilder("spark.io.compression.zstd.level")
.doc("Compression level for Zstd compression codec. Increasing the
compression " +
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index a6a5b1f67c6f..cc96437a1d21 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -233,10 +233,12 @@ class ZStdCompressionCodec(conf: SparkConf) extends
CompressionCodec {
NoPool.INSTANCE
}
+ private val workers = conf.get(IO_COMPRESSION_ZSTD_WORKERS)
+
override def compressedOutputStream(s: OutputStream): OutputStream = {
// Wrap the zstd output stream in a buffered output stream, so that we can
// avoid overhead excessive of JNI call while trying to compress small
amount of data.
- val os = new ZstdOutputStreamNoFinalizer(s, bufferPool).setLevel(level)
+ val os = new ZstdOutputStreamNoFinalizer(s,
bufferPool).setLevel(level).setWorkers(workers)
new BufferedOutputStream(os, bufferSize)
}
@@ -244,7 +246,9 @@ class ZStdCompressionCodec(conf: SparkConf) extends
CompressionCodec {
// SPARK-29322: Set "closeFrameOnFlush" to 'true' to let continuous input
stream not being
// stuck on reading open frame.
val os = new ZstdOutputStreamNoFinalizer(s, bufferPool)
- .setLevel(level).setCloseFrameOnFlush(true)
+ .setLevel(level)
+ .setWorkers(workers)
+ .setCloseFrameOnFlush(true)
new BufferedOutputStream(os, bufferSize)
}
diff --git a/core/src/test/scala/org/apache/spark/io/ZStandardBenchmark.scala
b/core/src/test/scala/org/apache/spark/io/ZStandardBenchmark.scala
index e23416177aef..e5b7bb927831 100644
--- a/core/src/test/scala/org/apache/spark/io/ZStandardBenchmark.scala
+++ b/core/src/test/scala/org/apache/spark/io/ZStandardBenchmark.scala
@@ -17,11 +17,11 @@
package org.apache.spark.io
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream,
ObjectOutputStream}
import org.apache.spark.SparkConf
import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
-import
org.apache.spark.internal.config.{IO_COMPRESSION_ZSTD_BUFFERPOOL_ENABLED,
IO_COMPRESSION_ZSTD_BUFFERSIZE, IO_COMPRESSION_ZSTD_LEVEL}
+import
org.apache.spark.internal.config.{IO_COMPRESSION_ZSTD_BUFFERPOOL_ENABLED,
IO_COMPRESSION_ZSTD_BUFFERSIZE, IO_COMPRESSION_ZSTD_LEVEL,
IO_COMPRESSION_ZSTD_WORKERS}
/**
@@ -49,6 +49,7 @@ object ZStandardBenchmark extends BenchmarkBase {
val benchmark2 = new Benchmark(name, N, output = output)
decompressionBenchmark(benchmark2, N)
benchmark2.run()
+ parallelCompressionBenchmark()
}
}
@@ -101,4 +102,29 @@ object ZStandardBenchmark extends BenchmarkBase {
}
}
}
+
+ private def parallelCompressionBenchmark(): Unit = {
+ val numberOfLargeObjectToWrite = 128
+ val data: Array[Byte] = (1 until 256 * 1024 * 1024).map(_.toByte).toArray
+
+ Seq(3, 9).foreach { level =>
+ val benchmark = new Benchmark(
+ s"Parallel Compression at level $level", numberOfLargeObjectToWrite,
output = output)
+ Seq(0, 1, 2, 4, 8, 16).foreach { workers =>
+ val conf = new SparkConf(false)
+ .set(IO_COMPRESSION_ZSTD_LEVEL, level)
+ .set(IO_COMPRESSION_ZSTD_WORKERS, workers)
+ benchmark.addCase(s"Parallel Compression with $workers workers") { _ =>
+ val baos = new ByteArrayOutputStream()
+ val zcos = new
ZStdCompressionCodec(conf).compressedOutputStream(baos)
+ val oos = new ObjectOutputStream(zcos)
+ 1 to numberOfLargeObjectToWrite foreach { _ =>
+ oos.writeObject(data)
+ }
+ oos.close()
+ }
+ }
+ benchmark.run()
+ }
+ }
}
diff --git a/docs/configuration.md b/docs/configuration.md
index b13250a7786e..2ad07cf59f71 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1760,6 +1760,17 @@ Apart from these, the following properties are also
available, and may be useful
</td>
<td>2.3.0</td>
</tr>
+<tr>
+ <td><code>spark.io.compression.zstd.workers</code></td>
+ <td>0</td>
+ <td>
+ Thread size spawned to compress in parallel when using Zstd. When value is 0
+ no worker is spawned, it works in single-threaded mode. When value > 0, it
triggers
+ asynchronous mode, corresponding number of threads are spawned. More
workers improve
+ performance, but also increase memory cost.
+ </td>
+ <td>4.0.0</td>
+</tr>
<tr>
<td><code>spark.kryo.classesToRegister</code></td>
<td>(none)</td>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]