This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 69f1d5c5e461 [SPARK-55025][PS] Improve performance in pandas by using
list comprehension
69f1d5c5e461 is described below
commit 69f1d5c5e4616265f7fc3300febbddc6d7a05c89
Author: Devin Petersohn <[email protected]>
AuthorDate: Fri Jan 16 19:42:23 2026 -0800
[SPARK-55025][PS] Improve performance in pandas by using list comprehension
### What changes were proposed in this pull request?
Improve the performance of various metadata and precomputing operations in
pandas by using list comprehension.
### Why are the changes needed?
Performance and maintainability
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
CI
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #53701 from devin-petersohn/devin/pandas_maintain_03.
Authored-by: Devin Petersohn <[email protected]>
Signed-off-by: Huaxin Gao <[email protected]>
---
python/pyspark/pandas/frame.py | 126 ++++++++++++++++++-----------------------
1 file changed, 56 insertions(+), 70 deletions(-)
diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index 63a8998487f5..e7ec1ea28b65 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -891,9 +891,7 @@ class DataFrame(Frame, Generic[T]):
op: Callable[["Series"], Union["Series", PySparkColumn]],
should_resolve: bool = False,
) -> "DataFrame":
- applied = []
- for label in self._internal.column_labels:
- applied.append(op(self._psser_for(label)))
+ applied = [op(self._psser_for(label)) for label in
self._internal.column_labels]
internal = self._internal.with_new_columns(applied)
if should_resolve:
internal = internal.resolved_copy
@@ -1612,17 +1610,16 @@ class DataFrame(Frame, Generic[T]):
# | 4| 1|NULL|
# +---+---+----+
- pair_scols = []
- for i in range(0, num_scols):
- for j in range(i, num_scols):
- pair_scols.append(
- F.struct(
- F.lit(i).alias(index_1_col_name),
- F.lit(j).alias(index_2_col_name),
- numeric_scols[i].alias(CORRELATION_VALUE_1_COLUMN),
- numeric_scols[j].alias(CORRELATION_VALUE_2_COLUMN),
- )
- )
+ pair_scols = [
+ F.struct(
+ F.lit(i).alias(index_1_col_name),
+ F.lit(j).alias(index_2_col_name),
+ numeric_scols[i].alias(CORRELATION_VALUE_1_COLUMN),
+ numeric_scols[j].alias(CORRELATION_VALUE_2_COLUMN),
+ )
+ for i in range(0, num_scols)
+ for j in range(i, num_scols)
+ ]
#
+-------------------+-------------------+-------------------+-------------------+
#
|__tmp_index_1_col__|__tmp_index_2_col__|__tmp_value_1_col__|__tmp_value_2_col__|
@@ -1851,16 +1848,16 @@ class DataFrame(Frame, Generic[T]):
sdf = combined._internal.spark_frame
index_col_name = verify_temp_column_name(sdf,
"__corrwith_index_temp_column__")
- this_numeric_column_labels: List[Label] = []
- for column_label in this._internal.column_labels:
- if isinstance(this._internal.spark_type_for(column_label),
(NumericType, BooleanType)):
- this_numeric_column_labels.append(column_label)
-
- that_numeric_column_labels: List[Label] = []
- for column_label in that._internal.column_labels:
- if isinstance(that._internal.spark_type_for(column_label),
(NumericType, BooleanType)):
- that_numeric_column_labels.append(column_label)
-
+ this_numeric_column_labels: List[Label] = [
+ column_label
+ for column_label in this._internal.column_labels
+ if isinstance(this._internal.spark_type_for(column_label),
(NumericType, BooleanType))
+ ]
+ that_numeric_column_labels: List[Label] = [
+ column_label
+ for column_label in that._internal.column_labels
+ if isinstance(that._internal.spark_type_for(column_label),
(NumericType, BooleanType))
+ ]
intersect_numeric_column_labels: List[Label] = []
diff_numeric_column_labels: List[Label] = []
pair_scols = []
@@ -4074,17 +4071,15 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
# | 4| 4|500| false| -4|
false| ...
#
+-----------------+---+---+------------------+-------------------+------------------+--...
- data_spark_columns = []
- for label in self._internal.column_labels:
- data_spark_columns.append(
- F.when(
-
psdf[tmp_cond_col_name(name_like_string(label))].spark.column,
- psdf._internal.spark_column_for(label),
- )
-
.otherwise(psdf[tmp_other_col_name(name_like_string(label))].spark.column)
- .alias(psdf._internal.spark_column_name_for(label))
+ data_spark_columns = [
+ F.when(
+ psdf[tmp_cond_col_name(name_like_string(label))].spark.column,
+ psdf._internal.spark_column_for(label),
)
-
+
.otherwise(psdf[tmp_other_col_name(name_like_string(label))].spark.column)
+ .alias(psdf._internal.spark_column_name_for(label))
+ for label in self._internal.column_labels
+ ]
return DataFrame(
psdf._internal.with_new_columns(
data_spark_columns, column_labels=self._internal.column_labels
# TODO: dtypes?
@@ -6076,15 +6071,12 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
internal = internal.with_filter(cond)
psdf: DataFrame = DataFrame(internal)
-
- null_counts = []
- for label in internal.column_labels:
- psser = psdf._psser_for(label)
- cond = psser.isnull().spark.column
- null_counts.append(
- F.sum(F.when(~cond,
1).otherwise(0)).alias(name_like_string(label))
+ null_counts = [
+ F.sum(F.when(~psdf._psser_for(label).isnull().spark.column,
1).otherwise(0)).alias(
+ name_like_string(label)
)
-
+ for label in internal.column_labels
+ ]
counts = internal.spark_frame.select(null_counts +
[F.count("*")]).head()
if thresh is not None:
@@ -6281,13 +6273,11 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
"future version. Convert to a specific numeric type before
interpolating.",
FutureWarning,
)
-
- numeric_col_names = []
- for label in self._internal.column_labels:
- psser = self._psser_for(label)
- if isinstance(psser.spark.data_type, (NumericType, BooleanType)):
- numeric_col_names.append(psser.name)
-
+ numeric_col_names = [
+ self._psser_for(label).name
+ for label in self._internal.column_labels
+ if isinstance(self._psser_for(label).spark.data_type,
(NumericType, BooleanType))
+ ]
if len(numeric_col_names) == 0:
raise TypeError(
"Cannot interpolate with all object-dtype columns in the
DataFrame. "
@@ -9936,13 +9926,12 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
# If not all columns are timestamp type,
# we also need to calculate the `std` for numeric columns
if has_numeric_type:
- std_exprs = []
- for label, spark_data_type in zip(column_labels,
spark_data_types):
- column_name = label[0]
- if isinstance(spark_data_type, (TimestampType,
TimestampNTZType)):
-
std_exprs.append(F.lit(None).alias("stddev_samp({})".format(column_name)))
- else:
- std_exprs.append(F.stddev(column_name))
+ std_exprs = [
+ F.lit(None).alias("stddev_samp({})".format(label[0]))
+ if isinstance(spark_data_type, (TimestampType,
TimestampNTZType))
+ else F.stddev(label[0])
+ for label, spark_data_type in zip(column_labels,
spark_data_types)
+ ]
exprs.extend(std_exprs)
stats_names.append("std")
@@ -13507,11 +13496,11 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
):
raise NotImplementedError("`on` currently works only for
TimestampType")
- agg_columns: List[ps.Series] = []
- for column_label in self._internal.column_labels:
- if isinstance(self._internal.spark_type_for(column_label),
(NumericType, BooleanType)):
- agg_columns.append(self._psser_for(column_label))
-
+ agg_columns: List[ps.Series] = [
+ self._psser_for(column_label)
+ for column_label in self._internal.column_labels
+ if isinstance(self._internal.spark_type_for(column_label),
(NumericType, BooleanType))
+ ]
if len(agg_columns) == 0:
raise ValueError("No available aggregation columns!")
@@ -13836,17 +13825,14 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
return align_diff_frames(apply_op, this, that, fillna=True,
how="full")
else:
# DataFrame and Series
- applied = []
this = inputs[0]
assert all(inp is this for inp in inputs if isinstance(inp,
DataFrame))
-
- for label in this._internal.column_labels:
- arguments = []
- for inp in inputs:
- arguments.append(inp[label] if isinstance(inp, DataFrame)
else inp)
- # both binary and unary.
- applied.append(ufunc(*arguments, **kwargs).rename(label))
-
+ applied = [
+ ufunc(
+ *[inp[label] if isinstance(inp, DataFrame) else inp for
inp in inputs], **kwargs
+ ).rename(label)
+ for label in this._internal.column_labels
+ ]
internal = this._internal.with_new_columns(applied)
return DataFrame(internal)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]