This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 020c1bd2b635 [SPARK-54226][SQL][FOLLOWUP] Remove zstd compression
level value check and update config names
020c1bd2b635 is described below
commit 020c1bd2b6356bd8a82e2eaf993a6d141e71ab87
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Sun Nov 9 21:29:00 2025 -0800
[SPARK-54226][SQL][FOLLOWUP] Remove zstd compression level value check and
update config names
### What changes were proposed in this pull request?
This is a followup to https://github.com/apache/spark/pull/52925.
### Why are the changes needed?
To address review comments.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit tests
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code v2.0.14
Closes #52962 from viirya/arrow_compress_udf_followup.
Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
python/pyspark/sql/tests/arrow/test_arrow.py | 8 ++++----
.../pyspark/sql/tests/pandas/test_pandas_grouped_map.py | 6 +++---
.../sql/tests/pandas/test_pandas_udf_grouped_agg.py | 4 ++--
python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py | 6 +++---
.../scala/org/apache/spark/sql/internal/SQLConf.scala | 15 +++++++--------
5 files changed, 19 insertions(+), 20 deletions(-)
diff --git a/python/pyspark/sql/tests/arrow/test_arrow.py
b/python/pyspark/sql/tests/arrow/test_arrow.py
index af08f8c8c101..14f8fbe33c8e 100644
--- a/python/pyspark/sql/tests/arrow/test_arrow.py
+++ b/python/pyspark/sql/tests/arrow/test_arrow.py
@@ -1817,7 +1817,7 @@ class ArrowTestsMixin:
for codec in ["none", "zstd", "lz4"]:
with self.subTest(compressionCodec=codec):
- with
self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
+ with
self.sql_conf({"spark.sql.execution.arrow.compression.codec": codec}):
pdf = df.toPandas()
assert_frame_equal(expected, pdf)
@@ -1846,7 +1846,7 @@ class ArrowTestsMixin:
for codec in ["none", "zstd", "lz4"]:
with self.subTest(compressionCodec=codec):
- with
self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
+ with
self.sql_conf({"spark.sql.execution.arrow.compression.codec": codec}):
t_out = df.toArrow()
self.assertTrue(t_out.equals(t_in))
@@ -1863,7 +1863,7 @@ class ArrowTestsMixin:
for codec in ["none", "zstd", "lz4"]:
with self.subTest(compressionCodec=codec):
- with
self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
+ with
self.sql_conf({"spark.sql.execution.arrow.compression.codec": codec}):
pdf = df.toPandas()
self.assertEqual(len(pdf), 10000)
self.assertEqual(pdf.columns.tolist(), ["id", "str_col",
"mod_col"])
@@ -1880,7 +1880,7 @@ class ArrowTestsMixin:
for codec in ["none", "zstd", "lz4"]:
with self.subTest(compressionCodec=codec):
- with
self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
+ with
self.sql_conf({"spark.sql.execution.arrow.compression.codec": codec}):
t = df.toArrow()
self.assertEqual(t.num_rows, 10000)
self.assertEqual(t.column_names, ["id", "str_col",
"mod_col"])
diff --git a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
index b60c5a187fbf..a2f31bacf812 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
@@ -1418,7 +1418,7 @@ class ApplyInPandasTestsMixin:
for codec in ["none", "zstd", "lz4"]:
with self.subTest(compressionCodec=codec):
- with
self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
+ with
self.sql_conf({"spark.sql.execution.arrow.compression.codec": codec}):
result = df.groupby("id").apply(foo).sort("id").toPandas()
assert_frame_equal(expected, result)
@@ -1432,7 +1432,7 @@ class ApplyInPandasTestsMixin:
for codec in ["none", "zstd", "lz4"]:
with self.subTest(compressionCodec=codec):
- with
self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
+ with
self.sql_conf({"spark.sql.execution.arrow.compression.codec": codec}):
result = (
df.groupby("id")
.applyInPandas(stats, schema="id long, mean double")
@@ -1457,7 +1457,7 @@ class ApplyInPandasTestsMixin:
for codec in ["none", "zstd", "lz4"]:
with self.subTest(compressionCodec=codec):
- with
self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
+ with
self.sql_conf({"spark.sql.execution.arrow.compression.codec": codec}):
result = (
df.groupby("id")
.applyInPandas(sum_func, schema="v double")
diff --git a/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py
b/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py
index 2958d0e67f1e..6915e8aee948 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py
@@ -872,7 +872,7 @@ class GroupedAggPandasUDFTestsMixin:
for codec in ["none", "zstd", "lz4"]:
with self.subTest(compressionCodec=codec):
- with
self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
+ with
self.sql_conf({"spark.sql.execution.arrow.compression.codec": codec}):
result =
df.groupby("id").agg(sum_udf(df.v)).sort("id").toPandas()
assert_frame_equal(expected, result)
@@ -891,7 +891,7 @@ class GroupedAggPandasUDFTestsMixin:
for codec in ["none", "zstd", "lz4"]:
with self.subTest(compressionCodec=codec):
- with
self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
+ with
self.sql_conf({"spark.sql.execution.arrow.compression.codec": codec}):
result = (
df.groupby("id").agg(mean_udf(df.v),
sum_udf(df.v)).sort("id").toPandas()
)
diff --git a/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py
b/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py
index 554c994afc1e..563e0b789a96 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py
@@ -1999,7 +1999,7 @@ class ScalarPandasUDFTestsMixin:
for codec in ["none", "zstd", "lz4"]:
with self.subTest(compressionCodec=codec):
- with
self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
+ with
self.sql_conf({"spark.sql.execution.arrow.compression.codec": codec}):
result =
df.select(plus_one("id").alias("result")).collect()
self.assertEqual(expected, result)
@@ -2017,7 +2017,7 @@ class ScalarPandasUDFTestsMixin:
for codec in ["none", "zstd", "lz4"]:
with self.subTest(compressionCodec=codec):
- with
self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
+ with
self.sql_conf({"spark.sql.execution.arrow.compression.codec": codec}):
# Test string UDF
result =
df.select(concat_string("id").alias("result")).collect()
expected = [Row(result=f"value_{i}") for i in range(50)]
@@ -2040,7 +2040,7 @@ class ScalarPandasUDFTestsMixin:
for codec in ["none", "zstd", "lz4"]:
with self.subTest(compressionCodec=codec):
- with
self.sql_conf({"spark.sql.execution.arrow.compressionCodec": codec}):
+ with
self.sql_conf({"spark.sql.execution.arrow.compression.codec": codec}):
result =
df.select(plus_two("id").alias("result")).collect()
self.assertEqual(expected, result)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 5a16ca0814fc..36ded2bd7b63 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -4064,7 +4064,7 @@ object SQLConf {
.createWithDefaultString("64MB")
val ARROW_EXECUTION_COMPRESSION_CODEC =
- buildConf("spark.sql.execution.arrow.compressionCodec")
+ buildConf("spark.sql.execution.arrow.compression.codec")
.doc("Compression codec used to compress Arrow IPC data when
transferring data " +
"between JVM and Python processes (e.g., toPandas, toArrow). This can
significantly " +
"reduce memory usage and network bandwidth when transferring large
datasets. " +
@@ -4078,16 +4078,15 @@ object SQLConf {
.createWithDefault("none")
val ARROW_EXECUTION_ZSTD_COMPRESSION_LEVEL =
- buildConf("spark.sql.execution.arrow.zstd.compressionLevel")
+ buildConf("spark.sql.execution.arrow.compression.zstd.level")
.doc("Compression level for Zstandard (zstd) codec when compressing
Arrow IPC data. " +
- "This config is only used when
spark.sql.execution.arrow.compressionCodec is set to " +
- "'zstd'. Valid values are integers from 1 (fastest, lowest
compression) to 22 " +
- "(slowest, highest compression). The default value 3 provides a good
balance between " +
- "compression speed and compression ratio.")
+ "This config is only used when
spark.sql.execution.arrow.compression.codec is set to " +
+ "'zstd'. Negative values provide ultra-fast compression with lower " +
+ "compression ratios. Positive values provide normal to maximum
compression, " +
+ "with higher values giving better compression but slower speed. The
default value 3 " +
+ "provides a good balance between compression speed and compression
ratio.")
.version("4.1.0")
.intConf
- .checkValue(level => level >= 1 && level <= 22,
- "Zstd compression level must be between 1 and 22")
.createWithDefault(3)
val ARROW_TRANSFORM_WITH_STATE_IN_PYSPARK_MAX_STATE_RECORDS_PER_BATCH =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]