This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ec9d490fd83 [SPARK-55385][CORE][SQL] Mitigate the recomputation in 
`zipWithIndex`
8ec9d490fd83 is described below

commit 8ec9d490fd830aa4c01a7825665f226b64169590
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Mon Feb 9 07:12:42 2026 +0900

    [SPARK-55385][CORE][SQL] Mitigate the recomputation in `zipWithIndex`
    
    ### What changes were proposed in this pull request?
    Mitigate the recomputation in `zipWithIndex`
    
    ### Why are the changes needed?
    `zipWithIndex` triggers an extra job to compute the `startIndices`.
    If the parent RDD holds the same data distribution: number of partitions 
and number of rows per partition, then we can use parent RDD to compute the 
`startIndices`, so that the intermediate computation (might be expensive) can 
be skipped.
    
    It should benefit such patterns:
    
    ```
    rdd.map(expensive computation).zipWithIndex
    
    df.select(expensive computation).zipWithIndex
    ```
    
    It should be able to support other operators, but this PR focus on 
`RDD.map` and `ProjectExec`.
    
    For example
    ```scala
    val rdd = sc.range(0, 10, 1, 4)
    val start = System.currentTimeMillis()
    rdd.map(x => {Thread.sleep(10000); x + 1}).zipWithIndex().collect()
    val duration = System.currentTimeMillis() - start
    ```
    
    master:
    ```scala
    val rdd: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[1] at range at 
<console>:1
    val start: Long = 1770351594037
    val duration: Long = 60651
    ```
    
    this PR:
    ```scala
    val rdd: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[12] at range at 
<console>:1
    val start: Long = 1770351397114
    val duration: Long = 30040
    ```
    
    The parent RDD `sc.range(0, 10, 1, 4)` was used to compute `startIndices`, 
thus the expensive computation `x => {Thread.sleep(10000); x + 1}` was skipped 
in this step.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    CI
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #54169 from zhengruifeng/opt_zip_with_index.
    
    Authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../org/apache/spark/rdd/MapPartitionsRDD.scala    |  7 ++++++-
 .../spark/rdd/MapPartitionsWithEvaluatorRDD.scala  |  3 ++-
 core/src/main/scala/org/apache/spark/rdd/RDD.scala | 24 ++++++++++++++++++----
 .../org/apache/spark/rdd/ZippedWithIndexRDD.scala  | 17 +++++++++++++--
 .../sql/execution/basicPhysicalOperators.scala     | 14 ++++++++-----
 5 files changed, 52 insertions(+), 13 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
index 39520a9734b0..5b37ceca8a60 100644
--- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
@@ -35,13 +35,18 @@ import org.apache.spark.{Partition, TaskContext}
  * @param isOrderSensitive whether or not the function is order-sensitive. If 
it's order
  *                         sensitive, it may return totally different result 
when the input order
  *                         is changed. Mostly stateful functions are 
order-sensitive.
+ * @param preservesDistribution Whether the input function preserves the data 
distribution,
+ *                              that is, 1 number of partitions, and 2 number 
of rows per each
+ *                              partition. This param is used to optimize the 
performance of
+ *                              RDD.zipWithIndex.
  */
 private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
     var prev: RDD[T],
     f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, 
partition index, iterator)
     preservesPartitioning: Boolean = false,
     isFromBarrier: Boolean = false,
-    isOrderSensitive: Boolean = false)
+    isOrderSensitive: Boolean = false,
+    val preservesDistribution: Boolean = false)
   extends RDD[U](prev) {
 
   override val partitioner = if (preservesPartitioning) 
firstParent[T].partitioner else None
diff --git 
a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithEvaluatorRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithEvaluatorRDD.scala
index 5d2faa420890..6896a5bee2a6 100644
--- 
a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithEvaluatorRDD.scala
+++ 
b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithEvaluatorRDD.scala
@@ -23,7 +23,8 @@ import org.apache.spark.{Partition, 
PartitionEvaluatorFactory, TaskContext}
 
 private[spark] class MapPartitionsWithEvaluatorRDD[T : ClassTag, U : ClassTag](
     var prev: RDD[T],
-    evaluatorFactory: PartitionEvaluatorFactory[T, U])
+    evaluatorFactory: PartitionEvaluatorFactory[T, U],
+    val preservesDistribution: Boolean = false)
   extends RDD[U](prev) {
 
   override def getPartitions: Array[Partition] = firstParent[T].partitions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index d1408ee774ce..b86d8c8bb5bd 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -423,7 +423,10 @@ abstract class RDD[T: ClassTag](
    */
   def map[U: ClassTag](f: T => U): RDD[U] = withScope {
     val cleanF = sc.clean(f)
-    new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
+    new MapPartitionsRDD[U, T](
+      this,
+      (_, _, iter) => iter.map(cleanF),
+      preservesDistribution = true)
   }
 
   /**
@@ -878,16 +881,22 @@ abstract class RDD[T: ClassTag](
    * @param isOrderSensitive whether or not the function is order-sensitive. 
If it's order
    *                         sensitive, it may return totally different result 
when the input order
    *                         is changed. Mostly stateful functions are 
order-sensitive.
+   * @param preservesDistribution Whether the input function preserves the 
data distribution,
+   *                              that is, 1 number of partitions, and 2 
number of rows per each
+   *                              partition. This param is used to optimize 
the performance of
+   *                              RDD.zipWithIndex.
    */
   private[spark] def mapPartitionsWithIndexInternal[U: ClassTag](
       f: (Int, Iterator[T]) => Iterator[U],
       preservesPartitioning: Boolean = false,
-      isOrderSensitive: Boolean = false): RDD[U] = withScope {
+      isOrderSensitive: Boolean = false,
+      preservesDistribution: Boolean = false): RDD[U] = withScope {
     new MapPartitionsRDD(
       this,
       (_: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter),
       preservesPartitioning = preservesPartitioning,
-      isOrderSensitive = isOrderSensitive)
+      isOrderSensitive = isOrderSensitive,
+      preservesDistribution = preservesDistribution)
   }
 
   /**
@@ -928,7 +937,14 @@ abstract class RDD[T: ClassTag](
   @Since("3.5.0")
   def mapPartitionsWithEvaluator[U: ClassTag](
       evaluatorFactory: PartitionEvaluatorFactory[T, U]): RDD[U] = withScope {
-    new MapPartitionsWithEvaluatorRDD(this, evaluatorFactory)
+    mapPartitionsWithEvaluator(evaluatorFactory, false)
+  }
+
+  private[spark] def mapPartitionsWithEvaluator[U: ClassTag](
+      evaluatorFactory: PartitionEvaluatorFactory[T, U],
+      preservesDistribution: Boolean): RDD[U] = withScope {
+    new MapPartitionsWithEvaluatorRDD(this, evaluatorFactory,
+      preservesDistribution = preservesDistribution)
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
index 8425b211d6ec..7afef839aa5f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
@@ -20,6 +20,7 @@ package org.apache.spark.rdd
 import scala.reflect.ClassTag
 
 import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.Utils
 
 private[spark]
@@ -39,6 +40,17 @@ class ZippedWithIndexRDDPartition(val prev: Partition, val 
startIndex: Long)
 private[spark]
 class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, 
Long)](prev) {
 
+  private def getAncestorWithSameDistribution(rdd: RDD[_]): RDD[_] = {
+    rdd match {
+      case c: RDD[_] if c.getStorageLevel != StorageLevel.NONE => c
+      case m: MapPartitionsRDD[_, _] if m.preservesDistribution =>
+        getAncestorWithSameDistribution(m.prev)
+      case m: MapPartitionsWithEvaluatorRDD[_, _] if m.preservesDistribution =>
+        getAncestorWithSameDistribution(m.prev)
+      case _ => rdd
+    }
+  }
+
   /** The start index of each partition. */
   @transient private val startIndices: Array[Long] = {
     val n = prev.partitions.length
@@ -47,8 +59,9 @@ class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends 
RDD[(T, Long)](prev)
     } else if (n == 1) {
       Array(0L)
     } else {
-      prev.context.runJob(
-        prev,
+      val ancestor = getAncestorWithSameDistribution(prev)
+      ancestor.context.runJob(
+        ancestor,
         Utils.getIteratorSize _,
         0 until n - 1 // do not need to count the last partition
       ).scanLeft(0L)(_ + _)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index fd34519ff61b..14d9621fbfab 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -93,12 +93,16 @@ case class ProjectExec(projectList: Seq[NamedExpression], 
child: SparkPlan)
   protected override def doExecute(): RDD[InternalRow] = {
     val evaluatorFactory = new ProjectEvaluatorFactory(projectList, 
child.output)
     if (conf.usePartitionEvaluator) {
-      child.execute().mapPartitionsWithEvaluator(evaluatorFactory)
+      child.execute().mapPartitionsWithEvaluator(
+        evaluatorFactory, preservesDistribution = true
+      )
     } else {
-      child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
-        val evaluator = evaluatorFactory.createEvaluator()
-        evaluator.eval(index, iter)
-      }
+      child.execute().mapPartitionsWithIndexInternal(
+        f = (index, iter) => {
+          val evaluator = evaluatorFactory.createEvaluator()
+          evaluator.eval(index, iter)
+        }, preservesDistribution = true
+      )
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to