This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new dc57455589a9 [SPARK-54571][CORE][SQL] Use LZ4 safeDecompressor to 
mitigate perf regression
dc57455589a9 is described below

commit dc57455589a9f160638ab3526ca359eb84f76d67
Author: Cheng Pan <[email protected]>
AuthorDate: Fri Feb 27 20:27:21 2026 +0800

    [SPARK-54571][CORE][SQL] Use LZ4 safeDecompressor to mitigate perf 
regression
    
    ### What changes were proposed in this pull request?
    
    Previously, lz4-java was upgraded to 1.10.x to address CVEs,
    
    - https://github.com/apache/spark/pull/53327
    - https://github.com/apache/spark/pull/53347
    - https://github.com/apache/spark/pull/53971
    
    while this casues significant performance drop, see the benchmark report at
    
    - https://github.com/apache/spark/pull/53453
    
    this PR follows the 
[suggestion](https://github.com/apache/spark/pull/53290#issuecomment-3607045004)
 to migrate to safeDecompressor.
    
    ### Why are the changes needed?
    
    Mitigate performance regression.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, except for performance.
    
    ### How was this patch tested?
    
    GHA for functionality, 
[benchmark](https://github.com/apache/spark/pull/53453#issuecomment-3645530618) 
for performance.
    
    > TL;DR - my test results show lz4-java 1.10.1 is about 10~15% slower on 
lz4 compression than 1.8.0, and is about ~5% slower on lz4 decompression even 
with migrating to suggested safeDecompressor
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #53454 from pan3793/SPARK-54571.
    
    Lead-authored-by: Cheng Pan <[email protected]>
    Co-authored-by: pan3793 <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 core/benchmarks/LZ4TPCDSDataBenchmark-jdk21-results.txt       |  4 ++--
 core/benchmarks/LZ4TPCDSDataBenchmark-results.txt             |  4 ++--
 .../src/main/scala/org/apache/spark/io/CompressionCodec.scala | 11 +++++------
 .../apache/spark/sql/catalyst/plans/logical/Statistics.scala  |  7 +++++--
 4 files changed, 14 insertions(+), 12 deletions(-)

diff --git a/core/benchmarks/LZ4TPCDSDataBenchmark-jdk21-results.txt 
b/core/benchmarks/LZ4TPCDSDataBenchmark-jdk21-results.txt
index 578f710b1da5..a066ced506b5 100644
--- a/core/benchmarks/LZ4TPCDSDataBenchmark-jdk21-results.txt
+++ b/core/benchmarks/LZ4TPCDSDataBenchmark-jdk21-results.txt
@@ -6,12 +6,12 @@ OpenJDK 64-Bit Server VM 21.0.10+7-LTS on Linux 
6.14.0-1017-azure
 AMD EPYC 7763 64-Core Processor
 Compression:                              Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Compression 4 times                                2612           2624         
 17          0.0   652960433.5       1.0X
+Compression 4 times                                2611           2619         
 11          0.0   652760335.3       1.0X
 
 OpenJDK 64-Bit Server VM 21.0.10+7-LTS on Linux 6.14.0-1017-azure
 AMD EPYC 7763 64-Core Processor
 Decompression:                            Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Decompression 4 times                              2219           2220         
  1          0.0   554762743.5       1.0X
+Decompression 4 times                               896            912         
 20          0.0   224050201.0       1.0X
 
 
diff --git a/core/benchmarks/LZ4TPCDSDataBenchmark-results.txt 
b/core/benchmarks/LZ4TPCDSDataBenchmark-results.txt
index 7a5c89955eeb..8d909dfc7f49 100644
--- a/core/benchmarks/LZ4TPCDSDataBenchmark-results.txt
+++ b/core/benchmarks/LZ4TPCDSDataBenchmark-results.txt
@@ -6,12 +6,12 @@ OpenJDK 64-Bit Server VM 17.0.18+8-LTS on Linux 
6.14.0-1017-azure
 AMD EPYC 7763 64-Core Processor
 Compression:                              Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Compression 4 times                                2605           2611         
  8          0.0   651243236.8       1.0X
+Compression 4 times                                2602           2609         
 10          0.0   650438397.0       1.0X
 
 OpenJDK 64-Bit Server VM 17.0.18+8-LTS on Linux 6.14.0-1017-azure
 AMD EPYC 7763 64-Core Processor
 Decompression:                            Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Decompression 4 times                              2361           2367         
 10          0.0   590134148.0       1.0X
+Decompression 4 times                               938            966         
 33          0.0   234424499.5       1.0X
 
 
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala 
b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index b81e46667323..243ee689cb5f 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -152,12 +152,11 @@ class LZ4CompressionCodec(conf: SparkConf) extends 
CompressionCodec {
   }
 
   override def compressedInputStream(s: InputStream): InputStream = {
-    val disableConcatenationOfByteStream = false
-    new LZ4BlockInputStream(
-      s,
-      lz4Factory.fastDecompressor(),
-      xxHashFactory.newStreamingHash32(defaultSeed).asChecksum,
-      disableConcatenationOfByteStream)
+    LZ4BlockInputStream.newBuilder()
+      .withDecompressor(lz4Factory.safeDecompressor())
+      .withChecksum(xxHashFactory.newStreamingHash32(defaultSeed).asChecksum)
+      .withStopOnEmptyBlock(false)
+      .build(s)
   }
 }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
index a2850a0b179f..d88bc62d0bc1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
@@ -21,7 +21,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, 
DataInputStream, Da
 import java.math.{MathContext, RoundingMode}
 import java.util.Base64
 
-import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
+import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream, LZ4Factory}
 
 import org.apache.spark.sql.catalyst.catalog.CatalogColumnStat
 import org.apache.spark.sql.catalyst.expressions._
@@ -210,7 +210,10 @@ object HistogramSerializer {
   final def deserialize(str: String): Histogram = {
     val bytes = Base64.getDecoder().decode(str)
     val bis = new ByteArrayInputStream(bytes)
-    val ins = new DataInputStream(new LZ4BlockInputStream(bis))
+    val ins = new DataInputStream(
+      LZ4BlockInputStream.newBuilder()
+        .withDecompressor(LZ4Factory.fastestInstance().safeDecompressor())
+        .build(bis))
     val height = ins.readDouble()
     val numBins = ins.readInt()
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to