This is an automated email from the ASF dual-hosted git repository.
dbtsai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ea0a35e065d3 [SPARK-54555][PYTHON] Enable Arrow-optimized Python UDFs
and Arrow-based PySpark IPC by default
ea0a35e065d3 is described below
commit ea0a35e065d3eba14c5ed24252e14d872e5b371b
Author: Amanda Liu <[email protected]>
AuthorDate: Tue Dec 2 13:24:29 2025 -0800
[SPARK-54555][PYTHON] Enable Arrow-optimized Python UDFs and Arrow-based
PySpark IPC by default
### What changes were proposed in this pull request?
Enable PySpark Arrow-based optimizations by default in Spark 4.2.0,
updating default conf values:
- Set `spark.sql.execution.pythonUDF.arrow.enabled` and
`spark.sql.execution.pythonUDTF.arrow.enabled` to`true` by default to enable
Arrow-optimized execution for regular Python UDFs and UDTFs.
- Set `spark.sql.execution.arrow.pyspark.enabled` to `true` by default to
enable Arrow-based columnar data exchange for PySpark APIs such as
DataFrame.toPandas and SparkSession.createDataFrame when the input is a pandas
DataFrame or NumPy array.
Update user-facing docs and migration guides to reflect the change.
### Why are the changes needed?
Arrow’s columnar IPC significantly improves JVM↔Python throughput and
reduces serialization/deserialization overhead, speeding up Python UDFs and
DataFrame conversions. Additionally, Arrow provides consistent, well-defined
rules for type coercion when Python return values differ from declared UDF
return types, reducing ambiguous behavior.
Enabling arrow by default brings performance and correctness improvements
to the majority of PySpark users with minimal configuration. Users who depend
on the previous (non-Arrow) implementation can opt out by explicitly setting
`spark.sql.execution.pythonUDF.arrow.enabled`,
`spark.sql.execution.pythonUTF.arrow.enabled`, and
`spark.sql.execution.arrow.pyspark.enabled` to `false`.
### Does this PR introduce _any_ user-facing change?
Yes, changes the default configuration of
`spark.sql.execution.pythonUDF.arrow.enabled`,
`spark.sql.execution.pythonUDTF.arrow.enabled`, and
`spark.sql.execution.arrow.pyspark.enabled` to `true` and updates user-facing
docs.
### How was this patch tested?
Existing PySpark test suites are run with enabling and disabling the arrow
conf.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #53264 from asl3/enablearrowbydefault.
Authored-by: Amanda Liu <[email protected]>
Signed-off-by: DB Tsai <[email protected]>
---
.../source/migration_guide/pyspark_upgrade.rst | 6 +++
python/docs/source/tutorial/sql/arrow_pandas.rst | 20 +++++---
python/docs/source/tutorial/sql/python_udtf.rst | 3 +-
.../docs/source/tutorial/sql/type_conversions.rst | 2 +-
python/pyspark/sql/tests/test_unified_udf.py | 56 +++++++++++-----------
.../org/apache/spark/sql/internal/SQLConf.scala | 6 +--
6 files changed, 55 insertions(+), 38 deletions(-)
diff --git a/python/docs/source/migration_guide/pyspark_upgrade.rst
b/python/docs/source/migration_guide/pyspark_upgrade.rst
index c6cf69dadc93..fbd63539b380 100644
--- a/python/docs/source/migration_guide/pyspark_upgrade.rst
+++ b/python/docs/source/migration_guide/pyspark_upgrade.rst
@@ -19,6 +19,12 @@
Upgrading PySpark
==================
+Upgrading from PySpark 4.1 to 4.2
+---------------------------------
+* In Spark 4.2, columnar data exchange between PySpark and the JVM uses Apache
Arrow by default. The configuration
``spark.sql.execution.arrow.pyspark.enabled`` now defaults to true. To restore
the legacy (non-Arrow) row-based data exchange, set
``spark.sql.execution.arrow.pyspark.enabled`` to ``false``.
+* In Spark 4.2, regular Python UDFs are Arrow-optimized by default. The
configuration ``spark.sql.execution.pythonUDF.arrow.enabled`` now defaults to
true. To restore the legacy behavior for Python UDF execution, set
``spark.sql.execution.pythonUDF.arrow.enabled`` to ``false``.
+* In Spark 4.2, regular Python UDTFs are Arrow-optimized by default. The
configuration ``spark.sql.execution.pythonUDTF.arrow.enabled`` now defaults to
true. To restore the legacy behavior for Python UDTF execution, set
``spark.sql.execution.pythonUDTF.arrow.enabled`` to ``false``.
+
Upgrading from PySpark 4.0 to 4.1
---------------------------------
diff --git a/python/docs/source/tutorial/sql/arrow_pandas.rst
b/python/docs/source/tutorial/sql/arrow_pandas.rst
index 3bef50874d7f..386fe83b4821 100644
--- a/python/docs/source/tutorial/sql/arrow_pandas.rst
+++ b/python/docs/source/tutorial/sql/arrow_pandas.rst
@@ -60,8 +60,8 @@ Enabling for Conversion to/from Pandas
Arrow is available as an optimization when converting a Spark DataFrame to a
Pandas DataFrame
using the call :meth:`DataFrame.toPandas` and when creating a Spark DataFrame
from a Pandas DataFrame with
-:meth:`SparkSession.createDataFrame`. To use Arrow when executing these calls,
users need to first set
-the Spark configuration ``spark.sql.execution.arrow.pyspark.enabled`` to
``true``. This is disabled by default.
+:meth:`SparkSession.createDataFrame`. To use Arrow when executing these calls,
+the Spark configuration ``spark.sql.execution.arrow.pyspark.enabled`` must be
set to ``true``. This is enabled by default.
In addition, optimizations enabled by
``spark.sql.execution.arrow.pyspark.enabled`` could fallback automatically
to non-Arrow optimization implementation if an error occurs before the actual
computation within Spark.
@@ -368,6 +368,9 @@ Here's an example that demonstrates the usage of both a
default, pickled Python
:lines: 298-316
:dedent: 4
+Type coercion:
+~~~~~~~~~~~~~~
+
Compared to the default, pickled Python UDFs, Arrow Python UDFs provide a more
coherent type coercion mechanism. UDF
type coercion poses challenges when the Python instances returned by UDFs do
not align with the user-specified
return type. The default, pickled Python UDFs' type coercion has certain
limitations, such as relying on None as a
@@ -375,11 +378,16 @@ fallback for type mismatches, leading to potential
ambiguity and data loss. Addi
and tuples to strings can yield ambiguous results. Arrow Python UDFs, on the
other hand, leverage Arrow's
capabilities to standardize type coercion and address these issues effectively.
-A note on Arrow Python UDF type coercion: In Spark 4.1, unnecessary conversion
to pandas instances is removed in the serializer
-when ``spark.sql.execution.pythonUDF.arrow.enabled`` is enabled. As a result,
the type coercion changes
-when the produced output has a schema different from the specified schema. To
restore the previous behavior,
-enable ``spark.sql.legacy.execution.pythonUDF.pandas.conversion.enabled``.
+Type coercion differences are introduced by the following changes:
+* Since Spark 4.2, Arrow optimization is enabled by default for regular Python
UDFs.
+The full type coercion difference is summarized in the tables `here
<https://github.com/apache/spark/pull/41706>`__.
+To disable Arrow optimization, set
``spark.sql.execution.pythonUDF.arrow.enabled`` to false.
+
+* Since Spark 4.1, unnecessary conversion to pandas instances in
Arrow-optimized Python UDF is removed in the serializer
+when ``spark.sql.legacy.execution.pythonUDF.pandas.conversion.enabled`` is
disabled.
+
The behavior difference is summarized in the tables `here
<https://github.com/apache/spark/pull/51225>`__.
+To restore the legacy behavior, set
``spark.sql.legacy.execution.pythonUDF.pandas.conversion.enabled`` to true.
Usage Notes
-----------
diff --git a/python/docs/source/tutorial/sql/python_udtf.rst
b/python/docs/source/tutorial/sql/python_udtf.rst
index 26142e42e363..e2d240a6f9de 100644
--- a/python/docs/source/tutorial/sql/python_udtf.rst
+++ b/python/docs/source/tutorial/sql/python_udtf.rst
@@ -429,7 +429,8 @@ Arrow Optimization
------------------
Apache Arrow is an in-memory columnar data format used in Spark to efficiently
transfer
-data between Java and Python processes. Apache Arrow is disabled by default
for Python UDTFs.
+data between Java and Python processes. Beginning in Spark 4.2, Apache Arrow
is enabled by default for Python UDTFs.
+To disable Arrow optimization, set
``spark.sql.execution.pythonUDTF.arrow.enabled`` to false.
Arrow can improve performance when each input row generates a large result
table from the UDTF.
diff --git a/python/docs/source/tutorial/sql/type_conversions.rst
b/python/docs/source/tutorial/sql/type_conversions.rst
index 2a88731ce819..625a68340f20 100644
--- a/python/docs/source/tutorial/sql/type_conversions.rst
+++ b/python/docs/source/tutorial/sql/type_conversions.rst
@@ -57,7 +57,7 @@ are listed below:
- Default
* - spark.sql.execution.pythonUDF.arrow.enabled
- Enable PyArrow in PySpark. See more `here <arrow_pandas.rst>`_.
- - False
+ - True
* - spark.sql.pyspark.inferNestedDictAsStruct.enabled
- When enabled, nested dictionaries are inferred as StructType.
Otherwise, they are inferred as MapType.
- False
diff --git a/python/pyspark/sql/tests/test_unified_udf.py
b/python/pyspark/sql/tests/test_unified_udf.py
index 2d3446bd0b5b..4b89d92111c8 100644
--- a/python/pyspark/sql/tests/test_unified_udf.py
+++ b/python/pyspark/sql/tests/test_unified_udf.py
@@ -351,39 +351,41 @@ class UnifiedUDFTestsMixin:
import pandas as pd
import pyarrow as pa
- @udf(returnType=LongType())
- def f1(x):
- return x + 1
+ with self.sql_conf({"spark.sql.execution.pythonUDF.arrow.enabled":
False}):
- @udf(returnType=LongType())
- def f2(x: int) -> int:
- return x + 1
+ @udf(returnType=LongType())
+ def f1(x):
+ return x + 1
- # Cannot infer a vectorized UDF type
- @udf(returnType=LongType())
- def f3(x: int) -> pd.Series:
- return x + 1
+ @udf(returnType=LongType())
+ def f2(x: int) -> int:
+ return x + 1
- # Cannot infer a vectorized UDF type
- @udf(returnType=LongType())
- def f4(x: int) -> pa.Array:
- return x + 1
+ # Cannot infer a vectorized UDF type
+ @udf(returnType=LongType())
+ def f3(x: int) -> pd.Series:
+ return x + 1
- # useArrow is explicitly set to false
- @udf(returnType=LongType(), useArrow=False)
- def f5(x: pd.Series) -> pd.Series:
- return x + 1
+ # Cannot infer a vectorized UDF type
+ @udf(returnType=LongType())
+ def f4(x: int) -> pa.Array:
+ return x + 1
- # useArrow is explicitly set to false
- @udf(returnType=LongType(), useArrow=False)
- def f6(x: pa.Array) -> pa.Array:
- return x + 1
+ # useArrow is explicitly set to false
+ @udf(returnType=LongType(), useArrow=False)
+ def f5(x: pd.Series) -> pd.Series:
+ return x + 1
- expected = self.spark.range(10).select((sf.col("id") +
1).alias("res")).collect()
- for f in [f1, f2, f3, f4, f5, f6]:
- self.assertEqual(f.evalType, PythonEvalType.SQL_BATCHED_UDF)
- result =
self.spark.range(10).select(f("id").alias("res")).collect()
- self.assertEqual(result, expected)
+ # useArrow is explicitly set to false
+ @udf(returnType=LongType(), useArrow=False)
+ def f6(x: pa.Array) -> pa.Array:
+ return x + 1
+
+ expected = self.spark.range(10).select((sf.col("id") +
1).alias("res")).collect()
+ for f in [f1, f2, f3, f4, f5, f6]:
+ self.assertEqual(f.evalType, PythonEvalType.SQL_BATCHED_UDF)
+ result =
self.spark.range(10).select(f("id").alias("res")).collect()
+ self.assertEqual(result, expected)
def test_arrow_optimized_python_udf(self):
import pandas as pd
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index d6fa8b21b21a..c331f1724854 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3917,7 +3917,7 @@ object SQLConf {
.doc("(Deprecated since Spark 3.0, please set
'spark.sql.execution.arrow.pyspark.enabled'.)")
.version("2.3.0")
.booleanConf
- .createWithDefault(false)
+ .createWithDefault(true)
val ARROW_PYSPARK_EXECUTION_ENABLED =
buildConf("spark.sql.execution.arrow.pyspark.enabled")
@@ -4263,7 +4263,7 @@ object SQLConf {
"can only be enabled when the given function takes at least one
argument.")
.version("3.4.0")
.booleanConf
- .createWithDefault(false)
+ .createWithDefault(true)
val PYTHON_UDF_ARROW_CONCURRENCY_LEVEL =
buildConf("spark.sql.execution.pythonUDF.arrow.concurrency.level")
@@ -4299,7 +4299,7 @@ object SQLConf {
.doc("Enable Arrow optimization for Python UDTFs.")
.version("3.5.0")
.booleanConf
- .createWithDefault(false)
+ .createWithDefault(true)
val PYTHON_TABLE_UDF_LEGACY_PANDAS_CONVERSION_ENABLED =
buildConf("spark.sql.legacy.execution.pythonUDTF.pandas.conversion.enabled")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]