This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new f6ee3d0 [SPARK-31082][CORE] MapOutputTrackerMaster.getMapLocation
should handle last mapIndex correctly
f6ee3d0 is described below
commit f6ee3d061409128c301e5f0d9ae9733132173cc8
Author: yi.wu <[email protected]>
AuthorDate: Mon Mar 9 15:53:34 2020 +0800
[SPARK-31082][CORE] MapOutputTrackerMaster.getMapLocation should handle
last mapIndex correctly
### What changes were proposed in this pull request?
In `getMapLocation`, change the condition from `...endMapIndex <
statuses.length` to `...endMapIndex <= statuses.length`.
### Why are the changes needed?
`endMapIndex` is exclusive, we should include it when comparing to
`statuses.length`. Otherwise, we can't get the location for last mapIndex.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Updated existed test.
Closes #27850 from Ngone51/fix_getmaploction.
Authored-by: yi.wu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit ef51ff9dc8c220fcbed76cdd1783f58f400df48c)
Signed-off-by: Wenchen Fan <[email protected]>
---
core/src/main/scala/org/apache/spark/MapOutputTracker.scala | 5 +++--
.../spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 10 +++++++---
2 files changed, 10 insertions(+), 5 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index f229061..ec8621b 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -696,7 +696,7 @@ private[spark] class MapOutputTrackerMaster(
*
* @param dep shuffle dependency object
* @param startMapIndex the start map index
- * @param endMapIndex the end map index
+ * @param endMapIndex the end map index (exclusive)
* @return a sequence of locations where task runs.
*/
def getMapLocation(
@@ -707,7 +707,8 @@ private[spark] class MapOutputTrackerMaster(
val shuffleStatus = shuffleStatuses.get(dep.shuffleId).orNull
if (shuffleStatus != null) {
shuffleStatus.withMapStatuses { statuses =>
- if (startMapIndex < endMapIndex && (startMapIndex >= 0 && endMapIndex
< statuses.length)) {
+ if (startMapIndex < endMapIndex &&
+ (startMapIndex >= 0 && endMapIndex <= statuses.length)) {
val statusesPicked = statuses.slice(startMapIndex,
endMapIndex).filter(_ != null)
statusesPicked.map(_.location.host).toSeq
} else {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 25b1f89..94947a8 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -114,9 +114,13 @@ class AdaptiveQueryExecSuite
val numLocalReaders = collect(plan) {
case reader @ CustomShuffleReaderExec(_, _,
LOCAL_SHUFFLE_READER_DESCRIPTION) => reader
- }.length
-
- assert(numShuffles === (numLocalReaders + numShufflesWithoutLocalReader))
+ }
+ numLocalReaders.foreach { r =>
+ val rdd = r.execute()
+ val parts = rdd.partitions
+ assert(parts.forall(rdd.preferredLocations(_).nonEmpty))
+ }
+ assert(numShuffles === (numLocalReaders.length +
numShufflesWithoutLocalReader))
}
test("Change merge join to broadcast join") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]