Repository: spark Updated Branches: refs/heads/branch-1.0 64316af5a -> 67bffd3c7
[SPARK-1112, 2156] (1.0 edition) Use correct akka frame size and overhead amounts. SPARK-1112: This is a more conservative version of #1132 that doesn't change around the actor system initialization on the executor. Instead we just directly read the current frame size limit from the ActorSystem. SPARK-2156: This uses the same fixe as in #1132. Author: Patrick Wendell <[email protected]> Closes #1172 from pwendell/akka-10-fix and squashes the following commits: d56297e [Patrick Wendell] Set limit in LocalBackend to preserve test expectations 9f5ed19 [Patrick Wendell] [SPARK-1112, 2156] (1.0 edition) Use correct akka frame size and overhead amounts. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/67bffd3c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/67bffd3c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/67bffd3c Branch: refs/heads/branch-1.0 Commit: 67bffd3c7ee8e9e3395e714e470459f09d19e66d Parents: 64316af Author: Patrick Wendell <[email protected]> Authored: Sun Jun 22 19:31:15 2014 -0700 Committer: Aaron Davidson <[email protected]> Committed: Sun Jun 22 19:31:15 2014 -0700 ---------------------------------------------------------------------- .../executor/CoarseGrainedExecutorBackend.scala | 8 ++++++-- .../org/apache/spark/executor/Executor.scala | 8 +++----- .../apache/spark/executor/ExecutorBackend.scala | 3 +++ .../cluster/CoarseGrainedSchedulerBackend.scala | 2 +- .../spark/scheduler/local/LocalBackend.scala | 6 +++++- .../scala/org/apache/spark/util/AkkaUtils.scala | 3 +++ .../apache/spark/MapOutputTrackerSuite.scala | 21 +++++++++++--------- 7 files changed, 33 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/67bffd3c/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 2279d77..70c1f4c 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -34,7 +34,8 @@ private[spark] class CoarseGrainedExecutorBackend( driverUrl: String, executorId: String, hostPort: String, - cores: Int) + cores: Int, + actorSystem: ActorSystem) extends Actor with ExecutorBackend with Logging { @@ -94,6 +95,9 @@ private[spark] class CoarseGrainedExecutorBackend( override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { driver ! StatusUpdate(executorId, taskId, state, data) } + + override def akkaFrameSize() = actorSystem.settings.config.getBytes( + "akka.remote.netty.tcp.maximum-frame-size") } private[spark] object CoarseGrainedExecutorBackend { @@ -113,7 +117,7 @@ private[spark] object CoarseGrainedExecutorBackend { val sparkHostPort = hostname + ":" + boundPort actorSystem.actorOf( Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, - sparkHostPort, cores), + sparkHostPort, cores, actorSystem), name = "Executor") workerUrl.foreach { url => http://git-wip-us.apache.org/repos/asf/spark/blob/67bffd3c/core/src/main/scala/org/apache/spark/executor/Executor.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index baee7a2..214a8c8 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -97,10 +97,6 @@ private[spark] class Executor( private val urlClassLoader = createClassLoader() private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) - // Akka's message frame size. If task result is bigger than this, we use the block manager - // to send the result back. - private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) - // Start worker thread pool val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker") @@ -211,8 +207,10 @@ private[spark] class Executor( task.metrics.getOrElse(null)) val serializedDirectResult = ser.serialize(directResult) logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit) + val serializedResult = { - if (serializedDirectResult.limit >= akkaFrameSize - 1024) { + if (serializedDirectResult.limit >= execBackend.akkaFrameSize() - + AkkaUtils.reservedSizeBytes) { logInfo("Storing result for " + taskId + " in local BlockManager") val blockId = TaskResultBlockId(taskId) env.blockManager.putBytes( http://git-wip-us.apache.org/repos/asf/spark/blob/67bffd3c/core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala index 3d34960..43a15cd 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala @@ -26,4 +26,7 @@ import org.apache.spark.TaskState.TaskState */ private[spark] trait ExecutorBackend { def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) + + // Exists as a work around for SPARK-1112. This only exists in branch-1.x of Spark. + def akkaFrameSize(): Long = Long.MaxValue } http://git-wip-us.apache.org/repos/asf/spark/blob/67bffd3c/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index e47a060..08bc218 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -143,7 +143,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A for (task <- tasks.flatten) { val ser = SparkEnv.get.closureSerializer.newInstance() val serializedTask = ser.serialize(task) - if (serializedTask.limit >= akkaFrameSize - 1024) { + if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { val taskSetId = scheduler.taskIdToTaskSetId(task.taskId) scheduler.activeTaskSets.get(taskSetId).foreach { taskSet => try { http://git-wip-us.apache.org/repos/asf/spark/blob/67bffd3c/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index 43f0e18..a98c891 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -20,11 +20,11 @@ package org.apache.spark.scheduler.local import java.nio.ByteBuffer import akka.actor.{Actor, ActorRef, Props} - import org.apache.spark.{Logging, SparkEnv, TaskState} import org.apache.spark.TaskState.TaskState import org.apache.spark.executor.{Executor, ExecutorBackend} import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer} +import org.apache.spark.util.AkkaUtils private case class ReviveOffers() @@ -106,4 +106,8 @@ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) { localActor ! StatusUpdate(taskId, state, serializedData) } + + // This limit is calculated only to preserve expected behavior in tests. In reality, since this + // backend sends messages over the existing actor system, there is no need to enforce a limit. + override def akkaFrameSize() = AkkaUtils.maxFrameSizeBytes(scheduler.sc.getConf) } http://git-wip-us.apache.org/repos/asf/spark/blob/67bffd3c/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index a8d12bb..9930c71 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -121,4 +121,7 @@ private[spark] object AkkaUtils extends Logging { def maxFrameSizeBytes(conf: SparkConf): Int = { conf.getInt("spark.akka.frameSize", 10) * 1024 * 1024 } + + /** Space reserved for extra data in an Akka message besides serialized task or task result. */ + val reservedSizeBytes = 200 * 1024 } http://git-wip-us.apache.org/repos/asf/spark/blob/67bffd3c/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 6b2571c..1b64d49 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -182,7 +182,6 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { test("remote fetch exceeds akka frame size") { val newConf = new SparkConf - newConf.set("spark.akka.frameSize", "1") newConf.set("spark.akka.askTimeout", "1") // Fail fast val masterTracker = new MapOutputTrackerMaster(conf) @@ -191,14 +190,18 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem) val masterActor = actorRef.underlyingActor - // Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should throw exception. - // Note that the size is hand-selected here because map output statuses are compressed before - // being sent. - masterTracker.registerShuffle(20, 100) - (0 until 100).foreach { i => - masterTracker.registerMapOutput(20, i, new MapStatus( - BlockManagerId("999", "mps", 1000, 0), Array.fill[Byte](4000000)(0))) + // Frame size should be 2 * the configured frame size, and MapOutputTrackerMasterActor should + // throw exception. + val shuffleId = 20 + val numMaps = 2 + val data = new Array[Byte](AkkaUtils.maxFrameSizeBytes(conf)) + val random = new java.util.Random(0) + random.nextBytes(data) // Make it hard to compress. + masterTracker.registerShuffle(shuffleId, numMaps) + (0 until numMaps).foreach { i => + masterTracker.registerMapOutput(shuffleId, i, new MapStatus( + BlockManagerId("999", "mps", 1000, 0), data)) } - intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) } + intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(shuffleId)) } } }
