This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 110c0575448 [SPARK-41503][CONNECT][PYTHON] Implement Partition
Transformation Functions
110c0575448 is described below
commit 110c0575448c63c1d40e670e8e27b7ee6fb74907
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Sat Dec 24 13:11:12 2022 -0800
[SPARK-41503][CONNECT][PYTHON] Implement Partition Transformation Functions
### What changes were proposed in this pull request?
Implement [Partition Transformation
Functions](https://github.com/apache/spark/blob/master/python/docs/source/reference/pyspark.sql/functions.rst#partition-transformation-functions)
### Why are the changes needed?
for API coverage
### Does this PR introduce _any_ user-facing change?
yes
### How was this patch tested?
existing UT
Closes #39203 from zhengruifeng/connect_function_transform_partition.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../sql/connect/planner/SparkConnectPlanner.scala | 21 +++++++++++
python/pyspark/sql/connect/functions.py | 44 ++++++++++++++++++++++
2 files changed, 65 insertions(+)
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index dce3a8c8e55..cb8d30b180c 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -646,6 +646,27 @@ class SparkConnectPlanner(session: SparkSession) {
}
Some(NthValue(children(0), children(1), ignoreNulls))
+ case "bucket" if fun.getArgumentsCount == 2 =>
+ val children =
fun.getArgumentsList.asScala.toSeq.map(transformExpression)
+ (children.head, children.last) match {
+ case (numBuckets: Literal, child) if numBuckets.dataType ==
IntegerType =>
+ Some(Bucket(numBuckets, child))
+ case (other, _) =>
+ throw InvalidPlanInput(s"numBuckets should be a literal integer,
but got $other")
+ }
+
+ case "years" if fun.getArgumentsCount == 1 =>
+ Some(Years(transformExpression(fun.getArguments(0))))
+
+ case "months" if fun.getArgumentsCount == 1 =>
+ Some(Months(transformExpression(fun.getArguments(0))))
+
+ case "days" if fun.getArgumentsCount == 1 =>
+ Some(Days(transformExpression(fun.getArguments(0))))
+
+ case "hours" if fun.getArgumentsCount == 1 =>
+ Some(Hours(transformExpression(fun.getArguments(0))))
+
case _ => None
}
}
diff --git a/python/pyspark/sql/connect/functions.py
b/python/pyspark/sql/connect/functions.py
index c2b1c7d61c8..407e7536f03 100644
--- a/python/pyspark/sql/connect/functions.py
+++ b/python/pyspark/sql/connect/functions.py
@@ -2070,6 +2070,50 @@ def timestamp_seconds(col: "ColumnOrName") -> Column:
timestamp_seconds.__doc__ = pysparkfuncs.timestamp_seconds.__doc__
+# Partition Transformation Functions
+
+
+def bucket(numBuckets: Union[Column, int], col: "ColumnOrName") -> Column:
+ if isinstance(numBuckets, int):
+ _numBuckets = lit(numBuckets)
+ elif isinstance(numBuckets, Column):
+ _numBuckets = numBuckets
+ else:
+ raise TypeError("numBuckets should be a Column or an int, got
{}".format(type(numBuckets)))
+
+ return _invoke_function("bucket", _numBuckets, _to_col(col))
+
+
+bucket.__doc__ = pysparkfuncs.bucket.__doc__
+
+
+def years(col: "ColumnOrName") -> Column:
+ return _invoke_function_over_columns("years", col)
+
+
+years.__doc__ = pysparkfuncs.years.__doc__
+
+
+def months(col: "ColumnOrName") -> Column:
+ return _invoke_function_over_columns("months", col)
+
+
+months.__doc__ = pysparkfuncs.months.__doc__
+
+
+def days(col: "ColumnOrName") -> Column:
+ return _invoke_function_over_columns("days", col)
+
+
+days.__doc__ = pysparkfuncs.days.__doc__
+
+
+def hours(col: "ColumnOrName") -> Column:
+ return _invoke_function_over_columns("hours", col)
+
+
+hours.__doc__ = pysparkfuncs.hours.__doc__
+
# Misc Functions
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]