This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a85070baae74 [SPARK-46256][CORE] Parallel Compression Support for ZSTD
a85070baae74 is described below

commit a85070baae749596ef1645cb3feaf13427116020
Author: Kent Yao <y...@apache.org>
AuthorDate: Tue Dec 5 19:15:56 2023 +0800

    [SPARK-46256][CORE] Parallel Compression Support for ZSTD
    
    ### What changes were proposed in this pull request?
    
    This PR adds a new configuration named `spark.io.compression.zstd.workers`, 
which enables Parallel Compression Support for ZSTD.
    
    ### Why are the changes needed?
    
    Sufficient performance improvement.
    
    ```
    [info] OpenJDK 64-Bit Server VM 17.0.9+0 on Mac OS X 14.1.2
    [info] Apple M2 Max
    [info] Parallel Compression at level 9:          Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    [info] 
------------------------------------------------------------------------------------------------------------------------
    [info] Parallel Compression with 0 workers                 883            
885           3          0.0     6896156.6       1.0X
    [info] Parallel Compression with 1 workers                 785            
789           3          0.0     6136529.9       1.1X
    [info] Parallel Compression with 2 workers                 428            
431           3          0.0     3346625.6       2.1X
    [info] Parallel Compression with 4 workers                 273            
275           1          0.0     2131454.4       3.2X
    [info] Parallel Compression with 8 workers                 215            
218           2          0.0     1681013.7       4.1X
    [info] Parallel Compression with 16 workers                222            
226           4          0.0     1733805.3       4.0X
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, we introduced a new configuration for this new feature.
    
    ### How was this patch tested?
    
    It's done by new benchmarks added in ZStandardCompressionCodec.
    
    https://github.com/yaooqinn/spark/actions/runs/7095872839
    https://github.com/yaooqinn/spark/actions/runs/7095873975
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #44172 from yaooqinn/SPARK-46256.
    
    Authored-by: Kent Yao <y...@apache.org>
    Signed-off-by: Kent Yao <y...@apache.org>
---
 .../ZStandardBenchmark-jdk21-results.txt           | 46 ++++++++++++++++------
 core/benchmarks/ZStandardBenchmark-results.txt     | 46 ++++++++++++++++------
 .../org/apache/spark/internal/config/package.scala | 11 ++++++
 .../org/apache/spark/io/CompressionCodec.scala     |  8 +++-
 .../org/apache/spark/io/ZStandardBenchmark.scala   | 30 +++++++++++++-
 docs/configuration.md                              | 11 ++++++
 6 files changed, 124 insertions(+), 28 deletions(-)

diff --git a/core/benchmarks/ZStandardBenchmark-jdk21-results.txt 
b/core/benchmarks/ZStandardBenchmark-jdk21-results.txt
index 9f06fffc5088..f5383af55c3a 100644
--- a/core/benchmarks/ZStandardBenchmark-jdk21-results.txt
+++ b/core/benchmarks/ZStandardBenchmark-jdk21-results.txt
@@ -6,22 +6,44 @@ OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 
5.15.0-1051-azure
 AMD EPYC 7763 64-Core Processor
 Benchmark ZStandardCompressionCodec:                    Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
--------------------------------------------------------------------------------------------------------------------------------------
-Compression 10000 times at level 1 without buffer pool            675          
  925         281          0.0       67464.0       1.0X
-Compression 10000 times at level 2 without buffer pool            901          
  902           3          0.0       90079.7       0.7X
-Compression 10000 times at level 3 without buffer pool            994          
  998           6          0.0       99413.5       0.7X
-Compression 10000 times at level 1 with buffer pool               813          
  813           0          0.0       81311.2       0.8X
-Compression 10000 times at level 2 with buffer pool               873          
  874           0          0.0       87335.4       0.8X
-Compression 10000 times at level 3 with buffer pool               987          
  987           1          0.0       98671.2       0.7X
+Compression 10000 times at level 1 without buffer pool            672          
  681          10          0.0       67171.0       1.0X
+Compression 10000 times at level 2 without buffer pool            715          
  718           4          0.0       71458.8       0.9X
+Compression 10000 times at level 3 without buffer pool            831          
  835           4          0.0       83139.1       0.8X
+Compression 10000 times at level 1 with buffer pool               609          
  611           2          0.0       60881.5       1.1X
+Compression 10000 times at level 2 with buffer pool               648          
  649           1          0.0       64791.0       1.0X
+Compression 10000 times at level 3 with buffer pool               744          
  751           6          0.0       74392.4       0.9X
 
 OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1051-azure
 AMD EPYC 7763 64-Core Processor
 Benchmark ZStandardCompressionCodec:                        Best Time(ms)   
Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------------------------
-Decompression 10000 times from level 1 without buffer pool            826      
      829           3          0.0       82582.8       1.0X
-Decompression 10000 times from level 2 without buffer pool            827      
      829           2          0.0       82728.9       1.0X
-Decompression 10000 times from level 3 without buffer pool            821      
      823           2          0.0       82099.0       1.0X
-Decompression 10000 times from level 1 with buffer pool               755      
      755           1          0.0       75470.0       1.1X
-Decompression 10000 times from level 2 with buffer pool               756      
      758           3          0.0       75579.0       1.1X
-Decompression 10000 times from level 3 with buffer pool               755      
      755           0          0.0       75526.6       1.1X
+Decompression 10000 times from level 1 without buffer pool            842      
      849          12          0.0       84240.0       1.0X
+Decompression 10000 times from level 2 without buffer pool            842      
      846           6          0.0       84185.2       1.0X
+Decompression 10000 times from level 3 without buffer pool            843      
      844           1          0.0       84285.4       1.0X
+Decompression 10000 times from level 1 with buffer pool               770      
      771           1          0.0       77024.9       1.1X
+Decompression 10000 times from level 2 with buffer pool               771      
      771           0          0.0       77120.4       1.1X
+Decompression 10000 times from level 3 with buffer pool               770      
      771           0          0.0       77031.9       1.1X
+
+OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1051-azure
+AMD EPYC 7763 64-Core Processor
+Parallel Compression at level 3:          Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+Parallel Compression with 0 workers                  48             50         
  3          0.0      376597.0       1.0X
+Parallel Compression with 1 workers                  41             42         
  3          0.0      318927.3       1.2X
+Parallel Compression with 2 workers                  38             40         
  2          0.0      297410.2       1.3X
+Parallel Compression with 4 workers                  37             39         
  1          0.0      287605.8       1.3X
+Parallel Compression with 8 workers                  39             40         
  1          0.0      301948.1       1.2X
+Parallel Compression with 16 workers                 41             43         
  1          0.0      317095.6       1.2X
+
+OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1051-azure
+AMD EPYC 7763 64-Core Processor
+Parallel Compression at level 9:          Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+Parallel Compression with 0 workers                 174            175         
  1          0.0     1360596.3       1.0X
+Parallel Compression with 1 workers                 189            228         
 24          0.0     1477060.7       0.9X
+Parallel Compression with 2 workers                 109            118         
 15          0.0      851455.9       1.6X
+Parallel Compression with 4 workers                 114            118         
  3          0.0      891964.9       1.5X
+Parallel Compression with 8 workers                 115            122         
  4          0.0      899748.7       1.5X
+Parallel Compression with 16 workers                119            123         
  2          0.0      931210.7       1.5X
 
 
diff --git a/core/benchmarks/ZStandardBenchmark-results.txt 
b/core/benchmarks/ZStandardBenchmark-results.txt
index 2c5e894ca782..64375b7a379a 100644
--- a/core/benchmarks/ZStandardBenchmark-results.txt
+++ b/core/benchmarks/ZStandardBenchmark-results.txt
@@ -6,22 +6,44 @@ OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 
5.15.0-1051-azure
 AMD EPYC 7763 64-Core Processor
 Benchmark ZStandardCompressionCodec:                    Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
--------------------------------------------------------------------------------------------------------------------------------------
-Compression 10000 times at level 1 without buffer pool            657          
  659           2          0.0       65716.9       1.0X
-Compression 10000 times at level 2 without buffer pool            706          
  707           1          0.0       70617.1       0.9X
-Compression 10000 times at level 3 without buffer pool            788          
  788           0          0.0       78755.6       0.8X
-Compression 10000 times at level 1 with buffer pool               585          
  586           1          0.0       58455.1       1.1X
-Compression 10000 times at level 2 with buffer pool               614          
  616           1          0.0       61437.2       1.1X
-Compression 10000 times at level 3 with buffer pool               717          
  717           0          0.0       71705.1       0.9X
+Compression 10000 times at level 1 without buffer pool            666          
  669           3          0.0       66598.6       1.0X
+Compression 10000 times at level 2 without buffer pool            711          
  711           1          0.0       71077.5       0.9X
+Compression 10000 times at level 3 without buffer pool            816          
  816           0          0.0       81575.8       0.8X
+Compression 10000 times at level 1 with buffer pool               591          
  592           1          0.0       59095.6       1.1X
+Compression 10000 times at level 2 with buffer pool               630          
  632           1          0.0       62995.1       1.1X
+Compression 10000 times at level 3 with buffer pool               742          
  742           0          0.0       74180.7       0.9X
 
 OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1051-azure
 AMD EPYC 7763 64-Core Processor
 Benchmark ZStandardCompressionCodec:                        Best Time(ms)   
Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------------------------
-Decompression 10000 times from level 1 without buffer pool            595      
      596           0          0.0       59535.3       1.0X
-Decompression 10000 times from level 2 without buffer pool            594      
      595           1          0.0       59432.7       1.0X
-Decompression 10000 times from level 3 without buffer pool            595      
      600          10          0.0       59485.8       1.0X
-Decompression 10000 times from level 1 with buffer pool               549      
      549           1          0.0       54859.6       1.1X
-Decompression 10000 times from level 2 with buffer pool               549      
      550           0          0.0       54948.0       1.1X
-Decompression 10000 times from level 3 with buffer pool               549      
      549           0          0.0       54917.9       1.1X
+Decompression 10000 times from level 1 without buffer pool            600      
      602           1          0.0       60024.1       1.0X
+Decompression 10000 times from level 2 without buffer pool            600      
      603           2          0.0       59973.0       1.0X
+Decompression 10000 times from level 3 without buffer pool            601      
      602           1          0.0       60075.9       1.0X
+Decompression 10000 times from level 1 with buffer pool               553      
      553           0          0.0       55316.4       1.1X
+Decompression 10000 times from level 2 with buffer pool               553      
      554           1          0.0       55271.5       1.1X
+Decompression 10000 times from level 3 with buffer pool               553      
      553           0          0.0       55261.4       1.1X
+
+OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1051-azure
+AMD EPYC 7763 64-Core Processor
+Parallel Compression at level 3:          Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+Parallel Compression with 0 workers                  49             50         
  1          0.0      380070.1       1.0X
+Parallel Compression with 1 workers                  41             42         
  4          0.0      319807.1       1.2X
+Parallel Compression with 2 workers                  38             41         
  2          0.0      297706.4       1.3X
+Parallel Compression with 4 workers                  38             40         
  2          0.0      296505.8       1.3X
+Parallel Compression with 8 workers                  39             41         
  1          0.0      305793.6       1.2X
+Parallel Compression with 16 workers                 43             44         
  1          0.0      332233.1       1.1X
+
+OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1051-azure
+AMD EPYC 7763 64-Core Processor
+Parallel Compression at level 9:          Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+Parallel Compression with 0 workers                 175            175         
  0          0.0     1363800.8       1.0X
+Parallel Compression with 1 workers                 186            187         
  3          0.0     1455096.4       0.9X
+Parallel Compression with 2 workers                 110            115         
  6          0.0      863272.6       1.6X
+Parallel Compression with 4 workers                 104            108         
  2          0.0      810721.1       1.7X
+Parallel Compression with 8 workers                 110            112         
  2          0.0      859303.5       1.6X
+Parallel Compression with 16 workers                109            112         
  2          0.0      847863.6       1.6X
 
 
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index e7e69dcb01ea..bce7ee70c7b2 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -1910,6 +1910,17 @@ package object config {
       .booleanConf
       .createWithDefault(true)
 
+  private[spark] val IO_COMPRESSION_ZSTD_WORKERS =
+    ConfigBuilder("spark.io.compression.zstd.workers")
+      .doc("Thread size spawned to compress in parallel when using Zstd. When 
the value is 0, " +
+        "no worker is spawned, it works in single-threaded mode. When value > 
0, it triggers " +
+        "asynchronous mode, corresponding number of threads are spawned. More 
workers improve " +
+        "performance, but also increase memory cost.")
+      .version("4.0.0")
+      .intConf
+      .checkValue(_ >= 0, "The number of workers must not be negative.")
+      .createWithDefault(0)
+
   private[spark] val IO_COMPRESSION_ZSTD_LEVEL =
     ConfigBuilder("spark.io.compression.zstd.level")
       .doc("Compression level for Zstd compression codec. Increasing the 
compression " +
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala 
b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index a6a5b1f67c6f..cc96437a1d21 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -233,10 +233,12 @@ class ZStdCompressionCodec(conf: SparkConf) extends 
CompressionCodec {
     NoPool.INSTANCE
   }
 
+  private val workers = conf.get(IO_COMPRESSION_ZSTD_WORKERS)
+
   override def compressedOutputStream(s: OutputStream): OutputStream = {
     // Wrap the zstd output stream in a buffered output stream, so that we can
     // avoid overhead excessive of JNI call while trying to compress small 
amount of data.
-    val os = new ZstdOutputStreamNoFinalizer(s, bufferPool).setLevel(level)
+    val os = new ZstdOutputStreamNoFinalizer(s, 
bufferPool).setLevel(level).setWorkers(workers)
     new BufferedOutputStream(os, bufferSize)
   }
 
@@ -244,7 +246,9 @@ class ZStdCompressionCodec(conf: SparkConf) extends 
CompressionCodec {
     // SPARK-29322: Set "closeFrameOnFlush" to 'true' to let continuous input 
stream not being
     // stuck on reading open frame.
     val os = new ZstdOutputStreamNoFinalizer(s, bufferPool)
-      .setLevel(level).setCloseFrameOnFlush(true)
+      .setLevel(level)
+      .setWorkers(workers)
+      .setCloseFrameOnFlush(true)
     new BufferedOutputStream(os, bufferSize)
   }
 
diff --git a/core/src/test/scala/org/apache/spark/io/ZStandardBenchmark.scala 
b/core/src/test/scala/org/apache/spark/io/ZStandardBenchmark.scala
index e23416177aef..e5b7bb927831 100644
--- a/core/src/test/scala/org/apache/spark/io/ZStandardBenchmark.scala
+++ b/core/src/test/scala/org/apache/spark/io/ZStandardBenchmark.scala
@@ -17,11 +17,11 @@
 
 package org.apache.spark.io
 
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, 
ObjectOutputStream}
 
 import org.apache.spark.SparkConf
 import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
-import 
org.apache.spark.internal.config.{IO_COMPRESSION_ZSTD_BUFFERPOOL_ENABLED, 
IO_COMPRESSION_ZSTD_BUFFERSIZE, IO_COMPRESSION_ZSTD_LEVEL}
+import 
org.apache.spark.internal.config.{IO_COMPRESSION_ZSTD_BUFFERPOOL_ENABLED, 
IO_COMPRESSION_ZSTD_BUFFERSIZE, IO_COMPRESSION_ZSTD_LEVEL, 
IO_COMPRESSION_ZSTD_WORKERS}
 
 
 /**
@@ -49,6 +49,7 @@ object ZStandardBenchmark extends BenchmarkBase {
       val benchmark2 = new Benchmark(name, N, output = output)
       decompressionBenchmark(benchmark2, N)
       benchmark2.run()
+      parallelCompressionBenchmark()
     }
   }
 
@@ -101,4 +102,29 @@ object ZStandardBenchmark extends BenchmarkBase {
       }
     }
   }
+
+  private def parallelCompressionBenchmark(): Unit = {
+    val numberOfLargeObjectToWrite = 128
+    val data: Array[Byte] = (1 until 256 * 1024 * 1024).map(_.toByte).toArray
+
+    Seq(3, 9).foreach { level =>
+      val benchmark = new Benchmark(
+        s"Parallel Compression at level $level", numberOfLargeObjectToWrite, 
output = output)
+      Seq(0, 1, 2, 4, 8, 16).foreach { workers =>
+        val conf = new SparkConf(false)
+          .set(IO_COMPRESSION_ZSTD_LEVEL, level)
+          .set(IO_COMPRESSION_ZSTD_WORKERS, workers)
+        benchmark.addCase(s"Parallel Compression with $workers workers") { _ =>
+          val baos = new ByteArrayOutputStream()
+          val zcos = new 
ZStdCompressionCodec(conf).compressedOutputStream(baos)
+          val oos = new ObjectOutputStream(zcos)
+          1 to numberOfLargeObjectToWrite foreach { _ =>
+            oos.writeObject(data)
+          }
+          oos.close()
+        }
+      }
+      benchmark.run()
+    }
+  }
 }
diff --git a/docs/configuration.md b/docs/configuration.md
index b13250a7786e..2ad07cf59f71 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1760,6 +1760,17 @@ Apart from these, the following properties are also 
available, and may be useful
   </td>
   <td>2.3.0</td>
 </tr>
+<tr>
+  <td><code>spark.io.compression.zstd.workers</code></td>
+  <td>0</td>
+  <td>
+    Thread size spawned to compress in parallel when using Zstd. When value is 0
+    no worker is spawned, it works in single-threaded mode. When value > 0, it 
triggers
+    asynchronous mode, corresponding number of threads are spawned. More 
workers improve
+    performance, but also increase memory cost.
+  </td>
+  <td>4.0.0</td>
+</tr>
 <tr>
   <td><code>spark.kryo.classesToRegister</code></td>
   <td>(none)</td>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to