This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e5728e2fb70 [SPARK-44999][CORE] Refactor `ExternalSorter` to reduce
checks on `shouldPartition` when calling `getPartition`
e5728e2fb70 is described below
commit e5728e2fb706b0b611b371bdb7216acbdfe5c49b
Author: yangjie01 <[email protected]>
AuthorDate: Tue Aug 29 08:30:06 2023 -0700
[SPARK-44999][CORE] Refactor `ExternalSorter` to reduce checks on
`shouldPartition` when calling `getPartition`
### What changes were proposed in this pull request?
The `getPartition` method checks `shouldPartition` every time it is called.
However, `shouldPartition` should not be changeable after the `ExternalSorter`
is instantiated, so this PR makes the following changes to `getPartition` to
avoid always checking `shouldPartition`:
1. Added `val actualPartitioner`: when `shouldPartition` is true, it uses
`partitioner.get`, otherwise it returns `ConstantPartitioner`, where
`ConstantPartitioner` is defined as follows:
https://github.com/apache/spark/blob/df63adf734370f5c2d71a348f9d36658718b302c/core/src/main/scala/org/apache/spark/Partitioner.scala#L156-L162
2. After step 1, the private method `getPartition` can directly call
`actualPartitioner.getPartition`. In order to shorten the call stack, this PR
replaces the call to `getPartition` in `ExternalSorter` with a call to
`actualPartitioner.getPartition`.
3. Checked `numPartitions > 1` directly when initializing `val
actualPartitioner` and removed `val shouldPartition`, because it is no longer
used elsewhere.
### Why are the changes needed?
To reduce checks on `shouldPartition` when calling `getPartition`
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GitHub Actions
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #42713 from LuciferYang/ExternalSorter-partitioner.
Authored-by: yangjie01 <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../org/apache/spark/util/collection/ExternalSorter.scala | 10 ++++------
1 file changed, 4 insertions(+), 6 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 7153bb72476..a68e0de70c5 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -102,10 +102,8 @@ private[spark] class ExternalSorter[K, V, C](
private val conf = SparkEnv.get.conf
private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1)
- private val shouldPartition = numPartitions > 1
- private def getPartition(key: K): Int = {
- if (shouldPartition) partitioner.get.getPartition(key) else 0
- }
+ private val actualPartitioner =
+ if (numPartitions > 1) partitioner.get else new ConstantPartitioner
private val blockManager = SparkEnv.get.blockManager
private val diskBlockManager = blockManager.diskBlockManager
@@ -197,7 +195,7 @@ private[spark] class ExternalSorter[K, V, C](
while (records.hasNext) {
addElementsRead()
kv = records.next()
- map.changeValue((getPartition(kv._1), kv._1), update)
+ map.changeValue((actualPartitioner.getPartition(kv._1), kv._1), update)
maybeSpillCollection(usingMap = true)
}
} else {
@@ -205,7 +203,7 @@ private[spark] class ExternalSorter[K, V, C](
while (records.hasNext) {
addElementsRead()
val kv = records.next()
- buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
+ buffer.insert(actualPartitioner.getPartition(kv._1), kv._1,
kv._2.asInstanceOf[C])
maybeSpillCollection(usingMap = false)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]