This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new deb09eec6176 [SPARK-55385][CORE][SQL][FOLLOWUP] Rename 
preservesDistribution to preservesPartitionSizes
deb09eec6176 is described below

commit deb09eec6176f921982efa50532d335ce2410a04
Author: Wenchen Fan <[email protected]>
AuthorDate: Tue Feb 10 08:05:12 2026 +0800

    [SPARK-55385][CORE][SQL][FOLLOWUP] Rename preservesDistribution to 
preservesPartitionSizes
    
    ### What changes were proposed in this pull request?
    
    This is a follow-up to #54169. Rename `preservesDistribution` to 
`preservesPartitionSizes` and `getAncestorWithSameDistribution` to 
`getAncestorWithSamePartitionSizes`.
    
    ### Why are the changes needed?
    
    The original name `preservesDistribution` is confusing because in Spark, 
"distribution" typically refers to how tuples are partitioned across cluster 
nodes (e.g., `HashPartitioning`, `ClusteredDistribution`). This field actually 
tracks whether the number of rows per partition is preserved (i.e., a 1:1 
element mapping like `map`), which is used to optimize `RDD.zipWithIndex`. The 
new name `preservesPartitionSizes` is more precise and avoids confusion with 
the nearby `preservesPartition [...]
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing tests. This is a pure rename with no functional changes.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Cursor
    
    Closes #54226 from 
cloud-fan/followup-spark-55385-rename-preserves-distribution.
    
    Authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 .../scala/org/apache/spark/rdd/MapPartitionsRDD.scala  | 10 +++++-----
 .../spark/rdd/MapPartitionsWithEvaluatorRDD.scala      |  2 +-
 core/src/main/scala/org/apache/spark/rdd/RDD.scala     | 18 +++++++++---------
 .../org/apache/spark/rdd/ZippedWithIndexRDD.scala      | 12 ++++++------
 .../spark/sql/execution/basicPhysicalOperators.scala   |  4 ++--
 5 files changed, 23 insertions(+), 23 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
index 5b37ceca8a60..d24047dd98af 100644
--- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
@@ -35,10 +35,10 @@ import org.apache.spark.{Partition, TaskContext}
  * @param isOrderSensitive whether or not the function is order-sensitive. If 
it's order
  *                         sensitive, it may return totally different result 
when the input order
  *                         is changed. Mostly stateful functions are 
order-sensitive.
- * @param preservesDistribution Whether the input function preserves the data 
distribution,
- *                              that is, 1 number of partitions, and 2 number 
of rows per each
- *                              partition. This param is used to optimize the 
performance of
- *                              RDD.zipWithIndex.
+ * @param preservesPartitionSizes Whether the input function preserves the 
number of rows in each
+ *                                partition. This is true for 1:1 element 
mappings like `map`.
+ *                                Used to optimize `RDD.zipWithIndex` by 
counting rows on a
+ *                                cheaper ancestor RDD instead of the 
immediate parent.
  */
 private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
     var prev: RDD[T],
@@ -46,7 +46,7 @@ private[spark] class MapPartitionsRDD[U: ClassTag, T: 
ClassTag](
     preservesPartitioning: Boolean = false,
     isFromBarrier: Boolean = false,
     isOrderSensitive: Boolean = false,
-    val preservesDistribution: Boolean = false)
+    val preservesPartitionSizes: Boolean = false)
   extends RDD[U](prev) {
 
   override val partitioner = if (preservesPartitioning) 
firstParent[T].partitioner else None
diff --git 
a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithEvaluatorRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithEvaluatorRDD.scala
index 6896a5bee2a6..d6e878b81dae 100644
--- 
a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithEvaluatorRDD.scala
+++ 
b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithEvaluatorRDD.scala
@@ -24,7 +24,7 @@ import org.apache.spark.{Partition, 
PartitionEvaluatorFactory, TaskContext}
 private[spark] class MapPartitionsWithEvaluatorRDD[T : ClassTag, U : ClassTag](
     var prev: RDD[T],
     evaluatorFactory: PartitionEvaluatorFactory[T, U],
-    val preservesDistribution: Boolean = false)
+    val preservesPartitionSizes: Boolean = false)
   extends RDD[U](prev) {
 
   override def getPartitions: Array[Partition] = firstParent[T].partitions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index b86d8c8bb5bd..5500f085de1e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -426,7 +426,7 @@ abstract class RDD[T: ClassTag](
     new MapPartitionsRDD[U, T](
       this,
       (_, _, iter) => iter.map(cleanF),
-      preservesDistribution = true)
+      preservesPartitionSizes = true)
   }
 
   /**
@@ -881,22 +881,22 @@ abstract class RDD[T: ClassTag](
    * @param isOrderSensitive whether or not the function is order-sensitive. 
If it's order
    *                         sensitive, it may return totally different result 
when the input order
    *                         is changed. Mostly stateful functions are 
order-sensitive.
-   * @param preservesDistribution Whether the input function preserves the 
data distribution,
-   *                              that is, 1 number of partitions, and 2 
number of rows per each
-   *                              partition. This param is used to optimize 
the performance of
-   *                              RDD.zipWithIndex.
+   * @param preservesPartitionSizes Whether the input function preserves the 
number of rows in each
+   *                                partition. This is true for 1:1 element 
mappings like `map`.
+   *                                Used to optimize `RDD.zipWithIndex` by 
counting rows on a
+   *                                cheaper ancestor RDD instead of the 
immediate parent.
    */
   private[spark] def mapPartitionsWithIndexInternal[U: ClassTag](
       f: (Int, Iterator[T]) => Iterator[U],
       preservesPartitioning: Boolean = false,
       isOrderSensitive: Boolean = false,
-      preservesDistribution: Boolean = false): RDD[U] = withScope {
+      preservesPartitionSizes: Boolean = false): RDD[U] = withScope {
     new MapPartitionsRDD(
       this,
       (_: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter),
       preservesPartitioning = preservesPartitioning,
       isOrderSensitive = isOrderSensitive,
-      preservesDistribution = preservesDistribution)
+      preservesPartitionSizes = preservesPartitionSizes)
   }
 
   /**
@@ -942,9 +942,9 @@ abstract class RDD[T: ClassTag](
 
   private[spark] def mapPartitionsWithEvaluator[U: ClassTag](
       evaluatorFactory: PartitionEvaluatorFactory[T, U],
-      preservesDistribution: Boolean): RDD[U] = withScope {
+      preservesPartitionSizes: Boolean): RDD[U] = withScope {
     new MapPartitionsWithEvaluatorRDD(this, evaluatorFactory,
-      preservesDistribution = preservesDistribution)
+      preservesPartitionSizes = preservesPartitionSizes)
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
index 7afef839aa5f..98b1f60dbbef 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
@@ -40,13 +40,13 @@ class ZippedWithIndexRDDPartition(val prev: Partition, val 
startIndex: Long)
 private[spark]
 class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, 
Long)](prev) {
 
-  private def getAncestorWithSameDistribution(rdd: RDD[_]): RDD[_] = {
+  private def getAncestorWithSamePartitionSizes(rdd: RDD[_]): RDD[_] = {
     rdd match {
       case c: RDD[_] if c.getStorageLevel != StorageLevel.NONE => c
-      case m: MapPartitionsRDD[_, _] if m.preservesDistribution =>
-        getAncestorWithSameDistribution(m.prev)
-      case m: MapPartitionsWithEvaluatorRDD[_, _] if m.preservesDistribution =>
-        getAncestorWithSameDistribution(m.prev)
+      case m: MapPartitionsRDD[_, _] if m.preservesPartitionSizes =>
+        getAncestorWithSamePartitionSizes(m.prev)
+      case m: MapPartitionsWithEvaluatorRDD[_, _] if m.preservesPartitionSizes 
=>
+        getAncestorWithSamePartitionSizes(m.prev)
       case _ => rdd
     }
   }
@@ -59,7 +59,7 @@ class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends 
RDD[(T, Long)](prev)
     } else if (n == 1) {
       Array(0L)
     } else {
-      val ancestor = getAncestorWithSameDistribution(prev)
+      val ancestor = getAncestorWithSamePartitionSizes(prev)
       ancestor.context.runJob(
         ancestor,
         Utils.getIteratorSize _,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 14d9621fbfab..f2cbfd2154a4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -94,14 +94,14 @@ case class ProjectExec(projectList: Seq[NamedExpression], 
child: SparkPlan)
     val evaluatorFactory = new ProjectEvaluatorFactory(projectList, 
child.output)
     if (conf.usePartitionEvaluator) {
       child.execute().mapPartitionsWithEvaluator(
-        evaluatorFactory, preservesDistribution = true
+        evaluatorFactory, preservesPartitionSizes = true
       )
     } else {
       child.execute().mapPartitionsWithIndexInternal(
         f = (index, iter) => {
           val evaluator = evaluatorFactory.createEvaluator()
           evaluator.eval(index, iter)
-        }, preservesDistribution = true
+        }, preservesPartitionSizes = true
       )
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to