This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new deb09eec6176 [SPARK-55385][CORE][SQL][FOLLOWUP] Rename
preservesDistribution to preservesPartitionSizes
deb09eec6176 is described below
commit deb09eec6176f921982efa50532d335ce2410a04
Author: Wenchen Fan <[email protected]>
AuthorDate: Tue Feb 10 08:05:12 2026 +0800
[SPARK-55385][CORE][SQL][FOLLOWUP] Rename preservesDistribution to
preservesPartitionSizes
### What changes were proposed in this pull request?
This is a follow-up to #54169. Rename `preservesDistribution` to
`preservesPartitionSizes` and `getAncestorWithSameDistribution` to
`getAncestorWithSamePartitionSizes`.
### Why are the changes needed?
The original name `preservesDistribution` is confusing because in Spark,
"distribution" typically refers to how tuples are partitioned across cluster
nodes (e.g., `HashPartitioning`, `ClusteredDistribution`). This field actually
tracks whether the number of rows per partition is preserved (i.e., a 1:1
element mapping like `map`), which is used to optimize `RDD.zipWithIndex`. The
new name `preservesPartitionSizes` is more precise and avoids confusion with
the nearby `preservesPartition [...]
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests. This is a pure rename with no functional changes.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Cursor
Closes #54226 from
cloud-fan/followup-spark-55385-rename-preserves-distribution.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
.../scala/org/apache/spark/rdd/MapPartitionsRDD.scala | 10 +++++-----
.../spark/rdd/MapPartitionsWithEvaluatorRDD.scala | 2 +-
core/src/main/scala/org/apache/spark/rdd/RDD.scala | 18 +++++++++---------
.../org/apache/spark/rdd/ZippedWithIndexRDD.scala | 12 ++++++------
.../spark/sql/execution/basicPhysicalOperators.scala | 4 ++--
5 files changed, 23 insertions(+), 23 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
index 5b37ceca8a60..d24047dd98af 100644
--- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
@@ -35,10 +35,10 @@ import org.apache.spark.{Partition, TaskContext}
* @param isOrderSensitive whether or not the function is order-sensitive. If
it's order
* sensitive, it may return totally different result
when the input order
* is changed. Mostly stateful functions are
order-sensitive.
- * @param preservesDistribution Whether the input function preserves the data
distribution,
- * that is, 1 number of partitions, and 2 number
of rows per each
- * partition. This param is used to optimize the
performance of
- * RDD.zipWithIndex.
+ * @param preservesPartitionSizes Whether the input function preserves the
number of rows in each
+ * partition. This is true for 1:1 element
mappings like `map`.
+ * Used to optimize `RDD.zipWithIndex` by
counting rows on a
+ * cheaper ancestor RDD instead of the
immediate parent.
*/
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
@@ -46,7 +46,7 @@ private[spark] class MapPartitionsRDD[U: ClassTag, T:
ClassTag](
preservesPartitioning: Boolean = false,
isFromBarrier: Boolean = false,
isOrderSensitive: Boolean = false,
- val preservesDistribution: Boolean = false)
+ val preservesPartitionSizes: Boolean = false)
extends RDD[U](prev) {
override val partitioner = if (preservesPartitioning)
firstParent[T].partitioner else None
diff --git
a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithEvaluatorRDD.scala
b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithEvaluatorRDD.scala
index 6896a5bee2a6..d6e878b81dae 100644
---
a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithEvaluatorRDD.scala
+++
b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithEvaluatorRDD.scala
@@ -24,7 +24,7 @@ import org.apache.spark.{Partition,
PartitionEvaluatorFactory, TaskContext}
private[spark] class MapPartitionsWithEvaluatorRDD[T : ClassTag, U : ClassTag](
var prev: RDD[T],
evaluatorFactory: PartitionEvaluatorFactory[T, U],
- val preservesDistribution: Boolean = false)
+ val preservesPartitionSizes: Boolean = false)
extends RDD[U](prev) {
override def getPartitions: Array[Partition] = firstParent[T].partitions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index b86d8c8bb5bd..5500f085de1e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -426,7 +426,7 @@ abstract class RDD[T: ClassTag](
new MapPartitionsRDD[U, T](
this,
(_, _, iter) => iter.map(cleanF),
- preservesDistribution = true)
+ preservesPartitionSizes = true)
}
/**
@@ -881,22 +881,22 @@ abstract class RDD[T: ClassTag](
* @param isOrderSensitive whether or not the function is order-sensitive.
If it's order
* sensitive, it may return totally different result
when the input order
* is changed. Mostly stateful functions are
order-sensitive.
- * @param preservesDistribution Whether the input function preserves the
data distribution,
- * that is, 1 number of partitions, and 2
number of rows per each
- * partition. This param is used to optimize
the performance of
- * RDD.zipWithIndex.
+ * @param preservesPartitionSizes Whether the input function preserves the
number of rows in each
+ * partition. This is true for 1:1 element
mappings like `map`.
+ * Used to optimize `RDD.zipWithIndex` by
counting rows on a
+ * cheaper ancestor RDD instead of the
immediate parent.
*/
private[spark] def mapPartitionsWithIndexInternal[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false,
isOrderSensitive: Boolean = false,
- preservesDistribution: Boolean = false): RDD[U] = withScope {
+ preservesPartitionSizes: Boolean = false): RDD[U] = withScope {
new MapPartitionsRDD(
this,
(_: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter),
preservesPartitioning = preservesPartitioning,
isOrderSensitive = isOrderSensitive,
- preservesDistribution = preservesDistribution)
+ preservesPartitionSizes = preservesPartitionSizes)
}
/**
@@ -942,9 +942,9 @@ abstract class RDD[T: ClassTag](
private[spark] def mapPartitionsWithEvaluator[U: ClassTag](
evaluatorFactory: PartitionEvaluatorFactory[T, U],
- preservesDistribution: Boolean): RDD[U] = withScope {
+ preservesPartitionSizes: Boolean): RDD[U] = withScope {
new MapPartitionsWithEvaluatorRDD(this, evaluatorFactory,
- preservesDistribution = preservesDistribution)
+ preservesPartitionSizes = preservesPartitionSizes)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
index 7afef839aa5f..98b1f60dbbef 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
@@ -40,13 +40,13 @@ class ZippedWithIndexRDDPartition(val prev: Partition, val
startIndex: Long)
private[spark]
class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T,
Long)](prev) {
- private def getAncestorWithSameDistribution(rdd: RDD[_]): RDD[_] = {
+ private def getAncestorWithSamePartitionSizes(rdd: RDD[_]): RDD[_] = {
rdd match {
case c: RDD[_] if c.getStorageLevel != StorageLevel.NONE => c
- case m: MapPartitionsRDD[_, _] if m.preservesDistribution =>
- getAncestorWithSameDistribution(m.prev)
- case m: MapPartitionsWithEvaluatorRDD[_, _] if m.preservesDistribution =>
- getAncestorWithSameDistribution(m.prev)
+ case m: MapPartitionsRDD[_, _] if m.preservesPartitionSizes =>
+ getAncestorWithSamePartitionSizes(m.prev)
+ case m: MapPartitionsWithEvaluatorRDD[_, _] if m.preservesPartitionSizes
=>
+ getAncestorWithSamePartitionSizes(m.prev)
case _ => rdd
}
}
@@ -59,7 +59,7 @@ class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends
RDD[(T, Long)](prev)
} else if (n == 1) {
Array(0L)
} else {
- val ancestor = getAncestorWithSameDistribution(prev)
+ val ancestor = getAncestorWithSamePartitionSizes(prev)
ancestor.context.runJob(
ancestor,
Utils.getIteratorSize _,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 14d9621fbfab..f2cbfd2154a4 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -94,14 +94,14 @@ case class ProjectExec(projectList: Seq[NamedExpression],
child: SparkPlan)
val evaluatorFactory = new ProjectEvaluatorFactory(projectList,
child.output)
if (conf.usePartitionEvaluator) {
child.execute().mapPartitionsWithEvaluator(
- evaluatorFactory, preservesDistribution = true
+ evaluatorFactory, preservesPartitionSizes = true
)
} else {
child.execute().mapPartitionsWithIndexInternal(
f = (index, iter) => {
val evaluator = evaluatorFactory.createEvaluator()
evaluator.eval(index, iter)
- }, preservesDistribution = true
+ }, preservesPartitionSizes = true
)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]