This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 69f1d5c5e461 [SPARK-55025][PS] Improve performance in pandas by using 
list comprehension
69f1d5c5e461 is described below

commit 69f1d5c5e4616265f7fc3300febbddc6d7a05c89
Author: Devin Petersohn <[email protected]>
AuthorDate: Fri Jan 16 19:42:23 2026 -0800

    [SPARK-55025][PS] Improve performance in pandas by using list comprehension
    
    ### What changes were proposed in this pull request?
    
    Improve the performance of various metadata and precomputing operations in 
pandas by using list comprehension.
    
    ### Why are the changes needed?
    
    Performance and maintainability
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    CI
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #53701 from devin-petersohn/devin/pandas_maintain_03.
    
    Authored-by: Devin Petersohn <[email protected]>
    Signed-off-by: Huaxin Gao <[email protected]>
---
 python/pyspark/pandas/frame.py | 126 ++++++++++++++++++-----------------------
 1 file changed, 56 insertions(+), 70 deletions(-)

diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index 63a8998487f5..e7ec1ea28b65 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -891,9 +891,7 @@ class DataFrame(Frame, Generic[T]):
         op: Callable[["Series"], Union["Series", PySparkColumn]],
         should_resolve: bool = False,
     ) -> "DataFrame":
-        applied = []
-        for label in self._internal.column_labels:
-            applied.append(op(self._psser_for(label)))
+        applied = [op(self._psser_for(label)) for label in 
self._internal.column_labels]
         internal = self._internal.with_new_columns(applied)
         if should_resolve:
             internal = internal.resolved_copy
@@ -1612,17 +1610,16 @@ class DataFrame(Frame, Generic[T]):
         # |  4|  1|NULL|
         # +---+---+----+
 
-        pair_scols = []
-        for i in range(0, num_scols):
-            for j in range(i, num_scols):
-                pair_scols.append(
-                    F.struct(
-                        F.lit(i).alias(index_1_col_name),
-                        F.lit(j).alias(index_2_col_name),
-                        numeric_scols[i].alias(CORRELATION_VALUE_1_COLUMN),
-                        numeric_scols[j].alias(CORRELATION_VALUE_2_COLUMN),
-                    )
-                )
+        pair_scols = [
+            F.struct(
+                F.lit(i).alias(index_1_col_name),
+                F.lit(j).alias(index_2_col_name),
+                numeric_scols[i].alias(CORRELATION_VALUE_1_COLUMN),
+                numeric_scols[j].alias(CORRELATION_VALUE_2_COLUMN),
+            )
+            for i in range(0, num_scols)
+            for j in range(i, num_scols)
+        ]
 
         # 
+-------------------+-------------------+-------------------+-------------------+
         # 
|__tmp_index_1_col__|__tmp_index_2_col__|__tmp_value_1_col__|__tmp_value_2_col__|
@@ -1851,16 +1848,16 @@ class DataFrame(Frame, Generic[T]):
         sdf = combined._internal.spark_frame
         index_col_name = verify_temp_column_name(sdf, 
"__corrwith_index_temp_column__")
 
-        this_numeric_column_labels: List[Label] = []
-        for column_label in this._internal.column_labels:
-            if isinstance(this._internal.spark_type_for(column_label), 
(NumericType, BooleanType)):
-                this_numeric_column_labels.append(column_label)
-
-        that_numeric_column_labels: List[Label] = []
-        for column_label in that._internal.column_labels:
-            if isinstance(that._internal.spark_type_for(column_label), 
(NumericType, BooleanType)):
-                that_numeric_column_labels.append(column_label)
-
+        this_numeric_column_labels: List[Label] = [
+            column_label
+            for column_label in this._internal.column_labels
+            if isinstance(this._internal.spark_type_for(column_label), 
(NumericType, BooleanType))
+        ]
+        that_numeric_column_labels: List[Label] = [
+            column_label
+            for column_label in that._internal.column_labels
+            if isinstance(that._internal.spark_type_for(column_label), 
(NumericType, BooleanType))
+        ]
         intersect_numeric_column_labels: List[Label] = []
         diff_numeric_column_labels: List[Label] = []
         pair_scols = []
@@ -4074,17 +4071,15 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
         # |                4|  4|500|             false|                 -4|   
          false|  ...
         # 
+-----------------+---+---+------------------+-------------------+------------------+--...
 
-        data_spark_columns = []
-        for label in self._internal.column_labels:
-            data_spark_columns.append(
-                F.when(
-                    
psdf[tmp_cond_col_name(name_like_string(label))].spark.column,
-                    psdf._internal.spark_column_for(label),
-                )
-                
.otherwise(psdf[tmp_other_col_name(name_like_string(label))].spark.column)
-                .alias(psdf._internal.spark_column_name_for(label))
+        data_spark_columns = [
+            F.when(
+                psdf[tmp_cond_col_name(name_like_string(label))].spark.column,
+                psdf._internal.spark_column_for(label),
             )
-
+            
.otherwise(psdf[tmp_other_col_name(name_like_string(label))].spark.column)
+            .alias(psdf._internal.spark_column_name_for(label))
+            for label in self._internal.column_labels
+        ]
         return DataFrame(
             psdf._internal.with_new_columns(
                 data_spark_columns, column_labels=self._internal.column_labels 
 # TODO: dtypes?
@@ -6076,15 +6071,12 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
                 internal = internal.with_filter(cond)
 
             psdf: DataFrame = DataFrame(internal)
-
-            null_counts = []
-            for label in internal.column_labels:
-                psser = psdf._psser_for(label)
-                cond = psser.isnull().spark.column
-                null_counts.append(
-                    F.sum(F.when(~cond, 
1).otherwise(0)).alias(name_like_string(label))
+            null_counts = [
+                F.sum(F.when(~psdf._psser_for(label).isnull().spark.column, 
1).otherwise(0)).alias(
+                    name_like_string(label)
                 )
-
+                for label in internal.column_labels
+            ]
             counts = internal.spark_frame.select(null_counts + 
[F.count("*")]).head()
 
             if thresh is not None:
@@ -6281,13 +6273,11 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
                     "future version. Convert to a specific numeric type before 
interpolating.",
                     FutureWarning,
                 )
-
-        numeric_col_names = []
-        for label in self._internal.column_labels:
-            psser = self._psser_for(label)
-            if isinstance(psser.spark.data_type, (NumericType, BooleanType)):
-                numeric_col_names.append(psser.name)
-
+        numeric_col_names = [
+            self._psser_for(label).name
+            for label in self._internal.column_labels
+            if isinstance(self._psser_for(label).spark.data_type, 
(NumericType, BooleanType))
+        ]
         if len(numeric_col_names) == 0:
             raise TypeError(
                 "Cannot interpolate with all object-dtype columns in the 
DataFrame. "
@@ -9936,13 +9926,12 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
             # If not all columns are timestamp type,
             # we also need to calculate the `std` for numeric columns
             if has_numeric_type:
-                std_exprs = []
-                for label, spark_data_type in zip(column_labels, 
spark_data_types):
-                    column_name = label[0]
-                    if isinstance(spark_data_type, (TimestampType, 
TimestampNTZType)):
-                        
std_exprs.append(F.lit(None).alias("stddev_samp({})".format(column_name)))
-                    else:
-                        std_exprs.append(F.stddev(column_name))
+                std_exprs = [
+                    F.lit(None).alias("stddev_samp({})".format(label[0]))
+                    if isinstance(spark_data_type, (TimestampType, 
TimestampNTZType))
+                    else F.stddev(label[0])
+                    for label, spark_data_type in zip(column_labels, 
spark_data_types)
+                ]
                 exprs.extend(std_exprs)
                 stats_names.append("std")
 
@@ -13507,11 +13496,11 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
         ):
             raise NotImplementedError("`on` currently works only for 
TimestampType")
 
-        agg_columns: List[ps.Series] = []
-        for column_label in self._internal.column_labels:
-            if isinstance(self._internal.spark_type_for(column_label), 
(NumericType, BooleanType)):
-                agg_columns.append(self._psser_for(column_label))
-
+        agg_columns: List[ps.Series] = [
+            self._psser_for(column_label)
+            for column_label in self._internal.column_labels
+            if isinstance(self._internal.spark_type_for(column_label), 
(NumericType, BooleanType))
+        ]
         if len(agg_columns) == 0:
             raise ValueError("No available aggregation columns!")
 
@@ -13836,17 +13825,14 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
             return align_diff_frames(apply_op, this, that, fillna=True, 
how="full")
         else:
             # DataFrame and Series
-            applied = []
             this = inputs[0]
             assert all(inp is this for inp in inputs if isinstance(inp, 
DataFrame))
-
-            for label in this._internal.column_labels:
-                arguments = []
-                for inp in inputs:
-                    arguments.append(inp[label] if isinstance(inp, DataFrame) 
else inp)
-                # both binary and unary.
-                applied.append(ufunc(*arguments, **kwargs).rename(label))
-
+            applied = [
+                ufunc(
+                    *[inp[label] if isinstance(inp, DataFrame) else inp for 
inp in inputs], **kwargs
+                ).rename(label)
+                for label in this._internal.column_labels
+            ]
             internal = this._internal.with_new_columns(applied)
             return DataFrame(internal)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to