Repository: spark
Updated Branches:
  refs/heads/master 210404a56 -> 6b79bfb42


[SPARK-3613] Record only average block size in MapStatus for large stages

This changes the way we send MapStatus from executors back to driver for large 
stages (>2000 tasks). For large stages, we no longer send one byte per block. 
Instead, we just send the average block size.

This makes large jobs (tens of thousands of tasks) much more reliable since the 
driver no longer sends huge amount of data.

Author: Reynold Xin <[email protected]>

Closes #2470 from rxin/mapstatus and squashes the following commits:

822ff54 [Reynold Xin] Code review feedback.
3b86f56 [Reynold Xin] Added MimaExclude.
f89d182 [Reynold Xin] Fixed a bug in MapStatus
6a0401c [Reynold Xin] [SPARK-3613] Record only average block size in MapStatus 
for large stages.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6b79bfb4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6b79bfb4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6b79bfb4

Branch: refs/heads/master
Commit: 6b79bfb42580b6bd4c4cd99fb521534a94150693
Parents: 210404a
Author: Reynold Xin <[email protected]>
Authored: Mon Sep 29 22:56:22 2014 -0700
Committer: Reynold Xin <[email protected]>
Committed: Mon Sep 29 22:56:22 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/MapOutputTracker.scala     |  29 +----
 .../org/apache/spark/scheduler/MapStatus.scala  | 119 +++++++++++++++++--
 .../spark/shuffle/hash/HashShuffleWriter.scala  |   8 +-
 .../spark/shuffle/sort/SortShuffleWriter.scala  |   3 +-
 .../apache/spark/MapOutputTrackerSuite.scala    |  66 ++++------
 .../spark/scheduler/DAGSchedulerSuite.scala     |   2 +-
 .../apache/spark/scheduler/MapStatusSuite.scala |  92 ++++++++++++++
 .../org/apache/spark/util/AkkaUtilsSuite.scala  |  14 +--
 project/MimaExcludes.scala                      |   5 +-
 9 files changed, 240 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6b79bfb4/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala 
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index f92189b..4cb0bd4 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -349,7 +349,6 @@ private[spark] class MapOutputTrackerWorker(conf: 
SparkConf) extends MapOutputTr
 }
 
 private[spark] object MapOutputTracker {
-  private val LOG_BASE = 1.1
 
   // Serialize an array of map output locations into an efficient byte format 
so that we can send
   // it to reduce tasks. We do this by compressing the serialized bytes using 
GZIP. They will
@@ -385,34 +384,8 @@ private[spark] object MapOutputTracker {
           throw new MetadataFetchFailedException(
             shuffleId, reduceId, "Missing an output location for shuffle " + 
shuffleId)
         } else {
-          (status.location, decompressSize(status.compressedSizes(reduceId)))
+          (status.location, status.getSizeForBlock(reduceId))
         }
     }
   }
-
-  /**
-   * Compress a size in bytes to 8 bits for efficient reporting of map output 
sizes.
-   * We do this by encoding the log base 1.1 of the size as an integer, which 
can support
-   * sizes up to 35 GB with at most 10% error.
-   */
-  def compressSize(size: Long): Byte = {
-    if (size == 0) {
-      0
-    } else if (size <= 1L) {
-      1
-    } else {
-      math.min(255, math.ceil(math.log(size) / 
math.log(LOG_BASE)).toInt).toByte
-    }
-  }
-
-  /**
-   * Decompress an 8-bit encoded block size, using the reverse operation of 
compressSize.
-   */
-  def decompressSize(compressedSize: Byte): Long = {
-    if (compressedSize == 0) {
-      0
-    } else {
-      math.pow(LOG_BASE, compressedSize & 0xFF).toLong
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6b79bfb4/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index d3f63ff..e25096e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -24,22 +24,123 @@ import org.apache.spark.storage.BlockManagerId
 /**
  * Result returned by a ShuffleMapTask to a scheduler. Includes the block 
manager address that the
  * task ran on as well as the sizes of outputs for each reducer, for passing 
on to the reduce tasks.
- * The map output sizes are compressed using MapOutputTracker.compressSize.
  */
-private[spark] class MapStatus(var location: BlockManagerId, var 
compressedSizes: Array[Byte])
-  extends Externalizable {
+private[spark] sealed trait MapStatus {
+  /** Location where this task was run. */
+  def location: BlockManagerId
 
-  def this() = this(null, null)  // For deserialization only
+  /** Estimated size for the reduce block, in bytes. */
+  def getSizeForBlock(reduceId: Int): Long
+}
+
+
+private[spark] object MapStatus {
+
+  def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
+    if (uncompressedSizes.length > 2000) {
+      new HighlyCompressedMapStatus(loc, uncompressedSizes)
+    } else {
+      new CompressedMapStatus(loc, uncompressedSizes)
+    }
+  }
+
+  private[this] val LOG_BASE = 1.1
+
+  /**
+   * Compress a size in bytes to 8 bits for efficient reporting of map output 
sizes.
+   * We do this by encoding the log base 1.1 of the size as an integer, which 
can support
+   * sizes up to 35 GB with at most 10% error.
+   */
+  def compressSize(size: Long): Byte = {
+    if (size == 0) {
+      0
+    } else if (size <= 1L) {
+      1
+    } else {
+      math.min(255, math.ceil(math.log(size) / 
math.log(LOG_BASE)).toInt).toByte
+    }
+  }
+
+  /**
+   * Decompress an 8-bit encoded block size, using the reverse operation of 
compressSize.
+   */
+  def decompressSize(compressedSize: Byte): Long = {
+    if (compressedSize == 0) {
+      0
+    } else {
+      math.pow(LOG_BASE, compressedSize & 0xFF).toLong
+    }
+  }
+}
+
+
+/**
+ * A [[MapStatus]] implementation that tracks the size of each block. Size for 
each block is
+ * represented using a single byte.
+ *
+ * @param loc location where the task is being executed.
+ * @param compressedSizes size of the blocks, indexed by reduce partition id.
+ */
+private[spark] class CompressedMapStatus(
+    private[this] var loc: BlockManagerId,
+    private[this] var compressedSizes: Array[Byte])
+  extends MapStatus with Externalizable {
+
+  protected def this() = this(null, null.asInstanceOf[Array[Byte]])  // For 
deserialization only
+
+  def this(loc: BlockManagerId, uncompressedSizes: Array[Long]) {
+    this(loc, uncompressedSizes.map(MapStatus.compressSize))
+  }
 
-  def writeExternal(out: ObjectOutput) {
-    location.writeExternal(out)
+  override def location: BlockManagerId = loc
+
+  override def getSizeForBlock(reduceId: Int): Long = {
+    MapStatus.decompressSize(compressedSizes(reduceId))
+  }
+
+  override def writeExternal(out: ObjectOutput): Unit = {
+    loc.writeExternal(out)
     out.writeInt(compressedSizes.length)
     out.write(compressedSizes)
   }
 
-  def readExternal(in: ObjectInput) {
-    location = BlockManagerId(in)
-    compressedSizes = new Array[Byte](in.readInt())
+  override def readExternal(in: ObjectInput): Unit = {
+    loc = BlockManagerId(in)
+    val len = in.readInt()
+    compressedSizes = new Array[Byte](len)
     in.readFully(compressedSizes)
   }
 }
+
+
+/**
+ * A [[MapStatus]] implementation that only stores the average size of the 
blocks.
+ *
+ * @param loc location where the task is being executed.
+ * @param avgSize average size of all the blocks
+ */
+private[spark] class HighlyCompressedMapStatus(
+    private[this] var loc: BlockManagerId,
+    private[this] var avgSize: Long)
+  extends MapStatus with Externalizable {
+
+  def this(loc: BlockManagerId, uncompressedSizes: Array[Long]) {
+    this(loc, uncompressedSizes.sum / uncompressedSizes.length)
+  }
+
+  protected def this() = this(null, 0L)  // For deserialization only
+
+  override def location: BlockManagerId = loc
+
+  override def getSizeForBlock(reduceId: Int): Long = avgSize
+
+  override def writeExternal(out: ObjectOutput): Unit = {
+    loc.writeExternal(out)
+    out.writeLong(avgSize)
+  }
+
+  override def readExternal(in: ObjectInput): Unit = {
+    loc = BlockManagerId(in)
+    avgSize = in.readLong()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6b79bfb4/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala 
b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
index 4b9454d..746ed33 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala
@@ -103,13 +103,11 @@ private[spark] class HashShuffleWriter[K, V](
 
   private def commitWritesAndBuildStatus(): MapStatus = {
     // Commit the writes. Get the size of each bucket block (total block size).
-    val compressedSizes = shuffle.writers.map { writer: BlockObjectWriter =>
+    val sizes: Array[Long] = shuffle.writers.map { writer: BlockObjectWriter =>
       writer.commitAndClose()
-      val size = writer.fileSegment().length
-      MapOutputTracker.compressSize(size)
+      writer.fileSegment().length
     }
-
-    new MapStatus(blockManager.blockManagerId, compressedSizes)
+    MapStatus(blockManager.blockManagerId, sizes)
   }
 
   private def revertWrites(): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/6b79bfb4/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala 
b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index 89a78d6..927481b 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -70,8 +70,7 @@ private[spark] class SortShuffleWriter[K, V, C](
     val partitionLengths = sorter.writePartitionedFile(blockId, context, 
outputFile)
     shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths)
 
-    mapStatus = new MapStatus(blockManager.blockManagerId,
-      partitionLengths.map(MapOutputTracker.compressSize))
+    mapStatus = MapStatus(blockManager.blockManagerId, partitionLengths)
   }
 
   /** Close this writer, passing along whether the map completed */

http://git-wip-us.apache.org/repos/asf/spark/blob/6b79bfb4/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala 
b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 5369169..1fef79a 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -23,32 +23,13 @@ import akka.actor._
 import akka.testkit.TestActorRef
 import org.scalatest.FunSuite
 
-import org.apache.spark.scheduler.MapStatus
+import org.apache.spark.scheduler.{CompressedMapStatus, MapStatus}
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.util.AkkaUtils
 
 class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
   private val conf = new SparkConf
-  test("compressSize") {
-    assert(MapOutputTracker.compressSize(0L) === 0)
-    assert(MapOutputTracker.compressSize(1L) === 1)
-    assert(MapOutputTracker.compressSize(2L) === 8)
-    assert(MapOutputTracker.compressSize(10L) === 25)
-    assert((MapOutputTracker.compressSize(1000000L) & 0xFF) === 145)
-    assert((MapOutputTracker.compressSize(1000000000L) & 0xFF) === 218)
-    // This last size is bigger than we can encode in a byte, so check that we 
just return 255
-    assert((MapOutputTracker.compressSize(1000000000000000000L) & 0xFF) === 
255)
-  }
-
-  test("decompressSize") {
-    assert(MapOutputTracker.decompressSize(0) === 0)
-    for (size <- Seq(2L, 10L, 100L, 50000L, 1000000L, 1000000000L)) {
-      val size2 = 
MapOutputTracker.decompressSize(MapOutputTracker.compressSize(size))
-      assert(size2 >= 0.99 * size && size2 <= 1.11 * size,
-        "size " + size + " decompressed to " + size2 + ", which is out of 
range")
-    }
-  }
 
   test("master start and stop") {
     val actorSystem = ActorSystem("test")
@@ -65,14 +46,12 @@ class MapOutputTrackerSuite extends FunSuite with 
LocalSparkContext {
       actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, 
conf)))
     tracker.registerShuffle(10, 2)
     assert(tracker.containsShuffle(10))
-    val compressedSize1000 = MapOutputTracker.compressSize(1000L)
-    val compressedSize10000 = MapOutputTracker.compressSize(10000L)
-    val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
-    val size10000 = MapOutputTracker.decompressSize(compressedSize10000)
-    tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", 
"hostA", 1000),
-        Array(compressedSize1000, compressedSize10000)))
-    tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", 
"hostB", 1000),
-        Array(compressedSize10000, compressedSize1000)))
+    val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
+    val size10000 = MapStatus.decompressSize(MapStatus.compressSize(10000L))
+    tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 
1000),
+        Array(1000L, 10000L)))
+    tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 
1000),
+        Array(10000L, 1000L)))
     val statuses = tracker.getServerStatuses(10, 0)
     assert(statuses.toSeq === Seq((BlockManagerId("a", "hostA", 1000), 
size1000),
                                   (BlockManagerId("b", "hostB", 1000), 
size10000)))
@@ -84,11 +63,11 @@ class MapOutputTrackerSuite extends FunSuite with 
LocalSparkContext {
     val tracker = new MapOutputTrackerMaster(conf)
     tracker.trackerActor = actorSystem.actorOf(Props(new 
MapOutputTrackerMasterActor(tracker, conf)))
     tracker.registerShuffle(10, 2)
-    val compressedSize1000 = MapOutputTracker.compressSize(1000L)
-    val compressedSize10000 = MapOutputTracker.compressSize(10000L)
-    tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", 
"hostA", 1000),
+    val compressedSize1000 = MapStatus.compressSize(1000L)
+    val compressedSize10000 = MapStatus.compressSize(10000L)
+    tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 
1000),
       Array(compressedSize1000, compressedSize10000)))
-    tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", 
"hostB", 1000),
+    tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 
1000),
       Array(compressedSize10000, compressedSize1000)))
     assert(tracker.containsShuffle(10))
     assert(tracker.getServerStatuses(10, 0).nonEmpty)
@@ -103,11 +82,11 @@ class MapOutputTrackerSuite extends FunSuite with 
LocalSparkContext {
     tracker.trackerActor =
       actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, 
conf)))
     tracker.registerShuffle(10, 2)
-    val compressedSize1000 = MapOutputTracker.compressSize(1000L)
-    val compressedSize10000 = MapOutputTracker.compressSize(10000L)
-    tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", 
"hostA", 1000),
+    val compressedSize1000 = MapStatus.compressSize(1000L)
+    val compressedSize10000 = MapStatus.compressSize(10000L)
+    tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 
1000),
         Array(compressedSize1000, compressedSize1000, compressedSize1000)))
-    tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", 
"hostB", 1000),
+    tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 
1000),
         Array(compressedSize10000, compressedSize1000, compressedSize1000)))
 
     // As if we had two simultaneous fetch failures
@@ -142,10 +121,9 @@ class MapOutputTrackerSuite extends FunSuite with 
LocalSparkContext {
     slaveTracker.updateEpoch(masterTracker.getEpoch)
     intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
 
-    val compressedSize1000 = MapOutputTracker.compressSize(1000L)
-    val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
-    masterTracker.registerMapOutput(10, 0, new MapStatus(
-      BlockManagerId("a", "hostA", 1000), Array(compressedSize1000)))
+    val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
+    masterTracker.registerMapOutput(10, 0, MapStatus(
+      BlockManagerId("a", "hostA", 1000), Array(1000L)))
     masterTracker.incrementEpoch()
     slaveTracker.updateEpoch(masterTracker.getEpoch)
     assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
@@ -173,8 +151,8 @@ class MapOutputTrackerSuite extends FunSuite with 
LocalSparkContext {
 
     // Frame size should be ~123B, and no exception should be thrown
     masterTracker.registerShuffle(10, 1)
-    masterTracker.registerMapOutput(10, 0, new MapStatus(
-      BlockManagerId("88", "mph", 1000), Array.fill[Byte](10)(0)))
+    masterTracker.registerMapOutput(10, 0, MapStatus(
+      BlockManagerId("88", "mph", 1000), Array.fill[Long](10)(0)))
     masterActor.receive(GetMapOutputStatuses(10))
   }
 
@@ -194,8 +172,8 @@ class MapOutputTrackerSuite extends FunSuite with 
LocalSparkContext {
     // being sent.
     masterTracker.registerShuffle(20, 100)
     (0 until 100).foreach { i =>
-      masterTracker.registerMapOutput(20, i, new MapStatus(
-        BlockManagerId("999", "mps", 1000), Array.fill[Byte](4000000)(0)))
+      masterTracker.registerMapOutput(20, i, new CompressedMapStatus(
+        BlockManagerId("999", "mps", 1000), Array.fill[Long](4000000)(0)))
     }
     intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6b79bfb4/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index aa73469..a2e4f71 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -740,7 +740,7 @@ class DAGSchedulerSuite extends 
TestKit(ActorSystem("DAGSchedulerSuite")) with F
   }
 
   private def makeMapStatus(host: String, reduces: Int): MapStatus =
-   new MapStatus(makeBlockManagerId(host), Array.fill[Byte](reduces)(2))
+    MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(2))
 
   private def makeBlockManagerId(host: String): BlockManagerId =
     BlockManagerId("exec-" + host, host, 12345)

http://git-wip-us.apache.org/repos/asf/spark/blob/6b79bfb4/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
new file mode 100644
index 0000000..79e04f0
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.storage.BlockManagerId
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkConf
+import org.apache.spark.serializer.JavaSerializer
+
+
+class MapStatusSuite extends FunSuite {
+
+  test("compressSize") {
+    assert(MapStatus.compressSize(0L) === 0)
+    assert(MapStatus.compressSize(1L) === 1)
+    assert(MapStatus.compressSize(2L) === 8)
+    assert(MapStatus.compressSize(10L) === 25)
+    assert((MapStatus.compressSize(1000000L) & 0xFF) === 145)
+    assert((MapStatus.compressSize(1000000000L) & 0xFF) === 218)
+    // This last size is bigger than we can encode in a byte, so check that we 
just return 255
+    assert((MapStatus.compressSize(1000000000000000000L) & 0xFF) === 255)
+  }
+
+  test("decompressSize") {
+    assert(MapStatus.decompressSize(0) === 0)
+    for (size <- Seq(2L, 10L, 100L, 50000L, 1000000L, 1000000000L)) {
+      val size2 = MapStatus.decompressSize(MapStatus.compressSize(size))
+      assert(size2 >= 0.99 * size && size2 <= 1.11 * size,
+        "size " + size + " decompressed to " + size2 + ", which is out of 
range")
+    }
+  }
+
+  test("large tasks should use " + classOf[HighlyCompressedMapStatus].getName) 
{
+    val sizes = Array.fill[Long](2001)(150L)
+    val status = MapStatus(null, sizes)
+    assert(status.isInstanceOf[HighlyCompressedMapStatus])
+    assert(status.getSizeForBlock(10) === 150L)
+    assert(status.getSizeForBlock(50) === 150L)
+    assert(status.getSizeForBlock(99) === 150L)
+    assert(status.getSizeForBlock(2000) === 150L)
+  }
+
+  test(classOf[HighlyCompressedMapStatus].getName + ": estimated size is 
within 10%") {
+    val sizes = Array.tabulate[Long](50) { i => i.toLong }
+    val loc = BlockManagerId("a", "b", 10)
+    val status = MapStatus(loc, sizes)
+    val ser = new JavaSerializer(new SparkConf)
+    val buf = ser.newInstance().serialize(status)
+    val status1 = ser.newInstance().deserialize[MapStatus](buf)
+    assert(status1.location == loc)
+    for (i <- 0 until sizes.length) {
+      // make sure the estimated size is within 10% of the input; note that we 
skip the very small
+      // sizes because the compression is very lossy there.
+      val estimate = status1.getSizeForBlock(i)
+      if (estimate > 100) {
+        assert(math.abs(estimate - sizes(i)) * 10 <= sizes(i),
+          s"incorrect estimated size $estimate, original was ${sizes(i)}")
+      }
+    }
+  }
+
+  test(classOf[HighlyCompressedMapStatus].getName + ": estimated size should 
be the average size") {
+    val sizes = Array.tabulate[Long](3000) { i => i.toLong }
+    val avg = sizes.sum / sizes.length
+    val loc = BlockManagerId("a", "b", 10)
+    val status = MapStatus(loc, sizes)
+    val ser = new JavaSerializer(new SparkConf)
+    val buf = ser.newInstance().serialize(status)
+    val status1 = ser.newInstance().deserialize[MapStatus](buf)
+    assert(status1.location == loc)
+    for (i <- 0 until 3000) {
+      val estimate = status1.getSizeForBlock(i)
+      assert(estimate === avg)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6b79bfb4/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
index 76bf4cf..7bca171 100644
--- a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
@@ -106,10 +106,9 @@ class AkkaUtilsSuite extends FunSuite with 
LocalSparkContext {
     masterTracker.incrementEpoch()
     slaveTracker.updateEpoch(masterTracker.getEpoch)
 
-    val compressedSize1000 = MapOutputTracker.compressSize(1000L)
-    val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
-    masterTracker.registerMapOutput(10, 0, new MapStatus(
-      BlockManagerId("a", "hostA", 1000), Array(compressedSize1000)))
+    val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
+    masterTracker.registerMapOutput(10, 0,
+      MapStatus(BlockManagerId("a", "hostA", 1000), Array(1000L)))
     masterTracker.incrementEpoch()
     slaveTracker.updateEpoch(masterTracker.getEpoch)
 
@@ -157,10 +156,9 @@ class AkkaUtilsSuite extends FunSuite with 
LocalSparkContext {
     masterTracker.incrementEpoch()
     slaveTracker.updateEpoch(masterTracker.getEpoch)
 
-    val compressedSize1000 = MapOutputTracker.compressSize(1000L)
-    val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
-    masterTracker.registerMapOutput(10, 0, new MapStatus(
-      BlockManagerId("a", "hostA", 1000), Array(compressedSize1000)))
+    val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
+    masterTracker.registerMapOutput(10, 0, MapStatus(
+      BlockManagerId("a", "hostA", 1000), Array(1000L)))
     masterTracker.incrementEpoch()
     slaveTracker.updateEpoch(masterTracker.getEpoch)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6b79bfb4/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 1adfaa1..4076ebc 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -45,7 +45,10 @@ object MimaExcludes {
             ProblemFilters.exclude[MissingMethodProblem](
               
"org.apache.spark.mllib.stat.MultivariateStatisticalSummary.normL1"),
             ProblemFilters.exclude[MissingMethodProblem](
-              
"org.apache.spark.mllib.stat.MultivariateStatisticalSummary.normL2")
+              
"org.apache.spark.mllib.stat.MultivariateStatisticalSummary.normL2"),
+            // MapStatus should be private[spark]
+            ProblemFilters.exclude[IncompatibleTemplateDefProblem](
+              "org.apache.spark.scheduler.MapStatus")
           )
 
         case v if v.startsWith("1.1") =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to