This is an automated email from the ASF dual-hosted git repository.
yumwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b6aea1a8d99 [SPARK-39325][CORE] Improve MapOutputTracker
convertMapStatuses performance
b6aea1a8d99 is described below
commit b6aea1a8d99b3d99e91f7f195b23169d3d61b6a7
Author: Kun Wan <[email protected]>
AuthorDate: Sat Jun 11 18:16:43 2022 +0800
[SPARK-39325][CORE] Improve MapOutputTracker convertMapStatuses performance
### What changes were proposed in this pull request?
Optimize `MapOutputTracker.convertMapStatuses()` method.
### Why are the changes needed?
`MapOutputTracker.convertMapStatuses()` will be very slow if there are tens
of thousands MapStatuses and MergeStatuses.
Benchmark code:
```java
val benchmark = new Benchmark("MapStatuses Convert", 1, output = output)
val blockManagerNumber = 1000
val mapNumber = 50000
val shufflePartitions = 10000
val shuffleId: Int = 0
// First reduce task will fetch map data from startPartition to
endPartition
val startPartition = 0
val startMapIndex = 0
val endMapIndex = mapNumber
val blockManagers = Array.tabulate(blockManagerNumber) { i =>
BlockManagerId("a", "host" + i, 7337)
}
val mapStatuses: Array[MapStatus] = Array.tabulate(mapNumber) {
mapTaskId =>
HighlyCompressedMapStatus(
blockManagers(mapTaskId % blockManagerNumber),
Array.tabulate(shufflePartitions)(i => if (i % 50 == 0) 1 else 0),
mapTaskId)
}
val bitmap = new RoaringBitmap()
Range(0, 4000).foreach(bitmap.add(_))
val mergeStatuses = Array.tabulate(shufflePartitions) { part =>
MergeStatus(blockManagers(part % blockManagerNumber), shuffleId,
bitmap, 100)
}
Array(499, 999, 1499).foreach { endPartition =>
benchmark.addCase(
s"Num Maps: $mapNumber Fetch partitions:${endPartition -
startPartition + 1}",
numIters) { _ =>
MapOutputTracker.convertMapStatuses(
shuffleId,
startPartition,
endPartition,
mapStatuses,
startMapIndex,
endMapIndex,
Some(mergeStatuses))
}
}
```
Before this PR
```
================================================================================================
MapStatuses Convert Benchmark
================================================================================================
Java HotSpot(TM) 64-Bit Server VM 1.8.0_281-b09 on Mac OS X 10.15.7
Intel(R) Core(TM) i9-9980HK CPU 2.40GHz
MapStatuses Convert: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Num Maps: 50000 Fetch partitions:500 3393 3483
96 0.0 3393439257.0 1.0X
Num Maps: 50000 Fetch partitions:1000 6640 6772
121 0.0 6639654832.0 0.5X
Num Maps: 50000 Fetch partitions:1500 10035 10143
108 0.0 10035100069.0 0.3X
```
After this PR
```
================================================================================================
MapStatuses Convert Benchmark
================================================================================================
Java HotSpot(TM) 64-Bit Server VM 1.8.0_281-b09 on Mac OS X 10.15.7
Intel(R) Core(TM) i9-9980HK CPU 2.40GHz
MapStatuses Convert: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Num Maps: 50000 Fetch partitions:500 667 679
15 0.0 666562302.0 1.0X
Num Maps: 50000 Fetch partitions:1000 1285 1397
115 0.0 1284808865.0 0.5X
Num Maps: 50000 Fetch partitions:1500 2045 2068
32 0.0 2044951906.0 0.3X
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Exists UTs.
Closes #36709 from wankunde/convert_mapstatus.
Lead-authored-by: Kun Wan <[email protected]>
Co-authored-by: wankun <[email protected]>
Signed-off-by: Yuming Wang <[email protected]>
---
.../MapStatusesConvertBenchmark-results.txt | 13 +++
.../scala/org/apache/spark/MapOutputTracker.scala | 50 ++++++------
.../org/apache/spark/scheduler/MergeStatus.scala | 9 ---
.../apache/spark/MapStatusesConvertBenchmark.scala | 92 ++++++++++++++++++++++
4 files changed, 130 insertions(+), 34 deletions(-)
diff --git a/core/benchmarks/MapStatusesConvertBenchmark-results.txt
b/core/benchmarks/MapStatusesConvertBenchmark-results.txt
new file mode 100644
index 00000000000..f41401bbe2e
--- /dev/null
+++ b/core/benchmarks/MapStatusesConvertBenchmark-results.txt
@@ -0,0 +1,13 @@
+================================================================================================
+MapStatuses Convert Benchmark
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_332-b09 on Linux 5.13.0-1025-azure
+Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
+MapStatuses Convert: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------
+Num Maps: 50000 Fetch partitions:500 1330 1359
26 0.0 1329827185.0 1.0X
+Num Maps: 50000 Fetch partitions:1000 2648 2666
20 0.0 2647944453.0 0.5X
+Num Maps: 50000 Fetch partitions:1500 4155 4436
383 0.0 4154563448.0 0.3X
+
+
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index e6ed469250b..79cae483b22 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -1596,7 +1596,7 @@ private[spark] object MapOutputTracker extends Logging {
mapStatuses: Array[MapStatus],
startMapIndex : Int,
endMapIndex: Int,
- mergeStatuses: Option[Array[MergeStatus]] = None): MapSizesByExecutorId
= {
+ mergeStatusesOpt: Option[Array[MergeStatus]] = None):
MapSizesByExecutorId = {
assert (mapStatuses != null)
val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId,
Long, Int)]]
var enableBatchFetch = true
@@ -1608,39 +1608,39 @@ private[spark] object MapOutputTracker extends Logging {
// TODO: SPARK-35036: Instead of reading map blocks in case of AQE with
Push based shuffle,
// TODO: improve push based shuffle to read partial merged blocks
satisfying the start/end
// TODO: map indexes
- if (mergeStatuses.exists(_.exists(_ != null)) && startMapIndex == 0
+ if (mergeStatusesOpt.exists(_.exists(_ != null)) && startMapIndex == 0
&& endMapIndex == mapStatuses.length) {
enableBatchFetch = false
logDebug(s"Disable shuffle batch fetch as Push based shuffle is enabled
for $shuffleId.")
- // We have MergeStatus and full range of mapIds are requested so return
a merged block.
- val numMaps = mapStatuses.length
- mergeStatuses.get.zipWithIndex.slice(startPartition,
endPartition).foreach {
- case (mergeStatus, partId) =>
- val remainingMapStatuses = if (mergeStatus != null &&
mergeStatus.totalSize > 0) {
- // If MergeStatus is available for the given partition, add
location of the
- // pre-merged shuffle partition for this partition ID. Here we
create a
- // ShuffleMergedBlockId to indicate this is a merged shuffle block.
- splitsByAddress.getOrElseUpdate(mergeStatus.location,
ListBuffer()) +=
- ((ShuffleMergedBlockId(shuffleId, mergeStatus.shuffleMergeId,
partId),
- mergeStatus.totalSize, SHUFFLE_PUSH_MAP_ID))
- // For the "holes" in this pre-merged shuffle partition, i.e.,
unmerged mapper
- // shuffle partition blocks, fetch the original map produced
shuffle partition blocks
- val mapStatusesWithIndex = mapStatuses.zipWithIndex
- mergeStatus.getMissingMaps(numMaps).map(mapStatusesWithIndex)
- } else {
- // If MergeStatus is not available for the given partition, fall
back to
- // fetching all the original mapper shuffle partition blocks
- mapStatuses.zipWithIndex.toSeq
- }
- // Add location for the mapper shuffle partition blocks
- for ((mapStatus, mapIndex) <- remainingMapStatuses) {
- validateStatus(mapStatus, shuffleId, partId)
+ val mergeStatuses = mergeStatusesOpt.get
+ for (partId <- startPartition until endPartition) {
+ val mergeStatus = mergeStatuses(partId)
+ if (mergeStatus != null && mergeStatus.totalSize > 0) {
+ // If MergeStatus is available for the given partition, add location
of the
+ // pre-merged shuffle partition for this partition ID. Here we
create a
+ // ShuffleMergedBlockId to indicate this is a merged shuffle block.
+ splitsByAddress.getOrElseUpdate(mergeStatus.location, ListBuffer())
+=
+ ((ShuffleMergedBlockId(shuffleId, mergeStatus.shuffleMergeId,
partId),
+ mergeStatus.totalSize, SHUFFLE_PUSH_MAP_ID))
+ }
+ }
+
+ // Add location for the mapper shuffle partition blocks
+ for ((mapStatus, mapIndex) <- mapStatuses.iterator.zipWithIndex) {
+ validateStatus(mapStatus, shuffleId, startPartition)
+ for (partId <- startPartition until endPartition) {
+ // For the "holes" in this pre-merged shuffle partition, i.e.,
unmerged mapper
+ // shuffle partition blocks, fetch the original map produced shuffle
partition blocks
+ val mergeStatus = mergeStatuses(partId)
+ if (mergeStatus == null || mergeStatus.totalSize == 0 ||
+ !mergeStatus.tracker.contains(mapIndex)) {
val size = mapStatus.getSizeForBlock(partId)
if (size != 0) {
splitsByAddress.getOrElseUpdate(mapStatus.location,
ListBuffer()) +=
((ShuffleBlockId(shuffleId, mapStatus.mapId, partId), size,
mapIndex))
}
}
+ }
}
} else {
val iter = mapStatuses.iterator.zipWithIndex
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MergeStatus.scala
b/core/src/main/scala/org/apache/spark/scheduler/MergeStatus.scala
index 6d160264538..850756b50a3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MergeStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MergeStatus.scala
@@ -58,15 +58,6 @@ private[spark] class MergeStatus(
def tracker: RoaringBitmap = mapTracker
- /**
- * Get the list of mapper IDs for missing mapper partition blocks that are
not merged.
- * The reducer will use this information to decide which shuffle partition
blocks to
- * fetch in the original way.
- */
- def getMissingMaps(numMaps: Int): Seq[Int] = {
- (0 until numMaps).filter(i => !mapTracker.contains(i))
- }
-
/**
* Get the number of missing map outputs for missing mapper partition blocks
that are not merged.
*/
diff --git
a/core/src/test/scala/org/apache/spark/MapStatusesConvertBenchmark.scala
b/core/src/test/scala/org/apache/spark/MapStatusesConvertBenchmark.scala
new file mode 100644
index 00000000000..7f25c86497f
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/MapStatusesConvertBenchmark.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.roaringbitmap.RoaringBitmap
+
+import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
+import org.apache.spark.scheduler.{HighlyCompressedMapStatus, MapStatus,
MergeStatus}
+import org.apache.spark.storage.BlockManagerId
+
+/**
+ * Benchmark to measure performance for converting mapStatuses and
mergeStatuses.
+ * To run this benchmark:
+ * {{{
+ * 1. without sbt:
+ * bin/spark-submit --class <this class> --jars <spark core test jar>
+ * 2. build/sbt "core/test:runMain <this class>"
+ * 3. generate result:
+ * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/test:runMain <this
class>"
+ * Results will be written to
"benchmarks/MapStatusesConvertBenchmark-results.txt".
+ * }}}
+ * */
+object MapStatusesConvertBenchmark extends BenchmarkBase {
+
+ private def convertMapStatus(numIters: Int): Unit = {
+
+ val benchmark = new Benchmark("MapStatuses Convert", 1, output = output)
+
+ val blockManagerNumber = 1000
+ val mapNumber = 50000
+ val shufflePartitions = 10000
+
+ val shuffleId: Int = 0
+ // First reduce task will fetch map data from startPartition to
endPartition
+ val startPartition = 0
+ val startMapIndex = 0
+ val endMapIndex = mapNumber
+ val blockManagers = Array.tabulate(blockManagerNumber) { i =>
+ BlockManagerId("a", "host" + i, 7337)
+ }
+ val mapStatuses: Array[MapStatus] = Array.tabulate(mapNumber) { mapTaskId
=>
+ HighlyCompressedMapStatus(
+ blockManagers(mapTaskId % blockManagerNumber),
+ Array.tabulate(shufflePartitions)(i => if (i % 50 == 0) 1 else 0),
+ mapTaskId)
+ }
+ val bitmap = new RoaringBitmap()
+ Range(0, 4000).foreach(bitmap.add(_))
+ val mergeStatuses = Array.tabulate(shufflePartitions) { part =>
+ MergeStatus(blockManagers(part % blockManagerNumber), shuffleId, bitmap,
100)
+ }
+
+ Array(499, 999, 1499).foreach { endPartition =>
+ benchmark.addCase(
+ s"Num Maps: $mapNumber Fetch partitions:${endPartition -
startPartition + 1}",
+ numIters) { _ =>
+ MapOutputTracker.convertMapStatuses(
+ shuffleId,
+ startPartition,
+ endPartition,
+ mapStatuses,
+ startMapIndex,
+ endMapIndex,
+ Some(mergeStatuses))
+ }
+ }
+
+ benchmark.run()
+ }
+
+ override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+ val numIters = 3
+ runBenchmark("MapStatuses Convert Benchmark") {
+ convertMapStatus(numIters)
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]