Repository: spark
Updated Branches:
  refs/heads/branch-1.0 64316af5a -> 67bffd3c7


[SPARK-1112, 2156] (1.0 edition) Use correct akka frame size and overhead 
amounts.

SPARK-1112: This is a more conservative version of #1132 that doesn't change
around the actor system initialization on the executor. Instead we just
directly read the current frame size limit from the ActorSystem.

SPARK-2156: This uses the same fixe as in #1132.

Author: Patrick Wendell <[email protected]>

Closes #1172 from pwendell/akka-10-fix and squashes the following commits:

d56297e [Patrick Wendell] Set limit in LocalBackend to preserve test 
expectations
9f5ed19 [Patrick Wendell] [SPARK-1112, 2156] (1.0 edition) Use correct akka 
frame size and overhead amounts.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/67bffd3c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/67bffd3c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/67bffd3c

Branch: refs/heads/branch-1.0
Commit: 67bffd3c7ee8e9e3395e714e470459f09d19e66d
Parents: 64316af
Author: Patrick Wendell <[email protected]>
Authored: Sun Jun 22 19:31:15 2014 -0700
Committer: Aaron Davidson <[email protected]>
Committed: Sun Jun 22 19:31:15 2014 -0700

----------------------------------------------------------------------
 .../executor/CoarseGrainedExecutorBackend.scala |  8 ++++++--
 .../org/apache/spark/executor/Executor.scala    |  8 +++-----
 .../apache/spark/executor/ExecutorBackend.scala |  3 +++
 .../cluster/CoarseGrainedSchedulerBackend.scala |  2 +-
 .../spark/scheduler/local/LocalBackend.scala    |  6 +++++-
 .../scala/org/apache/spark/util/AkkaUtils.scala |  3 +++
 .../apache/spark/MapOutputTrackerSuite.scala    | 21 +++++++++++---------
 7 files changed, 33 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/67bffd3c/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 2279d77..70c1f4c 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -34,7 +34,8 @@ private[spark] class CoarseGrainedExecutorBackend(
     driverUrl: String,
     executorId: String,
     hostPort: String,
-    cores: Int)
+    cores: Int,
+    actorSystem: ActorSystem)
   extends Actor
   with ExecutorBackend
   with Logging {
@@ -94,6 +95,9 @@ private[spark] class CoarseGrainedExecutorBackend(
   override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
     driver ! StatusUpdate(executorId, taskId, state, data)
   }
+
+  override def akkaFrameSize() = actorSystem.settings.config.getBytes(
+    "akka.remote.netty.tcp.maximum-frame-size")
 }
 
 private[spark] object CoarseGrainedExecutorBackend {
@@ -113,7 +117,7 @@ private[spark] object CoarseGrainedExecutorBackend {
         val sparkHostPort = hostname + ":" + boundPort
         actorSystem.actorOf(
           Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId,
-            sparkHostPort, cores),
+            sparkHostPort, cores, actorSystem),
           name = "Executor")
         workerUrl.foreach {
           url =>

http://git-wip-us.apache.org/repos/asf/spark/blob/67bffd3c/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index baee7a2..214a8c8 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -97,10 +97,6 @@ private[spark] class Executor(
   private val urlClassLoader = createClassLoader()
   private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
 
-  // Akka's message frame size. If task result is bigger than this, we use the 
block manager
-  // to send the result back.
-  private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
-
   // Start worker thread pool
   val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch 
worker")
 
@@ -211,8 +207,10 @@ private[spark] class Executor(
           task.metrics.getOrElse(null))
         val serializedDirectResult = ser.serialize(directResult)
         logInfo("Serialized size of result for " + taskId + " is " + 
serializedDirectResult.limit)
+
         val serializedResult = {
-          if (serializedDirectResult.limit >= akkaFrameSize - 1024) {
+          if (serializedDirectResult.limit >= execBackend.akkaFrameSize() -
+              AkkaUtils.reservedSizeBytes) {
             logInfo("Storing result for " + taskId + " in local BlockManager")
             val blockId = TaskResultBlockId(taskId)
             env.blockManager.putBytes(

http://git-wip-us.apache.org/repos/asf/spark/blob/67bffd3c/core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala 
b/core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala
index 3d34960..43a15cd 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala
@@ -26,4 +26,7 @@ import org.apache.spark.TaskState.TaskState
  */
 private[spark] trait ExecutorBackend {
   def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer)
+
+  // Exists as a work around for SPARK-1112. This only exists in branch-1.x of 
Spark.
+  def akkaFrameSize(): Long = Long.MaxValue
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/67bffd3c/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index e47a060..08bc218 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -143,7 +143,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
       for (task <- tasks.flatten) {
         val ser = SparkEnv.get.closureSerializer.newInstance()
         val serializedTask = ser.serialize(task)
-        if (serializedTask.limit >= akkaFrameSize - 1024) {
+        if (serializedTask.limit >= akkaFrameSize - 
AkkaUtils.reservedSizeBytes) {
           val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
           scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>
             try {

http://git-wip-us.apache.org/repos/asf/spark/blob/67bffd3c/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala 
b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 43f0e18..a98c891 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -20,11 +20,11 @@ package org.apache.spark.scheduler.local
 import java.nio.ByteBuffer
 
 import akka.actor.{Actor, ActorRef, Props}
-
 import org.apache.spark.{Logging, SparkEnv, TaskState}
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.executor.{Executor, ExecutorBackend}
 import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, 
WorkerOffer}
+import org.apache.spark.util.AkkaUtils
 
 private case class ReviveOffers()
 
@@ -106,4 +106,8 @@ private[spark] class LocalBackend(scheduler: 
TaskSchedulerImpl, val totalCores:
   override def statusUpdate(taskId: Long, state: TaskState, serializedData: 
ByteBuffer) {
     localActor ! StatusUpdate(taskId, state, serializedData)
   }
+
+  // This limit is calculated only to preserve expected behavior in tests. In 
reality, since this
+  // backend sends messages over the existing actor system, there is no need 
to enforce a limit.
+  override def akkaFrameSize() = 
AkkaUtils.maxFrameSizeBytes(scheduler.sc.getConf)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/67bffd3c/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala 
b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index a8d12bb..9930c71 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -121,4 +121,7 @@ private[spark] object AkkaUtils extends Logging {
   def maxFrameSizeBytes(conf: SparkConf): Int = {
     conf.getInt("spark.akka.frameSize", 10) * 1024 * 1024
   }
+
+  /** Space reserved for extra data in an Akka message besides serialized task 
or task result. */
+  val reservedSizeBytes = 200 * 1024
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/67bffd3c/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala 
b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 6b2571c..1b64d49 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -182,7 +182,6 @@ class MapOutputTrackerSuite extends FunSuite with 
LocalSparkContext {
 
   test("remote fetch exceeds akka frame size") {
     val newConf = new SparkConf
-    newConf.set("spark.akka.frameSize", "1")
     newConf.set("spark.akka.askTimeout", "1") // Fail fast
 
     val masterTracker = new MapOutputTrackerMaster(conf)
@@ -191,14 +190,18 @@ class MapOutputTrackerSuite extends FunSuite with 
LocalSparkContext {
       new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem)
     val masterActor = actorRef.underlyingActor
 
-    // Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should 
throw exception.
-    // Note that the size is hand-selected here because map output statuses 
are compressed before
-    // being sent.
-    masterTracker.registerShuffle(20, 100)
-    (0 until 100).foreach { i =>
-      masterTracker.registerMapOutput(20, i, new MapStatus(
-        BlockManagerId("999", "mps", 1000, 0), Array.fill[Byte](4000000)(0)))
+    // Frame size should be 2 * the configured frame size, and 
MapOutputTrackerMasterActor should
+    // throw exception.
+    val shuffleId = 20
+    val numMaps = 2
+    val data = new Array[Byte](AkkaUtils.maxFrameSizeBytes(conf))
+    val random = new java.util.Random(0)
+    random.nextBytes(data) // Make it hard to compress.
+    masterTracker.registerShuffle(shuffleId, numMaps)
+    (0 until numMaps).foreach { i =>
+      masterTracker.registerMapOutput(shuffleId, i, new MapStatus(
+        BlockManagerId("999", "mps", 1000, 0), data))
     }
-    intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) }
+    intercept[SparkException] { 
masterActor.receive(GetMapOutputStatuses(shuffleId)) }
   }
 }

Reply via email to