This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new ee3ddf8536c [SPARK-44340][SQL][FOLLOWUP][3.5] Set partition index correctly for WindowGroupLimitExec ee3ddf8536c is described below commit ee3ddf8536ccf822d8c8e975872658a7ff15666d Author: Jiaan Geng <belie...@163.com> AuthorDate: Mon Jul 31 20:59:23 2023 +0800 [SPARK-44340][SQL][FOLLOWUP][3.5] Set partition index correctly for WindowGroupLimitExec ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/41899, to set the partition index correctly even if it's not used for now. It also contains a few code cleanup. ### Why are the changes needed? future-proof ### Does this PR introduce _any_ user-facing change? 'No'. ### How was this patch tested? existing tests Closes #42233 from beliefer/SPARK-44340_followup_3.5. Authored-by: Jiaan Geng <belie...@163.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../apache/spark/sql/execution/window/WindowGroupLimitExec.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExec.scala index 98969f60c2b..e975f3b219a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExec.scala @@ -72,8 +72,6 @@ case class WindowGroupLimitExec( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) protected override def doExecute(): RDD[InternalRow] = { - val numOutputRows = longMetric("numOutputRows") - val evaluatorFactory = new WindowGroupLimitEvaluatorFactory( partitionSpec, @@ -81,14 +79,14 @@ case class WindowGroupLimitExec( rankLikeFunction, limit, child.output, - numOutputRows) + longMetric("numOutputRows")) if (conf.usePartitionEvaluator) { child.execute().mapPartitionsWithEvaluator(evaluatorFactory) } else { - child.execute().mapPartitionsInternal { iter => + child.execute().mapPartitionsWithIndexInternal { (index, rowIterator) => val evaluator = evaluatorFactory.createEvaluator() - evaluator.eval(0, iter) + evaluator.eval(index, rowIterator) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org