This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 4686c2733702 [SPARK-45073][PS][CONNECT] Replace `LastNotNull` with `Last(ignoreNulls=True)` 4686c2733702 is described below commit 4686c27337025dd1a616da73b19abe7ea00a4624 Author: Ruifeng Zheng <ruife...@apache.org> AuthorDate: Tue Sep 5 11:13:35 2023 +0800 [SPARK-45073][PS][CONNECT] Replace `LastNotNull` with `Last(ignoreNulls=True)` ### What changes were proposed in this pull request? Replace `LastNotNull` with `Last(ignoreNulls=True)` ### Why are the changes needed? https://github.com/apache/spark/pull/36127 introduced a PS dedicated expression `LastNotNull`, which was actually not needed and can be replaced with built-in `Last` ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? CI ### Was this patch authored or co-authored using generative AI tooling? NO Closes #42808 from zhengruifeng/del_last_not_none. Authored-by: Ruifeng Zheng <ruife...@apache.org> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- .../sql/connect/planner/SparkConnectPlanner.scala | 4 --- python/pyspark/pandas/series.py | 2 +- python/pyspark/pandas/spark/functions.py | 14 -------- .../catalyst/expressions/windowExpressions.scala | 37 ---------------------- .../spark/sql/api/python/PythonSQLUtils.scala | 2 -- 5 files changed, 1 insertion(+), 58 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 579b378d09f6..1a63c9fc27c6 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -1905,10 +1905,6 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging { val ignoreNA = extractBoolean(children(2), "ignoreNA") Some(EWM(children(0), alpha, ignoreNA)) - case "last_non_null" if fun.getArgumentsCount == 1 => - val children = fun.getArgumentsList.asScala.map(transformExpression) - Some(LastNonNull(children(0))) - case "null_index" if fun.getArgumentsCount == 1 => val children = fun.getArgumentsList.asScala.map(transformExpression) Some(NullIndex(children(0))) diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py index 7fa08c6d9b24..863e98c42ead 100644 --- a/python/pyspark/pandas/series.py +++ b/python/pyspark/pandas/series.py @@ -2257,7 +2257,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]): return self._psdf.copy()._psser_for(self._column_label) scol = self.spark.column - last_non_null = SF.last_non_null(scol) + last_non_null = F.last(scol, True) null_index = SF.null_index(scol) Window = get_window_class() diff --git a/python/pyspark/pandas/spark/functions.py b/python/pyspark/pandas/spark/functions.py index d6f6c6fdeebc..b0bc6efcd56e 100644 --- a/python/pyspark/pandas/spark/functions.py +++ b/python/pyspark/pandas/spark/functions.py @@ -159,20 +159,6 @@ def ewm(col: Column, alpha: float, ignore_na: bool) -> Column: return Column(sc._jvm.PythonSQLUtils.ewm(col._jc, alpha, ignore_na)) -def last_non_null(col: Column) -> Column: - if is_remote(): - from pyspark.sql.connect.functions import _invoke_function_over_columns - - return _invoke_function_over_columns( # type: ignore[return-value] - "last_non_null", - col, # type: ignore[arg-type] - ) - - else: - sc = SparkContext._active_spark_context - return Column(sc._jvm.PythonSQLUtils.lastNonNull(col._jc)) - - def null_index(col: Column) -> Column: if is_remote(): from pyspark.sql.connect.functions import _invoke_function_over_columns diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala index 50c98c01645d..bc61170f567f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala @@ -1152,43 +1152,6 @@ case class EWM(input: Expression, alpha: Double, ignoreNA: Boolean) } -/** - * Keep the last non-null value seen if any. This expression is dedicated only for - * Pandas API on Spark. - * For example, - * Input: null, 1, 2, 3, null, 4, 5, null - * Output: null, 1, 2, 3, 3, 4, 5, 5 - */ -case class LastNonNull(input: Expression) - extends AggregateWindowFunction with UnaryLike[Expression] { - - override def dataType: DataType = input.dataType - - private lazy val last = AttributeReference("last", dataType, nullable = true)() - - override def aggBufferAttributes: Seq[AttributeReference] = last :: Nil - - override lazy val initialValues: Seq[Expression] = Seq(Literal.create(null, dataType)) - - override lazy val updateExpressions: Seq[Expression] = { - Seq( - /* last = */ If(IsNull(input), last, input) - ) - } - - override lazy val evaluateExpression: Expression = last - - override def prettyName: String = "last_non_null" - - override def sql: String = s"$prettyName(${input.sql})" - - override def child: Expression = input - - override protected def withNewChildInternal(newChild: Expression): LastNonNull = - copy(input = newChild) -} - - /** * Return the indices for consecutive null values, for non-null values, it returns 0. * This expression is dedicated only for Pandas API on Spark. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala index 196377cce2ae..3f0e9369c619 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala @@ -145,8 +145,6 @@ private[sql] object PythonSQLUtils extends Logging { def ewm(e: Column, alpha: Double, ignoreNA: Boolean): Column = Column(EWM(e.expr, alpha, ignoreNA)) - def lastNonNull(e: Column): Column = Column(LastNonNull(e.expr)) - def nullIndex(e: Column): Column = Column(NullIndex(e.expr)) def makeInterval(unit: String, e: Column): Column = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org