Repository: spark
Updated Branches:
  refs/heads/master a634d66ce -> 889f6cc10


[SPARK-24143] filter empty blocks when convert mapstatus to (blockId, size) pair

## What changes were proposed in this pull request?

In current code(`MapOutputTracker.convertMapStatuses`), mapstatus are converted 
to (blockId, size) pair for all blocks – no matter the block is empty or not, 
which result in OOM when there are lots of consecutive empty blocks, especially 
when adaptive execution is enabled.

(blockId, size) pair is only used in `ShuffleBlockFetcherIterator` to control 
shuffle-read and only non-empty block request is sent. Can we just filter out 
the empty blocks in MapOutputTracker.convertMapStatuses and save memory?

## How was this patch tested?

not added yet.

Author: jinxing <[email protected]>

Closes #21212 from jinxing64/SPARK-24143.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/889f6cc1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/889f6cc1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/889f6cc1

Branch: refs/heads/master
Commit: 889f6cc10cbd7781df04f468674a61f0ac5a870b
Parents: a634d66
Author: jinxing <[email protected]>
Authored: Mon May 7 14:16:27 2018 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Mon May 7 14:16:27 2018 +0800

----------------------------------------------------------------------
 .../org/apache/spark/MapOutputTracker.scala     | 31 +++++++++--------
 .../storage/ShuffleBlockFetcherIterator.scala   | 35 ++++++++++++--------
 .../apache/spark/MapOutputTrackerSuite.scala    | 31 ++++++++++++++++-
 .../shuffle/BlockStoreShuffleReaderSuite.scala  |  2 +-
 .../ShuffleBlockFetcherIteratorSuite.scala      | 19 ++++++-----
 5 files changed, 80 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/889f6cc1/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala 
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 195fd4f..7364605 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.{ConcurrentHashMap, 
LinkedBlockingQueue, ThreadPoolE
 import java.util.zip.{GZIPInputStream, GZIPOutputStream}
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
+import scala.collection.mutable.{HashMap, HashSet, ListBuffer, Map}
 import scala.concurrent.{ExecutionContext, Future}
 import scala.concurrent.duration.Duration
 import scala.reflect.ClassTag
@@ -282,7 +282,7 @@ private[spark] abstract class MapOutputTracker(conf: 
SparkConf) extends Logging
 
   // For testing
   def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int)
-      : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
+      : Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = {
     getMapSizesByExecutorId(shuffleId, reduceId, reduceId + 1)
   }
 
@@ -296,7 +296,7 @@ private[spark] abstract class MapOutputTracker(conf: 
SparkConf) extends Logging
    *         describing the shuffle blocks that are stored at that block 
manager.
    */
   def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, 
endPartition: Int)
-      : Seq[(BlockManagerId, Seq[(BlockId, Long)])]
+      : Iterator[(BlockManagerId, Seq[(BlockId, Long)])]
 
   /**
    * Deletes map output status information for the specified shuffle stage.
@@ -632,9 +632,10 @@ private[spark] class MapOutputTrackerMaster(
     }
   }
 
+  // Get blocks sizes by executor Id. Note that zero-sized blocks are excluded 
in the result.
   // This method is only called in local-mode.
   def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, 
endPartition: Int)
-      : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
+      : Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = {
     logDebug(s"Fetching outputs for shuffle $shuffleId, partitions 
$startPartition-$endPartition")
     shuffleStatuses.get(shuffleId) match {
       case Some (shuffleStatus) =>
@@ -642,7 +643,7 @@ private[spark] class MapOutputTrackerMaster(
           MapOutputTracker.convertMapStatuses(shuffleId, startPartition, 
endPartition, statuses)
         }
       case None =>
-        Seq.empty
+        Iterator.empty
     }
   }
 
@@ -669,8 +670,9 @@ private[spark] class MapOutputTrackerWorker(conf: 
SparkConf) extends MapOutputTr
   /** Remembers which map output locations are currently being fetched on an 
executor. */
   private val fetching = new HashSet[Int]
 
+  // Get blocks sizes by executor Id. Note that zero-sized blocks are excluded 
in the result.
   override def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, 
endPartition: Int)
-      : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
+      : Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = {
     logDebug(s"Fetching outputs for shuffle $shuffleId, partitions 
$startPartition-$endPartition")
     val statuses = getStatuses(shuffleId)
     try {
@@ -841,6 +843,7 @@ private[spark] object MapOutputTracker extends Logging {
    * Given an array of map statuses and a range of map output partitions, 
returns a sequence that,
    * for each block manager ID, lists the shuffle block IDs and corresponding 
shuffle block sizes
    * stored at that block manager.
+   * Note that empty blocks are filtered in the result.
    *
    * If any of the statuses is null (indicating a missing location due to a 
failed mapper),
    * throws a FetchFailedException.
@@ -857,22 +860,24 @@ private[spark] object MapOutputTracker extends Logging {
       shuffleId: Int,
       startPartition: Int,
       endPartition: Int,
-      statuses: Array[MapStatus]): Seq[(BlockManagerId, Seq[(BlockId, Long)])] 
= {
+      statuses: Array[MapStatus]): Iterator[(BlockManagerId, Seq[(BlockId, 
Long)])] = {
     assert (statuses != null)
-    val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(BlockId, 
Long)]]
-    for ((status, mapId) <- statuses.zipWithIndex) {
+    val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId, 
Long)]]
+    for ((status, mapId) <- statuses.iterator.zipWithIndex) {
       if (status == null) {
         val errorMessage = s"Missing an output location for shuffle $shuffleId"
         logError(errorMessage)
         throw new MetadataFetchFailedException(shuffleId, startPartition, 
errorMessage)
       } else {
         for (part <- startPartition until endPartition) {
-          splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) +=
-            ((ShuffleBlockId(shuffleId, mapId, part), 
status.getSizeForBlock(part)))
+          val size = status.getSizeForBlock(part)
+          if (size != 0) {
+            splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) +=
+                ((ShuffleBlockId(shuffleId, mapId, part), size))
+          }
         }
       }
     }
-
-    splitsByAddress.toSeq
+    splitsByAddress.iterator
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/889f6cc1/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index dd9df74..6971efd 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -48,7 +48,9 @@ import org.apache.spark.util.io.ChunkedByteBufferOutputStream
  * @param blockManager [[BlockManager]] for reading local blocks
  * @param blocksByAddress list of blocks to fetch grouped by the 
[[BlockManagerId]].
  *                        For each block we also require the size (in bytes as 
a long field) in
- *                        order to throttle the memory usage.
+ *                        order to throttle the memory usage. Note that 
zero-sized blocks are
+ *                        already excluded, which happened in
+ *                        [[MapOutputTracker.convertMapStatuses]].
  * @param streamWrapper A function to wrap the returned input stream.
  * @param maxBytesInFlight max size (in bytes) of remote blocks to fetch at 
any given point.
  * @param maxReqsInFlight max number of remote requests to fetch blocks at any 
given point.
@@ -62,7 +64,7 @@ final class ShuffleBlockFetcherIterator(
     context: TaskContext,
     shuffleClient: ShuffleClient,
     blockManager: BlockManager,
-    blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
+    blocksByAddress: Iterator[(BlockManagerId, Seq[(BlockId, Long)])],
     streamWrapper: (BlockId, InputStream) => InputStream,
     maxBytesInFlight: Long,
     maxReqsInFlight: Int,
@@ -74,8 +76,8 @@ final class ShuffleBlockFetcherIterator(
   import ShuffleBlockFetcherIterator._
 
   /**
-   * Total number of blocks to fetch. This can be smaller than the total 
number of blocks
-   * in [[blocksByAddress]] because we filter out zero-sized blocks in 
[[initialize]].
+   * Total number of blocks to fetch. This should be equal to the total number 
of blocks
+   * in [[blocksByAddress]] because we already filter out zero-sized blocks in 
[[blocksByAddress]].
    *
    * This should equal localBlocks.size + remoteBlocks.size.
    */
@@ -267,13 +269,16 @@ final class ShuffleBlockFetcherIterator(
     // at most maxBytesInFlight in order to limit the amount of data in flight.
     val remoteRequests = new ArrayBuffer[FetchRequest]
 
-    // Tracks total number of blocks (including zero sized blocks)
-    var totalBlocks = 0
     for ((address, blockInfos) <- blocksByAddress) {
-      totalBlocks += blockInfos.size
       if (address.executorId == blockManager.blockManagerId.executorId) {
-        // Filter out zero-sized blocks
-        localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1)
+        blockInfos.find(_._2 <= 0) match {
+          case Some((blockId, size)) if size < 0 =>
+            throw new BlockException(blockId, "Negative block size " + size)
+          case Some((blockId, size)) if size == 0 =>
+            throw new BlockException(blockId, "Zero-sized blocks should be 
excluded.")
+          case None => // do nothing.
+        }
+        localBlocks ++= blockInfos.map(_._1)
         numBlocksToFetch += localBlocks.size
       } else {
         val iterator = blockInfos.iterator
@@ -281,14 +286,15 @@ final class ShuffleBlockFetcherIterator(
         var curBlocks = new ArrayBuffer[(BlockId, Long)]
         while (iterator.hasNext) {
           val (blockId, size) = iterator.next()
-          // Skip empty blocks
-          if (size > 0) {
+          if (size < 0) {
+            throw new BlockException(blockId, "Negative block size " + size)
+          } else if (size == 0) {
+            throw new BlockException(blockId, "Zero-sized blocks should be 
excluded.")
+          } else {
             curBlocks += ((blockId, size))
             remoteBlocks += blockId
             numBlocksToFetch += 1
             curRequestSize += size
-          } else if (size < 0) {
-            throw new BlockException(blockId, "Negative block size " + size)
           }
           if (curRequestSize >= targetRequestSize ||
               curBlocks.size >= maxBlocksInFlightPerAddress) {
@@ -306,7 +312,8 @@ final class ShuffleBlockFetcherIterator(
         }
       }
     }
-    logInfo(s"Getting $numBlocksToFetch non-empty blocks out of $totalBlocks 
blocks")
+    logInfo(s"Getting $numBlocksToFetch non-empty blocks including 
${localBlocks.size}" +
+        s" local blocks and ${remoteBlocks.size} remote blocks")
     remoteRequests
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/889f6cc1/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala 
b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 50b8ea7..21f481d 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -147,7 +147,7 @@ class MapOutputTrackerSuite extends SparkFunSuite {
     masterTracker.registerMapOutput(10, 0, MapStatus(
       BlockManagerId("a", "hostA", 1000), Array(1000L)))
     slaveTracker.updateEpoch(masterTracker.getEpoch)
-    assert(slaveTracker.getMapSizesByExecutorId(10, 0) ===
+    assert(slaveTracker.getMapSizesByExecutorId(10, 0).toSeq ===
       Seq((BlockManagerId("a", "hostA", 1000), ArrayBuffer((ShuffleBlockId(10, 
0, 0), size1000)))))
     assert(0 == masterTracker.getNumCachedSerializedBroadcast)
 
@@ -298,4 +298,33 @@ class MapOutputTrackerSuite extends SparkFunSuite {
     }
   }
 
+  test("zero-sized blocks should be excluded when getMapSizesByExecutorId") {
+    val rpcEnv = createRpcEnv("test")
+    val tracker = newTrackerMaster()
+    tracker.trackerEndpoint = 
rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
+      new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, conf))
+    tracker.registerShuffle(10, 2)
+
+    val size0 = MapStatus.decompressSize(MapStatus.compressSize(0L))
+    val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
+    val size10000 = MapStatus.decompressSize(MapStatus.compressSize(10000L))
+    tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 
1000),
+      Array(size0, size1000, size0, size10000)))
+    tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 
1000),
+      Array(size10000, size0, size1000, size0)))
+    assert(tracker.containsShuffle(10))
+    assert(tracker.getMapSizesByExecutorId(10, 0, 4).toSeq ===
+        Seq(
+          (BlockManagerId("a", "hostA", 1000),
+              Seq((ShuffleBlockId(10, 0, 1), size1000), (ShuffleBlockId(10, 0, 
3), size10000))),
+          (BlockManagerId("b", "hostB", 1000),
+              Seq((ShuffleBlockId(10, 1, 0), size10000), (ShuffleBlockId(10, 
1, 2), size1000)))
+        )
+    )
+
+    tracker.unregisterShuffle(10)
+    tracker.stop()
+    rpcEnv.shutdown()
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/889f6cc1/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala
 
b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala
index dba1172..2d8a83c 100644
--- 
a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala
@@ -108,7 +108,7 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite 
with LocalSparkContext
         val shuffleBlockId = ShuffleBlockId(shuffleId, mapId, reduceId)
         (shuffleBlockId, byteOutputStream.size().toLong)
       }
-      Seq((localBlockManagerId, shuffleBlockIdsAndSizes))
+      Seq((localBlockManagerId, shuffleBlockIdsAndSizes)).toIterator
     }
 
     // Create a mocked shuffle handle to pass into HashShuffleReader.

http://git-wip-us.apache.org/repos/asf/spark/blob/889f6cc1/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index 692ae3b..cefebfa 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -99,7 +99,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite 
with PrivateMethodT
     val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
       (localBmId, localBlocks.keys.map(blockId => (blockId, 
1.asInstanceOf[Long])).toSeq),
       (remoteBmId, remoteBlocks.keys.map(blockId => (blockId, 
1.asInstanceOf[Long])).toSeq)
-    )
+    ).toIterator
 
     val iterator = new ShuffleBlockFetcherIterator(
       TaskContext.empty(),
@@ -176,7 +176,7 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
     })
 
     val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
-      (remoteBmId, blocks.keys.map(blockId => (blockId, 
1.asInstanceOf[Long])).toSeq))
+      (remoteBmId, blocks.keys.map(blockId => (blockId, 
1.asInstanceOf[Long])).toSeq)).toIterator
 
     val taskContext = TaskContext.empty()
     val iterator = new ShuffleBlockFetcherIterator(
@@ -244,7 +244,7 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
     })
 
     val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
-      (remoteBmId, blocks.keys.map(blockId => (blockId, 
1.asInstanceOf[Long])).toSeq))
+      (remoteBmId, blocks.keys.map(blockId => (blockId, 
1.asInstanceOf[Long])).toSeq)).toIterator
 
     val taskContext = TaskContext.empty()
     val iterator = new ShuffleBlockFetcherIterator(
@@ -310,7 +310,7 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
     })
 
     val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
-      (remoteBmId, blocks.keys.map(blockId => (blockId, 
1.asInstanceOf[Long])).toSeq))
+      (remoteBmId, blocks.keys.map(blockId => (blockId, 
1.asInstanceOf[Long])).toSeq)).toIterator
 
     val taskContext = TaskContext.empty()
     val iterator = new ShuffleBlockFetcherIterator(
@@ -378,7 +378,7 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
     val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
       (localBmId, localBlockLengths),
       (remoteBmId, remoteBlockLengths)
-    )
+    ).toIterator
 
     val taskContext = TaskContext.empty()
     val iterator = new ShuffleBlockFetcherIterator(
@@ -437,7 +437,7 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
     })
 
     val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
-      (remoteBmId, blocks.keys.map(blockId => (blockId, 
1.asInstanceOf[Long])).toSeq))
+      (remoteBmId, blocks.keys.map(blockId => (blockId, 
1.asInstanceOf[Long])).toSeq)).toIterator
 
     val taskContext = TaskContext.empty()
     val iterator = new ShuffleBlockFetcherIterator(
@@ -495,7 +495,8 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
         }
       })
 
-    def fetchShuffleBlock(blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, 
Long)])]): Unit = {
+    def fetchShuffleBlock(
+        blocksByAddress: Iterator[(BlockManagerId, Seq[(BlockId, Long)])]): 
Unit = {
       // Set `maxBytesInFlight` and `maxReqsInFlight` to `Int.MaxValue`, so 
that during the
       // construction of `ShuffleBlockFetcherIterator`, all requests to fetch 
remote shuffle blocks
       // are issued. The `maxReqSizeShuffleToMem` is hard-coded as 200 here.
@@ -513,14 +514,14 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
     }
 
     val blocksByAddress1 = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
-      (remoteBmId, remoteBlocks.keys.map(blockId => (blockId, 100L)).toSeq))
+      (remoteBmId, remoteBlocks.keys.map(blockId => (blockId, 
100L)).toSeq)).toIterator
     fetchShuffleBlock(blocksByAddress1)
     // `maxReqSizeShuffleToMem` is 200, which is greater than the block size 
100, so don't fetch
     // shuffle block to disk.
     assert(tempFileManager == null)
 
     val blocksByAddress2 = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
-      (remoteBmId, remoteBlocks.keys.map(blockId => (blockId, 300L)).toSeq))
+      (remoteBmId, remoteBlocks.keys.map(blockId => (blockId, 
300L)).toSeq)).toIterator
     fetchShuffleBlock(blocksByAddress2)
     // `maxReqSizeShuffleToMem` is 200, which is smaller than the block size 
300, so fetch
     // shuffle block to disk.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to