This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new ee3ddf8536c [SPARK-44340][SQL][FOLLOWUP][3.5] Set partition index 
correctly for WindowGroupLimitExec
ee3ddf8536c is described below

commit ee3ddf8536ccf822d8c8e975872658a7ff15666d
Author: Jiaan Geng <belie...@163.com>
AuthorDate: Mon Jul 31 20:59:23 2023 +0800

    [SPARK-44340][SQL][FOLLOWUP][3.5] Set partition index correctly for 
WindowGroupLimitExec
    
    ### What changes were proposed in this pull request?
    This is a followup of https://github.com/apache/spark/pull/41899, to set 
the partition index correctly even if it's not used for now. It also contains a 
few code cleanup.
    
    ### Why are the changes needed?
    future-proof
    
    ### Does this PR introduce _any_ user-facing change?
    'No'.
    
    ### How was this patch tested?
    existing tests
    
    Closes #42233 from beliefer/SPARK-44340_followup_3.5.
    
    Authored-by: Jiaan Geng <belie...@163.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../apache/spark/sql/execution/window/WindowGroupLimitExec.scala  | 8 +++-----
 1 file changed, 3 insertions(+), 5 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExec.scala
index 98969f60c2b..e975f3b219a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExec.scala
@@ -72,8 +72,6 @@ case class WindowGroupLimitExec(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"))
 
   protected override def doExecute(): RDD[InternalRow] = {
-    val numOutputRows = longMetric("numOutputRows")
-
     val evaluatorFactory =
       new WindowGroupLimitEvaluatorFactory(
         partitionSpec,
@@ -81,14 +79,14 @@ case class WindowGroupLimitExec(
         rankLikeFunction,
         limit,
         child.output,
-        numOutputRows)
+        longMetric("numOutputRows"))
 
     if (conf.usePartitionEvaluator) {
       child.execute().mapPartitionsWithEvaluator(evaluatorFactory)
     } else {
-      child.execute().mapPartitionsInternal { iter =>
+      child.execute().mapPartitionsWithIndexInternal { (index, rowIterator) =>
         val evaluator = evaluatorFactory.createEvaluator()
-        evaluator.eval(0, iter)
+        evaluator.eval(index, rowIterator)
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to