This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 110c0575448 [SPARK-41503][CONNECT][PYTHON] Implement Partition Transformation Functions 110c0575448 is described below commit 110c0575448c63c1d40e670e8e27b7ee6fb74907 Author: Ruifeng Zheng <ruife...@apache.org> AuthorDate: Sat Dec 24 13:11:12 2022 -0800 [SPARK-41503][CONNECT][PYTHON] Implement Partition Transformation Functions ### What changes were proposed in this pull request? Implement [Partition Transformation Functions](https://github.com/apache/spark/blob/master/python/docs/source/reference/pyspark.sql/functions.rst#partition-transformation-functions) ### Why are the changes needed? for API coverage ### Does this PR introduce _any_ user-facing change? yes ### How was this patch tested? existing UT Closes #39203 from zhengruifeng/connect_function_transform_partition. Authored-by: Ruifeng Zheng <ruife...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../sql/connect/planner/SparkConnectPlanner.scala | 21 +++++++++++ python/pyspark/sql/connect/functions.py | 44 ++++++++++++++++++++++ 2 files changed, 65 insertions(+) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index dce3a8c8e55..cb8d30b180c 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -646,6 +646,27 @@ class SparkConnectPlanner(session: SparkSession) { } Some(NthValue(children(0), children(1), ignoreNulls)) + case "bucket" if fun.getArgumentsCount == 2 => + val children = fun.getArgumentsList.asScala.toSeq.map(transformExpression) + (children.head, children.last) match { + case (numBuckets: Literal, child) if numBuckets.dataType == IntegerType => + Some(Bucket(numBuckets, child)) + case (other, _) => + throw InvalidPlanInput(s"numBuckets should be a literal integer, but got $other") + } + + case "years" if fun.getArgumentsCount == 1 => + Some(Years(transformExpression(fun.getArguments(0)))) + + case "months" if fun.getArgumentsCount == 1 => + Some(Months(transformExpression(fun.getArguments(0)))) + + case "days" if fun.getArgumentsCount == 1 => + Some(Days(transformExpression(fun.getArguments(0)))) + + case "hours" if fun.getArgumentsCount == 1 => + Some(Hours(transformExpression(fun.getArguments(0)))) + case _ => None } } diff --git a/python/pyspark/sql/connect/functions.py b/python/pyspark/sql/connect/functions.py index c2b1c7d61c8..407e7536f03 100644 --- a/python/pyspark/sql/connect/functions.py +++ b/python/pyspark/sql/connect/functions.py @@ -2070,6 +2070,50 @@ def timestamp_seconds(col: "ColumnOrName") -> Column: timestamp_seconds.__doc__ = pysparkfuncs.timestamp_seconds.__doc__ +# Partition Transformation Functions + + +def bucket(numBuckets: Union[Column, int], col: "ColumnOrName") -> Column: + if isinstance(numBuckets, int): + _numBuckets = lit(numBuckets) + elif isinstance(numBuckets, Column): + _numBuckets = numBuckets + else: + raise TypeError("numBuckets should be a Column or an int, got {}".format(type(numBuckets))) + + return _invoke_function("bucket", _numBuckets, _to_col(col)) + + +bucket.__doc__ = pysparkfuncs.bucket.__doc__ + + +def years(col: "ColumnOrName") -> Column: + return _invoke_function_over_columns("years", col) + + +years.__doc__ = pysparkfuncs.years.__doc__ + + +def months(col: "ColumnOrName") -> Column: + return _invoke_function_over_columns("months", col) + + +months.__doc__ = pysparkfuncs.months.__doc__ + + +def days(col: "ColumnOrName") -> Column: + return _invoke_function_over_columns("days", col) + + +days.__doc__ = pysparkfuncs.days.__doc__ + + +def hours(col: "ColumnOrName") -> Column: + return _invoke_function_over_columns("hours", col) + + +hours.__doc__ = pysparkfuncs.hours.__doc__ + # Misc Functions --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org