This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 165c754652eb [SPARK-50183][PYTHON] Unify the internal functions for 
Pandas API and PySpark Plotting
165c754652eb is described below

commit 165c754652ebf80310426376caac324d7ab2bad0
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Thu Oct 31 15:47:08 2024 +0900

    [SPARK-50183][PYTHON] Unify the internal functions for Pandas API and 
PySpark Plotting
    
    ### What changes were proposed in this pull request?
    Unify the internal functions for Pandas API and PySpark Plotting
    
    ### Why are the changes needed?
    to make internal functions more easier to reuse
    
    ### Does this PR introduce _any_ user-facing change?
    no, refactor only
    
    ### How was this patch tested?
    ci
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #48715 from zhengruifeng/py_internal_fn.
    
    Authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../pyspark/pandas/data_type_ops/datetime_ops.py   |   2 +-
 python/pyspark/pandas/frame.py                     |   2 +-
 python/pyspark/pandas/generic.py                   |   2 +-
 python/pyspark/pandas/groupby.py                   |   2 +-
 python/pyspark/pandas/internal.py                  |   2 +-
 python/pyspark/pandas/plot/core.py                 |   2 +-
 python/pyspark/pandas/resample.py                  |   2 +-
 python/pyspark/pandas/series.py                    |   2 +-
 python/pyspark/pandas/spark/functions.py           |  91 --------------
 python/pyspark/pandas/window.py                    |   2 +-
 python/pyspark/sql/internal.py                     | 132 +++++++++++++++++++++
 python/pyspark/sql/plot/core.py                    |  17 +--
 python/pyspark/sql/utils.py                        |  20 ----
 13 files changed, 145 insertions(+), 133 deletions(-)

diff --git a/python/pyspark/pandas/data_type_ops/datetime_ops.py 
b/python/pyspark/pandas/data_type_ops/datetime_ops.py
index dc2f68232e73..22bd7a6d329d 100644
--- a/python/pyspark/pandas/data_type_ops/datetime_ops.py
+++ b/python/pyspark/pandas/data_type_ops/datetime_ops.py
@@ -34,7 +34,7 @@ from pyspark.sql.types import (
 )
 from pyspark.sql.utils import pyspark_column_op
 from pyspark.pandas._typing import Dtype, IndexOpsLike, SeriesOrIndex
-from pyspark.pandas.spark import functions as SF
+from pyspark.sql.internal import InternalFunction as SF
 from pyspark.pandas.base import IndexOpsMixin
 from pyspark.pandas.data_type_ops.base import (
     DataTypeOps,
diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index ddfcc2126f2c..49aa49f65e35 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -70,6 +70,7 @@ from pyspark.errors import PySparkValueError
 from pyspark import StorageLevel
 from pyspark.sql import Column as PySparkColumn, DataFrame as 
PySparkDataFrame, functions as F
 from pyspark.sql.functions import pandas_udf
+from pyspark.sql.internal import InternalFunction as SF
 from pyspark.sql.types import (
     ArrayType,
     BooleanType,
@@ -105,7 +106,6 @@ from pyspark.pandas.correlation import (
     CORRELATION_CORR_OUTPUT_COLUMN,
     CORRELATION_COUNT_OUTPUT_COLUMN,
 )
-from pyspark.pandas.spark import functions as SF
 from pyspark.pandas.spark.accessors import SparkFrameMethods, 
CachedSparkFrameMethods
 from pyspark.pandas.utils import (
     align_diff_frames,
diff --git a/python/pyspark/pandas/generic.py b/python/pyspark/pandas/generic.py
index 55f15fd2eb1a..1244ee2d88aa 100644
--- a/python/pyspark/pandas/generic.py
+++ b/python/pyspark/pandas/generic.py
@@ -40,6 +40,7 @@ import pandas as pd
 from pandas.api.types import is_list_like  # type: ignore[attr-defined]
 
 from pyspark.sql import Column, functions as F
+from pyspark.sql.internal import InternalFunction as SF
 from pyspark.sql.types import (
     BooleanType,
     DoubleType,
@@ -58,7 +59,6 @@ from pyspark.pandas._typing import (
 )
 from pyspark.pandas.indexing import AtIndexer, iAtIndexer, iLocIndexer, 
LocIndexer
 from pyspark.pandas.internal import InternalFrame
-from pyspark.pandas.spark import functions as SF
 from pyspark.pandas.typedef import spark_type_to_pandas_dtype
 from pyspark.pandas.utils import (
     is_name_like_tuple,
diff --git a/python/pyspark/pandas/groupby.py b/python/pyspark/pandas/groupby.py
index 55627a4c740c..8de03918a4cd 100644
--- a/python/pyspark/pandas/groupby.py
+++ b/python/pyspark/pandas/groupby.py
@@ -47,6 +47,7 @@ from pandas.api.types import is_number, is_hashable, 
is_list_like  # type: ignor
 from pandas.core.common import _builtin_table  # type: ignore[attr-defined]
 
 from pyspark.sql import Column, DataFrame as SparkDataFrame, Window, functions 
as F
+from pyspark.sql.internal import InternalFunction as SF
 from pyspark.sql.types import (
     BooleanType,
     DataType,
@@ -74,7 +75,6 @@ from pyspark.pandas.missing.groupby import (
     MissingPandasLikeSeriesGroupBy,
 )
 from pyspark.pandas.series import Series, first_series
-from pyspark.pandas.spark import functions as SF
 from pyspark.pandas.config import get_option
 from pyspark.pandas.correlation import (
     compute,
diff --git a/python/pyspark/pandas/internal.py 
b/python/pyspark/pandas/internal.py
index 363ef7330254..3f6831b60067 100644
--- a/python/pyspark/pandas/internal.py
+++ b/python/pyspark/pandas/internal.py
@@ -43,7 +43,7 @@ from pyspark.sql.types import (  # noqa: F401
 )
 from pyspark.sql.utils import is_timestamp_ntz_preferred, is_remote
 from pyspark import pandas as ps
-from pyspark.pandas.spark import functions as SF
+from pyspark.sql.internal import InternalFunction as SF
 from pyspark.pandas._typing import Label
 from pyspark.pandas.spark.utils import as_nullable_spark_type, 
force_decimal_precision_scale
 from pyspark.pandas.data_type_ops.base import DataTypeOps
diff --git a/python/pyspark/pandas/plot/core.py 
b/python/pyspark/pandas/plot/core.py
index f5652177fe4a..7004bae47c90 100644
--- a/python/pyspark/pandas/plot/core.py
+++ b/python/pyspark/pandas/plot/core.py
@@ -24,7 +24,7 @@ from pandas.core.base import PandasObject
 from pandas.core.dtypes.inference import is_integer
 
 from pyspark.sql import functions as F, Column
-from pyspark.pandas.spark import functions as SF
+from pyspark.sql.internal import InternalFunction as SF
 from pyspark.pandas.missing import unsupported_function
 from pyspark.pandas.config import get_option
 from pyspark.pandas.utils import name_like_string
diff --git a/python/pyspark/pandas/resample.py 
b/python/pyspark/pandas/resample.py
index 02db8fd91b9b..152bf90e60cf 100644
--- a/python/pyspark/pandas/resample.py
+++ b/python/pyspark/pandas/resample.py
@@ -32,6 +32,7 @@ import pandas as pd
 from pandas.tseries.frequencies import to_offset
 
 from pyspark.sql import Column, functions as F
+from pyspark.sql.internal import InternalFunction as SF
 from pyspark.sql.types import (
     NumericType,
     StructField,
@@ -41,7 +42,6 @@ from pyspark.sql.types import (
 from pyspark import pandas as ps  # For running doctests and reference 
resolution in PyCharm.
 from pyspark.pandas._typing import FrameLike
 from pyspark.pandas.frame import DataFrame
-from pyspark.pandas.spark import functions as SF
 from pyspark.pandas.internal import (
     InternalField,
     InternalFrame,
diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py
index 7e276860fbab..18a4dfaee1d5 100644
--- a/python/pyspark/pandas/series.py
+++ b/python/pyspark/pandas/series.py
@@ -60,6 +60,7 @@ from pyspark.sql import (
     DataFrame as SparkDataFrame,
     Window as PySparkWindow,
 )
+from pyspark.sql.internal import InternalFunction as SF
 from pyspark.sql.types import (
     ArrayType,
     BooleanType,
@@ -117,7 +118,6 @@ from pyspark.pandas.utils import (
     log_advice,
 )
 from pyspark.pandas.datetimes import DatetimeMethods
-from pyspark.pandas.spark import functions as SF
 from pyspark.pandas.spark.accessors import SparkSeriesMethods
 from pyspark.pandas.strings import StringMethods
 from pyspark.pandas.typedef import (
diff --git a/python/pyspark/pandas/spark/functions.py 
b/python/pyspark/pandas/spark/functions.py
deleted file mode 100644
index a6b8e79ca50f..000000000000
--- a/python/pyspark/pandas/spark/functions.py
+++ /dev/null
@@ -1,91 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-"""
-Additional Spark functions used in pandas-on-Spark.
-"""
-from pyspark.sql import Column, functions as F
-from pyspark.sql.utils import _invoke_internal_function_over_columns
-from typing import Union
-
-
-def timestamp_ntz_to_long(col: Column) -> Column:
-    return _invoke_internal_function_over_columns("timestamp_ntz_to_long", col)
-
-
-def product(col: Column, dropna: bool) -> Column:
-    return _invoke_internal_function_over_columns("pandas_product", col, 
F.lit(dropna))
-
-
-def stddev(col: Column, ddof: int) -> Column:
-    return _invoke_internal_function_over_columns("pandas_stddev", col, 
F.lit(ddof))
-
-
-def var(col: Column, ddof: int) -> Column:
-    return _invoke_internal_function_over_columns("pandas_var", col, 
F.lit(ddof))
-
-
-def skew(col: Column) -> Column:
-    return _invoke_internal_function_over_columns("pandas_skew", col)
-
-
-def kurt(col: Column) -> Column:
-    return _invoke_internal_function_over_columns("pandas_kurt", col)
-
-
-def mode(col: Column, dropna: bool) -> Column:
-    return _invoke_internal_function_over_columns("pandas_mode", col, 
F.lit(dropna))
-
-
-def covar(col1: Column, col2: Column, ddof: int) -> Column:
-    return _invoke_internal_function_over_columns("pandas_covar", col1, col2, 
F.lit(ddof))
-
-
-def ewm(col: Column, alpha: float, ignorena: bool) -> Column:
-    return _invoke_internal_function_over_columns("ewm", col, F.lit(alpha), 
F.lit(ignorena))
-
-
-def null_index(col: Column) -> Column:
-    return _invoke_internal_function_over_columns("null_index", col)
-
-
-def distributed_id() -> Column:
-    return _invoke_internal_function_over_columns("distributed_id")
-
-
-def distributed_sequence_id() -> Column:
-    return _invoke_internal_function_over_columns("distributed_sequence_id")
-
-
-def collect_top_k(col: Column, num: int, reverse: bool) -> Column:
-    return _invoke_internal_function_over_columns("collect_top_k", col, 
F.lit(num), F.lit(reverse))
-
-
-def array_binary_search(col: Column, value: Column) -> Column:
-    return _invoke_internal_function_over_columns("array_binary_search", col, 
value)
-
-
-def make_interval(unit: str, e: Union[Column, int, float]) -> Column:
-    unit_mapping = {
-        "YEAR": "years",
-        "MONTH": "months",
-        "WEEK": "weeks",
-        "DAY": "days",
-        "HOUR": "hours",
-        "MINUTE": "mins",
-        "SECOND": "secs",
-    }
-    return F.make_interval(**{unit_mapping[unit]: F.lit(e)})
diff --git a/python/pyspark/pandas/window.py b/python/pyspark/pandas/window.py
index fb5dd29169e9..591fa7d82875 100644
--- a/python/pyspark/pandas/window.py
+++ b/python/pyspark/pandas/window.py
@@ -22,6 +22,7 @@ import numpy as np
 
 from pyspark.sql import Window
 from pyspark.sql import functions as F
+from pyspark.sql.internal import InternalFunction as SF
 from pyspark.pandas.missing.window import (
     MissingPandasLikeRolling,
     MissingPandasLikeRollingGroupby,
@@ -34,7 +35,6 @@ from pyspark import pandas as ps  # noqa: F401
 from pyspark.pandas._typing import FrameLike
 from pyspark.pandas.groupby import GroupBy, DataFrameGroupBy
 from pyspark.pandas.internal import NATURAL_ORDER_COLUMN_NAME, 
SPARK_INDEX_NAME_FORMAT
-from pyspark.pandas.spark import functions as SF
 from pyspark.pandas.utils import scol_for
 from pyspark.sql.column import Column
 from pyspark.sql.types import (
diff --git a/python/pyspark/sql/internal.py b/python/pyspark/sql/internal.py
new file mode 100644
index 000000000000..eb4a2153cdaa
--- /dev/null
+++ b/python/pyspark/sql/internal.py
@@ -0,0 +1,132 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from pyspark.sql import Column, functions as F, is_remote
+
+from typing import Union, TYPE_CHECKING
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import ColumnOrName
+
+"""
+Internal Spark functions used in Pandas API on Spark & PySpark Native Plotting.
+"""
+
+
+class InternalFunction:
+    @staticmethod
+    def _invoke_internal_function_over_columns(name: str, *cols: 
"ColumnOrName") -> Column:
+        if is_remote():
+            from pyspark.sql.connect.functions.builtin import 
_invoke_function_over_columns
+
+            return _invoke_function_over_columns(name, *cols)
+
+        else:
+            from pyspark.sql.classic.column import Column, _to_seq, 
_to_java_column
+            from pyspark import SparkContext
+
+            sc = SparkContext._active_spark_context
+            return Column(
+                sc._jvm.PythonSQLUtils.internalFn(  # type: ignore
+                    name, _to_seq(sc, cols, _to_java_column)  # type: ignore
+                )
+            )
+
+    @staticmethod
+    def timestamp_ntz_to_long(col: Column) -> Column:
+        return 
InternalFunction._invoke_internal_function_over_columns("timestamp_ntz_to_long",
 col)
+
+    @staticmethod
+    def product(col: Column, dropna: bool) -> Column:
+        return InternalFunction._invoke_internal_function_over_columns(
+            "pandas_product", col, F.lit(dropna)
+        )
+
+    @staticmethod
+    def stddev(col: Column, ddof: int) -> Column:
+        return InternalFunction._invoke_internal_function_over_columns(
+            "pandas_stddev", col, F.lit(ddof)
+        )
+
+    @staticmethod
+    def var(col: Column, ddof: int) -> Column:
+        return InternalFunction._invoke_internal_function_over_columns(
+            "pandas_var", col, F.lit(ddof)
+        )
+
+    @staticmethod
+    def skew(col: Column) -> Column:
+        return 
InternalFunction._invoke_internal_function_over_columns("pandas_skew", col)
+
+    @staticmethod
+    def kurt(col: Column) -> Column:
+        return 
InternalFunction._invoke_internal_function_over_columns("pandas_kurt", col)
+
+    @staticmethod
+    def mode(col: Column, dropna: bool) -> Column:
+        return InternalFunction._invoke_internal_function_over_columns(
+            "pandas_mode", col, F.lit(dropna)
+        )
+
+    @staticmethod
+    def covar(col1: Column, col2: Column, ddof: int) -> Column:
+        return InternalFunction._invoke_internal_function_over_columns(
+            "pandas_covar", col1, col2, F.lit(ddof)
+        )
+
+    @staticmethod
+    def ewm(col: Column, alpha: float, ignorena: bool) -> Column:
+        return InternalFunction._invoke_internal_function_over_columns(
+            "ewm", col, F.lit(alpha), F.lit(ignorena)
+        )
+
+    @staticmethod
+    def null_index(col: Column) -> Column:
+        return 
InternalFunction._invoke_internal_function_over_columns("null_index", col)
+
+    @staticmethod
+    def distributed_id() -> Column:
+        return 
InternalFunction._invoke_internal_function_over_columns("distributed_id")
+
+    @staticmethod
+    def distributed_sequence_id() -> Column:
+        return 
InternalFunction._invoke_internal_function_over_columns("distributed_sequence_id")
+
+    @staticmethod
+    def collect_top_k(col: Column, num: int, reverse: bool) -> Column:
+        return InternalFunction._invoke_internal_function_over_columns(
+            "collect_top_k", col, F.lit(num), F.lit(reverse)
+        )
+
+    @staticmethod
+    def array_binary_search(col: Column, value: Column) -> Column:
+        return InternalFunction._invoke_internal_function_over_columns(
+            "array_binary_search", col, value
+        )
+
+    @staticmethod
+    def make_interval(unit: str, e: Union[Column, int, float]) -> Column:
+        unit_mapping = {
+            "YEAR": "years",
+            "MONTH": "months",
+            "WEEK": "weeks",
+            "DAY": "days",
+            "HOUR": "hours",
+            "MINUTE": "mins",
+            "SECOND": "secs",
+        }
+        return F.make_interval(**{unit_mapping[unit]: F.lit(e)})
diff --git a/python/pyspark/sql/plot/core.py b/python/pyspark/sql/plot/core.py
index ab8f3463302b..f836924e3dfb 100644
--- a/python/pyspark/sql/plot/core.py
+++ b/python/pyspark/sql/plot/core.py
@@ -25,9 +25,10 @@ from pyspark.errors import (
     PySparkValueError,
 )
 from pyspark.sql import Column, functions as F
+from pyspark.sql.internal import InternalFunction as SF
 from pyspark.sql.pandas.utils import require_minimum_numpy_version, 
require_minimum_pandas_version
 from pyspark.sql.types import NumericType
-from pyspark.sql.utils import require_minimum_plotly_version, 
_invoke_internal_function_over_columns
+from pyspark.sql.utils import require_minimum_plotly_version
 
 from pandas.core.dtypes.inference import is_integer
 
@@ -568,10 +569,6 @@ class PySparkKdePlotBase:
 
 
 class PySparkHistogramPlotBase:
-    @staticmethod
-    def array_binary_search(col: Column, value: Column) -> Column:
-        return _invoke_internal_function_over_columns("array_binary_search", 
col, value)
-
     @staticmethod
     def get_bins(sdf: "DataFrame", bins: int) -> "np.ndarray":
         require_minimum_numpy_version()
@@ -623,7 +620,7 @@ class PySparkHistogramPlotBase:
         # determines which bucket a given value falls into, based on 
predefined bin intervals
         # refers to 
org.apache.spark.ml.feature.Bucketizer#binarySearchForBuckets
         def binary_search_for_buckets(value: Column) -> Column:
-            index = PySparkHistogramPlotBase.array_binary_search(F.lit(bins), 
value)
+            index = SF.array_binary_search(F.lit(bins), value)
             bucket = F.when(index >= 0, index).otherwise(-index - 2)
             unboundErrMsg = F.lit(f"value %s out of the bins bounds: 
[{bins[0]}, {bins[-1]}]")
             return (
@@ -717,12 +714,6 @@ class PySparkHistogramPlotBase:
 
 
 class PySparkBoxPlotBase:
-    @staticmethod
-    def collect_top_k(col: Column, num: int, reverse: bool) -> Column:
-        return _invoke_internal_function_over_columns(
-            "collect_top_k", col, F.lit(num), F.lit(reverse)
-        )
-
     @staticmethod
     def compute_box(
         sdf: "DataFrame", colnames: List[str], whis: float, precision: float, 
showfliers: bool
@@ -777,7 +768,7 @@ class PySparkBoxPlotBase:
                     outlier,
                     F.struct(F.abs(value - med), value.alias("val")),
                 ).otherwise(F.lit(None))
-                topk = PySparkBoxPlotBase.collect_top_k(pair, 1001, False)
+                topk = SF.collect_top_k(pair, 1001, False)
                 fliers = F.when(F.size(topk) > 0, 
topk["val"]).otherwise(F.lit(None))
             else:
                 fliers = F.lit(None)
diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py
index b961fae19151..5d9ec92cbc83 100644
--- a/python/pyspark/sql/utils.py
+++ b/python/pyspark/sql/utils.py
@@ -58,10 +58,8 @@ if TYPE_CHECKING:
         JVMView,
     )
     from pyspark import SparkContext
-    from pyspark.sql import Column
     from pyspark.sql.session import SparkSession
     from pyspark.sql.dataframe import DataFrame
-    from pyspark.sql._typing import ColumnOrName
     from pyspark.pandas._typing import IndexOpsLike, SeriesOrIndex
 
 has_numpy: bool = False
@@ -218,24 +216,6 @@ def enum_to_value(value: Any) -> Any:
     return enum_to_value(value.value) if value is not None and 
isinstance(value, Enum) else value
 
 
-def _invoke_internal_function_over_columns(name: str, *cols: "ColumnOrName") 
-> "Column":
-    if is_remote():
-        from pyspark.sql.connect.functions.builtin import 
_invoke_function_over_columns
-
-        return _invoke_function_over_columns(name, *cols)
-
-    else:
-        from pyspark.sql.classic.column import Column, _to_seq, _to_java_column
-        from pyspark import SparkContext
-
-        sc = SparkContext._active_spark_context
-        return Column(
-            sc._jvm.PythonSQLUtils.internalFn(  # type: ignore
-                name, _to_seq(sc, cols, _to_java_column)  # type: ignore
-            )
-        )
-
-
 def is_timestamp_ntz_preferred() -> bool:
     """
     Return a bool if TimestampNTZType is preferred according to the SQL 
configuration set.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to