This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new a85070baae74 [SPARK-46256][CORE] Parallel Compression Support for ZSTD a85070baae74 is described below commit a85070baae749596ef1645cb3feaf13427116020 Author: Kent Yao <y...@apache.org> AuthorDate: Tue Dec 5 19:15:56 2023 +0800 [SPARK-46256][CORE] Parallel Compression Support for ZSTD ### What changes were proposed in this pull request? This PR adds a new configuration named `spark.io.compression.zstd.workers`, which enables Parallel Compression Support for ZSTD. ### Why are the changes needed? Sufficient performance improvement. ``` [info] OpenJDK 64-Bit Server VM 17.0.9+0 on Mac OS X 14.1.2 [info] Apple M2 Max [info] Parallel Compression at level 9: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] Parallel Compression with 0 workers 883 885 3 0.0 6896156.6 1.0X [info] Parallel Compression with 1 workers 785 789 3 0.0 6136529.9 1.1X [info] Parallel Compression with 2 workers 428 431 3 0.0 3346625.6 2.1X [info] Parallel Compression with 4 workers 273 275 1 0.0 2131454.4 3.2X [info] Parallel Compression with 8 workers 215 218 2 0.0 1681013.7 4.1X [info] Parallel Compression with 16 workers 222 226 4 0.0 1733805.3 4.0X ``` ### Does this PR introduce _any_ user-facing change? Yes, we introduced a new configuration for this new feature. ### How was this patch tested? It's done by new benchmarks added in ZStandardCompressionCodec. https://github.com/yaooqinn/spark/actions/runs/7095872839 https://github.com/yaooqinn/spark/actions/runs/7095873975 ### Was this patch authored or co-authored using generative AI tooling? no Closes #44172 from yaooqinn/SPARK-46256. Authored-by: Kent Yao <y...@apache.org> Signed-off-by: Kent Yao <y...@apache.org> --- .../ZStandardBenchmark-jdk21-results.txt | 46 ++++++++++++++++------ core/benchmarks/ZStandardBenchmark-results.txt | 46 ++++++++++++++++------ .../org/apache/spark/internal/config/package.scala | 11 ++++++ .../org/apache/spark/io/CompressionCodec.scala | 8 +++- .../org/apache/spark/io/ZStandardBenchmark.scala | 30 +++++++++++++- docs/configuration.md | 11 ++++++ 6 files changed, 124 insertions(+), 28 deletions(-) diff --git a/core/benchmarks/ZStandardBenchmark-jdk21-results.txt b/core/benchmarks/ZStandardBenchmark-jdk21-results.txt index 9f06fffc5088..f5383af55c3a 100644 --- a/core/benchmarks/ZStandardBenchmark-jdk21-results.txt +++ b/core/benchmarks/ZStandardBenchmark-jdk21-results.txt @@ -6,22 +6,44 @@ OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1051-azure AMD EPYC 7763 64-Core Processor Benchmark ZStandardCompressionCodec: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------------------- -Compression 10000 times at level 1 without buffer pool 675 925 281 0.0 67464.0 1.0X -Compression 10000 times at level 2 without buffer pool 901 902 3 0.0 90079.7 0.7X -Compression 10000 times at level 3 without buffer pool 994 998 6 0.0 99413.5 0.7X -Compression 10000 times at level 1 with buffer pool 813 813 0 0.0 81311.2 0.8X -Compression 10000 times at level 2 with buffer pool 873 874 0 0.0 87335.4 0.8X -Compression 10000 times at level 3 with buffer pool 987 987 1 0.0 98671.2 0.7X +Compression 10000 times at level 1 without buffer pool 672 681 10 0.0 67171.0 1.0X +Compression 10000 times at level 2 without buffer pool 715 718 4 0.0 71458.8 0.9X +Compression 10000 times at level 3 without buffer pool 831 835 4 0.0 83139.1 0.8X +Compression 10000 times at level 1 with buffer pool 609 611 2 0.0 60881.5 1.1X +Compression 10000 times at level 2 with buffer pool 648 649 1 0.0 64791.0 1.0X +Compression 10000 times at level 3 with buffer pool 744 751 6 0.0 74392.4 0.9X OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1051-azure AMD EPYC 7763 64-Core Processor Benchmark ZStandardCompressionCodec: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------------------ -Decompression 10000 times from level 1 without buffer pool 826 829 3 0.0 82582.8 1.0X -Decompression 10000 times from level 2 without buffer pool 827 829 2 0.0 82728.9 1.0X -Decompression 10000 times from level 3 without buffer pool 821 823 2 0.0 82099.0 1.0X -Decompression 10000 times from level 1 with buffer pool 755 755 1 0.0 75470.0 1.1X -Decompression 10000 times from level 2 with buffer pool 756 758 3 0.0 75579.0 1.1X -Decompression 10000 times from level 3 with buffer pool 755 755 0 0.0 75526.6 1.1X +Decompression 10000 times from level 1 without buffer pool 842 849 12 0.0 84240.0 1.0X +Decompression 10000 times from level 2 without buffer pool 842 846 6 0.0 84185.2 1.0X +Decompression 10000 times from level 3 without buffer pool 843 844 1 0.0 84285.4 1.0X +Decompression 10000 times from level 1 with buffer pool 770 771 1 0.0 77024.9 1.1X +Decompression 10000 times from level 2 with buffer pool 771 771 0 0.0 77120.4 1.1X +Decompression 10000 times from level 3 with buffer pool 770 771 0 0.0 77031.9 1.1X + +OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1051-azure +AMD EPYC 7763 64-Core Processor +Parallel Compression at level 3: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Parallel Compression with 0 workers 48 50 3 0.0 376597.0 1.0X +Parallel Compression with 1 workers 41 42 3 0.0 318927.3 1.2X +Parallel Compression with 2 workers 38 40 2 0.0 297410.2 1.3X +Parallel Compression with 4 workers 37 39 1 0.0 287605.8 1.3X +Parallel Compression with 8 workers 39 40 1 0.0 301948.1 1.2X +Parallel Compression with 16 workers 41 43 1 0.0 317095.6 1.2X + +OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1051-azure +AMD EPYC 7763 64-Core Processor +Parallel Compression at level 9: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Parallel Compression with 0 workers 174 175 1 0.0 1360596.3 1.0X +Parallel Compression with 1 workers 189 228 24 0.0 1477060.7 0.9X +Parallel Compression with 2 workers 109 118 15 0.0 851455.9 1.6X +Parallel Compression with 4 workers 114 118 3 0.0 891964.9 1.5X +Parallel Compression with 8 workers 115 122 4 0.0 899748.7 1.5X +Parallel Compression with 16 workers 119 123 2 0.0 931210.7 1.5X diff --git a/core/benchmarks/ZStandardBenchmark-results.txt b/core/benchmarks/ZStandardBenchmark-results.txt index 2c5e894ca782..64375b7a379a 100644 --- a/core/benchmarks/ZStandardBenchmark-results.txt +++ b/core/benchmarks/ZStandardBenchmark-results.txt @@ -6,22 +6,44 @@ OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1051-azure AMD EPYC 7763 64-Core Processor Benchmark ZStandardCompressionCodec: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------------------- -Compression 10000 times at level 1 without buffer pool 657 659 2 0.0 65716.9 1.0X -Compression 10000 times at level 2 without buffer pool 706 707 1 0.0 70617.1 0.9X -Compression 10000 times at level 3 without buffer pool 788 788 0 0.0 78755.6 0.8X -Compression 10000 times at level 1 with buffer pool 585 586 1 0.0 58455.1 1.1X -Compression 10000 times at level 2 with buffer pool 614 616 1 0.0 61437.2 1.1X -Compression 10000 times at level 3 with buffer pool 717 717 0 0.0 71705.1 0.9X +Compression 10000 times at level 1 without buffer pool 666 669 3 0.0 66598.6 1.0X +Compression 10000 times at level 2 without buffer pool 711 711 1 0.0 71077.5 0.9X +Compression 10000 times at level 3 without buffer pool 816 816 0 0.0 81575.8 0.8X +Compression 10000 times at level 1 with buffer pool 591 592 1 0.0 59095.6 1.1X +Compression 10000 times at level 2 with buffer pool 630 632 1 0.0 62995.1 1.1X +Compression 10000 times at level 3 with buffer pool 742 742 0 0.0 74180.7 0.9X OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1051-azure AMD EPYC 7763 64-Core Processor Benchmark ZStandardCompressionCodec: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------------------ -Decompression 10000 times from level 1 without buffer pool 595 596 0 0.0 59535.3 1.0X -Decompression 10000 times from level 2 without buffer pool 594 595 1 0.0 59432.7 1.0X -Decompression 10000 times from level 3 without buffer pool 595 600 10 0.0 59485.8 1.0X -Decompression 10000 times from level 1 with buffer pool 549 549 1 0.0 54859.6 1.1X -Decompression 10000 times from level 2 with buffer pool 549 550 0 0.0 54948.0 1.1X -Decompression 10000 times from level 3 with buffer pool 549 549 0 0.0 54917.9 1.1X +Decompression 10000 times from level 1 without buffer pool 600 602 1 0.0 60024.1 1.0X +Decompression 10000 times from level 2 without buffer pool 600 603 2 0.0 59973.0 1.0X +Decompression 10000 times from level 3 without buffer pool 601 602 1 0.0 60075.9 1.0X +Decompression 10000 times from level 1 with buffer pool 553 553 0 0.0 55316.4 1.1X +Decompression 10000 times from level 2 with buffer pool 553 554 1 0.0 55271.5 1.1X +Decompression 10000 times from level 3 with buffer pool 553 553 0 0.0 55261.4 1.1X + +OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1051-azure +AMD EPYC 7763 64-Core Processor +Parallel Compression at level 3: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Parallel Compression with 0 workers 49 50 1 0.0 380070.1 1.0X +Parallel Compression with 1 workers 41 42 4 0.0 319807.1 1.2X +Parallel Compression with 2 workers 38 41 2 0.0 297706.4 1.3X +Parallel Compression with 4 workers 38 40 2 0.0 296505.8 1.3X +Parallel Compression with 8 workers 39 41 1 0.0 305793.6 1.2X +Parallel Compression with 16 workers 43 44 1 0.0 332233.1 1.1X + +OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1051-azure +AMD EPYC 7763 64-Core Processor +Parallel Compression at level 9: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Parallel Compression with 0 workers 175 175 0 0.0 1363800.8 1.0X +Parallel Compression with 1 workers 186 187 3 0.0 1455096.4 0.9X +Parallel Compression with 2 workers 110 115 6 0.0 863272.6 1.6X +Parallel Compression with 4 workers 104 108 2 0.0 810721.1 1.7X +Parallel Compression with 8 workers 110 112 2 0.0 859303.5 1.6X +Parallel Compression with 16 workers 109 112 2 0.0 847863.6 1.6X diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index e7e69dcb01ea..bce7ee70c7b2 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1910,6 +1910,17 @@ package object config { .booleanConf .createWithDefault(true) + private[spark] val IO_COMPRESSION_ZSTD_WORKERS = + ConfigBuilder("spark.io.compression.zstd.workers") + .doc("Thread size spawned to compress in parallel when using Zstd. When the value is 0, " + + "no worker is spawned, it works in single-threaded mode. When value > 0, it triggers " + + "asynchronous mode, corresponding number of threads are spawned. More workers improve " + + "performance, but also increase memory cost.") + .version("4.0.0") + .intConf + .checkValue(_ >= 0, "The number of workers must not be negative.") + .createWithDefault(0) + private[spark] val IO_COMPRESSION_ZSTD_LEVEL = ConfigBuilder("spark.io.compression.zstd.level") .doc("Compression level for Zstd compression codec. Increasing the compression " + diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index a6a5b1f67c6f..cc96437a1d21 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -233,10 +233,12 @@ class ZStdCompressionCodec(conf: SparkConf) extends CompressionCodec { NoPool.INSTANCE } + private val workers = conf.get(IO_COMPRESSION_ZSTD_WORKERS) + override def compressedOutputStream(s: OutputStream): OutputStream = { // Wrap the zstd output stream in a buffered output stream, so that we can // avoid overhead excessive of JNI call while trying to compress small amount of data. - val os = new ZstdOutputStreamNoFinalizer(s, bufferPool).setLevel(level) + val os = new ZstdOutputStreamNoFinalizer(s, bufferPool).setLevel(level).setWorkers(workers) new BufferedOutputStream(os, bufferSize) } @@ -244,7 +246,9 @@ class ZStdCompressionCodec(conf: SparkConf) extends CompressionCodec { // SPARK-29322: Set "closeFrameOnFlush" to 'true' to let continuous input stream not being // stuck on reading open frame. val os = new ZstdOutputStreamNoFinalizer(s, bufferPool) - .setLevel(level).setCloseFrameOnFlush(true) + .setLevel(level) + .setWorkers(workers) + .setCloseFrameOnFlush(true) new BufferedOutputStream(os, bufferSize) } diff --git a/core/src/test/scala/org/apache/spark/io/ZStandardBenchmark.scala b/core/src/test/scala/org/apache/spark/io/ZStandardBenchmark.scala index e23416177aef..e5b7bb927831 100644 --- a/core/src/test/scala/org/apache/spark/io/ZStandardBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/io/ZStandardBenchmark.scala @@ -17,11 +17,11 @@ package org.apache.spark.io -import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectOutputStream} import org.apache.spark.SparkConf import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} -import org.apache.spark.internal.config.{IO_COMPRESSION_ZSTD_BUFFERPOOL_ENABLED, IO_COMPRESSION_ZSTD_BUFFERSIZE, IO_COMPRESSION_ZSTD_LEVEL} +import org.apache.spark.internal.config.{IO_COMPRESSION_ZSTD_BUFFERPOOL_ENABLED, IO_COMPRESSION_ZSTD_BUFFERSIZE, IO_COMPRESSION_ZSTD_LEVEL, IO_COMPRESSION_ZSTD_WORKERS} /** @@ -49,6 +49,7 @@ object ZStandardBenchmark extends BenchmarkBase { val benchmark2 = new Benchmark(name, N, output = output) decompressionBenchmark(benchmark2, N) benchmark2.run() + parallelCompressionBenchmark() } } @@ -101,4 +102,29 @@ object ZStandardBenchmark extends BenchmarkBase { } } } + + private def parallelCompressionBenchmark(): Unit = { + val numberOfLargeObjectToWrite = 128 + val data: Array[Byte] = (1 until 256 * 1024 * 1024).map(_.toByte).toArray + + Seq(3, 9).foreach { level => + val benchmark = new Benchmark( + s"Parallel Compression at level $level", numberOfLargeObjectToWrite, output = output) + Seq(0, 1, 2, 4, 8, 16).foreach { workers => + val conf = new SparkConf(false) + .set(IO_COMPRESSION_ZSTD_LEVEL, level) + .set(IO_COMPRESSION_ZSTD_WORKERS, workers) + benchmark.addCase(s"Parallel Compression with $workers workers") { _ => + val baos = new ByteArrayOutputStream() + val zcos = new ZStdCompressionCodec(conf).compressedOutputStream(baos) + val oos = new ObjectOutputStream(zcos) + 1 to numberOfLargeObjectToWrite foreach { _ => + oos.writeObject(data) + } + oos.close() + } + } + benchmark.run() + } + } } diff --git a/docs/configuration.md b/docs/configuration.md index b13250a7786e..2ad07cf59f71 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1760,6 +1760,17 @@ Apart from these, the following properties are also available, and may be useful </td> <td>2.3.0</td> </tr> +<tr> + <td><code>spark.io.compression.zstd.workers</code></td> + <td>0</td> + <td> + Thread size spawned to compress in parallel when using Zstd. When value is 0 + no worker is spawned, it works in single-threaded mode. When value > 0, it triggers + asynchronous mode, corresponding number of threads are spawned. More workers improve + performance, but also increase memory cost. + </td> + <td>4.0.0</td> +</tr> <tr> <td><code>spark.kryo.classesToRegister</code></td> <td>(none)</td> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org