This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d31c8596cd71 [SPARK-45965][SQL] Move DSv2 partitioning expressions
into functions.partitioning
d31c8596cd71 is described below
commit d31c8596cd714766892d1395e30358bd1cd3cb84
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Sun Nov 19 12:00:30 2023 +0900
[SPARK-45965][SQL] Move DSv2 partitioning expressions into
functions.partitioning
### What changes were proposed in this pull request?
This PR proposes to move the partitioning expressions for DSv2 `hours`,
`days`, `months`, `years`, and `bucket` from `org.apache.spark.sql.functions`
to `org.apache.spark.sql.functions.partitioning`.
**Note** that the old references are deprecated in Python only because
Scala 3 upgrade
([SIP-30](https://docs.scala-lang.org/sips/static-members.html)) is required
for Java to access `org.apache.spark.sql.functions.partitioning` as same as the
original way (e.g., `import static
org.apache.spark.sql.functions.partitioning.*`). I filed a JIRA separately for
this SPARK-45970. For Python, there is no problem so the functions are
deprecated properly.
**Note** that this PR includes the necessary refactoring that makes
`pyspark.sql.functions` into a package in order to allow `import
pyspark.sql.functions.partitioning` as if Scala allows `import
org.apache.spark.sql.functions.partitioning._`
From
```bash
pyspark
└── sql
└── functions.py
```
to
```bash
pyspark
└── sql
└── functions
├── __init__.py # Maintain the backward compatibility about
imports.
├── builtin.py # Previous `functions.py`
└── partitioning.py # New partitioning functions.
```
There are no user-facing changes here except that several non-API private
attributes are not accessible anymore (e.g., `from pyspark.sql.functions import
_invoke_function_over_columns`) In that case, they have to manually import
`from pyspark.sql.functions.builtin import _invoke_function_over_columns`.
### Why are the changes needed?
Those expressions can ONLY be used in DSv2 `partitionBy` but they exist in
the same place with other expressions that confuse users that they believe they
can use with other places such as `select`. They at least have to be grouped
separately so users don't be confused.
Just for doubly sure, I had an offline discussion including the original
author, and related people a while ago.
### Does this PR introduce _any_ user-facing change?
Yes:
- Moves the partitioning expressions for DSv2 `hours`, `days`, `months`,
and `bucket` from `org.apache.spark.sql.functions` to
`org.apache.spark.sql.functions.partitioning`.
- Deprecate the expressions `org.apache.spark.sql.functions` in Python
(from 4.0.0)
### How was this patch tested?
Reusing the existent unittest.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #43858 from HyukjinKwon/SPARK-45965.
Lead-authored-by: Hyukjin Kwon <[email protected]>
Co-authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../scala/org/apache/spark/sql/functions.scala | 68 +++++-
dev/sparktestsupport/modules.py | 6 +-
python/pyspark/ml/connect/functions.py | 2 +-
python/pyspark/pandas/spark/functions.py | 20 +-
python/pyspark/sql/connect/avro/functions.py | 2 +-
python/pyspark/sql/connect/dataframe.py | 2 +-
.../connect/functions/__init__.py} | 18 +-
.../connect/{functions.py => functions/builtin.py} | 37 ++-
.../pyspark/sql/connect/functions/partitioning.py | 111 +++++++++
python/pyspark/sql/connect/group.py | 2 +-
python/pyspark/sql/connect/protobuf/functions.py | 2 +-
.../functions.py => sql/functions/__init__.py} | 18 +-
.../sql/{functions.py => functions/builtin.py} | 65 ++++--
python/pyspark/sql/functions/partitioning.py | 248 +++++++++++++++++++++
python/pyspark/sql/tests/test_functions.py | 1 +
python/pyspark/sql/tests/test_readwriter.py | 2 +-
python/pyspark/sql/utils.py | 17 +-
python/setup.py | 2 +
.../scala/org/apache/spark/sql/functions.scala | 85 +++++--
.../apache/spark/sql/DataFrameWriterV2Suite.scala | 12 +-
20 files changed, 602 insertions(+), 118 deletions(-)
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
index 1c8f5993d29e..700f71dc6e40 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
@@ -7550,7 +7550,7 @@ object functions {
* @group partition_transforms
* @since 3.4.0
*/
- def years(e: Column): Column = Column.fn("years", e)
+ def years(e: Column): Column = partitioning.years(e)
/**
* A transform for timestamps and dates to partition data into months.
@@ -7558,7 +7558,7 @@ object functions {
* @group partition_transforms
* @since 3.4.0
*/
- def months(e: Column): Column = Column.fn("months", e)
+ def months(e: Column): Column = partitioning.months(e)
/**
* A transform for timestamps and dates to partition data into days.
@@ -7566,7 +7566,7 @@ object functions {
* @group partition_transforms
* @since 3.4.0
*/
- def days(e: Column): Column = Column.fn("days", e)
+ def days(e: Column): Column = partitioning.days(e)
/**
* A transform for timestamps to partition data into hours.
@@ -7574,7 +7574,7 @@ object functions {
* @group partition_transforms
* @since 3.4.0
*/
- def hours(e: Column): Column = Column.fn("hours", e)
+ def hours(e: Column): Column = partitioning.hours(e)
/**
* Converts the timestamp without time zone `sourceTs` from the `sourceTz`
time zone to
@@ -7861,8 +7861,7 @@ object functions {
* @group partition_transforms
* @since 3.4.0
*/
- def bucket(numBuckets: Column, e: Column): Column =
- Column.fn("bucket", numBuckets, e)
+ def bucket(numBuckets: Column, e: Column): Column =
partitioning.bucket(numBuckets, e)
/**
* A transform for any type that partitions by a hash of the input column.
@@ -7870,8 +7869,7 @@ object functions {
* @group partition_transforms
* @since 3.4.0
*/
- def bucket(numBuckets: Int, e: Column): Column =
- Column.fn("bucket", lit(numBuckets), e)
+ def bucket(numBuckets: Int, e: Column): Column =
partitioning.bucket(numBuckets, e)
//////////////////////////////////////////////////////////////////////////////////////////////
// Predicates functions
@@ -8404,4 +8402,58 @@ object functions {
.addAllArguments(cols.map(_.expr).asJava)
}
+ // scalastyle:off
+ // TODO(SPARK-45970): Use @static annotation so Java can access to those
+ // API in the same way. Once we land this fix, should deprecate
+ // functions.hours, days, months, years and bucket.
+ object partitioning {
+ // scalastyle:on
+ /**
+ * A transform for timestamps and dates to partition data into years.
+ *
+ * @group partition_transforms
+ * @since 4.0.0
+ */
+ def years(e: Column): Column = Column.fn("years", e)
+
+ /**
+ * A transform for timestamps and dates to partition data into months.
+ *
+ * @group partition_transforms
+ * @since 4.0.0
+ */
+ def months(e: Column): Column = Column.fn("months", e)
+
+ /**
+ * A transform for timestamps and dates to partition data into days.
+ *
+ * @group partition_transforms
+ * @since 4.0.0
+ */
+ def days(e: Column): Column = Column.fn("days", e)
+
+ /**
+ * A transform for timestamps to partition data into hours.
+ *
+ * @group partition_transforms
+ * @since 4.0.0
+ */
+ def hours(e: Column): Column = Column.fn("hours", e)
+
+ /**
+ * A transform for any type that partitions by a hash of the input column.
+ *
+ * @group partition_transforms
+ * @since 4.0.0
+ */
+ def bucket(numBuckets: Column, e: Column): Column = Column.fn("bucket",
numBuckets, e)
+
+ /**
+ * A transform for any type that partitions by a hash of the input column.
+ *
+ * @group partition_transforms
+ * @since 4.0.0
+ */
+ def bucket(numBuckets: Int, e: Column): Column = Column.fn("bucket",
lit(numBuckets), e)
+ }
}
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index c8981c779ca2..89b2ff7976d9 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -471,7 +471,8 @@ pyspark_sql = Module(
"pyspark.sql.dataframe",
"pyspark.sql.datasource",
"pyspark.sql.group",
- "pyspark.sql.functions",
+ "pyspark.sql.functions.builtin",
+ "pyspark.sql.functions.partitioning",
"pyspark.sql.readwriter",
"pyspark.sql.streaming.query",
"pyspark.sql.streaming.readwriter",
@@ -859,7 +860,8 @@ pyspark_connect = Module(
"pyspark.sql.connect.column",
"pyspark.sql.connect.readwriter",
"pyspark.sql.connect.dataframe",
- "pyspark.sql.connect.functions",
+ "pyspark.sql.connect.functions.builtin",
+ "pyspark.sql.connect.functions.partitioning",
"pyspark.sql.connect.observation",
"pyspark.sql.connect.avro.functions",
"pyspark.sql.connect.protobuf.functions",
diff --git a/python/pyspark/ml/connect/functions.py
b/python/pyspark/ml/connect/functions.py
index c681bf5926b4..b305c04519ae 100644
--- a/python/pyspark/ml/connect/functions.py
+++ b/python/pyspark/ml/connect/functions.py
@@ -16,7 +16,7 @@
#
from pyspark.ml import functions as PyMLFunctions
from pyspark.sql.connect.column import Column
-from pyspark.sql.connect.functions import _invoke_function, _to_col, lit
+from pyspark.sql.connect.functions.builtin import _invoke_function, _to_col,
lit
def vector_to_array(col: Column, dtype: str = "float64") -> Column:
diff --git a/python/pyspark/pandas/spark/functions.py
b/python/pyspark/pandas/spark/functions.py
index 36ea007c4d7b..df8de08a27ee 100644
--- a/python/pyspark/pandas/spark/functions.py
+++ b/python/pyspark/pandas/spark/functions.py
@@ -24,7 +24,7 @@ from pyspark.sql.utils import is_remote
def product(col: Column, dropna: bool) -> Column:
if is_remote():
- from pyspark.sql.connect.functions import
_invoke_function_over_columns, lit
+ from pyspark.sql.connect.functions.builtin import
_invoke_function_over_columns, lit
return _invoke_function_over_columns( # type: ignore[return-value]
"pandas_product",
@@ -39,7 +39,7 @@ def product(col: Column, dropna: bool) -> Column:
def stddev(col: Column, ddof: int) -> Column:
if is_remote():
- from pyspark.sql.connect.functions import
_invoke_function_over_columns, lit
+ from pyspark.sql.connect.functions.builtin import
_invoke_function_over_columns, lit
return _invoke_function_over_columns( # type: ignore[return-value]
"pandas_stddev",
@@ -54,7 +54,7 @@ def stddev(col: Column, ddof: int) -> Column:
def var(col: Column, ddof: int) -> Column:
if is_remote():
- from pyspark.sql.connect.functions import
_invoke_function_over_columns, lit
+ from pyspark.sql.connect.functions.builtin import
_invoke_function_over_columns, lit
return _invoke_function_over_columns( # type: ignore[return-value]
"pandas_var",
@@ -69,7 +69,7 @@ def var(col: Column, ddof: int) -> Column:
def skew(col: Column) -> Column:
if is_remote():
- from pyspark.sql.connect.functions import _invoke_function_over_columns
+ from pyspark.sql.connect.functions.builtin import
_invoke_function_over_columns
return _invoke_function_over_columns( # type: ignore[return-value]
"pandas_skew",
@@ -83,7 +83,7 @@ def skew(col: Column) -> Column:
def kurt(col: Column) -> Column:
if is_remote():
- from pyspark.sql.connect.functions import _invoke_function_over_columns
+ from pyspark.sql.connect.functions.builtin import
_invoke_function_over_columns
return _invoke_function_over_columns( # type: ignore[return-value]
"pandas_kurt",
@@ -97,7 +97,7 @@ def kurt(col: Column) -> Column:
def mode(col: Column, dropna: bool) -> Column:
if is_remote():
- from pyspark.sql.connect.functions import
_invoke_function_over_columns, lit
+ from pyspark.sql.connect.functions.builtin import
_invoke_function_over_columns, lit
return _invoke_function_over_columns( # type: ignore[return-value]
"pandas_mode",
@@ -112,7 +112,7 @@ def mode(col: Column, dropna: bool) -> Column:
def covar(col1: Column, col2: Column, ddof: int) -> Column:
if is_remote():
- from pyspark.sql.connect.functions import
_invoke_function_over_columns, lit
+ from pyspark.sql.connect.functions.builtin import
_invoke_function_over_columns, lit
return _invoke_function_over_columns( # type: ignore[return-value]
"pandas_covar",
@@ -128,7 +128,7 @@ def covar(col1: Column, col2: Column, ddof: int) -> Column:
def ewm(col: Column, alpha: float, ignore_na: bool) -> Column:
if is_remote():
- from pyspark.sql.connect.functions import
_invoke_function_over_columns, lit
+ from pyspark.sql.connect.functions.builtin import
_invoke_function_over_columns, lit
return _invoke_function_over_columns( # type: ignore[return-value]
"ewm",
@@ -144,7 +144,7 @@ def ewm(col: Column, alpha: float, ignore_na: bool) ->
Column:
def null_index(col: Column) -> Column:
if is_remote():
- from pyspark.sql.connect.functions import _invoke_function_over_columns
+ from pyspark.sql.connect.functions.builtin import
_invoke_function_over_columns
return _invoke_function_over_columns( # type: ignore[return-value]
"null_index",
@@ -158,7 +158,7 @@ def null_index(col: Column) -> Column:
def timestampdiff(unit: str, start: Column, end: Column) -> Column:
if is_remote():
- from pyspark.sql.connect.functions import
_invoke_function_over_columns, lit
+ from pyspark.sql.connect.functions.builtin import
_invoke_function_over_columns, lit
return _invoke_function_over_columns( # type: ignore[return-value]
"timestampdiff",
diff --git a/python/pyspark/sql/connect/avro/functions.py
b/python/pyspark/sql/connect/avro/functions.py
index bf019ef8fe7d..1d28fd077b18 100644
--- a/python/pyspark/sql/connect/avro/functions.py
+++ b/python/pyspark/sql/connect/avro/functions.py
@@ -28,7 +28,7 @@ from typing import Dict, Optional, TYPE_CHECKING
from pyspark.sql.avro import functions as PyAvroFunctions
from pyspark.sql.connect.column import Column
-from pyspark.sql.connect.functions import _invoke_function, _to_col,
_options_to_col, lit
+from pyspark.sql.connect.functions.builtin import _invoke_function, _to_col,
_options_to_col, lit
if TYPE_CHECKING:
from pyspark.sql.connect._typing import ColumnOrName
diff --git a/python/pyspark/sql/connect/dataframe.py
b/python/pyspark/sql/connect/dataframe.py
index 75cecd5f6108..35e4882fb031 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -68,7 +68,7 @@ from pyspark.sql.connect.readwriter import DataFrameWriter,
DataFrameWriterV2
from pyspark.sql.connect.streaming.readwriter import DataStreamWriter
from pyspark.sql.connect.column import Column
from pyspark.sql.connect.expressions import UnresolvedRegex
-from pyspark.sql.connect.functions import (
+from pyspark.sql.connect.functions.builtin import (
_to_col_with_plan_id,
_to_col,
_invoke_function,
diff --git a/python/pyspark/ml/connect/functions.py
b/python/pyspark/sql/connect/functions/__init__.py
similarity index 58%
copy from python/pyspark/ml/connect/functions.py
copy to python/pyspark/sql/connect/functions/__init__.py
index c681bf5926b4..e0179d4d56cf 100644
--- a/python/pyspark/ml/connect/functions.py
+++ b/python/pyspark/sql/connect/functions/__init__.py
@@ -14,20 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-from pyspark.ml import functions as PyMLFunctions
-from pyspark.sql.connect.column import Column
-from pyspark.sql.connect.functions import _invoke_function, _to_col, lit
+"""PySpark Functions with Spark Connect"""
-def vector_to_array(col: Column, dtype: str = "float64") -> Column:
- return _invoke_function("vector_to_array", _to_col(col), lit(dtype))
-
-
-vector_to_array.__doc__ = PyMLFunctions.vector_to_array.__doc__
-
-
-def array_to_vector(col: Column) -> Column:
- return _invoke_function("array_to_vector", _to_col(col))
-
-
-array_to_vector.__doc__ = PyMLFunctions.array_to_vector.__doc__
+from pyspark.sql.connect.functions.builtin import * # noqa: F401,F403
+from pyspark.sql.connect.functions import partitioning # noqa: F401,F403
diff --git a/python/pyspark/sql/connect/functions.py
b/python/pyspark/sql/connect/functions/builtin.py
similarity index 99%
rename from python/pyspark/sql/connect/functions.py
rename to python/pyspark/sql/connect/functions/builtin.py
index 38eb814247c1..d1cfabb147ee 100644
--- a/python/pyspark/sql/connect/functions.py
+++ b/python/pyspark/sql/connect/functions/builtin.py
@@ -3406,48 +3406,45 @@ to_timestamp_ntz.__doc__ =
pysparkfuncs.to_timestamp_ntz.__doc__
def bucket(numBuckets: Union[Column, int], col: "ColumnOrName") -> Column:
- if isinstance(numBuckets, int):
- _numBuckets = lit(numBuckets)
- elif isinstance(numBuckets, Column):
- _numBuckets = numBuckets
- else:
- raise PySparkTypeError(
- error_class="NOT_COLUMN_OR_INT",
- message_parameters={
- "arg_name": "numBuckets",
- "arg_type": type(numBuckets).__name__,
- },
- )
+ from pyspark.sql.connect.functions import partitioning
- return _invoke_function("bucket", _numBuckets, _to_col(col))
+ return partitioning.bucket(numBuckets, col)
bucket.__doc__ = pysparkfuncs.bucket.__doc__
def years(col: "ColumnOrName") -> Column:
- return _invoke_function_over_columns("years", col)
+ from pyspark.sql.connect.functions import partitioning
+
+ return partitioning.years(col)
years.__doc__ = pysparkfuncs.years.__doc__
def months(col: "ColumnOrName") -> Column:
- return _invoke_function_over_columns("months", col)
+ from pyspark.sql.connect.functions import partitioning
+
+ return partitioning.months(col)
months.__doc__ = pysparkfuncs.months.__doc__
def days(col: "ColumnOrName") -> Column:
- return _invoke_function_over_columns("days", col)
+ from pyspark.sql.connect.functions import partitioning
+
+ return partitioning.days(col)
days.__doc__ = pysparkfuncs.days.__doc__
def hours(col: "ColumnOrName") -> Column:
- return _invoke_function_over_columns("hours", col)
+ from pyspark.sql.connect.functions import partitioning
+
+ return partitioning.hours(col)
hours.__doc__ = pysparkfuncs.hours.__doc__
@@ -3994,9 +3991,9 @@ def _test() -> None:
import sys
import doctest
from pyspark.sql import SparkSession as PySparkSession
- import pyspark.sql.connect.functions
+ import pyspark.sql.connect.functions.builtin
- globs = pyspark.sql.connect.functions.__dict__.copy()
+ globs = pyspark.sql.connect.functions.builtin.__dict__.copy()
globs["spark"] = (
PySparkSession.builder.appName("sql.connect.functions tests")
@@ -4005,7 +4002,7 @@ def _test() -> None:
)
(failure_count, test_count) = doctest.testmod(
- pyspark.sql.connect.functions,
+ pyspark.sql.connect.functions.builtin,
globs=globs,
optionflags=doctest.ELLIPSIS
| doctest.NORMALIZE_WHITESPACE
diff --git a/python/pyspark/sql/connect/functions/partitioning.py
b/python/pyspark/sql/connect/functions/partitioning.py
new file mode 100644
index 000000000000..ef319cad2e72
--- /dev/null
+++ b/python/pyspark/sql/connect/functions/partitioning.py
@@ -0,0 +1,111 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from pyspark.sql.connect.utils import check_dependencies
+
+check_dependencies(__name__)
+
+from typing import Union, TYPE_CHECKING
+
+from pyspark.errors import PySparkTypeError
+from pyspark.sql import functions as pysparkfuncs
+from pyspark.sql.connect.column import Column
+from pyspark.sql.connect.functions.builtin import _to_col,
_invoke_function_over_columns
+from pyspark.sql.connect.functions.builtin import lit, _invoke_function
+
+
+if TYPE_CHECKING:
+ from pyspark.sql.connect._typing import ColumnOrName
+
+
+def bucket(numBuckets: Union[Column, int], col: "ColumnOrName") -> Column:
+ if isinstance(numBuckets, int):
+ _numBuckets = lit(numBuckets)
+ elif isinstance(numBuckets, Column):
+ _numBuckets = numBuckets
+ else:
+ raise PySparkTypeError(
+ error_class="NOT_COLUMN_OR_INT",
+ message_parameters={
+ "arg_name": "numBuckets",
+ "arg_type": type(numBuckets).__name__,
+ },
+ )
+
+ return _invoke_function("bucket", _numBuckets, _to_col(col))
+
+
+bucket.__doc__ = pysparkfuncs.partitioning.bucket.__doc__
+
+
+def years(col: "ColumnOrName") -> Column:
+ return _invoke_function_over_columns("years", col)
+
+
+years.__doc__ = pysparkfuncs.partitioning.years.__doc__
+
+
+def months(col: "ColumnOrName") -> Column:
+ return _invoke_function_over_columns("months", col)
+
+
+months.__doc__ = pysparkfuncs.partitioning.months.__doc__
+
+
+def days(col: "ColumnOrName") -> Column:
+ return _invoke_function_over_columns("days", col)
+
+
+days.__doc__ = pysparkfuncs.partitioning.days.__doc__
+
+
+def hours(col: "ColumnOrName") -> Column:
+ return _invoke_function_over_columns("hours", col)
+
+
+hours.__doc__ = pysparkfuncs.partitioning.hours.__doc__
+
+
+def _test() -> None:
+ import sys
+ import doctest
+ from pyspark.sql import SparkSession as PySparkSession
+ import pyspark.sql.connect.functions.partitioning
+
+ globs = pyspark.sql.connect.functions.partitioning.__dict__.copy()
+
+ globs["spark"] = (
+ PySparkSession.builder.appName("sql.connect.functions tests")
+ .remote("local[4]")
+ .getOrCreate()
+ )
+
+ (failure_count, test_count) = doctest.testmod(
+ pyspark.sql.connect.functions.partitioning,
+ globs=globs,
+ optionflags=doctest.ELLIPSIS
+ | doctest.NORMALIZE_WHITESPACE
+ | doctest.IGNORE_EXCEPTION_DETAIL,
+ )
+
+ globs["spark"].stop()
+
+ if failure_count:
+ sys.exit(-1)
+
+
+if __name__ == "__main__":
+ _test()
diff --git a/python/pyspark/sql/connect/group.py
b/python/pyspark/sql/connect/group.py
index 9b7dce360b2a..7b71a43c1121 100644
--- a/python/pyspark/sql/connect/group.py
+++ b/python/pyspark/sql/connect/group.py
@@ -40,7 +40,7 @@ from pyspark.sql.types import StructType
import pyspark.sql.connect.plan as plan
from pyspark.sql.connect.column import Column
-from pyspark.sql.connect.functions import _invoke_function, col, lit
+from pyspark.sql.connect.functions.builtin import _invoke_function, col, lit
from pyspark.errors import PySparkNotImplementedError, PySparkTypeError
if TYPE_CHECKING:
diff --git a/python/pyspark/sql/connect/protobuf/functions.py
b/python/pyspark/sql/connect/protobuf/functions.py
index 56119f4bc4eb..8bcc2218f06c 100644
--- a/python/pyspark/sql/connect/protobuf/functions.py
+++ b/python/pyspark/sql/connect/protobuf/functions.py
@@ -28,7 +28,7 @@ from typing import Dict, Optional, TYPE_CHECKING
from pyspark.sql.protobuf import functions as PyProtobufFunctions
from pyspark.sql.connect.column import Column
-from pyspark.sql.connect.functions import _invoke_function, _to_col,
_options_to_col, lit
+from pyspark.sql.connect.functions.builtin import _invoke_function, _to_col,
_options_to_col, lit
if TYPE_CHECKING:
from pyspark.sql.connect._typing import ColumnOrName
diff --git a/python/pyspark/ml/connect/functions.py
b/python/pyspark/sql/functions/__init__.py
similarity index 58%
copy from python/pyspark/ml/connect/functions.py
copy to python/pyspark/sql/functions/__init__.py
index c681bf5926b4..dd09c4aa5c77 100644
--- a/python/pyspark/ml/connect/functions.py
+++ b/python/pyspark/sql/functions/__init__.py
@@ -14,20 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-from pyspark.ml import functions as PyMLFunctions
-from pyspark.sql.connect.column import Column
-from pyspark.sql.connect.functions import _invoke_function, _to_col, lit
+"""PySpark Functions"""
-def vector_to_array(col: Column, dtype: str = "float64") -> Column:
- return _invoke_function("vector_to_array", _to_col(col), lit(dtype))
-
-
-vector_to_array.__doc__ = PyMLFunctions.vector_to_array.__doc__
-
-
-def array_to_vector(col: Column) -> Column:
- return _invoke_function("array_to_vector", _to_col(col))
-
-
-array_to_vector.__doc__ = PyMLFunctions.array_to_vector.__doc__
+from pyspark.sql.functions.builtin import * # noqa: F401,F403
+from pyspark.sql.functions import partitioning # noqa: F401,F403
diff --git a/python/pyspark/sql/functions.py
b/python/pyspark/sql/functions/builtin.py
similarity index 99%
rename from python/pyspark/sql/functions.py
rename to python/pyspark/sql/functions/builtin.py
index 655806e83778..e23c38be4a0a 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions/builtin.py
@@ -15467,6 +15467,9 @@ def years(col: "ColumnOrName") -> Column:
.. versionchanged:: 3.4.0
Supports Spark Connect.
+ .. deprecated:: 4.0.0
+ Use :func:`partitioning.years` instead.
+
Parameters
----------
col : :class:`~pyspark.sql.Column` or str
@@ -15490,7 +15493,11 @@ def years(col: "ColumnOrName") -> Column:
method of the `DataFrameWriterV2`.
"""
- return _invoke_function_over_columns("years", col)
+ from pyspark.sql.functions import partitioning
+
+ warnings.warn("Deprecated in 4.0.0, use partitioning.years instead.",
FutureWarning)
+
+ return partitioning.years(col)
@_try_remote_functions
@@ -15504,6 +15511,9 @@ def months(col: "ColumnOrName") -> Column:
.. versionchanged:: 3.4.0
Supports Spark Connect.
+ .. deprecated:: 4.0.0
+ Use :func:`partitioning.months` instead.
+
Parameters
----------
col : :class:`~pyspark.sql.Column` or str
@@ -15527,7 +15537,11 @@ def months(col: "ColumnOrName") -> Column:
method of the `DataFrameWriterV2`.
"""
- return _invoke_function_over_columns("months", col)
+ from pyspark.sql.functions import partitioning
+
+ warnings.warn("Deprecated in 4.0.0, use partitioning.months instead.",
FutureWarning)
+
+ return partitioning.months(col)
@_try_remote_functions
@@ -15541,6 +15555,9 @@ def days(col: "ColumnOrName") -> Column:
.. versionchanged:: 3.4.0
Supports Spark Connect.
+ .. deprecated:: 4.0.0
+ Use :func:`partitioning.months` instead.
+
Parameters
----------
col : :class:`~pyspark.sql.Column` or str
@@ -15564,7 +15581,11 @@ def days(col: "ColumnOrName") -> Column:
method of the `DataFrameWriterV2`.
"""
- return _invoke_function_over_columns("days", col)
+ from pyspark.sql.functions import partitioning
+
+ warnings.warn("Deprecated in 4.0.0, use partitioning.days instead.",
FutureWarning)
+
+ return partitioning.days(col)
@_try_remote_functions
@@ -15578,6 +15599,9 @@ def hours(col: "ColumnOrName") -> Column:
.. versionchanged:: 3.4.0
Supports Spark Connect.
+ .. deprecated:: 4.0.0
+ Use :func:`partitioning.hours` instead.
+
Parameters
----------
col : :class:`~pyspark.sql.Column` or str
@@ -15601,7 +15625,11 @@ def hours(col: "ColumnOrName") -> Column:
method of the `DataFrameWriterV2`.
"""
- return _invoke_function_over_columns("hours", col)
+ from pyspark.sql.functions import partitioning
+
+ warnings.warn("Deprecated in 4.0.0, use partitioning.hours instead.",
FutureWarning)
+
+ return partitioning.hours(col)
@_try_remote_functions
@@ -16081,6 +16109,9 @@ def bucket(numBuckets: Union[Column, int], col:
"ColumnOrName") -> Column:
.. versionchanged:: 3.4.0
Supports Spark Connect.
+ .. deprecated:: 4.0.0
+ Use :func:`partitioning.bucket` instead.
+
Examples
--------
>>> df.writeTo("catalog.db.table").partitionedBy( # doctest: +SKIP
@@ -16104,19 +16135,11 @@ def bucket(numBuckets: Union[Column, int], col:
"ColumnOrName") -> Column:
method of the `DataFrameWriterV2`.
"""
- if not isinstance(numBuckets, (int, Column)):
- raise PySparkTypeError(
- error_class="NOT_COLUMN_OR_INT",
- message_parameters={"arg_name": "numBuckets", "arg_type":
type(numBuckets).__name__},
- )
+ from pyspark.sql.functions import partitioning
- _get_active_spark_context()
- numBuckets = (
- _create_column_from_literal(numBuckets)
- if isinstance(numBuckets, int)
- else _to_java_column(numBuckets)
- )
- return _invoke_function("bucket", numBuckets, _to_java_column(col))
+ warnings.warn("Deprecated in 4.0.0, use partitioning.bucket instead.",
FutureWarning)
+
+ return partitioning.bucket(numBuckets, col)
@_try_remote_functions
@@ -17474,15 +17497,17 @@ def udtf(
def _test() -> None:
import doctest
from pyspark.sql import SparkSession
- import pyspark.sql.functions
+ import pyspark.sql.functions.builtin
- globs = pyspark.sql.functions.__dict__.copy()
- spark = SparkSession.builder.master("local[4]").appName("sql.functions
tests").getOrCreate()
+ globs = pyspark.sql.functions.builtin.__dict__.copy()
+ spark = (
+ SparkSession.builder.master("local[4]").appName("sql.functions.builtin
tests").getOrCreate()
+ )
sc = spark.sparkContext
globs["sc"] = sc
globs["spark"] = spark
(failure_count, test_count) = doctest.testmod(
- pyspark.sql.functions,
+ pyspark.sql.functions.builtin,
globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE,
)
diff --git a/python/pyspark/sql/functions/partitioning.py
b/python/pyspark/sql/functions/partitioning.py
new file mode 100644
index 000000000000..59c293577b08
--- /dev/null
+++ b/python/pyspark/sql/functions/partitioning.py
@@ -0,0 +1,248 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A collections of partitioning functions
+"""
+
+import sys
+from typing import (
+ TYPE_CHECKING,
+ Union,
+)
+
+from pyspark.errors import PySparkTypeError
+from pyspark.sql.column import Column, _to_java_column,
_create_column_from_literal
+from pyspark.sql.functions.builtin import _invoke_function_over_columns,
_invoke_function
+from pyspark.sql.utils import (
+ try_partitioning_remote_functions as _try_partitioning_remote_functions,
+ get_active_spark_context as _get_active_spark_context,
+)
+
+if TYPE_CHECKING:
+ from pyspark.sql._typing import ColumnOrName
+
+
+@_try_partitioning_remote_functions
+def years(col: "ColumnOrName") -> Column:
+ """
+ Partition transform function: A transform for timestamps and dates
+ to partition data into years.
+
+ .. versionadded:: 4.0.0
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or str
+ target date or timestamp column to work on.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ data partitioned by years.
+
+ Examples
+ --------
+ >>> df.writeTo("catalog.db.table").partitionedBy( # doctest: +SKIP
+ ... partitioning.years("ts")
+ ... ).createOrReplace()
+
+ Notes
+ -----
+ This function can be used only in combination with
+ :py:meth:`~pyspark.sql.readwriter.DataFrameWriterV2.partitionedBy`
+ method of the `DataFrameWriterV2`.
+
+ """
+ return _invoke_function_over_columns("years", col)
+
+
+@_try_partitioning_remote_functions
+def months(col: "ColumnOrName") -> Column:
+ """
+ Partition transform function: A transform for timestamps and dates
+ to partition data into months.
+
+ .. versionadded:: 4.0.0
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or str
+ target date or timestamp column to work on.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ data partitioned by months.
+
+ Examples
+ --------
+ >>> df.writeTo("catalog.db.table").partitionedBy(
+ ... partitioning.months("ts")
+ ... ).createOrReplace() # doctest: +SKIP
+
+ Notes
+ -----
+ This function can be used only in combination with
+ :py:meth:`~pyspark.sql.readwriter.DataFrameWriterV2.partitionedBy`
+ method of the `DataFrameWriterV2`.
+
+ """
+ return _invoke_function_over_columns("months", col)
+
+
+@_try_partitioning_remote_functions
+def days(col: "ColumnOrName") -> Column:
+ """
+ Partition transform function: A transform for timestamps and dates
+ to partition data into days.
+
+ .. versionadded:: 4.0.0
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or str
+ target date or timestamp column to work on.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ data partitioned by days.
+
+ Examples
+ --------
+ >>> df.writeTo("catalog.db.table").partitionedBy( # doctest: +SKIP
+ ... partitioning.days("ts")
+ ... ).createOrReplace()
+
+ Notes
+ -----
+ This function can be used only in combination with
+ :py:meth:`~pyspark.sql.readwriter.DataFrameWriterV2.partitionedBy`
+ method of the `DataFrameWriterV2`.
+
+ """
+ return _invoke_function_over_columns("days", col)
+
+
+@_try_partitioning_remote_functions
+def hours(col: "ColumnOrName") -> Column:
+ """
+ Partition transform function: A transform for timestamps
+ to partition data into hours.
+
+ .. versionadded:: 4.0.0
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or str
+ target date or timestamp column to work on.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ data partitioned by hours.
+
+ Examples
+ --------
+ >>> df.writeTo("catalog.db.table").partitionedBy( # doctest: +SKIP
+ ... partitioning.hours("ts")
+ ... ).createOrReplace()
+
+ Notes
+ -----
+ This function can be used only in combination with
+ :py:meth:`~pyspark.sql.readwriter.DataFrameWriterV2.partitionedBy`
+ method of the `DataFrameWriterV2`.
+
+ """
+ return _invoke_function_over_columns("hours", col)
+
+
+@_try_partitioning_remote_functions
+def bucket(numBuckets: Union[Column, int], col: "ColumnOrName") -> Column:
+ """
+ Partition transform function: A transform for any type that partitions
+ by a hash of the input column.
+
+ .. versionadded:: 4.0.0
+
+ Examples
+ --------
+ >>> df.writeTo("catalog.db.table").partitionedBy( # doctest: +SKIP
+ ... partitioning.bucket(42, "ts")
+ ... ).createOrReplace()
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or str
+ target date or timestamp column to work on.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ data partitioned by given columns.
+
+ Notes
+ -----
+ This function can be used only in combination with
+ :py:meth:`~pyspark.sql.readwriter.DataFrameWriterV2.partitionedBy`
+ method of the `DataFrameWriterV2`.
+
+ """
+ if not isinstance(numBuckets, (int, Column)):
+ raise PySparkTypeError(
+ error_class="NOT_COLUMN_OR_INT",
+ message_parameters={
+ "arg_name": "numBuckets",
+ "arg_type": type(numBuckets).__name__,
+ },
+ )
+
+ _get_active_spark_context()
+ numBuckets = (
+ _create_column_from_literal(numBuckets)
+ if isinstance(numBuckets, int)
+ else _to_java_column(numBuckets)
+ )
+ return _invoke_function("bucket", numBuckets, _to_java_column(col))
+
+
+def _test() -> None:
+ import doctest
+ from pyspark.sql import SparkSession
+ import pyspark.sql.functions.partitioning
+
+ globs = pyspark.sql.functions.partitioning.__dict__.copy()
+ spark = (
+ SparkSession.builder.master("local[4]")
+ .appName("sql.functions.partitioning tests")
+ .getOrCreate()
+ )
+ globs["spark"] = spark
+ (failure_count, test_count) = doctest.testmod(
+ pyspark.sql.functions.partitioning,
+ globs=globs,
+ optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE,
+ )
+ spark.stop()
+ if failure_count:
+ sys.exit(-1)
+
+
+if __name__ == "__main__":
+ _test()
diff --git a/python/pyspark/sql/tests/test_functions.py
b/python/pyspark/sql/tests/test_functions.py
index 9753ba5c532a..b04127b3d5c1 100644
--- a/python/pyspark/sql/tests/test_functions.py
+++ b/python/pyspark/sql/tests/test_functions.py
@@ -67,6 +67,7 @@ class FunctionsTestsMixin:
"uuid", # namespace conflict with python built-in module
"chr", # namespace conflict with python built-in function
"session_user", # Scala only for now, needs implementation
+ "partitioning$", # partitioning expressions for DSv2
]
jvm_fn_set.difference_update(jvm_excluded_fn)
diff --git a/python/pyspark/sql/tests/test_readwriter.py
b/python/pyspark/sql/tests/test_readwriter.py
index 6bcef51732f8..70a320fc53b6 100644
--- a/python/pyspark/sql/tests/test_readwriter.py
+++ b/python/pyspark/sql/tests/test_readwriter.py
@@ -202,7 +202,7 @@ class ReadwriterV2TestsMixin:
def check_partitioning_functions(self, tpe):
import datetime
- from pyspark.sql.functions import years, months, days, hours, bucket
+ from pyspark.sql.functions.partitioning import years, months, days,
hours, bucket
df = self.spark.createDataFrame(
[(1, datetime.datetime(2000, 1, 1), "foo")], ("id", "ts", "value")
diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py
index b366c6c8bd8d..9f817341724a 100644
--- a/python/pyspark/sql/utils.py
+++ b/python/pyspark/sql/utils.py
@@ -50,7 +50,7 @@ if TYPE_CHECKING:
from pyspark.sql.window import Window
from pyspark.pandas._typing import IndexOpsLike, SeriesOrIndex
-has_numpy = False
+has_numpy: bool = False
try:
import numpy as np # noqa: F401
@@ -194,6 +194,21 @@ def try_remote_functions(f: FuncT) -> FuncT:
return cast(FuncT, wrapped)
+def try_partitioning_remote_functions(f: FuncT) -> FuncT:
+ """Mark API supported from Spark Connect."""
+
+ @functools.wraps(f)
+ def wrapped(*args: Any, **kwargs: Any) -> Any:
+ if is_remote() and "PYSPARK_NO_NAMESPACE_SHARE" not in os.environ:
+ from pyspark.sql.connect.functions import partitioning
+
+ return getattr(partitioning, f.__name__)(*args, **kwargs)
+ else:
+ return f(*args, **kwargs)
+
+ return cast(FuncT, wrapped)
+
+
def try_remote_avro_functions(f: FuncT) -> FuncT:
"""Mark API supported from Spark Connect."""
diff --git a/python/setup.py b/python/setup.py
index 7e5e4ebd8510..c75ab46931ba 100755
--- a/python/setup.py
+++ b/python/setup.py
@@ -249,9 +249,11 @@ try:
"pyspark.sql.connect",
"pyspark.sql.connect.avro",
"pyspark.sql.connect.client",
+ "pyspark.sql.connect.functions",
"pyspark.sql.connect.proto",
"pyspark.sql.connect.streaming",
"pyspark.sql.connect.streaming.worker",
+ "pyspark.sql.functions",
"pyspark.sql.pandas",
"pyspark.sql.protobuf",
"pyspark.sql.streaming",
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index a42df5bbcc29..2576c9b08313 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -7268,7 +7268,7 @@ object functions {
* @group partition_transforms
* @since 3.0.0
*/
- def years(e: Column): Column = withExpr { Years(e.expr) }
+ def years(e: Column): Column = partitioning.years(e)
/**
* A transform for timestamps and dates to partition data into months.
@@ -7276,7 +7276,7 @@ object functions {
* @group partition_transforms
* @since 3.0.0
*/
- def months(e: Column): Column = withExpr { Months(e.expr) }
+ def months(e: Column): Column = partitioning.months(e)
/**
* A transform for timestamps and dates to partition data into days.
@@ -7284,7 +7284,7 @@ object functions {
* @group partition_transforms
* @since 3.0.0
*/
- def days(e: Column): Column = withExpr { Days(e.expr) }
+ def days(e: Column): Column = partitioning.days(e)
/**
* Returns a string array of values within the nodes of xml that match the
XPath expression.
@@ -7379,7 +7379,7 @@ object functions {
* @group partition_transforms
* @since 3.0.0
*/
- def hours(e: Column): Column = withExpr { Hours(e.expr) }
+ def hours(e: Column): Column = partitioning.hours(e)
/**
* Converts the timestamp without time zone `sourceTs`
@@ -7662,14 +7662,7 @@ object functions {
* @group partition_transforms
* @since 3.0.0
*/
- def bucket(numBuckets: Column, e: Column): Column = withExpr {
- numBuckets.expr match {
- case lit @ Literal(_, IntegerType) =>
- Bucket(lit, e.expr)
- case _ =>
- throw
QueryCompilationErrors.invalidBucketsNumberError(numBuckets.toString,
e.toString)
- }
- }
+ def bucket(numBuckets: Column, e: Column): Column =
partitioning.bucket(numBuckets, e)
/**
* A transform for any type that partitions by a hash of the input column.
@@ -7677,9 +7670,7 @@ object functions {
* @group partition_transforms
* @since 3.0.0
*/
- def bucket(numBuckets: Int, e: Column): Column = withExpr {
- Bucket(Literal(numBuckets), e.expr)
- }
+ def bucket(numBuckets: Int, e: Column): Column =
partitioning.bucket(numBuckets, e)
//////////////////////////////////////////////////////////////////////////////////////////////
// Predicates functions
@@ -8289,4 +8280,68 @@ object functions {
def unwrap_udt(column: Column): Column = withExpr {
UnwrapUDT(column.expr)
}
+
+ // scalastyle:off
+ // TODO(SPARK-45970): Use @static annotation so Java can access to those
+ // API in the same way. Once we land this fix, should deprecate
+ // functions.hours, days, months, years and bucket.
+ object partitioning {
+ // scalastyle:on
+ /**
+ * A transform for timestamps and dates to partition data into years.
+ *
+ * @group partition_transforms
+ * @since 4.0.0
+ */
+ def years(e: Column): Column = withExpr { Years(e.expr) }
+
+ /**
+ * A transform for timestamps and dates to partition data into months.
+ *
+ * @group partition_transforms
+ * @since 4.0.0
+ */
+ def months(e: Column): Column = withExpr { Months(e.expr) }
+
+ /**
+ * A transform for timestamps and dates to partition data into days.
+ *
+ * @group partition_transforms
+ * @since 4.0.0
+ */
+ def days(e: Column): Column = withExpr { Days(e.expr) }
+
+ /**
+ * A transform for timestamps to partition data into hours.
+ *
+ * @group partition_transforms
+ * @since 4.0.0
+ */
+ def hours(e: Column): Column = withExpr { Hours(e.expr) }
+
+ /**
+ * A transform for any type that partitions by a hash of the input column.
+ *
+ * @group partition_transforms
+ * @since 4.0.0
+ */
+ def bucket(numBuckets: Column, e: Column): Column = withExpr {
+ numBuckets.expr match {
+ case lit @ Literal(_, IntegerType) =>
+ Bucket(lit, e.expr)
+ case _ =>
+ throw
QueryCompilationErrors.invalidBucketsNumberError(numBuckets.toString,
e.toString)
+ }
+ }
+
+ /**
+ * A transform for any type that partitions by a hash of the input column.
+ *
+ * @group partition_transforms
+ * @since 4.0.0
+ */
+ def bucket(numBuckets: Int, e: Column): Column = withExpr {
+ Bucket(Literal(numBuckets), e.expr)
+ }
+ }
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala
index 860d6fce604b..44d47abc93fa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala
@@ -463,7 +463,7 @@ class DataFrameWriterV2Suite extends QueryTest with
SharedSparkSession with Befo
spark.table("source")
.withColumn("ts", lit("2019-06-01 10:00:00.000000").cast("timestamp"))
.writeTo("testcat.table_name")
- .partitionedBy(years($"ts"))
+ .partitionedBy(partitioning.years($"ts"))
.create()
val table = catalog("testcat").loadTable(Identifier.of(Array(),
"table_name"))
@@ -476,7 +476,7 @@ class DataFrameWriterV2Suite extends QueryTest with
SharedSparkSession with Befo
spark.table("source")
.withColumn("ts", lit("2019-06-01 10:00:00.000000").cast("timestamp"))
.writeTo("testcat.table_name")
- .partitionedBy(months($"ts"))
+ .partitionedBy(partitioning.months($"ts"))
.create()
val table = catalog("testcat").loadTable(Identifier.of(Array(),
"table_name"))
@@ -489,7 +489,7 @@ class DataFrameWriterV2Suite extends QueryTest with
SharedSparkSession with Befo
spark.table("source")
.withColumn("ts", lit("2019-06-01 10:00:00.000000").cast("timestamp"))
.writeTo("testcat.table_name")
- .partitionedBy(days($"ts"))
+ .partitionedBy(partitioning.days($"ts"))
.create()
val table = catalog("testcat").loadTable(Identifier.of(Array(),
"table_name"))
@@ -502,7 +502,7 @@ class DataFrameWriterV2Suite extends QueryTest with
SharedSparkSession with Befo
spark.table("source")
.withColumn("ts", lit("2019-06-01 10:00:00.000000").cast("timestamp"))
.writeTo("testcat.table_name")
- .partitionedBy(hours($"ts"))
+ .partitionedBy(partitioning.hours($"ts"))
.create()
val table = catalog("testcat").loadTable(Identifier.of(Array(),
"table_name"))
@@ -514,7 +514,7 @@ class DataFrameWriterV2Suite extends QueryTest with
SharedSparkSession with Befo
test("Create: partitioned by bucket(4, id)") {
spark.table("source")
.writeTo("testcat.table_name")
- .partitionedBy(bucket(4, $"id"))
+ .partitionedBy(partitioning.bucket(4, $"id"))
.create()
val table = catalog("testcat").loadTable(Identifier.of(Array(),
"table_name"))
@@ -764,7 +764,7 @@ class DataFrameWriterV2Suite extends QueryTest with
SharedSparkSession with Befo
lit("2019-09-02 07:00:00.000000").cast("timestamp") as "modified",
lit("America/Los_Angeles") as "timezone"))
.writeTo("testcat.table_name")
- .partitionedBy(bucket(4, $"ts.timezone"))
+ .partitionedBy(partitioning.bucket(4, $"ts.timezone"))
.create()
val table = catalog("testcat").loadTable(Identifier.of(Array(),
"table_name"))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]