This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 165c754652eb [SPARK-50183][PYTHON] Unify the internal functions for
Pandas API and PySpark Plotting
165c754652eb is described below
commit 165c754652ebf80310426376caac324d7ab2bad0
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Thu Oct 31 15:47:08 2024 +0900
[SPARK-50183][PYTHON] Unify the internal functions for Pandas API and
PySpark Plotting
### What changes were proposed in this pull request?
Unify the internal functions for Pandas API and PySpark Plotting
### Why are the changes needed?
to make internal functions more easier to reuse
### Does this PR introduce _any_ user-facing change?
no, refactor only
### How was this patch tested?
ci
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #48715 from zhengruifeng/py_internal_fn.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../pyspark/pandas/data_type_ops/datetime_ops.py | 2 +-
python/pyspark/pandas/frame.py | 2 +-
python/pyspark/pandas/generic.py | 2 +-
python/pyspark/pandas/groupby.py | 2 +-
python/pyspark/pandas/internal.py | 2 +-
python/pyspark/pandas/plot/core.py | 2 +-
python/pyspark/pandas/resample.py | 2 +-
python/pyspark/pandas/series.py | 2 +-
python/pyspark/pandas/spark/functions.py | 91 --------------
python/pyspark/pandas/window.py | 2 +-
python/pyspark/sql/internal.py | 132 +++++++++++++++++++++
python/pyspark/sql/plot/core.py | 17 +--
python/pyspark/sql/utils.py | 20 ----
13 files changed, 145 insertions(+), 133 deletions(-)
diff --git a/python/pyspark/pandas/data_type_ops/datetime_ops.py
b/python/pyspark/pandas/data_type_ops/datetime_ops.py
index dc2f68232e73..22bd7a6d329d 100644
--- a/python/pyspark/pandas/data_type_ops/datetime_ops.py
+++ b/python/pyspark/pandas/data_type_ops/datetime_ops.py
@@ -34,7 +34,7 @@ from pyspark.sql.types import (
)
from pyspark.sql.utils import pyspark_column_op
from pyspark.pandas._typing import Dtype, IndexOpsLike, SeriesOrIndex
-from pyspark.pandas.spark import functions as SF
+from pyspark.sql.internal import InternalFunction as SF
from pyspark.pandas.base import IndexOpsMixin
from pyspark.pandas.data_type_ops.base import (
DataTypeOps,
diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index ddfcc2126f2c..49aa49f65e35 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -70,6 +70,7 @@ from pyspark.errors import PySparkValueError
from pyspark import StorageLevel
from pyspark.sql import Column as PySparkColumn, DataFrame as
PySparkDataFrame, functions as F
from pyspark.sql.functions import pandas_udf
+from pyspark.sql.internal import InternalFunction as SF
from pyspark.sql.types import (
ArrayType,
BooleanType,
@@ -105,7 +106,6 @@ from pyspark.pandas.correlation import (
CORRELATION_CORR_OUTPUT_COLUMN,
CORRELATION_COUNT_OUTPUT_COLUMN,
)
-from pyspark.pandas.spark import functions as SF
from pyspark.pandas.spark.accessors import SparkFrameMethods,
CachedSparkFrameMethods
from pyspark.pandas.utils import (
align_diff_frames,
diff --git a/python/pyspark/pandas/generic.py b/python/pyspark/pandas/generic.py
index 55f15fd2eb1a..1244ee2d88aa 100644
--- a/python/pyspark/pandas/generic.py
+++ b/python/pyspark/pandas/generic.py
@@ -40,6 +40,7 @@ import pandas as pd
from pandas.api.types import is_list_like # type: ignore[attr-defined]
from pyspark.sql import Column, functions as F
+from pyspark.sql.internal import InternalFunction as SF
from pyspark.sql.types import (
BooleanType,
DoubleType,
@@ -58,7 +59,6 @@ from pyspark.pandas._typing import (
)
from pyspark.pandas.indexing import AtIndexer, iAtIndexer, iLocIndexer,
LocIndexer
from pyspark.pandas.internal import InternalFrame
-from pyspark.pandas.spark import functions as SF
from pyspark.pandas.typedef import spark_type_to_pandas_dtype
from pyspark.pandas.utils import (
is_name_like_tuple,
diff --git a/python/pyspark/pandas/groupby.py b/python/pyspark/pandas/groupby.py
index 55627a4c740c..8de03918a4cd 100644
--- a/python/pyspark/pandas/groupby.py
+++ b/python/pyspark/pandas/groupby.py
@@ -47,6 +47,7 @@ from pandas.api.types import is_number, is_hashable,
is_list_like # type: ignor
from pandas.core.common import _builtin_table # type: ignore[attr-defined]
from pyspark.sql import Column, DataFrame as SparkDataFrame, Window, functions
as F
+from pyspark.sql.internal import InternalFunction as SF
from pyspark.sql.types import (
BooleanType,
DataType,
@@ -74,7 +75,6 @@ from pyspark.pandas.missing.groupby import (
MissingPandasLikeSeriesGroupBy,
)
from pyspark.pandas.series import Series, first_series
-from pyspark.pandas.spark import functions as SF
from pyspark.pandas.config import get_option
from pyspark.pandas.correlation import (
compute,
diff --git a/python/pyspark/pandas/internal.py
b/python/pyspark/pandas/internal.py
index 363ef7330254..3f6831b60067 100644
--- a/python/pyspark/pandas/internal.py
+++ b/python/pyspark/pandas/internal.py
@@ -43,7 +43,7 @@ from pyspark.sql.types import ( # noqa: F401
)
from pyspark.sql.utils import is_timestamp_ntz_preferred, is_remote
from pyspark import pandas as ps
-from pyspark.pandas.spark import functions as SF
+from pyspark.sql.internal import InternalFunction as SF
from pyspark.pandas._typing import Label
from pyspark.pandas.spark.utils import as_nullable_spark_type,
force_decimal_precision_scale
from pyspark.pandas.data_type_ops.base import DataTypeOps
diff --git a/python/pyspark/pandas/plot/core.py
b/python/pyspark/pandas/plot/core.py
index f5652177fe4a..7004bae47c90 100644
--- a/python/pyspark/pandas/plot/core.py
+++ b/python/pyspark/pandas/plot/core.py
@@ -24,7 +24,7 @@ from pandas.core.base import PandasObject
from pandas.core.dtypes.inference import is_integer
from pyspark.sql import functions as F, Column
-from pyspark.pandas.spark import functions as SF
+from pyspark.sql.internal import InternalFunction as SF
from pyspark.pandas.missing import unsupported_function
from pyspark.pandas.config import get_option
from pyspark.pandas.utils import name_like_string
diff --git a/python/pyspark/pandas/resample.py
b/python/pyspark/pandas/resample.py
index 02db8fd91b9b..152bf90e60cf 100644
--- a/python/pyspark/pandas/resample.py
+++ b/python/pyspark/pandas/resample.py
@@ -32,6 +32,7 @@ import pandas as pd
from pandas.tseries.frequencies import to_offset
from pyspark.sql import Column, functions as F
+from pyspark.sql.internal import InternalFunction as SF
from pyspark.sql.types import (
NumericType,
StructField,
@@ -41,7 +42,6 @@ from pyspark.sql.types import (
from pyspark import pandas as ps # For running doctests and reference
resolution in PyCharm.
from pyspark.pandas._typing import FrameLike
from pyspark.pandas.frame import DataFrame
-from pyspark.pandas.spark import functions as SF
from pyspark.pandas.internal import (
InternalField,
InternalFrame,
diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py
index 7e276860fbab..18a4dfaee1d5 100644
--- a/python/pyspark/pandas/series.py
+++ b/python/pyspark/pandas/series.py
@@ -60,6 +60,7 @@ from pyspark.sql import (
DataFrame as SparkDataFrame,
Window as PySparkWindow,
)
+from pyspark.sql.internal import InternalFunction as SF
from pyspark.sql.types import (
ArrayType,
BooleanType,
@@ -117,7 +118,6 @@ from pyspark.pandas.utils import (
log_advice,
)
from pyspark.pandas.datetimes import DatetimeMethods
-from pyspark.pandas.spark import functions as SF
from pyspark.pandas.spark.accessors import SparkSeriesMethods
from pyspark.pandas.strings import StringMethods
from pyspark.pandas.typedef import (
diff --git a/python/pyspark/pandas/spark/functions.py
b/python/pyspark/pandas/spark/functions.py
deleted file mode 100644
index a6b8e79ca50f..000000000000
--- a/python/pyspark/pandas/spark/functions.py
+++ /dev/null
@@ -1,91 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-"""
-Additional Spark functions used in pandas-on-Spark.
-"""
-from pyspark.sql import Column, functions as F
-from pyspark.sql.utils import _invoke_internal_function_over_columns
-from typing import Union
-
-
-def timestamp_ntz_to_long(col: Column) -> Column:
- return _invoke_internal_function_over_columns("timestamp_ntz_to_long", col)
-
-
-def product(col: Column, dropna: bool) -> Column:
- return _invoke_internal_function_over_columns("pandas_product", col,
F.lit(dropna))
-
-
-def stddev(col: Column, ddof: int) -> Column:
- return _invoke_internal_function_over_columns("pandas_stddev", col,
F.lit(ddof))
-
-
-def var(col: Column, ddof: int) -> Column:
- return _invoke_internal_function_over_columns("pandas_var", col,
F.lit(ddof))
-
-
-def skew(col: Column) -> Column:
- return _invoke_internal_function_over_columns("pandas_skew", col)
-
-
-def kurt(col: Column) -> Column:
- return _invoke_internal_function_over_columns("pandas_kurt", col)
-
-
-def mode(col: Column, dropna: bool) -> Column:
- return _invoke_internal_function_over_columns("pandas_mode", col,
F.lit(dropna))
-
-
-def covar(col1: Column, col2: Column, ddof: int) -> Column:
- return _invoke_internal_function_over_columns("pandas_covar", col1, col2,
F.lit(ddof))
-
-
-def ewm(col: Column, alpha: float, ignorena: bool) -> Column:
- return _invoke_internal_function_over_columns("ewm", col, F.lit(alpha),
F.lit(ignorena))
-
-
-def null_index(col: Column) -> Column:
- return _invoke_internal_function_over_columns("null_index", col)
-
-
-def distributed_id() -> Column:
- return _invoke_internal_function_over_columns("distributed_id")
-
-
-def distributed_sequence_id() -> Column:
- return _invoke_internal_function_over_columns("distributed_sequence_id")
-
-
-def collect_top_k(col: Column, num: int, reverse: bool) -> Column:
- return _invoke_internal_function_over_columns("collect_top_k", col,
F.lit(num), F.lit(reverse))
-
-
-def array_binary_search(col: Column, value: Column) -> Column:
- return _invoke_internal_function_over_columns("array_binary_search", col,
value)
-
-
-def make_interval(unit: str, e: Union[Column, int, float]) -> Column:
- unit_mapping = {
- "YEAR": "years",
- "MONTH": "months",
- "WEEK": "weeks",
- "DAY": "days",
- "HOUR": "hours",
- "MINUTE": "mins",
- "SECOND": "secs",
- }
- return F.make_interval(**{unit_mapping[unit]: F.lit(e)})
diff --git a/python/pyspark/pandas/window.py b/python/pyspark/pandas/window.py
index fb5dd29169e9..591fa7d82875 100644
--- a/python/pyspark/pandas/window.py
+++ b/python/pyspark/pandas/window.py
@@ -22,6 +22,7 @@ import numpy as np
from pyspark.sql import Window
from pyspark.sql import functions as F
+from pyspark.sql.internal import InternalFunction as SF
from pyspark.pandas.missing.window import (
MissingPandasLikeRolling,
MissingPandasLikeRollingGroupby,
@@ -34,7 +35,6 @@ from pyspark import pandas as ps # noqa: F401
from pyspark.pandas._typing import FrameLike
from pyspark.pandas.groupby import GroupBy, DataFrameGroupBy
from pyspark.pandas.internal import NATURAL_ORDER_COLUMN_NAME,
SPARK_INDEX_NAME_FORMAT
-from pyspark.pandas.spark import functions as SF
from pyspark.pandas.utils import scol_for
from pyspark.sql.column import Column
from pyspark.sql.types import (
diff --git a/python/pyspark/sql/internal.py b/python/pyspark/sql/internal.py
new file mode 100644
index 000000000000..eb4a2153cdaa
--- /dev/null
+++ b/python/pyspark/sql/internal.py
@@ -0,0 +1,132 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from pyspark.sql import Column, functions as F, is_remote
+
+from typing import Union, TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from pyspark.sql._typing import ColumnOrName
+
+"""
+Internal Spark functions used in Pandas API on Spark & PySpark Native Plotting.
+"""
+
+
+class InternalFunction:
+ @staticmethod
+ def _invoke_internal_function_over_columns(name: str, *cols:
"ColumnOrName") -> Column:
+ if is_remote():
+ from pyspark.sql.connect.functions.builtin import
_invoke_function_over_columns
+
+ return _invoke_function_over_columns(name, *cols)
+
+ else:
+ from pyspark.sql.classic.column import Column, _to_seq,
_to_java_column
+ from pyspark import SparkContext
+
+ sc = SparkContext._active_spark_context
+ return Column(
+ sc._jvm.PythonSQLUtils.internalFn( # type: ignore
+ name, _to_seq(sc, cols, _to_java_column) # type: ignore
+ )
+ )
+
+ @staticmethod
+ def timestamp_ntz_to_long(col: Column) -> Column:
+ return
InternalFunction._invoke_internal_function_over_columns("timestamp_ntz_to_long",
col)
+
+ @staticmethod
+ def product(col: Column, dropna: bool) -> Column:
+ return InternalFunction._invoke_internal_function_over_columns(
+ "pandas_product", col, F.lit(dropna)
+ )
+
+ @staticmethod
+ def stddev(col: Column, ddof: int) -> Column:
+ return InternalFunction._invoke_internal_function_over_columns(
+ "pandas_stddev", col, F.lit(ddof)
+ )
+
+ @staticmethod
+ def var(col: Column, ddof: int) -> Column:
+ return InternalFunction._invoke_internal_function_over_columns(
+ "pandas_var", col, F.lit(ddof)
+ )
+
+ @staticmethod
+ def skew(col: Column) -> Column:
+ return
InternalFunction._invoke_internal_function_over_columns("pandas_skew", col)
+
+ @staticmethod
+ def kurt(col: Column) -> Column:
+ return
InternalFunction._invoke_internal_function_over_columns("pandas_kurt", col)
+
+ @staticmethod
+ def mode(col: Column, dropna: bool) -> Column:
+ return InternalFunction._invoke_internal_function_over_columns(
+ "pandas_mode", col, F.lit(dropna)
+ )
+
+ @staticmethod
+ def covar(col1: Column, col2: Column, ddof: int) -> Column:
+ return InternalFunction._invoke_internal_function_over_columns(
+ "pandas_covar", col1, col2, F.lit(ddof)
+ )
+
+ @staticmethod
+ def ewm(col: Column, alpha: float, ignorena: bool) -> Column:
+ return InternalFunction._invoke_internal_function_over_columns(
+ "ewm", col, F.lit(alpha), F.lit(ignorena)
+ )
+
+ @staticmethod
+ def null_index(col: Column) -> Column:
+ return
InternalFunction._invoke_internal_function_over_columns("null_index", col)
+
+ @staticmethod
+ def distributed_id() -> Column:
+ return
InternalFunction._invoke_internal_function_over_columns("distributed_id")
+
+ @staticmethod
+ def distributed_sequence_id() -> Column:
+ return
InternalFunction._invoke_internal_function_over_columns("distributed_sequence_id")
+
+ @staticmethod
+ def collect_top_k(col: Column, num: int, reverse: bool) -> Column:
+ return InternalFunction._invoke_internal_function_over_columns(
+ "collect_top_k", col, F.lit(num), F.lit(reverse)
+ )
+
+ @staticmethod
+ def array_binary_search(col: Column, value: Column) -> Column:
+ return InternalFunction._invoke_internal_function_over_columns(
+ "array_binary_search", col, value
+ )
+
+ @staticmethod
+ def make_interval(unit: str, e: Union[Column, int, float]) -> Column:
+ unit_mapping = {
+ "YEAR": "years",
+ "MONTH": "months",
+ "WEEK": "weeks",
+ "DAY": "days",
+ "HOUR": "hours",
+ "MINUTE": "mins",
+ "SECOND": "secs",
+ }
+ return F.make_interval(**{unit_mapping[unit]: F.lit(e)})
diff --git a/python/pyspark/sql/plot/core.py b/python/pyspark/sql/plot/core.py
index ab8f3463302b..f836924e3dfb 100644
--- a/python/pyspark/sql/plot/core.py
+++ b/python/pyspark/sql/plot/core.py
@@ -25,9 +25,10 @@ from pyspark.errors import (
PySparkValueError,
)
from pyspark.sql import Column, functions as F
+from pyspark.sql.internal import InternalFunction as SF
from pyspark.sql.pandas.utils import require_minimum_numpy_version,
require_minimum_pandas_version
from pyspark.sql.types import NumericType
-from pyspark.sql.utils import require_minimum_plotly_version,
_invoke_internal_function_over_columns
+from pyspark.sql.utils import require_minimum_plotly_version
from pandas.core.dtypes.inference import is_integer
@@ -568,10 +569,6 @@ class PySparkKdePlotBase:
class PySparkHistogramPlotBase:
- @staticmethod
- def array_binary_search(col: Column, value: Column) -> Column:
- return _invoke_internal_function_over_columns("array_binary_search",
col, value)
-
@staticmethod
def get_bins(sdf: "DataFrame", bins: int) -> "np.ndarray":
require_minimum_numpy_version()
@@ -623,7 +620,7 @@ class PySparkHistogramPlotBase:
# determines which bucket a given value falls into, based on
predefined bin intervals
# refers to
org.apache.spark.ml.feature.Bucketizer#binarySearchForBuckets
def binary_search_for_buckets(value: Column) -> Column:
- index = PySparkHistogramPlotBase.array_binary_search(F.lit(bins),
value)
+ index = SF.array_binary_search(F.lit(bins), value)
bucket = F.when(index >= 0, index).otherwise(-index - 2)
unboundErrMsg = F.lit(f"value %s out of the bins bounds:
[{bins[0]}, {bins[-1]}]")
return (
@@ -717,12 +714,6 @@ class PySparkHistogramPlotBase:
class PySparkBoxPlotBase:
- @staticmethod
- def collect_top_k(col: Column, num: int, reverse: bool) -> Column:
- return _invoke_internal_function_over_columns(
- "collect_top_k", col, F.lit(num), F.lit(reverse)
- )
-
@staticmethod
def compute_box(
sdf: "DataFrame", colnames: List[str], whis: float, precision: float,
showfliers: bool
@@ -777,7 +768,7 @@ class PySparkBoxPlotBase:
outlier,
F.struct(F.abs(value - med), value.alias("val")),
).otherwise(F.lit(None))
- topk = PySparkBoxPlotBase.collect_top_k(pair, 1001, False)
+ topk = SF.collect_top_k(pair, 1001, False)
fliers = F.when(F.size(topk) > 0,
topk["val"]).otherwise(F.lit(None))
else:
fliers = F.lit(None)
diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py
index b961fae19151..5d9ec92cbc83 100644
--- a/python/pyspark/sql/utils.py
+++ b/python/pyspark/sql/utils.py
@@ -58,10 +58,8 @@ if TYPE_CHECKING:
JVMView,
)
from pyspark import SparkContext
- from pyspark.sql import Column
from pyspark.sql.session import SparkSession
from pyspark.sql.dataframe import DataFrame
- from pyspark.sql._typing import ColumnOrName
from pyspark.pandas._typing import IndexOpsLike, SeriesOrIndex
has_numpy: bool = False
@@ -218,24 +216,6 @@ def enum_to_value(value: Any) -> Any:
return enum_to_value(value.value) if value is not None and
isinstance(value, Enum) else value
-def _invoke_internal_function_over_columns(name: str, *cols: "ColumnOrName")
-> "Column":
- if is_remote():
- from pyspark.sql.connect.functions.builtin import
_invoke_function_over_columns
-
- return _invoke_function_over_columns(name, *cols)
-
- else:
- from pyspark.sql.classic.column import Column, _to_seq, _to_java_column
- from pyspark import SparkContext
-
- sc = SparkContext._active_spark_context
- return Column(
- sc._jvm.PythonSQLUtils.internalFn( # type: ignore
- name, _to_seq(sc, cols, _to_java_column) # type: ignore
- )
- )
-
-
def is_timestamp_ntz_preferred() -> bool:
"""
Return a bool if TimestampNTZType is preferred according to the SQL
configuration set.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]