This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8ec9d490fd83 [SPARK-55385][CORE][SQL] Mitigate the recomputation in
`zipWithIndex`
8ec9d490fd83 is described below
commit 8ec9d490fd830aa4c01a7825665f226b64169590
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Mon Feb 9 07:12:42 2026 +0900
[SPARK-55385][CORE][SQL] Mitigate the recomputation in `zipWithIndex`
### What changes were proposed in this pull request?
Mitigate the recomputation in `zipWithIndex`
### Why are the changes needed?
`zipWithIndex` triggers an extra job to compute the `startIndices`.
If the parent RDD holds the same data distribution: number of partitions
and number of rows per partition, then we can use parent RDD to compute the
`startIndices`, so that the intermediate computation (might be expensive) can
be skipped.
It should benefit such patterns:
```
rdd.map(expensive computation).zipWithIndex
df.select(expensive computation).zipWithIndex
```
It should be able to support other operators, but this PR focus on
`RDD.map` and `ProjectExec`.
For example
```scala
val rdd = sc.range(0, 10, 1, 4)
val start = System.currentTimeMillis()
rdd.map(x => {Thread.sleep(10000); x + 1}).zipWithIndex().collect()
val duration = System.currentTimeMillis() - start
```
master:
```scala
val rdd: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[1] at range at
<console>:1
val start: Long = 1770351594037
val duration: Long = 60651
```
this PR:
```scala
val rdd: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[12] at range at
<console>:1
val start: Long = 1770351397114
val duration: Long = 30040
```
The parent RDD `sc.range(0, 10, 1, 4)` was used to compute `startIndices`,
thus the expensive computation `x => {Thread.sleep(10000); x + 1}` was skipped
in this step.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
CI
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #54169 from zhengruifeng/opt_zip_with_index.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../org/apache/spark/rdd/MapPartitionsRDD.scala | 7 ++++++-
.../spark/rdd/MapPartitionsWithEvaluatorRDD.scala | 3 ++-
core/src/main/scala/org/apache/spark/rdd/RDD.scala | 24 ++++++++++++++++++----
.../org/apache/spark/rdd/ZippedWithIndexRDD.scala | 17 +++++++++++++--
.../sql/execution/basicPhysicalOperators.scala | 14 ++++++++-----
5 files changed, 52 insertions(+), 13 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
index 39520a9734b0..5b37ceca8a60 100644
--- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
@@ -35,13 +35,18 @@ import org.apache.spark.{Partition, TaskContext}
* @param isOrderSensitive whether or not the function is order-sensitive. If
it's order
* sensitive, it may return totally different result
when the input order
* is changed. Mostly stateful functions are
order-sensitive.
+ * @param preservesDistribution Whether the input function preserves the data
distribution,
+ * that is, 1 number of partitions, and 2 number
of rows per each
+ * partition. This param is used to optimize the
performance of
+ * RDD.zipWithIndex.
*/
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext,
partition index, iterator)
preservesPartitioning: Boolean = false,
isFromBarrier: Boolean = false,
- isOrderSensitive: Boolean = false)
+ isOrderSensitive: Boolean = false,
+ val preservesDistribution: Boolean = false)
extends RDD[U](prev) {
override val partitioner = if (preservesPartitioning)
firstParent[T].partitioner else None
diff --git
a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithEvaluatorRDD.scala
b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithEvaluatorRDD.scala
index 5d2faa420890..6896a5bee2a6 100644
---
a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithEvaluatorRDD.scala
+++
b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithEvaluatorRDD.scala
@@ -23,7 +23,8 @@ import org.apache.spark.{Partition,
PartitionEvaluatorFactory, TaskContext}
private[spark] class MapPartitionsWithEvaluatorRDD[T : ClassTag, U : ClassTag](
var prev: RDD[T],
- evaluatorFactory: PartitionEvaluatorFactory[T, U])
+ evaluatorFactory: PartitionEvaluatorFactory[T, U],
+ val preservesDistribution: Boolean = false)
extends RDD[U](prev) {
override def getPartitions: Array[Partition] = firstParent[T].partitions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index d1408ee774ce..b86d8c8bb5bd 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -423,7 +423,10 @@ abstract class RDD[T: ClassTag](
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
- new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
+ new MapPartitionsRDD[U, T](
+ this,
+ (_, _, iter) => iter.map(cleanF),
+ preservesDistribution = true)
}
/**
@@ -878,16 +881,22 @@ abstract class RDD[T: ClassTag](
* @param isOrderSensitive whether or not the function is order-sensitive.
If it's order
* sensitive, it may return totally different result
when the input order
* is changed. Mostly stateful functions are
order-sensitive.
+ * @param preservesDistribution Whether the input function preserves the
data distribution,
+ * that is, 1 number of partitions, and 2
number of rows per each
+ * partition. This param is used to optimize
the performance of
+ * RDD.zipWithIndex.
*/
private[spark] def mapPartitionsWithIndexInternal[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false,
- isOrderSensitive: Boolean = false): RDD[U] = withScope {
+ isOrderSensitive: Boolean = false,
+ preservesDistribution: Boolean = false): RDD[U] = withScope {
new MapPartitionsRDD(
this,
(_: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter),
preservesPartitioning = preservesPartitioning,
- isOrderSensitive = isOrderSensitive)
+ isOrderSensitive = isOrderSensitive,
+ preservesDistribution = preservesDistribution)
}
/**
@@ -928,7 +937,14 @@ abstract class RDD[T: ClassTag](
@Since("3.5.0")
def mapPartitionsWithEvaluator[U: ClassTag](
evaluatorFactory: PartitionEvaluatorFactory[T, U]): RDD[U] = withScope {
- new MapPartitionsWithEvaluatorRDD(this, evaluatorFactory)
+ mapPartitionsWithEvaluator(evaluatorFactory, false)
+ }
+
+ private[spark] def mapPartitionsWithEvaluator[U: ClassTag](
+ evaluatorFactory: PartitionEvaluatorFactory[T, U],
+ preservesDistribution: Boolean): RDD[U] = withScope {
+ new MapPartitionsWithEvaluatorRDD(this, evaluatorFactory,
+ preservesDistribution = preservesDistribution)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
index 8425b211d6ec..7afef839aa5f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
@@ -20,6 +20,7 @@ package org.apache.spark.rdd
import scala.reflect.ClassTag
import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
private[spark]
@@ -39,6 +40,17 @@ class ZippedWithIndexRDDPartition(val prev: Partition, val
startIndex: Long)
private[spark]
class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T,
Long)](prev) {
+ private def getAncestorWithSameDistribution(rdd: RDD[_]): RDD[_] = {
+ rdd match {
+ case c: RDD[_] if c.getStorageLevel != StorageLevel.NONE => c
+ case m: MapPartitionsRDD[_, _] if m.preservesDistribution =>
+ getAncestorWithSameDistribution(m.prev)
+ case m: MapPartitionsWithEvaluatorRDD[_, _] if m.preservesDistribution =>
+ getAncestorWithSameDistribution(m.prev)
+ case _ => rdd
+ }
+ }
+
/** The start index of each partition. */
@transient private val startIndices: Array[Long] = {
val n = prev.partitions.length
@@ -47,8 +59,9 @@ class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends
RDD[(T, Long)](prev)
} else if (n == 1) {
Array(0L)
} else {
- prev.context.runJob(
- prev,
+ val ancestor = getAncestorWithSameDistribution(prev)
+ ancestor.context.runJob(
+ ancestor,
Utils.getIteratorSize _,
0 until n - 1 // do not need to count the last partition
).scanLeft(0L)(_ + _)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index fd34519ff61b..14d9621fbfab 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -93,12 +93,16 @@ case class ProjectExec(projectList: Seq[NamedExpression],
child: SparkPlan)
protected override def doExecute(): RDD[InternalRow] = {
val evaluatorFactory = new ProjectEvaluatorFactory(projectList,
child.output)
if (conf.usePartitionEvaluator) {
- child.execute().mapPartitionsWithEvaluator(evaluatorFactory)
+ child.execute().mapPartitionsWithEvaluator(
+ evaluatorFactory, preservesDistribution = true
+ )
} else {
- child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
- val evaluator = evaluatorFactory.createEvaluator()
- evaluator.eval(index, iter)
- }
+ child.execute().mapPartitionsWithIndexInternal(
+ f = (index, iter) => {
+ val evaluator = evaluatorFactory.createEvaluator()
+ evaluator.eval(index, iter)
+ }, preservesDistribution = true
+ )
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]