Repository: spark
Updated Branches:
refs/heads/master 4a4c3dc9c -> 2e139eed3
[SPARK-17931] Eliminate unnecessary task (de) serialization
In the existing code, there are three layers of serialization
involved in sending a task from the scheduler to an executor:
- A Task object is serialized
- The Task object is copied to a byte buffer that also
contains serialized information about any additional JARs,
files, and Properties needed for the task to execute. This
byte buffer is stored as the member variable serializedTask
in the TaskDescription class.
- The TaskDescription is serialized (in addition to the serialized
task + JARs, the TaskDescription class contains the task ID and
other metadata) and sent in a LaunchTask message.
While it *is* necessary to have two layers of serialization, so that
the JAR, file, and Property info can be deserialized prior to
deserializing the Task object, the third layer of deserialization is
unnecessary. This commit eliminates a layer of serialization by moving
the JARs, files, and Properties into the TaskDescription class.
This commit also serializes the Properties manually (by traversing the map),
as is done with the JARs and files, which reduces the final serialized size.
Unit tests
This is a simpler alternative to the approach proposed in #15505.
shivaram and I did some benchmarking of this and #15505 on a 20-machine
m2.4xlarge EC2 machines (160 cores). We ran ~30 trials of code [1] (a very
simple job with 10K tasks per stage) and measured the average time per stage:
Before this change: 2490ms
With this change: 2345 ms (so ~6% improvement over the baseline)
With witgo's approach in #15505: 2046 ms (~18% improvement over baseline)
The reason that #15505 has a more significant improvement is that it also moves
the serialization from the TaskSchedulerImpl thread to the
CoarseGrainedSchedulerBackend thread. I added that functionality on top of this
change, and got almost the same improvement [1] as #15505 (average of 2103ms).
I think we should decouple these two changes, both so we have some record of
the improvement form each individual improvement, and because this change is
more about simplifying the code base (the improvement is negligible) while the
other is about performance improvement. The plan, currently, is to merge this
PR and then merge the remaining part of #15505 that moves serialization.
[1] The reason the improvement wasn't quite as good as with #15505 when we ran
the benchmarks is almost certainly because, at the point when we ran the
benchmarks, I hadn't updated the code to manually serialize the Properties
(instead the code was using Java's default serialization for the Properties
object, whereas #15505 manually serialized the Properties). This PR has since
been updated to manually serialize the Properties, just like the other maps.
Author: Kay Ousterhout <[email protected]>
Closes #16053 from kayousterhout/SPARK-17931.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2e139eed
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2e139eed
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2e139eed
Branch: refs/heads/master
Commit: 2e139eed3194c7b8814ff6cf007d4e8a874c1e4d
Parents: 4a4c3dc
Author: Kay Ousterhout <[email protected]>
Authored: Fri Jan 6 10:48:00 2017 -0600
Committer: Imran Rashid <[email protected]>
Committed: Fri Jan 6 10:48:08 2017 -0600
----------------------------------------------------------------------
.../executor/CoarseGrainedExecutorBackend.scala | 5 +-
.../org/apache/spark/executor/Executor.scala | 39 +++----
.../scala/org/apache/spark/scheduler/Task.scala | 86 ---------------
.../spark/scheduler/TaskDescription.scala | 107 +++++++++++++++++--
.../apache/spark/scheduler/TaskSetManager.scala | 15 ++-
.../cluster/CoarseGrainedSchedulerBackend.scala | 7 +-
.../scheduler/local/LocalSchedulerBackend.scala | 3 +-
.../apache/spark/executor/ExecutorSuite.scala | 26 +++--
.../spark/scheduler/TaskDescriptionSuite.scala | 69 ++++++++++++
.../spark/executor/MesosExecutorBackend.scala | 9 +-
.../MesosFineGrainedSchedulerBackend.scala | 2 +-
.../cluster/mesos/MesosTaskLaunchData.scala | 51 ---------
.../MesosFineGrainedSchedulerBackendSuite.scala | 23 +++-
.../mesos/MesosTaskLaunchDataSuite.scala | 36 -------
14 files changed, 241 insertions(+), 237 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/2e139eed/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 92a2790..4a38560 100644
---
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -92,10 +92,9 @@ private[spark] class CoarseGrainedExecutorBackend(
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
- val taskDesc = ser.deserialize[TaskDescription](data.value)
+ val taskDesc = TaskDescription.decode(data.value)
logInfo("Got assigned task " + taskDesc.taskId)
- executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber =
taskDesc.attemptNumber,
- taskDesc.name, taskDesc.serializedTask)
+ executor.launchTask(this, taskDesc)
}
case KillTask(taskId, _, interruptThread) =>
http://git-wip-us.apache.org/repos/asf/spark/blob/2e139eed/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 3346f6d..789198f 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -26,7 +26,7 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import javax.annotation.concurrent.GuardedBy
import scala.collection.JavaConverters._
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
import scala.util.control.NonFatal
import org.apache.spark._
@@ -34,7 +34,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.rpc.RpcTimeout
-import org.apache.spark.scheduler.{AccumulableInfo, DirectTaskResult,
IndirectTaskResult, Task}
+import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task,
TaskDescription}
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
import org.apache.spark.util._
@@ -148,15 +148,9 @@ private[spark] class Executor(
startDriverHeartbeater()
- def launchTask(
- context: ExecutorBackend,
- taskId: Long,
- attemptNumber: Int,
- taskName: String,
- serializedTask: ByteBuffer): Unit = {
- val tr = new TaskRunner(context, taskId = taskId, attemptNumber =
attemptNumber, taskName,
- serializedTask)
- runningTasks.put(taskId, tr)
+ def launchTask(context: ExecutorBackend, taskDescription: TaskDescription):
Unit = {
+ val tr = new TaskRunner(context, taskDescription)
+ runningTasks.put(taskDescription.taskId, tr)
threadPool.execute(tr)
}
@@ -212,13 +206,12 @@ private[spark] class Executor(
class TaskRunner(
execBackend: ExecutorBackend,
- val taskId: Long,
- val attemptNumber: Int,
- taskName: String,
- serializedTask: ByteBuffer)
+ private val taskDescription: TaskDescription)
extends Runnable {
+ val taskId = taskDescription.taskId
val threadName = s"Executor task launch worker for task $taskId"
+ private val taskName = taskDescription.name
/** Whether this task has been killed. */
@volatile private var killed = false
@@ -287,16 +280,14 @@ private[spark] class Executor(
startGCTime = computeTotalGcTime()
try {
- val (taskFiles, taskJars, taskProps, taskBytes) =
- Task.deserializeWithDependencies(serializedTask)
-
// Must be set before updateDependencies() is called, in case fetching
dependencies
// requires access to properties contained within (e.g. for access
control).
- Executor.taskDeserializationProps.set(taskProps)
+ Executor.taskDeserializationProps.set(taskDescription.properties)
- updateDependencies(taskFiles, taskJars)
- task = ser.deserialize[Task[Any]](taskBytes,
Thread.currentThread.getContextClassLoader)
- task.localProperties = taskProps
+ updateDependencies(taskDescription.addedFiles,
taskDescription.addedJars)
+ task = ser.deserialize[Task[Any]](
+ taskDescription.serializedTask,
Thread.currentThread.getContextClassLoader)
+ task.localProperties = taskDescription.properties
task.setTaskMemoryManager(taskMemoryManager)
// If this task has been killed before we deserialized it, let's quit
now. Otherwise,
@@ -321,7 +312,7 @@ private[spark] class Executor(
val value = try {
val res = task.run(
taskAttemptId = taskId,
- attemptNumber = attemptNumber,
+ attemptNumber = taskDescription.attemptNumber,
metricsSystem = env.metricsSystem)
threwException = false
res
@@ -637,7 +628,7 @@ private[spark] class Executor(
* Download any missing dependencies if we receive a new set of files and
JARs from the
* SparkContext. Also adds any new JARs we fetched to the class loader.
*/
- private def updateDependencies(newFiles: HashMap[String, Long], newJars:
HashMap[String, Long]) {
+ private def updateDependencies(newFiles: Map[String, Long], newJars:
Map[String, Long]) {
lazy val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
synchronized {
// Fetch missing dependencies
http://git-wip-us.apache.org/repos/asf/spark/blob/2e139eed/core/src/main/scala/org/apache/spark/scheduler/Task.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 5becca6..51976f6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -215,89 +215,3 @@ private[spark] abstract class Task[T](
}
}
}
-
-/**
- * Handles transmission of tasks and their dependencies, because this can be
slightly tricky. We
- * need to send the list of JARs and files added to the SparkContext with each
task to ensure that
- * worker nodes find out about it, but we can't make it part of the Task
because the user's code in
- * the task might depend on one of the JARs. Thus we serialize each task as
multiple objects, by
- * first writing out its dependencies.
- */
-private[spark] object Task {
- /**
- * Serialize a task and the current app dependencies (files and JARs added
to the SparkContext)
- */
- def serializeWithDependencies(
- task: Task[_],
- currentFiles: mutable.Map[String, Long],
- currentJars: mutable.Map[String, Long],
- serializer: SerializerInstance)
- : ByteBuffer = {
-
- val out = new ByteBufferOutputStream(4096)
- val dataOut = new DataOutputStream(out)
-
- // Write currentFiles
- dataOut.writeInt(currentFiles.size)
- for ((name, timestamp) <- currentFiles) {
- dataOut.writeUTF(name)
- dataOut.writeLong(timestamp)
- }
-
- // Write currentJars
- dataOut.writeInt(currentJars.size)
- for ((name, timestamp) <- currentJars) {
- dataOut.writeUTF(name)
- dataOut.writeLong(timestamp)
- }
-
- // Write the task properties separately so it is available before full
task deserialization.
- val propBytes = Utils.serialize(task.localProperties)
- dataOut.writeInt(propBytes.length)
- dataOut.write(propBytes)
-
- // Write the task itself and finish
- dataOut.flush()
- val taskBytes = serializer.serialize(task)
- Utils.writeByteBuffer(taskBytes, out)
- out.close()
- out.toByteBuffer
- }
-
- /**
- * Deserialize the list of dependencies in a task serialized with
serializeWithDependencies,
- * and return the task itself as a serialized ByteBuffer. The caller can
then update its
- * ClassLoaders and deserialize the task.
- *
- * @return (taskFiles, taskJars, taskProps, taskBytes)
- */
- def deserializeWithDependencies(serializedTask: ByteBuffer)
- : (HashMap[String, Long], HashMap[String, Long], Properties, ByteBuffer) =
{
-
- val in = new ByteBufferInputStream(serializedTask)
- val dataIn = new DataInputStream(in)
-
- // Read task's files
- val taskFiles = new HashMap[String, Long]()
- val numFiles = dataIn.readInt()
- for (i <- 0 until numFiles) {
- taskFiles(dataIn.readUTF()) = dataIn.readLong()
- }
-
- // Read task's JARs
- val taskJars = new HashMap[String, Long]()
- val numJars = dataIn.readInt()
- for (i <- 0 until numJars) {
- taskJars(dataIn.readUTF()) = dataIn.readLong()
- }
-
- val propLength = dataIn.readInt()
- val propBytes = new Array[Byte](propLength)
- dataIn.readFully(propBytes, 0, propLength)
- val taskProps = Utils.deserialize[Properties](propBytes)
-
- // Create a sub-buffer for the rest of the data, which is the serialized
Task object
- val subBuffer = serializedTask.slice() // ByteBufferInputStream will have
read just up to task
- (taskFiles, taskJars, taskProps, subBuffer)
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/2e139eed/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
----------------------------------------------------------------------
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
index 45c742c..78aa5c4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
@@ -17,13 +17,31 @@
package org.apache.spark.scheduler
+import java.io.{DataInputStream, DataOutputStream}
import java.nio.ByteBuffer
+import java.util.Properties
-import org.apache.spark.util.SerializableBuffer
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{HashMap, Map}
+
+import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream,
Utils}
/**
* Description of a task that gets passed onto executors to be executed,
usually created by
* `TaskSetManager.resourceOffer`.
+ *
+ * TaskDescriptions and the associated Task need to be serialized carefully
for two reasons:
+ *
+ * (1) When a TaskDescription is received by an Executor, the Executor
needs to first get the
+ * list of JARs and files and add these to the classpath, and set the
properties, before
+ * deserializing the Task object (serializedTask). This is why the
Properties are included
+ * in the TaskDescription, even though they're also in the serialized
task.
+ * (2) Because a TaskDescription is serialized and sent to an executor for
each task, efficient
+ * serialization (both in terms of serialization time and serialized
buffer size) is
+ * important. For this reason, we serialize TaskDescriptions ourselves
with the
+ * TaskDescription.encode and TaskDescription.decode methods. This
results in a smaller
+ * serialized size because it avoids serializing unnecessary fields in
the Map objects
+ * (which can introduce significant overhead when the maps are small).
*/
private[spark] class TaskDescription(
val taskId: Long,
@@ -31,13 +49,88 @@ private[spark] class TaskDescription(
val executorId: String,
val name: String,
val index: Int, // Index within this task's TaskSet
- _serializedTask: ByteBuffer)
- extends Serializable {
+ val addedFiles: Map[String, Long],
+ val addedJars: Map[String, Long],
+ val properties: Properties,
+ val serializedTask: ByteBuffer) {
+
+ override def toString: String = "TaskDescription(TID=%d,
index=%d)".format(taskId, index)
+}
- // Because ByteBuffers are not serializable, wrap the task in a
SerializableBuffer
- private val buffer = new SerializableBuffer(_serializedTask)
+private[spark] object TaskDescription {
+ private def serializeStringLongMap(map: Map[String, Long], dataOut:
DataOutputStream): Unit = {
+ dataOut.writeInt(map.size)
+ for ((key, value) <- map) {
+ dataOut.writeUTF(key)
+ dataOut.writeLong(value)
+ }
+ }
- def serializedTask: ByteBuffer = buffer.value
+ def encode(taskDescription: TaskDescription): ByteBuffer = {
+ val bytesOut = new ByteBufferOutputStream(4096)
+ val dataOut = new DataOutputStream(bytesOut)
- override def toString: String = "TaskDescription(TID=%d,
index=%d)".format(taskId, index)
+ dataOut.writeLong(taskDescription.taskId)
+ dataOut.writeInt(taskDescription.attemptNumber)
+ dataOut.writeUTF(taskDescription.executorId)
+ dataOut.writeUTF(taskDescription.name)
+ dataOut.writeInt(taskDescription.index)
+
+ // Write files.
+ serializeStringLongMap(taskDescription.addedFiles, dataOut)
+
+ // Write jars.
+ serializeStringLongMap(taskDescription.addedJars, dataOut)
+
+ // Write properties.
+ dataOut.writeInt(taskDescription.properties.size())
+ taskDescription.properties.asScala.foreach { case (key, value) =>
+ dataOut.writeUTF(key)
+ dataOut.writeUTF(value)
+ }
+
+ // Write the task. The task is already serialized, so write it directly to
the byte buffer.
+ Utils.writeByteBuffer(taskDescription.serializedTask, bytesOut)
+
+ dataOut.close()
+ bytesOut.close()
+ bytesOut.toByteBuffer
+ }
+
+ private def deserializeStringLongMap(dataIn: DataInputStream):
HashMap[String, Long] = {
+ val map = new HashMap[String, Long]()
+ val mapSize = dataIn.readInt()
+ for (i <- 0 until mapSize) {
+ map(dataIn.readUTF()) = dataIn.readLong()
+ }
+ map
+ }
+
+ def decode(byteBuffer: ByteBuffer): TaskDescription = {
+ val dataIn = new DataInputStream(new ByteBufferInputStream(byteBuffer))
+ val taskId = dataIn.readLong()
+ val attemptNumber = dataIn.readInt()
+ val executorId = dataIn.readUTF()
+ val name = dataIn.readUTF()
+ val index = dataIn.readInt()
+
+ // Read files.
+ val taskFiles = deserializeStringLongMap(dataIn)
+
+ // Read jars.
+ val taskJars = deserializeStringLongMap(dataIn)
+
+ // Read properties.
+ val properties = new Properties()
+ val numProperties = dataIn.readInt()
+ for (i <- 0 until numProperties) {
+ properties.setProperty(dataIn.readUTF(), dataIn.readUTF())
+ }
+
+ // Create a sub-buffer for the serialized task into its own buffer (to be
deserialized later).
+ val serializedTask = byteBuffer.slice()
+
+ new TaskDescription(taskId, attemptNumber, executorId, name, index,
taskFiles, taskJars,
+ properties, serializedTask)
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/2e139eed/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 3756c21..c7ff13c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -446,9 +446,8 @@ private[spark] class TaskSetManager(
lastLaunchTime = curTime
}
// Serialize and return the task
- val startTime = clock.getTimeMillis()
val serializedTask: ByteBuffer = try {
- Task.serializeWithDependencies(task, sched.sc.addedFiles,
sched.sc.addedJars, ser)
+ ser.serialize(task)
} catch {
// If the task cannot be serialized, then there's no point to
re-attempt the task,
// as it will always fail. So just abort the whole task-set.
@@ -475,8 +474,16 @@ private[spark] class TaskSetManager(
s"partition ${task.partitionId}, $taskLocality,
${serializedTask.limit} bytes)")
sched.dagScheduler.taskStarted(task, info)
- new TaskDescription(taskId = taskId, attemptNumber = attemptNum,
execId,
- taskName, index, serializedTask)
+ new TaskDescription(
+ taskId,
+ attemptNum,
+ execId,
+ taskName,
+ index,
+ sched.sc.addedFiles,
+ sched.sc.addedJars,
+ task.localProperties,
+ serializedTask)
}
} else {
None
http://git-wip-us.apache.org/repos/asf/spark/blob/2e139eed/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 3452487..31575c0 100644
---
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -98,11 +98,6 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
// Executors that have been lost, but for which we don't yet know the real
exit reason.
protected val executorsPendingLossReason = new HashSet[String]
- // If this DriverEndpoint is changed to support multiple threads,
- // then this may need to be changed so that we don't share the serializer
- // instance across threads
- private val ser = SparkEnv.get.closureSerializer.newInstance()
-
protected val addressToExecutorId = new HashMap[RpcAddress, String]
private val reviveThread =
@@ -249,7 +244,7 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
// Launch tasks returned by a set of resource offers
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
- val serializedTask = ser.serialize(task)
+ val serializedTask = TaskDescription.encode(task)
if (serializedTask.limit >= maxRpcMessageSize) {
scheduler.taskIdToTaskSetManager.get(task.taskId).foreach {
taskSetMgr =>
try {
http://git-wip-us.apache.org/repos/asf/spark/blob/2e139eed/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
----------------------------------------------------------------------
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
index 7a73e8e..625f998 100644
---
a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
+++
b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
@@ -84,8 +84,7 @@ private[spark] class LocalEndpoint(
val offers = IndexedSeq(new WorkerOffer(localExecutorId,
localExecutorHostname, freeCores))
for (task <- scheduler.resourceOffers(offers).flatten) {
freeCores -= scheduler.CPUS_PER_TASK
- executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber
= task.attemptNumber,
- task.name, task.serializedTask)
+ executor.launchTask(executorBackend, task)
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/2e139eed/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
index 742500d..f94baaa 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
@@ -18,9 +18,10 @@
package org.apache.spark.executor
import java.nio.ByteBuffer
+import java.util.Properties
import java.util.concurrent.CountDownLatch
-import scala.collection.mutable.HashMap
+import scala.collection.mutable.Map
import org.mockito.Matchers._
import org.mockito.Mockito.{mock, when}
@@ -32,7 +33,7 @@ import org.apache.spark.TaskState.TaskState
import org.apache.spark.memory.MemoryManager
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc.RpcEnv
-import org.apache.spark.scheduler.{FakeTask, Task}
+import org.apache.spark.scheduler.{FakeTask, TaskDescription}
import org.apache.spark.serializer.JavaSerializer
class ExecutorSuite extends SparkFunSuite {
@@ -52,13 +53,18 @@ class ExecutorSuite extends SparkFunSuite {
when(mockEnv.memoryManager).thenReturn(mockMemoryManager)
when(mockEnv.closureSerializer).thenReturn(serializer)
val fakeTaskMetrics =
serializer.newInstance().serialize(TaskMetrics.registered).array()
-
- val serializedTask =
- Task.serializeWithDependencies(
- new FakeTask(0, 0, Nil, fakeTaskMetrics),
- HashMap[String, Long](),
- HashMap[String, Long](),
- serializer.newInstance())
+ val serializedTask = serializer.newInstance().serialize(
+ new FakeTask(0, 0, Nil, fakeTaskMetrics))
+ val taskDescription = new TaskDescription(
+ taskId = 0,
+ attemptNumber = 0,
+ executorId = "",
+ name = "",
+ index = 0,
+ addedFiles = Map[String, Long](),
+ addedJars = Map[String, Long](),
+ properties = new Properties,
+ serializedTask)
// we use latches to force the program to run in this order:
// +-----------------------------+---------------------------------------+
@@ -108,7 +114,7 @@ class ExecutorSuite extends SparkFunSuite {
try {
executor = new Executor("id", "localhost", mockEnv, userClassPath = Nil,
isLocal = true)
// the task will be launched in a dedicated worker thread
- executor.launchTask(mockExecutorBackend, 0, 0, "", serializedTask)
+ executor.launchTask(mockExecutorBackend, taskDescription)
executorSuiteHelper.latch1.await()
// we know the task will be started, but not yet deserialized, because
of the latches we
http://git-wip-us.apache.org/repos/asf/spark/blob/2e139eed/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala
----------------------------------------------------------------------
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala
new file mode 100644
index 0000000..9f1fe05
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.nio.ByteBuffer
+import java.util.Properties
+
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.SparkFunSuite
+
+class TaskDescriptionSuite extends SparkFunSuite {
+ test("encoding and then decoding a TaskDescription results in the same
TaskDescription") {
+ val originalFiles = new HashMap[String, Long]()
+ originalFiles.put("fileUrl1", 1824)
+ originalFiles.put("fileUrl2", 2)
+
+ val originalJars = new HashMap[String, Long]()
+ originalJars.put("jar1", 3)
+
+ val originalProperties = new Properties()
+ originalProperties.put("property1", "18")
+ originalProperties.put("property2", "test value")
+
+ // Create a dummy byte buffer for the task.
+ val taskBuffer = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4))
+
+ val originalTaskDescription = new TaskDescription(
+ taskId = 1520589,
+ attemptNumber = 2,
+ executorId = "testExecutor",
+ name = "task for test",
+ index = 19,
+ originalFiles,
+ originalJars,
+ originalProperties,
+ taskBuffer
+ )
+
+ val serializedTaskDescription =
TaskDescription.encode(originalTaskDescription)
+ val decodedTaskDescription =
TaskDescription.decode(serializedTaskDescription)
+
+ // Make sure that all of the fields in the decoded task description match
the original.
+ assert(decodedTaskDescription.taskId === originalTaskDescription.taskId)
+ assert(decodedTaskDescription.attemptNumber ===
originalTaskDescription.attemptNumber)
+ assert(decodedTaskDescription.executorId ===
originalTaskDescription.executorId)
+ assert(decodedTaskDescription.name === originalTaskDescription.name)
+ assert(decodedTaskDescription.index === originalTaskDescription.index)
+ assert(decodedTaskDescription.addedFiles.equals(originalFiles))
+ assert(decodedTaskDescription.addedJars.equals(originalJars))
+
assert(decodedTaskDescription.properties.equals(originalTaskDescription.properties))
+ assert(decodedTaskDescription.serializedTask.equals(taskBuffer))
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/2e139eed/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
----------------------------------------------------------------------
diff --git
a/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
b/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
index ee9149c..b252539 100644
---
a/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
+++
b/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
@@ -29,7 +29,8 @@ import org.apache.spark.{SparkConf, SparkEnv, TaskState}
import org.apache.spark.TaskState
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
-import org.apache.spark.scheduler.cluster.mesos.{MesosSchedulerUtils,
MesosTaskLaunchData}
+import org.apache.spark.scheduler.TaskDescription
+import org.apache.spark.scheduler.cluster.mesos.MesosSchedulerUtils
import org.apache.spark.util.Utils
private[spark] class MesosExecutorBackend
@@ -84,14 +85,12 @@ private[spark] class MesosExecutorBackend
}
override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
- val taskId = taskInfo.getTaskId.getValue.toLong
- val taskData = MesosTaskLaunchData.fromByteString(taskInfo.getData)
+ val taskDescription =
TaskDescription.decode(taskInfo.getData.asReadOnlyByteBuffer())
if (executor == null) {
logError("Received launchTask but executor was null")
} else {
SparkHadoopUtil.get.runAsSparkUser { () =>
- executor.launchTask(this, taskId = taskId, attemptNumber =
taskData.attemptNumber,
- taskInfo.getName, taskData.serializedTask)
+ executor.launchTask(this, taskDescription)
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/2e139eed/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index 779ffb5..7e56191 100644
---
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -351,7 +351,7 @@ private[spark] class MesosFineGrainedSchedulerBackend(
.setExecutor(executorInfo)
.setName(task.name)
.addAllResources(cpuResources.asJava)
- .setData(MesosTaskLaunchData(task.serializedTask,
task.attemptNumber).toByteString)
+ .setData(ByteString.copyFrom(TaskDescription.encode(task)))
.build()
(taskInfo, finalResources.asJava)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/2e139eed/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala
----------------------------------------------------------------------
diff --git
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala
deleted file mode 100644
index 8370b61..0000000
---
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import java.nio.ByteBuffer
-
-import org.apache.mesos.protobuf.ByteString
-
-import org.apache.spark.internal.Logging
-
-/**
- * Wrapper for serializing the data sent when launching Mesos tasks.
- */
-private[spark] case class MesosTaskLaunchData(
- serializedTask: ByteBuffer,
- attemptNumber: Int) extends Logging {
-
- def toByteString: ByteString = {
- val dataBuffer = ByteBuffer.allocate(4 + serializedTask.limit)
- dataBuffer.putInt(attemptNumber)
- dataBuffer.put(serializedTask)
- dataBuffer.rewind
- logDebug(s"ByteBuffer size: [${dataBuffer.remaining}]")
- ByteString.copyFrom(dataBuffer)
- }
-}
-
-private[spark] object MesosTaskLaunchData extends Logging {
- def fromByteString(byteString: ByteString): MesosTaskLaunchData = {
- val byteBuffer = byteString.asReadOnlyByteBuffer()
- logDebug(s"ByteBuffer size: [${byteBuffer.remaining}]")
- val attemptNumber = byteBuffer.getInt // updates the position by 4 bytes
- val serializedTask = byteBuffer.slice() // subsequence starting at the
current position
- MesosTaskLaunchData(serializedTask, attemptNumber)
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/2e139eed/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
index 1d7a86f..4ee85b9 100644
---
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
+++
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer
import java.util.Arrays
import java.util.Collection
import java.util.Collections
+import java.util.Properties
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -246,7 +247,16 @@ class MesosFineGrainedSchedulerBackendSuite
mesosOffers.get(2).getHostname,
(minCpu - backend.mesosExecutorCores).toInt
)
- val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0,
ByteBuffer.wrap(new Array[Byte](0)))
+ val taskDesc = new TaskDescription(
+ taskId = 1L,
+ attemptNumber = 0,
+ executorId = "s1",
+ name = "n1",
+ index = 0,
+ addedFiles = mutable.Map.empty[String, Long],
+ addedJars = mutable.Map.empty[String, Long],
+ properties = new Properties(),
+ ByteBuffer.wrap(new Array[Byte](0)))
when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
@@ -345,7 +355,16 @@ class MesosFineGrainedSchedulerBackendSuite
2 // Deducting 1 for executor
)
- val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0,
ByteBuffer.wrap(new Array[Byte](0)))
+ val taskDesc = new TaskDescription(
+ taskId = 1L,
+ attemptNumber = 0,
+ executorId = "s1",
+ name = "n1",
+ index = 0,
+ addedFiles = mutable.Map.empty[String, Long],
+ addedJars = mutable.Map.empty[String, Long],
+ properties = new Properties(),
+ ByteBuffer.wrap(new Array[Byte](0)))
when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
when(taskScheduler.CPUS_PER_TASK).thenReturn(1)
http://git-wip-us.apache.org/repos/asf/spark/blob/2e139eed/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala
----------------------------------------------------------------------
diff --git
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala
deleted file mode 100644
index 5a81bb3..0000000
---
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import java.nio.ByteBuffer
-
-import org.apache.spark.SparkFunSuite
-
-class MesosTaskLaunchDataSuite extends SparkFunSuite {
- test("serialize and deserialize data must be same") {
- val serializedTask = ByteBuffer.allocate(40)
- (Range(100, 110).map(serializedTask.putInt(_)))
- serializedTask.rewind
- val attemptNumber = 100
- val byteString = MesosTaskLaunchData(serializedTask,
attemptNumber).toByteString
- serializedTask.rewind
- val mesosTaskLaunchData = MesosTaskLaunchData.fromByteString(byteString)
- assert(mesosTaskLaunchData.attemptNumber == attemptNumber)
- assert(mesosTaskLaunchData.serializedTask.equals(serializedTask))
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]