This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 34d47161f9af [SPARK-54568][PYTHON] Avoid unnecessary pandas conversion 
in create dataframe from ndarray
34d47161f9af is described below

commit 34d47161f9afcca32160a6e559683e06d83e2f28
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Wed Dec 3 17:08:37 2025 +0800

    [SPARK-54568][PYTHON] Avoid unnecessary pandas conversion in create 
dataframe from ndarray
    
    ### What changes were proposed in this pull request?
    Avoid unnecessary pandas conversion in create dataframe from ndarray
    
    ### Why are the changes needed?
    before:
    ndarray -> pandas dataframe -> arrow data
    
    after:
    ndarray -> arrow data
    
    and will be consistent with connect mode:
    
https://github.com/apache/spark/blob/40ba971b7319d74670ba86cc1f280a8a0f7a1dbb/python/pyspark/sql/connect/session.py#L675-L706
    
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
    ci
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #53280 from zhengruifeng/test_np_arrow.
    
    Authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/pyspark/sql/session.py | 40 +++++++++++++++++++++-------------------
 1 file changed, 21 insertions(+), 19 deletions(-)

diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index dbcfe52e77b6..b59ed9f0a840 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -52,7 +52,6 @@ from pyspark.sql.streaming import DataStreamReader
 from pyspark.sql.types import (
     AtomicType,
     DataType,
-    StructField,
     StructType,
     VariantVal,
     _make_type_verifier,
@@ -60,7 +59,6 @@ from pyspark.sql.types import (
     _has_nulltype,
     _merge_type,
     _create_converter,
-    _from_numpy_type,
 )
 from pyspark.errors.exceptions.captured import install_exception_handler
 from pyspark.sql.utils import (
@@ -1565,32 +1563,36 @@ class SparkSession(SparkConversionMixin):
             has_pyarrow = False
 
         if has_numpy and isinstance(data, np.ndarray):
-            # `data` of numpy.ndarray type will be converted to a pandas 
DataFrame,
-            # so pandas is required.
-            from pyspark.sql.pandas.utils import require_minimum_pandas_version
+            # `data` of numpy.ndarray type will be converted to an arrow Table,
+            # so pyarrow is required.
+            from pyspark.sql.pandas.utils import 
require_minimum_pyarrow_version
 
-            require_minimum_pandas_version()
+            require_minimum_pyarrow_version()
             if data.ndim not in [1, 2]:
                 raise PySparkValueError(
                     errorClass="INVALID_NDARRAY_DIMENSION",
                     messageParameters={"dimensions": "1 or 2"},
                 )
 
-            if data.ndim == 1 or data.shape[1] == 1:
-                column_names = ["value"]
+            col_names: list[str] = []
+            if isinstance(schema, StructType):
+                col_names = schema.names
+            elif isinstance(schema, list):
+                col_names = schema
+            elif data.ndim == 1 or data.shape[1] == 1:
+                col_names = ["value"]
             else:
-                column_names = ["_%s" % i for i in range(1, data.shape[1] + 1)]
-
-            if schema is None and not self._jconf.arrowPySparkEnabled():
-                # Construct `schema` from `np.dtype` of the input NumPy array
-                # TODO: Apply the logic below when 
self._jconf.arrowPySparkEnabled() is True
-                spark_type = _from_numpy_type(data.dtype)
-                if spark_type is not None:
-                    schema = StructType(
-                        [StructField(name, spark_type, nullable=True) for name 
in column_names]
-                    )
+                col_names = [f"_{i + 1}" for i in range(0, data.shape[1])]
 
-            data = pd.DataFrame(data, columns=column_names)
+            if data.ndim == 1:
+                data = pa.Table.from_arrays(arrays=[pa.array(data)], 
names=col_names)
+            elif data.shape[1] == 1:
+                data = pa.Table.from_arrays(arrays=[pa.array(data.squeeze())], 
names=col_names)
+            else:
+                data = pa.Table.from_arrays(
+                    arrays=[pa.array(data[::, i]) for i in range(0, 
data.shape[1])],
+                    names=col_names,
+                )
 
         if has_pandas and isinstance(data, pd.DataFrame):
             # Create a DataFrame from pandas DataFrame.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to