Updated Branches:
  refs/heads/master 5bcfd7981 -> 576c4a4c5

SPARK-1033. Ask for cores in Yarn container requests


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/3e85b87d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/3e85b87d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/3e85b87d

Branch: refs/heads/master
Commit: 3e85b87d9033e6d9a2634f7598abc3acee77486f
Parents: 792d908
Author: Sandy Ryza <sa...@cloudera.com>
Authored: Sun Jan 19 10:16:25 2014 -0800
Committer: Sandy Ryza <sa...@cloudera.com>
Committed: Mon Jan 20 14:42:32 2014 -0800

----------------------------------------------------------------------
 docs/running-on-yarn.md                                     | 2 +-
 .../apache/spark/deploy/yarn/YarnAllocationHandler.scala    | 9 +++++----
 2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/3e85b87d/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 3bd6264..fb8a043 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -133,7 +133,7 @@ See [Building Spark with Maven](building-with-maven.html) 
for instructions on ho
 
 # Important Notes
 
-- We do not requesting container resources based on the number of cores. Thus 
the numbers of cores given via command line arguments cannot be guaranteed.
+- Before Hadoop 2.2, YARN does not support cores in container resource 
requests. Thus, when running against an earlier version, the numbers of cores 
given via command line arguments cannot be guaranteed.
 - The local directories used for spark will be the local directories 
configured for YARN (Hadoop Yarn config yarn.nodemanager.local-dirs). If the 
user specifies spark.local.dir, it will be ignored.
 - The --files and --archives options support specifying file names with the # 
similar to Hadoop. For example you can specify: --files 
localtest.txt#appSees.txt and this will upload the file you have locally named 
localtest.txt into HDFS but this will be linked to by the name appSees.txt and 
your application should use the name as appSees.txt to reference it when 
running on YARN.
 - The --addJars option allows the SparkContext.addJar function to work if you 
are using it with local files. It does not need to be used if you are using it 
with HDFS, HTTP, HTTPS, or FTP files.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/3e85b87d/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 738ff98..f53c130 100644
--- 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -102,7 +102,8 @@ private[yarn] class YarnAllocationHandler(
   def getNumWorkersFailed: Int = numWorkersFailed.intValue
 
   def isResourceConstraintSatisfied(container: Container): Boolean = {
-    container.getResource.getMemory >= (workerMemory + 
YarnAllocationHandler.MEMORY_OVERHEAD)
+    (container.getResource.getMemory >= (workerMemory + 
YarnAllocationHandler.MEMORY_OVERHEAD)
+      && container.getResource.getVirtualCores >= workerCores)
   }
 
   def releaseContainer(container: Container) {
@@ -532,15 +533,15 @@ private[yarn] class YarnAllocationHandler(
       priority: Int
     ): ArrayBuffer[ContainerRequest] = {
 
-    val memoryResource = Records.newRecord(classOf[Resource])
-    memoryResource.setMemory(workerMemory + 
YarnAllocationHandler.MEMORY_OVERHEAD)
+    val memoryRequest = workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+    val resource = Resource.newInstance(memoryRequest, workerCores)
 
     val prioritySetting = Records.newRecord(classOf[Priority])
     prioritySetting.setPriority(priority)
 
     val requests = new ArrayBuffer[ContainerRequest]()
     for (i <- 0 until numWorkers) {
-      requests += new ContainerRequest(memoryResource, hosts, racks, 
prioritySetting)
+      requests += new ContainerRequest(resource, hosts, racks, prioritySetting)
     }
     requests
   }

Reply via email to