This is an automated email from the ASF dual-hosted git repository.

dbtsai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ea0a35e065d3 [SPARK-54555][PYTHON] Enable Arrow-optimized Python UDFs 
and Arrow-based PySpark IPC by default
ea0a35e065d3 is described below

commit ea0a35e065d3eba14c5ed24252e14d872e5b371b
Author: Amanda Liu <[email protected]>
AuthorDate: Tue Dec 2 13:24:29 2025 -0800

    [SPARK-54555][PYTHON] Enable Arrow-optimized Python UDFs and Arrow-based 
PySpark IPC by default
    
    ### What changes were proposed in this pull request?
    
    Enable PySpark Arrow-based optimizations by default in Spark 4.2.0, 
updating default conf values:
    - Set `spark.sql.execution.pythonUDF.arrow.enabled` and 
`spark.sql.execution.pythonUDTF.arrow.enabled` to`true` by default to enable 
Arrow-optimized execution for regular Python UDFs and UDTFs.
    - Set `spark.sql.execution.arrow.pyspark.enabled` to `true` by default to 
enable Arrow-based columnar data exchange for PySpark APIs such as 
DataFrame.toPandas and SparkSession.createDataFrame when the input is a pandas 
DataFrame or NumPy array.
    
    Update user-facing docs and migration guides to reflect the change.
    
    ### Why are the changes needed?
    
    Arrow’s columnar IPC significantly improves JVM↔Python throughput and 
reduces serialization/deserialization overhead, speeding up Python UDFs and 
DataFrame conversions. Additionally, Arrow provides consistent, well-defined 
rules for type coercion when Python return values differ from declared UDF 
return types, reducing ambiguous behavior.
    
    Enabling arrow by default brings performance and correctness improvements 
to the majority of PySpark users with minimal configuration. Users who depend 
on the previous (non-Arrow) implementation can opt out by explicitly setting 
`spark.sql.execution.pythonUDF.arrow.enabled`, 
`spark.sql.execution.pythonUTF.arrow.enabled`, and 
`spark.sql.execution.arrow.pyspark.enabled` to `false`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, changes the default configuration of 
`spark.sql.execution.pythonUDF.arrow.enabled`, 
`spark.sql.execution.pythonUDTF.arrow.enabled`, and 
`spark.sql.execution.arrow.pyspark.enabled` to `true` and updates user-facing 
docs.
    
    ### How was this patch tested?
    
    Existing PySpark test suites are run with enabling and disabling the arrow 
conf.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #53264 from asl3/enablearrowbydefault.
    
    Authored-by: Amanda Liu <[email protected]>
    Signed-off-by: DB Tsai <[email protected]>
---
 .../source/migration_guide/pyspark_upgrade.rst     |  6 +++
 python/docs/source/tutorial/sql/arrow_pandas.rst   | 20 +++++---
 python/docs/source/tutorial/sql/python_udtf.rst    |  3 +-
 .../docs/source/tutorial/sql/type_conversions.rst  |  2 +-
 python/pyspark/sql/tests/test_unified_udf.py       | 56 +++++++++++-----------
 .../org/apache/spark/sql/internal/SQLConf.scala    |  6 +--
 6 files changed, 55 insertions(+), 38 deletions(-)

diff --git a/python/docs/source/migration_guide/pyspark_upgrade.rst 
b/python/docs/source/migration_guide/pyspark_upgrade.rst
index c6cf69dadc93..fbd63539b380 100644
--- a/python/docs/source/migration_guide/pyspark_upgrade.rst
+++ b/python/docs/source/migration_guide/pyspark_upgrade.rst
@@ -19,6 +19,12 @@
 Upgrading PySpark
 ==================
 
+Upgrading from PySpark 4.1 to 4.2
+---------------------------------
+* In Spark 4.2, columnar data exchange between PySpark and the JVM uses Apache 
Arrow by default. The configuration 
``spark.sql.execution.arrow.pyspark.enabled`` now defaults to true. To restore 
the legacy (non-Arrow) row-based data exchange, set 
``spark.sql.execution.arrow.pyspark.enabled`` to ``false``.
+* In Spark 4.2, regular Python UDFs are Arrow-optimized by default. The 
configuration ``spark.sql.execution.pythonUDF.arrow.enabled`` now defaults to 
true. To restore the legacy behavior for Python UDF execution, set 
``spark.sql.execution.pythonUDF.arrow.enabled`` to ``false``.
+* In Spark 4.2, regular Python UDTFs are Arrow-optimized by default. The 
configuration ``spark.sql.execution.pythonUDTF.arrow.enabled`` now defaults to 
true. To restore the legacy behavior for Python UDTF execution, set 
``spark.sql.execution.pythonUDTF.arrow.enabled`` to ``false``.
+
 Upgrading from PySpark 4.0 to 4.1
 ---------------------------------
 
diff --git a/python/docs/source/tutorial/sql/arrow_pandas.rst 
b/python/docs/source/tutorial/sql/arrow_pandas.rst
index 3bef50874d7f..386fe83b4821 100644
--- a/python/docs/source/tutorial/sql/arrow_pandas.rst
+++ b/python/docs/source/tutorial/sql/arrow_pandas.rst
@@ -60,8 +60,8 @@ Enabling for Conversion to/from Pandas
 
 Arrow is available as an optimization when converting a Spark DataFrame to a 
Pandas DataFrame
 using the call :meth:`DataFrame.toPandas` and when creating a Spark DataFrame 
from a Pandas DataFrame with
-:meth:`SparkSession.createDataFrame`. To use Arrow when executing these calls, 
users need to first set
-the Spark configuration ``spark.sql.execution.arrow.pyspark.enabled`` to 
``true``. This is disabled by default.
+:meth:`SparkSession.createDataFrame`. To use Arrow when executing these calls,
+the Spark configuration ``spark.sql.execution.arrow.pyspark.enabled`` must be 
set to ``true``. This is enabled by default.
 
 In addition, optimizations enabled by 
``spark.sql.execution.arrow.pyspark.enabled`` could fallback automatically
 to non-Arrow optimization implementation if an error occurs before the actual 
computation within Spark.
@@ -368,6 +368,9 @@ Here's an example that demonstrates the usage of both a 
default, pickled Python
     :lines: 298-316
     :dedent: 4
 
+Type coercion:
+~~~~~~~~~~~~~~
+
 Compared to the default, pickled Python UDFs, Arrow Python UDFs provide a more 
coherent type coercion mechanism. UDF
 type coercion poses challenges when the Python instances returned by UDFs do 
not align with the user-specified
 return type. The default, pickled Python UDFs' type coercion has certain 
limitations, such as relying on None as a
@@ -375,11 +378,16 @@ fallback for type mismatches, leading to potential 
ambiguity and data loss. Addi
 and tuples to strings can yield ambiguous results. Arrow Python UDFs, on the 
other hand, leverage Arrow's
 capabilities to standardize type coercion and address these issues effectively.
 
-A note on Arrow Python UDF type coercion: In Spark 4.1, unnecessary conversion 
to pandas instances is removed in the serializer
-when ``spark.sql.execution.pythonUDF.arrow.enabled`` is enabled. As a result, 
the type coercion changes
-when the produced output has a schema different from the specified schema. To 
restore the previous behavior,
-enable ``spark.sql.legacy.execution.pythonUDF.pandas.conversion.enabled``. 
+Type coercion differences are introduced by the following changes:
+* Since Spark 4.2, Arrow optimization is enabled by default for regular Python 
UDFs.
+The full type coercion difference is summarized in the tables `here 
<https://github.com/apache/spark/pull/41706>`__.
+To disable Arrow optimization, set 
``spark.sql.execution.pythonUDF.arrow.enabled`` to false.
+
+* Since Spark 4.1, unnecessary conversion to pandas instances in 
Arrow-optimized Python UDF is removed in the serializer
+when ``spark.sql.legacy.execution.pythonUDF.pandas.conversion.enabled`` is 
disabled.
+
 The behavior difference is summarized in the tables `here 
<https://github.com/apache/spark/pull/51225>`__.
+To restore the legacy behavior, set 
``spark.sql.legacy.execution.pythonUDF.pandas.conversion.enabled`` to true.
 
 Usage Notes
 -----------
diff --git a/python/docs/source/tutorial/sql/python_udtf.rst 
b/python/docs/source/tutorial/sql/python_udtf.rst
index 26142e42e363..e2d240a6f9de 100644
--- a/python/docs/source/tutorial/sql/python_udtf.rst
+++ b/python/docs/source/tutorial/sql/python_udtf.rst
@@ -429,7 +429,8 @@ Arrow Optimization
 ------------------
 
 Apache Arrow is an in-memory columnar data format used in Spark to efficiently 
transfer
-data between Java and Python processes. Apache Arrow is disabled by default 
for Python UDTFs.
+data between Java and Python processes. Beginning in Spark 4.2, Apache Arrow 
is enabled by default for Python UDTFs.
+To disable Arrow optimization, set 
``spark.sql.execution.pythonUDTF.arrow.enabled`` to false.
 
 Arrow can improve performance when each input row generates a large result 
table from the UDTF.
 
diff --git a/python/docs/source/tutorial/sql/type_conversions.rst 
b/python/docs/source/tutorial/sql/type_conversions.rst
index 2a88731ce819..625a68340f20 100644
--- a/python/docs/source/tutorial/sql/type_conversions.rst
+++ b/python/docs/source/tutorial/sql/type_conversions.rst
@@ -57,7 +57,7 @@ are listed below:
       - Default
     * - spark.sql.execution.pythonUDF.arrow.enabled
       - Enable PyArrow in PySpark. See more `here <arrow_pandas.rst>`_.
-      - False
+      - True
     * - spark.sql.pyspark.inferNestedDictAsStruct.enabled
       - When enabled, nested dictionaries are inferred as StructType. 
Otherwise, they are inferred as MapType.
       - False
diff --git a/python/pyspark/sql/tests/test_unified_udf.py 
b/python/pyspark/sql/tests/test_unified_udf.py
index 2d3446bd0b5b..4b89d92111c8 100644
--- a/python/pyspark/sql/tests/test_unified_udf.py
+++ b/python/pyspark/sql/tests/test_unified_udf.py
@@ -351,39 +351,41 @@ class UnifiedUDFTestsMixin:
         import pandas as pd
         import pyarrow as pa
 
-        @udf(returnType=LongType())
-        def f1(x):
-            return x + 1
+        with self.sql_conf({"spark.sql.execution.pythonUDF.arrow.enabled": 
False}):
 
-        @udf(returnType=LongType())
-        def f2(x: int) -> int:
-            return x + 1
+            @udf(returnType=LongType())
+            def f1(x):
+                return x + 1
 
-        # Cannot infer a vectorized UDF type
-        @udf(returnType=LongType())
-        def f3(x: int) -> pd.Series:
-            return x + 1
+            @udf(returnType=LongType())
+            def f2(x: int) -> int:
+                return x + 1
 
-        # Cannot infer a vectorized UDF type
-        @udf(returnType=LongType())
-        def f4(x: int) -> pa.Array:
-            return x + 1
+            # Cannot infer a vectorized UDF type
+            @udf(returnType=LongType())
+            def f3(x: int) -> pd.Series:
+                return x + 1
 
-        # useArrow is explicitly set to false
-        @udf(returnType=LongType(), useArrow=False)
-        def f5(x: pd.Series) -> pd.Series:
-            return x + 1
+            # Cannot infer a vectorized UDF type
+            @udf(returnType=LongType())
+            def f4(x: int) -> pa.Array:
+                return x + 1
 
-        # useArrow is explicitly set to false
-        @udf(returnType=LongType(), useArrow=False)
-        def f6(x: pa.Array) -> pa.Array:
-            return x + 1
+            # useArrow is explicitly set to false
+            @udf(returnType=LongType(), useArrow=False)
+            def f5(x: pd.Series) -> pd.Series:
+                return x + 1
 
-        expected = self.spark.range(10).select((sf.col("id") + 
1).alias("res")).collect()
-        for f in [f1, f2, f3, f4, f5, f6]:
-            self.assertEqual(f.evalType, PythonEvalType.SQL_BATCHED_UDF)
-            result = 
self.spark.range(10).select(f("id").alias("res")).collect()
-            self.assertEqual(result, expected)
+            # useArrow is explicitly set to false
+            @udf(returnType=LongType(), useArrow=False)
+            def f6(x: pa.Array) -> pa.Array:
+                return x + 1
+
+            expected = self.spark.range(10).select((sf.col("id") + 
1).alias("res")).collect()
+            for f in [f1, f2, f3, f4, f5, f6]:
+                self.assertEqual(f.evalType, PythonEvalType.SQL_BATCHED_UDF)
+                result = 
self.spark.range(10).select(f("id").alias("res")).collect()
+                self.assertEqual(result, expected)
 
     def test_arrow_optimized_python_udf(self):
         import pandas as pd
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index d6fa8b21b21a..c331f1724854 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3917,7 +3917,7 @@ object SQLConf {
       .doc("(Deprecated since Spark 3.0, please set 
'spark.sql.execution.arrow.pyspark.enabled'.)")
       .version("2.3.0")
       .booleanConf
-      .createWithDefault(false)
+      .createWithDefault(true)
 
   val ARROW_PYSPARK_EXECUTION_ENABLED =
     buildConf("spark.sql.execution.arrow.pyspark.enabled")
@@ -4263,7 +4263,7 @@ object SQLConf {
         "can only be enabled when the given function takes at least one 
argument.")
       .version("3.4.0")
       .booleanConf
-      .createWithDefault(false)
+      .createWithDefault(true)
 
   val PYTHON_UDF_ARROW_CONCURRENCY_LEVEL =
     buildConf("spark.sql.execution.pythonUDF.arrow.concurrency.level")
@@ -4299,7 +4299,7 @@ object SQLConf {
       .doc("Enable Arrow optimization for Python UDTFs.")
       .version("3.5.0")
       .booleanConf
-      .createWithDefault(false)
+      .createWithDefault(true)
 
   val PYTHON_TABLE_UDF_LEGACY_PANDAS_CONVERSION_ENABLED =
     
buildConf("spark.sql.legacy.execution.pythonUDTF.pandas.conversion.enabled")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to