This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d31c8596cd71 [SPARK-45965][SQL] Move DSv2 partitioning expressions 
into functions.partitioning
d31c8596cd71 is described below

commit d31c8596cd714766892d1395e30358bd1cd3cb84
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Sun Nov 19 12:00:30 2023 +0900

    [SPARK-45965][SQL] Move DSv2 partitioning expressions into 
functions.partitioning
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to move the partitioning expressions for DSv2 `hours`, 
`days`, `months`, `years`, and `bucket` from `org.apache.spark.sql.functions` 
to `org.apache.spark.sql.functions.partitioning`.
    
    **Note** that the old references are deprecated in Python only because 
Scala 3 upgrade 
([SIP-30](https://docs.scala-lang.org/sips/static-members.html)) is required 
for Java to access `org.apache.spark.sql.functions.partitioning` as same as the 
original way (e.g., `import static 
org.apache.spark.sql.functions.partitioning.*`). I filed a JIRA separately for 
this SPARK-45970. For Python, there is no problem so the functions are 
deprecated properly.
    
    **Note** that this PR includes the necessary refactoring that makes 
`pyspark.sql.functions` into a package in order to allow `import 
pyspark.sql.functions.partitioning` as if Scala allows `import 
org.apache.spark.sql.functions.partitioning._`
    
    From
    
    ```bash
    pyspark
    └── sql
        └── functions.py
    ```
    
    to
    
    ```bash
    pyspark
    └── sql
        └── functions
            ├── __init__.py     # Maintain the backward compatibility about 
imports.
            ├── builtin.py      # Previous `functions.py`
            └── partitioning.py # New partitioning functions.
    ```
    
    There are no user-facing changes here except that several non-API private 
attributes are not accessible anymore (e.g., `from pyspark.sql.functions import 
_invoke_function_over_columns`) In that case, they have to manually import 
`from pyspark.sql.functions.builtin import _invoke_function_over_columns`.
    
    ### Why are the changes needed?
    
    Those expressions can ONLY be used in DSv2 `partitionBy` but they exist in 
the same place with other expressions that confuse users that they believe they 
can use with other places such as `select`. They at least have to be grouped 
separately so users don't be confused.
    
    Just for doubly sure, I had an offline discussion including the original 
author, and related people a while ago.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes:
    - Moves the partitioning expressions for DSv2 `hours`, `days`, `months`, 
and `bucket` from `org.apache.spark.sql.functions` to 
`org.apache.spark.sql.functions.partitioning`.
    - Deprecate the expressions `org.apache.spark.sql.functions` in Python 
(from 4.0.0)
    
    ### How was this patch tested?
    
    Reusing the existent unittest.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #43858 from HyukjinKwon/SPARK-45965.
    
    Lead-authored-by: Hyukjin Kwon <[email protected]>
    Co-authored-by: Hyukjin Kwon <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../scala/org/apache/spark/sql/functions.scala     |  68 +++++-
 dev/sparktestsupport/modules.py                    |   6 +-
 python/pyspark/ml/connect/functions.py             |   2 +-
 python/pyspark/pandas/spark/functions.py           |  20 +-
 python/pyspark/sql/connect/avro/functions.py       |   2 +-
 python/pyspark/sql/connect/dataframe.py            |   2 +-
 .../connect/functions/__init__.py}                 |  18 +-
 .../connect/{functions.py => functions/builtin.py} |  37 ++-
 .../pyspark/sql/connect/functions/partitioning.py  | 111 +++++++++
 python/pyspark/sql/connect/group.py                |   2 +-
 python/pyspark/sql/connect/protobuf/functions.py   |   2 +-
 .../functions.py => sql/functions/__init__.py}     |  18 +-
 .../sql/{functions.py => functions/builtin.py}     |  65 ++++--
 python/pyspark/sql/functions/partitioning.py       | 248 +++++++++++++++++++++
 python/pyspark/sql/tests/test_functions.py         |   1 +
 python/pyspark/sql/tests/test_readwriter.py        |   2 +-
 python/pyspark/sql/utils.py                        |  17 +-
 python/setup.py                                    |   2 +
 .../scala/org/apache/spark/sql/functions.scala     |  85 +++++--
 .../apache/spark/sql/DataFrameWriterV2Suite.scala  |  12 +-
 20 files changed, 602 insertions(+), 118 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
index 1c8f5993d29e..700f71dc6e40 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
@@ -7550,7 +7550,7 @@ object functions {
    * @group partition_transforms
    * @since 3.4.0
    */
-  def years(e: Column): Column = Column.fn("years", e)
+  def years(e: Column): Column = partitioning.years(e)
 
   /**
    * A transform for timestamps and dates to partition data into months.
@@ -7558,7 +7558,7 @@ object functions {
    * @group partition_transforms
    * @since 3.4.0
    */
-  def months(e: Column): Column = Column.fn("months", e)
+  def months(e: Column): Column = partitioning.months(e)
 
   /**
    * A transform for timestamps and dates to partition data into days.
@@ -7566,7 +7566,7 @@ object functions {
    * @group partition_transforms
    * @since 3.4.0
    */
-  def days(e: Column): Column = Column.fn("days", e)
+  def days(e: Column): Column = partitioning.days(e)
 
   /**
    * A transform for timestamps to partition data into hours.
@@ -7574,7 +7574,7 @@ object functions {
    * @group partition_transforms
    * @since 3.4.0
    */
-  def hours(e: Column): Column = Column.fn("hours", e)
+  def hours(e: Column): Column = partitioning.hours(e)
 
   /**
    * Converts the timestamp without time zone `sourceTs` from the `sourceTz` 
time zone to
@@ -7861,8 +7861,7 @@ object functions {
    * @group partition_transforms
    * @since 3.4.0
    */
-  def bucket(numBuckets: Column, e: Column): Column =
-    Column.fn("bucket", numBuckets, e)
+  def bucket(numBuckets: Column, e: Column): Column = 
partitioning.bucket(numBuckets, e)
 
   /**
    * A transform for any type that partitions by a hash of the input column.
@@ -7870,8 +7869,7 @@ object functions {
    * @group partition_transforms
    * @since 3.4.0
    */
-  def bucket(numBuckets: Int, e: Column): Column =
-    Column.fn("bucket", lit(numBuckets), e)
+  def bucket(numBuckets: Int, e: Column): Column = 
partitioning.bucket(numBuckets, e)
 
   
//////////////////////////////////////////////////////////////////////////////////////////////
   // Predicates functions
@@ -8404,4 +8402,58 @@ object functions {
       .addAllArguments(cols.map(_.expr).asJava)
   }
 
+  // scalastyle:off
+  // TODO(SPARK-45970): Use @static annotation so Java can access to those
+  //   API in the same way. Once we land this fix, should deprecate
+  //   functions.hours, days, months, years and bucket.
+  object partitioning {
+    // scalastyle:on
+    /**
+     * A transform for timestamps and dates to partition data into years.
+     *
+     * @group partition_transforms
+     * @since 4.0.0
+     */
+    def years(e: Column): Column = Column.fn("years", e)
+
+    /**
+     * A transform for timestamps and dates to partition data into months.
+     *
+     * @group partition_transforms
+     * @since 4.0.0
+     */
+    def months(e: Column): Column = Column.fn("months", e)
+
+    /**
+     * A transform for timestamps and dates to partition data into days.
+     *
+     * @group partition_transforms
+     * @since 4.0.0
+     */
+    def days(e: Column): Column = Column.fn("days", e)
+
+    /**
+     * A transform for timestamps to partition data into hours.
+     *
+     * @group partition_transforms
+     * @since 4.0.0
+     */
+    def hours(e: Column): Column = Column.fn("hours", e)
+
+    /**
+     * A transform for any type that partitions by a hash of the input column.
+     *
+     * @group partition_transforms
+     * @since 4.0.0
+     */
+    def bucket(numBuckets: Column, e: Column): Column = Column.fn("bucket", 
numBuckets, e)
+
+    /**
+     * A transform for any type that partitions by a hash of the input column.
+     *
+     * @group partition_transforms
+     * @since 4.0.0
+     */
+    def bucket(numBuckets: Int, e: Column): Column = Column.fn("bucket", 
lit(numBuckets), e)
+  }
 }
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index c8981c779ca2..89b2ff7976d9 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -471,7 +471,8 @@ pyspark_sql = Module(
         "pyspark.sql.dataframe",
         "pyspark.sql.datasource",
         "pyspark.sql.group",
-        "pyspark.sql.functions",
+        "pyspark.sql.functions.builtin",
+        "pyspark.sql.functions.partitioning",
         "pyspark.sql.readwriter",
         "pyspark.sql.streaming.query",
         "pyspark.sql.streaming.readwriter",
@@ -859,7 +860,8 @@ pyspark_connect = Module(
         "pyspark.sql.connect.column",
         "pyspark.sql.connect.readwriter",
         "pyspark.sql.connect.dataframe",
-        "pyspark.sql.connect.functions",
+        "pyspark.sql.connect.functions.builtin",
+        "pyspark.sql.connect.functions.partitioning",
         "pyspark.sql.connect.observation",
         "pyspark.sql.connect.avro.functions",
         "pyspark.sql.connect.protobuf.functions",
diff --git a/python/pyspark/ml/connect/functions.py 
b/python/pyspark/ml/connect/functions.py
index c681bf5926b4..b305c04519ae 100644
--- a/python/pyspark/ml/connect/functions.py
+++ b/python/pyspark/ml/connect/functions.py
@@ -16,7 +16,7 @@
 #
 from pyspark.ml import functions as PyMLFunctions
 from pyspark.sql.connect.column import Column
-from pyspark.sql.connect.functions import _invoke_function, _to_col, lit
+from pyspark.sql.connect.functions.builtin import _invoke_function, _to_col, 
lit
 
 
 def vector_to_array(col: Column, dtype: str = "float64") -> Column:
diff --git a/python/pyspark/pandas/spark/functions.py 
b/python/pyspark/pandas/spark/functions.py
index 36ea007c4d7b..df8de08a27ee 100644
--- a/python/pyspark/pandas/spark/functions.py
+++ b/python/pyspark/pandas/spark/functions.py
@@ -24,7 +24,7 @@ from pyspark.sql.utils import is_remote
 
 def product(col: Column, dropna: bool) -> Column:
     if is_remote():
-        from pyspark.sql.connect.functions import 
_invoke_function_over_columns, lit
+        from pyspark.sql.connect.functions.builtin import 
_invoke_function_over_columns, lit
 
         return _invoke_function_over_columns(  # type: ignore[return-value]
             "pandas_product",
@@ -39,7 +39,7 @@ def product(col: Column, dropna: bool) -> Column:
 
 def stddev(col: Column, ddof: int) -> Column:
     if is_remote():
-        from pyspark.sql.connect.functions import 
_invoke_function_over_columns, lit
+        from pyspark.sql.connect.functions.builtin import 
_invoke_function_over_columns, lit
 
         return _invoke_function_over_columns(  # type: ignore[return-value]
             "pandas_stddev",
@@ -54,7 +54,7 @@ def stddev(col: Column, ddof: int) -> Column:
 
 def var(col: Column, ddof: int) -> Column:
     if is_remote():
-        from pyspark.sql.connect.functions import 
_invoke_function_over_columns, lit
+        from pyspark.sql.connect.functions.builtin import 
_invoke_function_over_columns, lit
 
         return _invoke_function_over_columns(  # type: ignore[return-value]
             "pandas_var",
@@ -69,7 +69,7 @@ def var(col: Column, ddof: int) -> Column:
 
 def skew(col: Column) -> Column:
     if is_remote():
-        from pyspark.sql.connect.functions import _invoke_function_over_columns
+        from pyspark.sql.connect.functions.builtin import 
_invoke_function_over_columns
 
         return _invoke_function_over_columns(  # type: ignore[return-value]
             "pandas_skew",
@@ -83,7 +83,7 @@ def skew(col: Column) -> Column:
 
 def kurt(col: Column) -> Column:
     if is_remote():
-        from pyspark.sql.connect.functions import _invoke_function_over_columns
+        from pyspark.sql.connect.functions.builtin import 
_invoke_function_over_columns
 
         return _invoke_function_over_columns(  # type: ignore[return-value]
             "pandas_kurt",
@@ -97,7 +97,7 @@ def kurt(col: Column) -> Column:
 
 def mode(col: Column, dropna: bool) -> Column:
     if is_remote():
-        from pyspark.sql.connect.functions import 
_invoke_function_over_columns, lit
+        from pyspark.sql.connect.functions.builtin import 
_invoke_function_over_columns, lit
 
         return _invoke_function_over_columns(  # type: ignore[return-value]
             "pandas_mode",
@@ -112,7 +112,7 @@ def mode(col: Column, dropna: bool) -> Column:
 
 def covar(col1: Column, col2: Column, ddof: int) -> Column:
     if is_remote():
-        from pyspark.sql.connect.functions import 
_invoke_function_over_columns, lit
+        from pyspark.sql.connect.functions.builtin import 
_invoke_function_over_columns, lit
 
         return _invoke_function_over_columns(  # type: ignore[return-value]
             "pandas_covar",
@@ -128,7 +128,7 @@ def covar(col1: Column, col2: Column, ddof: int) -> Column:
 
 def ewm(col: Column, alpha: float, ignore_na: bool) -> Column:
     if is_remote():
-        from pyspark.sql.connect.functions import 
_invoke_function_over_columns, lit
+        from pyspark.sql.connect.functions.builtin import 
_invoke_function_over_columns, lit
 
         return _invoke_function_over_columns(  # type: ignore[return-value]
             "ewm",
@@ -144,7 +144,7 @@ def ewm(col: Column, alpha: float, ignore_na: bool) -> 
Column:
 
 def null_index(col: Column) -> Column:
     if is_remote():
-        from pyspark.sql.connect.functions import _invoke_function_over_columns
+        from pyspark.sql.connect.functions.builtin import 
_invoke_function_over_columns
 
         return _invoke_function_over_columns(  # type: ignore[return-value]
             "null_index",
@@ -158,7 +158,7 @@ def null_index(col: Column) -> Column:
 
 def timestampdiff(unit: str, start: Column, end: Column) -> Column:
     if is_remote():
-        from pyspark.sql.connect.functions import 
_invoke_function_over_columns, lit
+        from pyspark.sql.connect.functions.builtin import 
_invoke_function_over_columns, lit
 
         return _invoke_function_over_columns(  # type: ignore[return-value]
             "timestampdiff",
diff --git a/python/pyspark/sql/connect/avro/functions.py 
b/python/pyspark/sql/connect/avro/functions.py
index bf019ef8fe7d..1d28fd077b18 100644
--- a/python/pyspark/sql/connect/avro/functions.py
+++ b/python/pyspark/sql/connect/avro/functions.py
@@ -28,7 +28,7 @@ from typing import Dict, Optional, TYPE_CHECKING
 from pyspark.sql.avro import functions as PyAvroFunctions
 
 from pyspark.sql.connect.column import Column
-from pyspark.sql.connect.functions import _invoke_function, _to_col, 
_options_to_col, lit
+from pyspark.sql.connect.functions.builtin import _invoke_function, _to_col, 
_options_to_col, lit
 
 if TYPE_CHECKING:
     from pyspark.sql.connect._typing import ColumnOrName
diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index 75cecd5f6108..35e4882fb031 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -68,7 +68,7 @@ from pyspark.sql.connect.readwriter import DataFrameWriter, 
DataFrameWriterV2
 from pyspark.sql.connect.streaming.readwriter import DataStreamWriter
 from pyspark.sql.connect.column import Column
 from pyspark.sql.connect.expressions import UnresolvedRegex
-from pyspark.sql.connect.functions import (
+from pyspark.sql.connect.functions.builtin import (
     _to_col_with_plan_id,
     _to_col,
     _invoke_function,
diff --git a/python/pyspark/ml/connect/functions.py 
b/python/pyspark/sql/connect/functions/__init__.py
similarity index 58%
copy from python/pyspark/ml/connect/functions.py
copy to python/pyspark/sql/connect/functions/__init__.py
index c681bf5926b4..e0179d4d56cf 100644
--- a/python/pyspark/ml/connect/functions.py
+++ b/python/pyspark/sql/connect/functions/__init__.py
@@ -14,20 +14,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-from pyspark.ml import functions as PyMLFunctions
-from pyspark.sql.connect.column import Column
-from pyspark.sql.connect.functions import _invoke_function, _to_col, lit
 
+"""PySpark Functions with Spark Connect"""
 
-def vector_to_array(col: Column, dtype: str = "float64") -> Column:
-    return _invoke_function("vector_to_array", _to_col(col), lit(dtype))
-
-
-vector_to_array.__doc__ = PyMLFunctions.vector_to_array.__doc__
-
-
-def array_to_vector(col: Column) -> Column:
-    return _invoke_function("array_to_vector", _to_col(col))
-
-
-array_to_vector.__doc__ = PyMLFunctions.array_to_vector.__doc__
+from pyspark.sql.connect.functions.builtin import *  # noqa: F401,F403
+from pyspark.sql.connect.functions import partitioning  # noqa: F401,F403
diff --git a/python/pyspark/sql/connect/functions.py 
b/python/pyspark/sql/connect/functions/builtin.py
similarity index 99%
rename from python/pyspark/sql/connect/functions.py
rename to python/pyspark/sql/connect/functions/builtin.py
index 38eb814247c1..d1cfabb147ee 100644
--- a/python/pyspark/sql/connect/functions.py
+++ b/python/pyspark/sql/connect/functions/builtin.py
@@ -3406,48 +3406,45 @@ to_timestamp_ntz.__doc__ = 
pysparkfuncs.to_timestamp_ntz.__doc__
 
 
 def bucket(numBuckets: Union[Column, int], col: "ColumnOrName") -> Column:
-    if isinstance(numBuckets, int):
-        _numBuckets = lit(numBuckets)
-    elif isinstance(numBuckets, Column):
-        _numBuckets = numBuckets
-    else:
-        raise PySparkTypeError(
-            error_class="NOT_COLUMN_OR_INT",
-            message_parameters={
-                "arg_name": "numBuckets",
-                "arg_type": type(numBuckets).__name__,
-            },
-        )
+    from pyspark.sql.connect.functions import partitioning
 
-    return _invoke_function("bucket", _numBuckets, _to_col(col))
+    return partitioning.bucket(numBuckets, col)
 
 
 bucket.__doc__ = pysparkfuncs.bucket.__doc__
 
 
 def years(col: "ColumnOrName") -> Column:
-    return _invoke_function_over_columns("years", col)
+    from pyspark.sql.connect.functions import partitioning
+
+    return partitioning.years(col)
 
 
 years.__doc__ = pysparkfuncs.years.__doc__
 
 
 def months(col: "ColumnOrName") -> Column:
-    return _invoke_function_over_columns("months", col)
+    from pyspark.sql.connect.functions import partitioning
+
+    return partitioning.months(col)
 
 
 months.__doc__ = pysparkfuncs.months.__doc__
 
 
 def days(col: "ColumnOrName") -> Column:
-    return _invoke_function_over_columns("days", col)
+    from pyspark.sql.connect.functions import partitioning
+
+    return partitioning.days(col)
 
 
 days.__doc__ = pysparkfuncs.days.__doc__
 
 
 def hours(col: "ColumnOrName") -> Column:
-    return _invoke_function_over_columns("hours", col)
+    from pyspark.sql.connect.functions import partitioning
+
+    return partitioning.hours(col)
 
 
 hours.__doc__ = pysparkfuncs.hours.__doc__
@@ -3994,9 +3991,9 @@ def _test() -> None:
     import sys
     import doctest
     from pyspark.sql import SparkSession as PySparkSession
-    import pyspark.sql.connect.functions
+    import pyspark.sql.connect.functions.builtin
 
-    globs = pyspark.sql.connect.functions.__dict__.copy()
+    globs = pyspark.sql.connect.functions.builtin.__dict__.copy()
 
     globs["spark"] = (
         PySparkSession.builder.appName("sql.connect.functions tests")
@@ -4005,7 +4002,7 @@ def _test() -> None:
     )
 
     (failure_count, test_count) = doctest.testmod(
-        pyspark.sql.connect.functions,
+        pyspark.sql.connect.functions.builtin,
         globs=globs,
         optionflags=doctest.ELLIPSIS
         | doctest.NORMALIZE_WHITESPACE
diff --git a/python/pyspark/sql/connect/functions/partitioning.py 
b/python/pyspark/sql/connect/functions/partitioning.py
new file mode 100644
index 000000000000..ef319cad2e72
--- /dev/null
+++ b/python/pyspark/sql/connect/functions/partitioning.py
@@ -0,0 +1,111 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from pyspark.sql.connect.utils import check_dependencies
+
+check_dependencies(__name__)
+
+from typing import Union, TYPE_CHECKING
+
+from pyspark.errors import PySparkTypeError
+from pyspark.sql import functions as pysparkfuncs
+from pyspark.sql.connect.column import Column
+from pyspark.sql.connect.functions.builtin import _to_col, 
_invoke_function_over_columns
+from pyspark.sql.connect.functions.builtin import lit, _invoke_function
+
+
+if TYPE_CHECKING:
+    from pyspark.sql.connect._typing import ColumnOrName
+
+
+def bucket(numBuckets: Union[Column, int], col: "ColumnOrName") -> Column:
+    if isinstance(numBuckets, int):
+        _numBuckets = lit(numBuckets)
+    elif isinstance(numBuckets, Column):
+        _numBuckets = numBuckets
+    else:
+        raise PySparkTypeError(
+            error_class="NOT_COLUMN_OR_INT",
+            message_parameters={
+                "arg_name": "numBuckets",
+                "arg_type": type(numBuckets).__name__,
+            },
+        )
+
+    return _invoke_function("bucket", _numBuckets, _to_col(col))
+
+
+bucket.__doc__ = pysparkfuncs.partitioning.bucket.__doc__
+
+
+def years(col: "ColumnOrName") -> Column:
+    return _invoke_function_over_columns("years", col)
+
+
+years.__doc__ = pysparkfuncs.partitioning.years.__doc__
+
+
+def months(col: "ColumnOrName") -> Column:
+    return _invoke_function_over_columns("months", col)
+
+
+months.__doc__ = pysparkfuncs.partitioning.months.__doc__
+
+
+def days(col: "ColumnOrName") -> Column:
+    return _invoke_function_over_columns("days", col)
+
+
+days.__doc__ = pysparkfuncs.partitioning.days.__doc__
+
+
+def hours(col: "ColumnOrName") -> Column:
+    return _invoke_function_over_columns("hours", col)
+
+
+hours.__doc__ = pysparkfuncs.partitioning.hours.__doc__
+
+
+def _test() -> None:
+    import sys
+    import doctest
+    from pyspark.sql import SparkSession as PySparkSession
+    import pyspark.sql.connect.functions.partitioning
+
+    globs = pyspark.sql.connect.functions.partitioning.__dict__.copy()
+
+    globs["spark"] = (
+        PySparkSession.builder.appName("sql.connect.functions tests")
+        .remote("local[4]")
+        .getOrCreate()
+    )
+
+    (failure_count, test_count) = doctest.testmod(
+        pyspark.sql.connect.functions.partitioning,
+        globs=globs,
+        optionflags=doctest.ELLIPSIS
+        | doctest.NORMALIZE_WHITESPACE
+        | doctest.IGNORE_EXCEPTION_DETAIL,
+    )
+
+    globs["spark"].stop()
+
+    if failure_count:
+        sys.exit(-1)
+
+
+if __name__ == "__main__":
+    _test()
diff --git a/python/pyspark/sql/connect/group.py 
b/python/pyspark/sql/connect/group.py
index 9b7dce360b2a..7b71a43c1121 100644
--- a/python/pyspark/sql/connect/group.py
+++ b/python/pyspark/sql/connect/group.py
@@ -40,7 +40,7 @@ from pyspark.sql.types import StructType
 
 import pyspark.sql.connect.plan as plan
 from pyspark.sql.connect.column import Column
-from pyspark.sql.connect.functions import _invoke_function, col, lit
+from pyspark.sql.connect.functions.builtin import _invoke_function, col, lit
 from pyspark.errors import PySparkNotImplementedError, PySparkTypeError
 
 if TYPE_CHECKING:
diff --git a/python/pyspark/sql/connect/protobuf/functions.py 
b/python/pyspark/sql/connect/protobuf/functions.py
index 56119f4bc4eb..8bcc2218f06c 100644
--- a/python/pyspark/sql/connect/protobuf/functions.py
+++ b/python/pyspark/sql/connect/protobuf/functions.py
@@ -28,7 +28,7 @@ from typing import Dict, Optional, TYPE_CHECKING
 from pyspark.sql.protobuf import functions as PyProtobufFunctions
 
 from pyspark.sql.connect.column import Column
-from pyspark.sql.connect.functions import _invoke_function, _to_col, 
_options_to_col, lit
+from pyspark.sql.connect.functions.builtin import _invoke_function, _to_col, 
_options_to_col, lit
 
 if TYPE_CHECKING:
     from pyspark.sql.connect._typing import ColumnOrName
diff --git a/python/pyspark/ml/connect/functions.py 
b/python/pyspark/sql/functions/__init__.py
similarity index 58%
copy from python/pyspark/ml/connect/functions.py
copy to python/pyspark/sql/functions/__init__.py
index c681bf5926b4..dd09c4aa5c77 100644
--- a/python/pyspark/ml/connect/functions.py
+++ b/python/pyspark/sql/functions/__init__.py
@@ -14,20 +14,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-from pyspark.ml import functions as PyMLFunctions
-from pyspark.sql.connect.column import Column
-from pyspark.sql.connect.functions import _invoke_function, _to_col, lit
 
+"""PySpark Functions"""
 
-def vector_to_array(col: Column, dtype: str = "float64") -> Column:
-    return _invoke_function("vector_to_array", _to_col(col), lit(dtype))
-
-
-vector_to_array.__doc__ = PyMLFunctions.vector_to_array.__doc__
-
-
-def array_to_vector(col: Column) -> Column:
-    return _invoke_function("array_to_vector", _to_col(col))
-
-
-array_to_vector.__doc__ = PyMLFunctions.array_to_vector.__doc__
+from pyspark.sql.functions.builtin import *  # noqa: F401,F403
+from pyspark.sql.functions import partitioning  # noqa: F401,F403
diff --git a/python/pyspark/sql/functions.py 
b/python/pyspark/sql/functions/builtin.py
similarity index 99%
rename from python/pyspark/sql/functions.py
rename to python/pyspark/sql/functions/builtin.py
index 655806e83778..e23c38be4a0a 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions/builtin.py
@@ -15467,6 +15467,9 @@ def years(col: "ColumnOrName") -> Column:
     .. versionchanged:: 3.4.0
         Supports Spark Connect.
 
+    .. deprecated:: 4.0.0
+        Use :func:`partitioning.years` instead.
+
     Parameters
     ----------
     col : :class:`~pyspark.sql.Column` or str
@@ -15490,7 +15493,11 @@ def years(col: "ColumnOrName") -> Column:
     method of the `DataFrameWriterV2`.
 
     """
-    return _invoke_function_over_columns("years", col)
+    from pyspark.sql.functions import partitioning
+
+    warnings.warn("Deprecated in 4.0.0, use partitioning.years instead.", 
FutureWarning)
+
+    return partitioning.years(col)
 
 
 @_try_remote_functions
@@ -15504,6 +15511,9 @@ def months(col: "ColumnOrName") -> Column:
     .. versionchanged:: 3.4.0
         Supports Spark Connect.
 
+    .. deprecated:: 4.0.0
+        Use :func:`partitioning.months` instead.
+
     Parameters
     ----------
     col : :class:`~pyspark.sql.Column` or str
@@ -15527,7 +15537,11 @@ def months(col: "ColumnOrName") -> Column:
     method of the `DataFrameWriterV2`.
 
     """
-    return _invoke_function_over_columns("months", col)
+    from pyspark.sql.functions import partitioning
+
+    warnings.warn("Deprecated in 4.0.0, use partitioning.months instead.", 
FutureWarning)
+
+    return partitioning.months(col)
 
 
 @_try_remote_functions
@@ -15541,6 +15555,9 @@ def days(col: "ColumnOrName") -> Column:
     .. versionchanged:: 3.4.0
         Supports Spark Connect.
 
+    .. deprecated:: 4.0.0
+        Use :func:`partitioning.months` instead.
+
     Parameters
     ----------
     col : :class:`~pyspark.sql.Column` or str
@@ -15564,7 +15581,11 @@ def days(col: "ColumnOrName") -> Column:
     method of the `DataFrameWriterV2`.
 
     """
-    return _invoke_function_over_columns("days", col)
+    from pyspark.sql.functions import partitioning
+
+    warnings.warn("Deprecated in 4.0.0, use partitioning.days instead.", 
FutureWarning)
+
+    return partitioning.days(col)
 
 
 @_try_remote_functions
@@ -15578,6 +15599,9 @@ def hours(col: "ColumnOrName") -> Column:
     .. versionchanged:: 3.4.0
         Supports Spark Connect.
 
+    .. deprecated:: 4.0.0
+        Use :func:`partitioning.hours` instead.
+
     Parameters
     ----------
     col : :class:`~pyspark.sql.Column` or str
@@ -15601,7 +15625,11 @@ def hours(col: "ColumnOrName") -> Column:
     method of the `DataFrameWriterV2`.
 
     """
-    return _invoke_function_over_columns("hours", col)
+    from pyspark.sql.functions import partitioning
+
+    warnings.warn("Deprecated in 4.0.0, use partitioning.hours instead.", 
FutureWarning)
+
+    return partitioning.hours(col)
 
 
 @_try_remote_functions
@@ -16081,6 +16109,9 @@ def bucket(numBuckets: Union[Column, int], col: 
"ColumnOrName") -> Column:
     .. versionchanged:: 3.4.0
         Supports Spark Connect.
 
+    .. deprecated:: 4.0.0
+        Use :func:`partitioning.bucket` instead.
+
     Examples
     --------
     >>> df.writeTo("catalog.db.table").partitionedBy(  # doctest: +SKIP
@@ -16104,19 +16135,11 @@ def bucket(numBuckets: Union[Column, int], col: 
"ColumnOrName") -> Column:
     method of the `DataFrameWriterV2`.
 
     """
-    if not isinstance(numBuckets, (int, Column)):
-        raise PySparkTypeError(
-            error_class="NOT_COLUMN_OR_INT",
-            message_parameters={"arg_name": "numBuckets", "arg_type": 
type(numBuckets).__name__},
-        )
+    from pyspark.sql.functions import partitioning
 
-    _get_active_spark_context()
-    numBuckets = (
-        _create_column_from_literal(numBuckets)
-        if isinstance(numBuckets, int)
-        else _to_java_column(numBuckets)
-    )
-    return _invoke_function("bucket", numBuckets, _to_java_column(col))
+    warnings.warn("Deprecated in 4.0.0, use partitioning.bucket instead.", 
FutureWarning)
+
+    return partitioning.bucket(numBuckets, col)
 
 
 @_try_remote_functions
@@ -17474,15 +17497,17 @@ def udtf(
 def _test() -> None:
     import doctest
     from pyspark.sql import SparkSession
-    import pyspark.sql.functions
+    import pyspark.sql.functions.builtin
 
-    globs = pyspark.sql.functions.__dict__.copy()
-    spark = SparkSession.builder.master("local[4]").appName("sql.functions 
tests").getOrCreate()
+    globs = pyspark.sql.functions.builtin.__dict__.copy()
+    spark = (
+        SparkSession.builder.master("local[4]").appName("sql.functions.builtin 
tests").getOrCreate()
+    )
     sc = spark.sparkContext
     globs["sc"] = sc
     globs["spark"] = spark
     (failure_count, test_count) = doctest.testmod(
-        pyspark.sql.functions,
+        pyspark.sql.functions.builtin,
         globs=globs,
         optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE,
     )
diff --git a/python/pyspark/sql/functions/partitioning.py 
b/python/pyspark/sql/functions/partitioning.py
new file mode 100644
index 000000000000..59c293577b08
--- /dev/null
+++ b/python/pyspark/sql/functions/partitioning.py
@@ -0,0 +1,248 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A collections of partitioning functions
+"""
+
+import sys
+from typing import (
+    TYPE_CHECKING,
+    Union,
+)
+
+from pyspark.errors import PySparkTypeError
+from pyspark.sql.column import Column, _to_java_column, 
_create_column_from_literal
+from pyspark.sql.functions.builtin import _invoke_function_over_columns, 
_invoke_function
+from pyspark.sql.utils import (
+    try_partitioning_remote_functions as _try_partitioning_remote_functions,
+    get_active_spark_context as _get_active_spark_context,
+)
+
+if TYPE_CHECKING:
+    from pyspark.sql._typing import ColumnOrName
+
+
+@_try_partitioning_remote_functions
+def years(col: "ColumnOrName") -> Column:
+    """
+    Partition transform function: A transform for timestamps and dates
+    to partition data into years.
+
+    .. versionadded:: 4.0.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        target date or timestamp column to work on.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        data partitioned by years.
+
+    Examples
+    --------
+    >>> df.writeTo("catalog.db.table").partitionedBy(  # doctest: +SKIP
+    ...     partitioning.years("ts")
+    ... ).createOrReplace()
+
+    Notes
+    -----
+    This function can be used only in combination with
+    :py:meth:`~pyspark.sql.readwriter.DataFrameWriterV2.partitionedBy`
+    method of the `DataFrameWriterV2`.
+
+    """
+    return _invoke_function_over_columns("years", col)
+
+
+@_try_partitioning_remote_functions
+def months(col: "ColumnOrName") -> Column:
+    """
+    Partition transform function: A transform for timestamps and dates
+    to partition data into months.
+
+    .. versionadded:: 4.0.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        target date or timestamp column to work on.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        data partitioned by months.
+
+    Examples
+    --------
+    >>> df.writeTo("catalog.db.table").partitionedBy(
+    ...     partitioning.months("ts")
+    ... ).createOrReplace()  # doctest: +SKIP
+
+    Notes
+    -----
+    This function can be used only in combination with
+    :py:meth:`~pyspark.sql.readwriter.DataFrameWriterV2.partitionedBy`
+    method of the `DataFrameWriterV2`.
+
+    """
+    return _invoke_function_over_columns("months", col)
+
+
+@_try_partitioning_remote_functions
+def days(col: "ColumnOrName") -> Column:
+    """
+    Partition transform function: A transform for timestamps and dates
+    to partition data into days.
+
+    .. versionadded:: 4.0.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        target date or timestamp column to work on.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        data partitioned by days.
+
+    Examples
+    --------
+    >>> df.writeTo("catalog.db.table").partitionedBy(  # doctest: +SKIP
+    ...     partitioning.days("ts")
+    ... ).createOrReplace()
+
+    Notes
+    -----
+    This function can be used only in combination with
+    :py:meth:`~pyspark.sql.readwriter.DataFrameWriterV2.partitionedBy`
+    method of the `DataFrameWriterV2`.
+
+    """
+    return _invoke_function_over_columns("days", col)
+
+
+@_try_partitioning_remote_functions
+def hours(col: "ColumnOrName") -> Column:
+    """
+    Partition transform function: A transform for timestamps
+    to partition data into hours.
+
+    .. versionadded:: 4.0.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        target date or timestamp column to work on.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        data partitioned by hours.
+
+    Examples
+    --------
+    >>> df.writeTo("catalog.db.table").partitionedBy(   # doctest: +SKIP
+    ...     partitioning.hours("ts")
+    ... ).createOrReplace()
+
+    Notes
+    -----
+    This function can be used only in combination with
+    :py:meth:`~pyspark.sql.readwriter.DataFrameWriterV2.partitionedBy`
+    method of the `DataFrameWriterV2`.
+
+    """
+    return _invoke_function_over_columns("hours", col)
+
+
+@_try_partitioning_remote_functions
+def bucket(numBuckets: Union[Column, int], col: "ColumnOrName") -> Column:
+    """
+    Partition transform function: A transform for any type that partitions
+    by a hash of the input column.
+
+    .. versionadded:: 4.0.0
+
+    Examples
+    --------
+    >>> df.writeTo("catalog.db.table").partitionedBy(  # doctest: +SKIP
+    ...     partitioning.bucket(42, "ts")
+    ... ).createOrReplace()
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        target date or timestamp column to work on.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        data partitioned by given columns.
+
+    Notes
+    -----
+    This function can be used only in combination with
+    :py:meth:`~pyspark.sql.readwriter.DataFrameWriterV2.partitionedBy`
+    method of the `DataFrameWriterV2`.
+
+    """
+    if not isinstance(numBuckets, (int, Column)):
+        raise PySparkTypeError(
+            error_class="NOT_COLUMN_OR_INT",
+            message_parameters={
+                "arg_name": "numBuckets",
+                "arg_type": type(numBuckets).__name__,
+            },
+        )
+
+    _get_active_spark_context()
+    numBuckets = (
+        _create_column_from_literal(numBuckets)
+        if isinstance(numBuckets, int)
+        else _to_java_column(numBuckets)
+    )
+    return _invoke_function("bucket", numBuckets, _to_java_column(col))
+
+
+def _test() -> None:
+    import doctest
+    from pyspark.sql import SparkSession
+    import pyspark.sql.functions.partitioning
+
+    globs = pyspark.sql.functions.partitioning.__dict__.copy()
+    spark = (
+        SparkSession.builder.master("local[4]")
+        .appName("sql.functions.partitioning tests")
+        .getOrCreate()
+    )
+    globs["spark"] = spark
+    (failure_count, test_count) = doctest.testmod(
+        pyspark.sql.functions.partitioning,
+        globs=globs,
+        optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE,
+    )
+    spark.stop()
+    if failure_count:
+        sys.exit(-1)
+
+
+if __name__ == "__main__":
+    _test()
diff --git a/python/pyspark/sql/tests/test_functions.py 
b/python/pyspark/sql/tests/test_functions.py
index 9753ba5c532a..b04127b3d5c1 100644
--- a/python/pyspark/sql/tests/test_functions.py
+++ b/python/pyspark/sql/tests/test_functions.py
@@ -67,6 +67,7 @@ class FunctionsTestsMixin:
             "uuid",  # namespace conflict with python built-in module
             "chr",  # namespace conflict with python built-in function
             "session_user",  # Scala only for now, needs implementation
+            "partitioning$",  # partitioning expressions for DSv2
         ]
 
         jvm_fn_set.difference_update(jvm_excluded_fn)
diff --git a/python/pyspark/sql/tests/test_readwriter.py 
b/python/pyspark/sql/tests/test_readwriter.py
index 6bcef51732f8..70a320fc53b6 100644
--- a/python/pyspark/sql/tests/test_readwriter.py
+++ b/python/pyspark/sql/tests/test_readwriter.py
@@ -202,7 +202,7 @@ class ReadwriterV2TestsMixin:
 
     def check_partitioning_functions(self, tpe):
         import datetime
-        from pyspark.sql.functions import years, months, days, hours, bucket
+        from pyspark.sql.functions.partitioning import years, months, days, 
hours, bucket
 
         df = self.spark.createDataFrame(
             [(1, datetime.datetime(2000, 1, 1), "foo")], ("id", "ts", "value")
diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py
index b366c6c8bd8d..9f817341724a 100644
--- a/python/pyspark/sql/utils.py
+++ b/python/pyspark/sql/utils.py
@@ -50,7 +50,7 @@ if TYPE_CHECKING:
     from pyspark.sql.window import Window
     from pyspark.pandas._typing import IndexOpsLike, SeriesOrIndex
 
-has_numpy = False
+has_numpy: bool = False
 try:
     import numpy as np  # noqa: F401
 
@@ -194,6 +194,21 @@ def try_remote_functions(f: FuncT) -> FuncT:
     return cast(FuncT, wrapped)
 
 
+def try_partitioning_remote_functions(f: FuncT) -> FuncT:
+    """Mark API supported from Spark Connect."""
+
+    @functools.wraps(f)
+    def wrapped(*args: Any, **kwargs: Any) -> Any:
+        if is_remote() and "PYSPARK_NO_NAMESPACE_SHARE" not in os.environ:
+            from pyspark.sql.connect.functions import partitioning
+
+            return getattr(partitioning, f.__name__)(*args, **kwargs)
+        else:
+            return f(*args, **kwargs)
+
+    return cast(FuncT, wrapped)
+
+
 def try_remote_avro_functions(f: FuncT) -> FuncT:
     """Mark API supported from Spark Connect."""
 
diff --git a/python/setup.py b/python/setup.py
index 7e5e4ebd8510..c75ab46931ba 100755
--- a/python/setup.py
+++ b/python/setup.py
@@ -249,9 +249,11 @@ try:
             "pyspark.sql.connect",
             "pyspark.sql.connect.avro",
             "pyspark.sql.connect.client",
+            "pyspark.sql.connect.functions",
             "pyspark.sql.connect.proto",
             "pyspark.sql.connect.streaming",
             "pyspark.sql.connect.streaming.worker",
+            "pyspark.sql.functions",
             "pyspark.sql.pandas",
             "pyspark.sql.protobuf",
             "pyspark.sql.streaming",
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index a42df5bbcc29..2576c9b08313 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -7268,7 +7268,7 @@ object functions {
    * @group partition_transforms
    * @since 3.0.0
    */
-  def years(e: Column): Column = withExpr { Years(e.expr) }
+  def years(e: Column): Column = partitioning.years(e)
 
   /**
    * A transform for timestamps and dates to partition data into months.
@@ -7276,7 +7276,7 @@ object functions {
    * @group partition_transforms
    * @since 3.0.0
    */
-  def months(e: Column): Column = withExpr { Months(e.expr) }
+  def months(e: Column): Column = partitioning.months(e)
 
   /**
    * A transform for timestamps and dates to partition data into days.
@@ -7284,7 +7284,7 @@ object functions {
    * @group partition_transforms
    * @since 3.0.0
    */
-  def days(e: Column): Column = withExpr { Days(e.expr) }
+  def days(e: Column): Column = partitioning.days(e)
 
   /**
    * Returns a string array of values within the nodes of xml that match the 
XPath expression.
@@ -7379,7 +7379,7 @@ object functions {
    * @group partition_transforms
    * @since 3.0.0
    */
-  def hours(e: Column): Column = withExpr { Hours(e.expr) }
+  def hours(e: Column): Column = partitioning.hours(e)
 
   /**
    * Converts the timestamp without time zone `sourceTs`
@@ -7662,14 +7662,7 @@ object functions {
    * @group partition_transforms
    * @since 3.0.0
    */
-  def bucket(numBuckets: Column, e: Column): Column = withExpr {
-    numBuckets.expr match {
-      case lit @ Literal(_, IntegerType) =>
-        Bucket(lit, e.expr)
-      case _ =>
-        throw 
QueryCompilationErrors.invalidBucketsNumberError(numBuckets.toString, 
e.toString)
-    }
-  }
+  def bucket(numBuckets: Column, e: Column): Column = 
partitioning.bucket(numBuckets, e)
 
   /**
    * A transform for any type that partitions by a hash of the input column.
@@ -7677,9 +7670,7 @@ object functions {
    * @group partition_transforms
    * @since 3.0.0
    */
-  def bucket(numBuckets: Int, e: Column): Column = withExpr {
-    Bucket(Literal(numBuckets), e.expr)
-  }
+  def bucket(numBuckets: Int, e: Column): Column = 
partitioning.bucket(numBuckets, e)
 
   
//////////////////////////////////////////////////////////////////////////////////////////////
   // Predicates functions
@@ -8289,4 +8280,68 @@ object functions {
   def unwrap_udt(column: Column): Column = withExpr {
     UnwrapUDT(column.expr)
   }
+
+  // scalastyle:off
+  // TODO(SPARK-45970): Use @static annotation so Java can access to those
+  //   API in the same way. Once we land this fix, should deprecate
+  //   functions.hours, days, months, years and bucket.
+  object partitioning {
+  // scalastyle:on
+    /**
+     * A transform for timestamps and dates to partition data into years.
+     *
+     * @group partition_transforms
+     * @since 4.0.0
+     */
+    def years(e: Column): Column = withExpr { Years(e.expr) }
+
+    /**
+     * A transform for timestamps and dates to partition data into months.
+     *
+     * @group partition_transforms
+     * @since 4.0.0
+     */
+    def months(e: Column): Column = withExpr { Months(e.expr) }
+
+    /**
+     * A transform for timestamps and dates to partition data into days.
+     *
+     * @group partition_transforms
+     * @since 4.0.0
+     */
+    def days(e: Column): Column = withExpr { Days(e.expr) }
+
+    /**
+     * A transform for timestamps to partition data into hours.
+     *
+     * @group partition_transforms
+     * @since 4.0.0
+     */
+    def hours(e: Column): Column = withExpr { Hours(e.expr) }
+
+    /**
+     * A transform for any type that partitions by a hash of the input column.
+     *
+     * @group partition_transforms
+     * @since 4.0.0
+     */
+    def bucket(numBuckets: Column, e: Column): Column = withExpr {
+      numBuckets.expr match {
+        case lit @ Literal(_, IntegerType) =>
+          Bucket(lit, e.expr)
+        case _ =>
+          throw 
QueryCompilationErrors.invalidBucketsNumberError(numBuckets.toString, 
e.toString)
+      }
+    }
+
+    /**
+     * A transform for any type that partitions by a hash of the input column.
+     *
+     * @group partition_transforms
+     * @since 4.0.0
+     */
+    def bucket(numBuckets: Int, e: Column): Column = withExpr {
+      Bucket(Literal(numBuckets), e.expr)
+    }
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala
index 860d6fce604b..44d47abc93fa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala
@@ -463,7 +463,7 @@ class DataFrameWriterV2Suite extends QueryTest with 
SharedSparkSession with Befo
     spark.table("source")
         .withColumn("ts", lit("2019-06-01 10:00:00.000000").cast("timestamp"))
         .writeTo("testcat.table_name")
-        .partitionedBy(years($"ts"))
+        .partitionedBy(partitioning.years($"ts"))
         .create()
 
     val table = catalog("testcat").loadTable(Identifier.of(Array(), 
"table_name"))
@@ -476,7 +476,7 @@ class DataFrameWriterV2Suite extends QueryTest with 
SharedSparkSession with Befo
     spark.table("source")
         .withColumn("ts", lit("2019-06-01 10:00:00.000000").cast("timestamp"))
         .writeTo("testcat.table_name")
-        .partitionedBy(months($"ts"))
+        .partitionedBy(partitioning.months($"ts"))
         .create()
 
     val table = catalog("testcat").loadTable(Identifier.of(Array(), 
"table_name"))
@@ -489,7 +489,7 @@ class DataFrameWriterV2Suite extends QueryTest with 
SharedSparkSession with Befo
     spark.table("source")
         .withColumn("ts", lit("2019-06-01 10:00:00.000000").cast("timestamp"))
         .writeTo("testcat.table_name")
-        .partitionedBy(days($"ts"))
+        .partitionedBy(partitioning.days($"ts"))
         .create()
 
     val table = catalog("testcat").loadTable(Identifier.of(Array(), 
"table_name"))
@@ -502,7 +502,7 @@ class DataFrameWriterV2Suite extends QueryTest with 
SharedSparkSession with Befo
     spark.table("source")
         .withColumn("ts", lit("2019-06-01 10:00:00.000000").cast("timestamp"))
         .writeTo("testcat.table_name")
-        .partitionedBy(hours($"ts"))
+        .partitionedBy(partitioning.hours($"ts"))
         .create()
 
     val table = catalog("testcat").loadTable(Identifier.of(Array(), 
"table_name"))
@@ -514,7 +514,7 @@ class DataFrameWriterV2Suite extends QueryTest with 
SharedSparkSession with Befo
   test("Create: partitioned by bucket(4, id)") {
     spark.table("source")
         .writeTo("testcat.table_name")
-        .partitionedBy(bucket(4, $"id"))
+        .partitionedBy(partitioning.bucket(4, $"id"))
         .create()
 
     val table = catalog("testcat").loadTable(Identifier.of(Array(), 
"table_name"))
@@ -764,7 +764,7 @@ class DataFrameWriterV2Suite extends QueryTest with 
SharedSparkSession with Befo
         lit("2019-09-02 07:00:00.000000").cast("timestamp") as "modified",
         lit("America/Los_Angeles") as "timezone"))
       .writeTo("testcat.table_name")
-      .partitionedBy(bucket(4, $"ts.timezone"))
+      .partitionedBy(partitioning.bucket(4, $"ts.timezone"))
       .create()
 
     val table = catalog("testcat").loadTable(Identifier.of(Array(), 
"table_name"))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to