This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 60849b78204 [SPARK-42719][CORE] `MapOutputTracker#getMapLocation`
should respect `spark.shuffle.reduceLocality.enabled`
60849b78204 is described below
commit 60849b78204e69392976420b9a813bed0790e4e9
Author: roryqi <[email protected]>
AuthorDate: Thu Mar 9 11:50:05 2023 -0600
[SPARK-42719][CORE] `MapOutputTracker#getMapLocation` should respect
`spark.shuffle.reduceLocality.enabled`
### What changes were proposed in this pull request?
`MapOutputTracker#getMapLocation` should respect
`spark.shuffle.reduceLocality.enabled`
### Why are the changes needed?
Discuss as https://github.com/apache/spark/pull/40307
getPreferredLocations in ShuffledRowRDD should return Nil at the very
beginning in case spark.shuffle.reduceLocality.enabled = false (conceptually).
This logic is pushed into MapOutputTracker though - and
getPreferredLocationsForShuffle honors spark.shuffle.reduceLocality.enabled -
but getMapLocation does not.
So the fix would be to fix getMapLocation to honor the parameter.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
New ut
Closes #40339 from jerqi/new_feature.
Authored-by: roryqi <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
.../scala/org/apache/spark/MapOutputTracker.scala | 2 ++
.../org/apache/spark/MapOutputTrackerSuite.scala | 20 ++++++++++++++++++++
2 files changed, 22 insertions(+)
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index fade0b86dd8..5772285a63d 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -1112,6 +1112,8 @@ private[spark] class MapOutputTrackerMaster(
startMapIndex: Int,
endMapIndex: Int): Seq[String] =
{
+ if (!shuffleLocalityEnabled) return Nil
+
val shuffleStatus = shuffleStatuses.get(dep.shuffleId).orNull
if (shuffleStatus != null) {
shuffleStatus.withMapStatuses { statuses =>
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index dfad4a924d7..6b702df25c1 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -1030,4 +1030,24 @@ class MapOutputTrackerSuite extends SparkFunSuite with
LocalSparkContext {
rpcEnv.shutdown()
assert(npeCounter.intValue() == 0)
}
+
+ test("SPARK-42719: `MapOutputTracker#getMapLocation` should respect the
config option") {
+ val rpcEnv = createRpcEnv("test")
+ val newConf = new SparkConf
+ newConf.set(SHUFFLE_REDUCE_LOCALITY_ENABLE, false)
+ val tracker = newTrackerMaster(newConf)
+ try {
+ tracker.trackerEndpoint =
rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
+ new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, newConf))
+ tracker.registerShuffle(10, 6, 1)
+ tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA",
1000),
+ Array(2L), 5))
+ val mockShuffleDep = mock(classOf[ShuffleDependency[Int, Int, _]])
+ when(mockShuffleDep.shuffleId).thenReturn(10)
+ assert(tracker.getMapLocation(mockShuffleDep, 0, 1) === Nil)
+ } finally {
+ tracker.stop()
+ rpcEnv.shutdown()
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]