This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 020c1bd2b635 [SPARK-54226][SQL][FOLLOWUP] Remove zstd compression 
level value check and update config names
020c1bd2b635 is described below

commit 020c1bd2b6356bd8a82e2eaf993a6d141e71ab87
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Sun Nov 9 21:29:00 2025 -0800

    [SPARK-54226][SQL][FOLLOWUP] Remove zstd compression level value check and 
update config names
    
    ### What changes were proposed in this pull request?
    
    This is a followup to https://github.com/apache/spark/pull/52925.
    
    ### Why are the changes needed?
    
    To address review comments.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Unit tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code v2.0.14
    
    Closes #52962 from viirya/arrow_compress_udf_followup.
    
    Authored-by: Liang-Chi Hsieh <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 python/pyspark/sql/tests/arrow/test_arrow.py              |  8 ++++----
 .../pyspark/sql/tests/pandas/test_pandas_grouped_map.py   |  6 +++---
 .../sql/tests/pandas/test_pandas_udf_grouped_agg.py       |  4 ++--
 python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py |  6 +++---
 .../scala/org/apache/spark/sql/internal/SQLConf.scala     | 15 +++++++--------
 5 files changed, 19 insertions(+), 20 deletions(-)

diff --git a/python/pyspark/sql/tests/arrow/test_arrow.py 
b/python/pyspark/sql/tests/arrow/test_arrow.py
index af08f8c8c101..14f8fbe33c8e 100644
--- a/python/pyspark/sql/tests/arrow/test_arrow.py
+++ b/python/pyspark/sql/tests/arrow/test_arrow.py
@@ -1817,7 +1817,7 @@ class ArrowTestsMixin:
 
         for codec in ["none", "zstd", "lz4"]:
             with self.subTest(compressionCodec=codec):
-                with 
self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
+                with 
self.sql_conf({"spark.sql.execution.arrow.compression.codec": codec}):
                     pdf = df.toPandas()
                     assert_frame_equal(expected, pdf)
 
@@ -1846,7 +1846,7 @@ class ArrowTestsMixin:
 
         for codec in ["none", "zstd", "lz4"]:
             with self.subTest(compressionCodec=codec):
-                with 
self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
+                with 
self.sql_conf({"spark.sql.execution.arrow.compression.codec": codec}):
                     t_out = df.toArrow()
                     self.assertTrue(t_out.equals(t_in))
 
@@ -1863,7 +1863,7 @@ class ArrowTestsMixin:
 
         for codec in ["none", "zstd", "lz4"]:
             with self.subTest(compressionCodec=codec):
-                with 
self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
+                with 
self.sql_conf({"spark.sql.execution.arrow.compression.codec": codec}):
                     pdf = df.toPandas()
                     self.assertEqual(len(pdf), 10000)
                     self.assertEqual(pdf.columns.tolist(), ["id", "str_col", 
"mod_col"])
@@ -1880,7 +1880,7 @@ class ArrowTestsMixin:
 
         for codec in ["none", "zstd", "lz4"]:
             with self.subTest(compressionCodec=codec):
-                with 
self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
+                with 
self.sql_conf({"spark.sql.execution.arrow.compression.codec": codec}):
                     t = df.toArrow()
                     self.assertEqual(t.num_rows, 10000)
                     self.assertEqual(t.column_names, ["id", "str_col", 
"mod_col"])
diff --git a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py 
b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
index b60c5a187fbf..a2f31bacf812 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
@@ -1418,7 +1418,7 @@ class ApplyInPandasTestsMixin:
 
         for codec in ["none", "zstd", "lz4"]:
             with self.subTest(compressionCodec=codec):
-                with 
self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
+                with 
self.sql_conf({"spark.sql.execution.arrow.compression.codec": codec}):
                     result = df.groupby("id").apply(foo).sort("id").toPandas()
                     assert_frame_equal(expected, result)
 
@@ -1432,7 +1432,7 @@ class ApplyInPandasTestsMixin:
 
         for codec in ["none", "zstd", "lz4"]:
             with self.subTest(compressionCodec=codec):
-                with 
self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
+                with 
self.sql_conf({"spark.sql.execution.arrow.compression.codec": codec}):
                     result = (
                         df.groupby("id")
                         .applyInPandas(stats, schema="id long, mean double")
@@ -1457,7 +1457,7 @@ class ApplyInPandasTestsMixin:
 
         for codec in ["none", "zstd", "lz4"]:
             with self.subTest(compressionCodec=codec):
-                with 
self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
+                with 
self.sql_conf({"spark.sql.execution.arrow.compression.codec": codec}):
                     result = (
                         df.groupby("id")
                         .applyInPandas(sum_func, schema="v double")
diff --git a/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py 
b/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py
index 2958d0e67f1e..6915e8aee948 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py
@@ -872,7 +872,7 @@ class GroupedAggPandasUDFTestsMixin:
 
         for codec in ["none", "zstd", "lz4"]:
             with self.subTest(compressionCodec=codec):
-                with 
self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
+                with 
self.sql_conf({"spark.sql.execution.arrow.compression.codec": codec}):
                     result = 
df.groupby("id").agg(sum_udf(df.v)).sort("id").toPandas()
                     assert_frame_equal(expected, result)
 
@@ -891,7 +891,7 @@ class GroupedAggPandasUDFTestsMixin:
 
         for codec in ["none", "zstd", "lz4"]:
             with self.subTest(compressionCodec=codec):
-                with 
self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
+                with 
self.sql_conf({"spark.sql.execution.arrow.compression.codec": codec}):
                     result = (
                         df.groupby("id").agg(mean_udf(df.v), 
sum_udf(df.v)).sort("id").toPandas()
                     )
diff --git a/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py 
b/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py
index 554c994afc1e..563e0b789a96 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py
@@ -1999,7 +1999,7 @@ class ScalarPandasUDFTestsMixin:
 
         for codec in ["none", "zstd", "lz4"]:
             with self.subTest(compressionCodec=codec):
-                with 
self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
+                with 
self.sql_conf({"spark.sql.execution.arrow.compression.codec": codec}):
                     result = 
df.select(plus_one("id").alias("result")).collect()
                     self.assertEqual(expected, result)
 
@@ -2017,7 +2017,7 @@ class ScalarPandasUDFTestsMixin:
 
         for codec in ["none", "zstd", "lz4"]:
             with self.subTest(compressionCodec=codec):
-                with 
self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
+                with 
self.sql_conf({"spark.sql.execution.arrow.compression.codec": codec}):
                     # Test string UDF
                     result = 
df.select(concat_string("id").alias("result")).collect()
                     expected = [Row(result=f"value_{i}") for i in range(50)]
@@ -2040,7 +2040,7 @@ class ScalarPandasUDFTestsMixin:
 
         for codec in ["none", "zstd", "lz4"]:
             with self.subTest(compressionCodec=codec):
-                with 
self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
+                with 
self.sql_conf({"spark.sql.execution.arrow.compression.codec": codec}):
                     result = 
df.select(plus_two("id").alias("result")).collect()
                     self.assertEqual(expected, result)
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 5a16ca0814fc..36ded2bd7b63 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -4064,7 +4064,7 @@ object SQLConf {
       .createWithDefaultString("64MB")
 
   val ARROW_EXECUTION_COMPRESSION_CODEC =
-    buildConf("spark.sql.execution.arrow.compressionCodec")
+    buildConf("spark.sql.execution.arrow.compression.codec")
       .doc("Compression codec used to compress Arrow IPC data when 
transferring data " +
         "between JVM and Python processes (e.g., toPandas, toArrow). This can 
significantly " +
         "reduce memory usage and network bandwidth when transferring large 
datasets. " +
@@ -4078,16 +4078,15 @@ object SQLConf {
       .createWithDefault("none")
 
   val ARROW_EXECUTION_ZSTD_COMPRESSION_LEVEL =
-    buildConf("spark.sql.execution.arrow.zstd.compressionLevel")
+    buildConf("spark.sql.execution.arrow.compression.zstd.level")
       .doc("Compression level for Zstandard (zstd) codec when compressing 
Arrow IPC data. " +
-        "This config is only used when 
spark.sql.execution.arrow.compressionCodec is set to " +
-        "'zstd'. Valid values are integers from 1 (fastest, lowest 
compression) to 22 " +
-        "(slowest, highest compression). The default value 3 provides a good 
balance between " +
-        "compression speed and compression ratio.")
+        "This config is only used when 
spark.sql.execution.arrow.compression.codec is set to " +
+        "'zstd'. Negative values provide ultra-fast compression with lower " +
+        "compression ratios. Positive values provide normal to maximum 
compression, " +
+        "with higher values giving better compression but slower speed. The 
default value 3 " +
+        "provides a good balance between compression speed and compression 
ratio.")
       .version("4.1.0")
       .intConf
-      .checkValue(level => level >= 1 && level <= 22,
-        "Zstd compression level must be between 1 and 22")
       .createWithDefault(3)
 
   val ARROW_TRANSFORM_WITH_STATE_IN_PYSPARK_MAX_STATE_RECORDS_PER_BATCH =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to