This is an automated email from the ASF dual-hosted git repository.

bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c27a650a Add optional cpu limit to spawned action containers (#5443)
0c27a650a is described below

commit 0c27a650ab6073e131e5c74002465e93cf4d8621
Author: Quinten Parker <[email protected]>
AuthorDate: Fri Sep 29 06:27:59 2023 -0700

    Add optional cpu limit to spawned action containers (#5443)
    
    * Add optional cpu limit to spawned action containers
    
    * conf
    
    * formatting
    
    * Update core/invoker/src/main/resources/application.conf
    
    Co-authored-by: Dominic Kim <[email protected]>
    
    * formatting
    
    * formatting
    
    * ansible
    
    ---------
    
    Co-authored-by: Dominic Kim <[email protected]>
---
 ansible/group_vars/all                             |  1 +
 ansible/roles/invoker/tasks/deploy.yml             |  1 +
 .../core/containerpool/ContainerFactory.scala      | 20 ++++-
 .../openwhisk/core/yarn/YARNContainerFactory.scala |  3 +-
 core/invoker/src/main/resources/application.conf   |  2 +
 .../core/containerpool/ContainerProxy.scala        |  4 +
 .../containerpool/docker/DockerContainer.scala     |  2 +
 .../docker/DockerContainerFactory.scala            | 15 ++--
 .../docker/StandaloneDockerContainerFactory.scala  | 16 ++--
 .../kubernetes/KubernetesContainerFactory.scala    | 14 ++--
 .../v2/FunctionPullingContainerProxy.scala         |  4 +
 tests/dat/actions/zippedaction/package-lock.json   | 18 +++--
 tests/src/test/scala/common/LoggedFunction.scala   | 12 +++
 .../docker/test/DockerContainerFactoryTests.scala  | 86 +++++++++++++++++++++-
 .../docker/test/DockerContainerTests.scala         |  3 +
 .../test/ContainerPoolConfigTests.scala            | 77 +++++++++++++++++++
 .../containerpool/test/ContainerProxyTests.scala   | 11 ++-
 .../test/FunctionPullingContainerPoolTests.scala   |  2 +
 .../test/FunctionPullingContainerProxyTests.scala  | 13 +++-
 .../yarn/test/YARNContainerFactoryTests.scala      | 30 +++++---
 20 files changed, 290 insertions(+), 44 deletions(-)

diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 072e4aebb..7b80ec6a0 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -209,6 +209,7 @@ invoker:
   heap: "{{ invoker_heap | default('2g') }}"
   arguments: "{{ invoker_arguments | default('') }}"
   userMemory: "{{ invoker_user_memory | default('2048m') }}"
+  userCpus: "{{ invoker_user_cpus | default() }}"
   # Specify if it is allowed to deploy more than 1 invoker on a single machine.
   allowMultipleInstances: "{{ invoker_allow_multiple_instances | 
default(false) }}"
   # Specify if it should use docker-runc or docker to pause/unpause containers
diff --git a/ansible/roles/invoker/tasks/deploy.yml 
b/ansible/roles/invoker/tasks/deploy.yml
index a2089052c..fba7bf94b 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -258,6 +258,7 @@
       "CONFIG_whisk_containerFactory_containerArgs_network": "{{ 
invoker_container_network_name | default('bridge') }}"
       "INVOKER_CONTAINER_POLICY": "{{ invoker_container_policy_name | 
default()}}"
       "CONFIG_whisk_containerPool_userMemory": "{{ 
hostvars[groups['invokers'][invoker_index | int]].user_memory | 
default(invoker.userMemory) }}"
+      "CONFIG_whisk_containerPool_userCpus": "{{ invoker.userCpus | default() 
}}"
       "CONFIG_whisk_docker_client_parallelRuns": "{{ invoker_parallel_runs | 
default() }}"
       "CONFIG_whisk_docker_containerFactory_useRunc": "{{ invoker.useRunc | 
default(false) | lower }}"
       "WHISK_LOGS_DIR": "{{ whisk_logs_dir }}"
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
index e2cba1331..a7272d0a2 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
@@ -25,7 +25,7 @@ import org.apache.openwhisk.spi.Spi
 
 import scala.concurrent.Future
 import scala.concurrent.duration.FiniteDuration
-import scala.math.max
+import scala.math.{max, round}
 
 case class ContainerArgsConfig(network: String,
                                dnsServers: Seq[String] = Seq.empty,
@@ -55,6 +55,7 @@ case class ContainerPoolConfig(userMemory: ByteSize,
                                prewarmPromotion: Boolean,
                                memorySyncInterval: FiniteDuration,
                                batchDeletionSize: Int,
+                               userCpus: Option[Double] = None,
                                prewarmContainerCreationConfig: 
Option[PrewarmContainerCreationConfig] = None) {
   require(
     concurrentPeekFactor > 0 && concurrentPeekFactor <= 1.0,
@@ -62,6 +63,7 @@ case class ContainerPoolConfig(userMemory: ByteSize,
 
   require(prewarmExpirationCheckInterval.toSeconds > 0, 
"prewarmExpirationCheckInterval must be > 0")
   require(batchDeletionSize > 0, "batch deletion size must be > 0")
+  require(userCpus.forall(_ > 0), "userCpus must be > 0")
 
   /**
    * The shareFactor indicates the number of containers that would share a 
single core, on average.
@@ -73,6 +75,16 @@ case class ContainerPoolConfig(userMemory: ByteSize,
   // Grant more CPU to a container if it allocates more memory.
   def cpuShare(reservedMemory: ByteSize) =
     max((totalShare / (userMemory.toBytes / reservedMemory.toBytes)).toInt, 2) 
// The minimum allowed cpu-shares is 2
+
+  private val minContainerCpus = 0.01 // The minimum cpus allowed by docker is 
0.01
+  private val roundingMultiplier = 100000
+  def cpuLimit(reservedMemory: ByteSize): Option[Double] = {
+    userCpus.map(c => {
+      val containerCpus = c / (userMemory.toBytes / reservedMemory.toBytes)
+      val roundedContainerCpus = round(containerCpus * 
roundingMultiplier).toDouble / roundingMultiplier // Only use decimal precision 
of 5
+      max(roundedContainerCpus, minContainerCpus)
+    })
+  }
 }
 
 case class PrewarmContainerCreationConfig(maxConcurrent: Int, creationDelay: 
FiniteDuration) {
@@ -116,8 +128,9 @@ trait ContainerFactory {
     userProvidedImage: Boolean,
     memory: ByteSize,
     cpuShares: Int,
+    cpuLimit: Option[Double],
     action: Option[ExecutableWhiskAction])(implicit config: WhiskConfig, 
logging: Logging): Future[Container] = {
-    createContainer(tid, name, actionImage, userProvidedImage, memory, 
cpuShares)
+    createContainer(tid, name, actionImage, userProvidedImage, memory, 
cpuShares, cpuLimit)
   }
 
   def createContainer(tid: TransactionId,
@@ -125,7 +138,8 @@ trait ContainerFactory {
                       actionImage: ExecManifest.ImageName,
                       userProvidedImage: Boolean,
                       memory: ByteSize,
-                      cpuShares: Int)(implicit config: WhiskConfig, logging: 
Logging): Future[Container]
+                      cpuShares: Int,
+                      cpuLimit: Option[Double])(implicit config: WhiskConfig, 
logging: Logging): Future[Container]
 
   /** perform any initialization */
   def init(): Unit
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNContainerFactory.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNContainerFactory.scala
index e8fc86b12..d27df8109 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNContainerFactory.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNContainerFactory.scala
@@ -123,7 +123,8 @@ class YARNContainerFactory(actorSystem: ActorSystem,
     actionImage: ExecManifest.ImageName,
     unuseduserProvidedImage: Boolean,
     unusedmemory: ByteSize,
-    unusedcpuShares: Int)(implicit config: WhiskConfig, logging: Logging): 
Future[Container] = {
+    unusedcpuShares: Int,
+    unusedcpuLimit: Option[Double])(implicit config: WhiskConfig, logging: 
Logging): Future[Container] = {
     implicit val timeout: Timeout = 
Timeout(containerStartTimeoutMS.milliseconds)
 
     //First send the create command to YARN, then with a different actor, wait 
for the container to be ready
diff --git a/core/invoker/src/main/resources/application.conf 
b/core/invoker/src/main/resources/application.conf
index 6fca2210f..f04dfdbac 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -71,6 +71,8 @@ whisk {
     prewarm-promotion: false # if true, action can take prewarm container 
which has bigger memory
     memory-sync-interval: 1 second # period to sync memory info to etcd
     batch-deletion-size: 10 # batch size for removing containers when disable 
invoker, too big value may cause docker/k8s overload
+    # optional setting to specify the total allocatable cpus for all action 
containers, each container will get a fraction of this proportional to its 
allocated memory to limit the cpu
+    # user-cpus: 1
   }
 
   kubernetes {
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
index b5548b3b6..8261d49da 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
@@ -250,6 +250,7 @@ class ContainerProxy(factory: (TransactionId,
                                Boolean,
                                ByteSize,
                                Int,
+                               Option[Double],
                                Option[ExecutableWhiskAction]) => 
Future[Container],
                      sendActiveAck: ActiveAck,
                      storeActivation: (TransactionId, WhiskActivation, 
Boolean, UserContext) => Future[Any],
@@ -288,6 +289,7 @@ class ContainerProxy(factory: (TransactionId,
         job.exec.pull,
         job.memoryLimit,
         poolConfig.cpuShare(job.memoryLimit),
+        poolConfig.cpuLimit(job.memoryLimit),
         None)
         .map(container =>
           PreWarmCompleted(PreWarmedData(container, job.exec.kind, 
job.memoryLimit, expires = job.ttl.map(_.fromNow))))
@@ -307,6 +309,7 @@ class ContainerProxy(factory: (TransactionId,
         job.action.exec.pull,
         job.action.limits.memory.megabytes.MB,
         poolConfig.cpuShare(job.action.limits.memory.megabytes.MB),
+        poolConfig.cpuLimit(job.action.limits.memory.megabytes.MB),
         Some(job.action))
 
       // container factory will either yield a new container ready to execute 
the action, or
@@ -978,6 +981,7 @@ object ContainerProxy {
                       Boolean,
                       ByteSize,
                       Int,
+                      Option[Double],
                       Option[ExecutableWhiskAction]) => Future[Container],
             ack: ActiveAck,
             store: (TransactionId, WhiskActivation, Boolean, UserContext) => 
Future[Any],
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
index d624ba30a..378000b52 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
@@ -63,6 +63,7 @@ object DockerContainer {
              registryConfig: Option[RuntimesRegistryConfig] = None,
              memory: ByteSize = 256.MB,
              cpuShares: Int = 0,
+             cpuLimit: Option[Double] = None,
              environment: Map[String, String] = Map.empty,
              network: String = "bridge",
              dnsServers: Seq[String] = Seq.empty,
@@ -101,6 +102,7 @@ object DockerContainer {
       dnsSearch.flatMap(d => Seq("--dns-search", d)) ++
       dnsOptions.flatMap(d => Seq(dnsOptString, d)) ++
       name.map(n => Seq("--name", n)).getOrElse(Seq.empty) ++
+      cpuLimit.map(c => Seq("--cpus", c.toString)).getOrElse(Seq.empty) ++
       params
 
     val registryConfigUrl = registryConfig.map(_.url).getOrElse("")
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainerFactory.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainerFactory.scala
index b52479186..c4e1f8eff 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainerFactory.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainerFactory.scala
@@ -57,12 +57,14 @@ class DockerContainerFactory(instance: InvokerInstanceId,
     extends ContainerFactory {
 
   /** Create a container using docker cli */
-  override def createContainer(tid: TransactionId,
-                               name: String,
-                               actionImage: ExecManifest.ImageName,
-                               userProvidedImage: Boolean,
-                               memory: ByteSize,
-                               cpuShares: Int)(implicit config: WhiskConfig, 
logging: Logging): Future[Container] = {
+  override def createContainer(
+    tid: TransactionId,
+    name: String,
+    actionImage: ExecManifest.ImageName,
+    userProvidedImage: Boolean,
+    memory: ByteSize,
+    cpuShares: Int,
+    cpuLimit: Option[Double])(implicit config: WhiskConfig, logging: Logging): 
Future[Container] = {
     val registryConfig =
       ContainerFactory.resolveRegistryConfig(userProvidedImage, 
runtimesRegistryConfig, userImagesRegistryConfig)
     val image = if (userProvidedImage) Left(actionImage) else 
Right(actionImage)
@@ -72,6 +74,7 @@ class DockerContainerFactory(instance: InvokerInstanceId,
       registryConfig = Some(registryConfig),
       memory = memory,
       cpuShares = cpuShares,
+      cpuLimit = cpuLimit,
       environment = Map("__OW_API_HOST" -> config.wskApiHost) ++ 
containerArgsConfig.extraEnvVarMap,
       network = containerArgsConfig.network,
       dnsServers = containerArgsConfig.dnsServers,
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/StandaloneDockerContainerFactory.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/StandaloneDockerContainerFactory.scala
index b3102ccdd..0251da7fe 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/StandaloneDockerContainerFactory.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/StandaloneDockerContainerFactory.scala
@@ -61,12 +61,14 @@ class StandaloneDockerContainerFactory(instance: 
InvokerInstanceId, parameters:
   private val pulledImages = new TrieMap[String, Boolean]()
   private val factoryConfig = 
loadConfigOrThrow[StandaloneDockerConfig](ConfigKeys.standaloneDockerContainerFactory)
 
-  override def createContainer(tid: TransactionId,
-                               name: String,
-                               actionImage: ExecManifest.ImageName,
-                               userProvidedImage: Boolean,
-                               memory: ByteSize,
-                               cpuShares: Int)(implicit config: WhiskConfig, 
logging: Logging): Future[Container] = {
+  override def createContainer(
+    tid: TransactionId,
+    name: String,
+    actionImage: ExecManifest.ImageName,
+    userProvidedImage: Boolean,
+    memory: ByteSize,
+    cpuShares: Int,
+    cpuLimit: Option[Double])(implicit config: WhiskConfig, logging: Logging): 
Future[Container] = {
 
     //For standalone server usage we would also want to pull the OpenWhisk 
provided image so as to ensure if
     //local setup does not have the image then it pulls it down
@@ -84,7 +86,7 @@ class StandaloneDockerContainerFactory(instance: 
InvokerInstanceId, parameters:
         }
       } else Future.successful(true)
 
-    pulled.flatMap(_ => super.createContainer(tid, name, actionImage, 
userProvidedImage, memory, cpuShares))
+    pulled.flatMap(_ => super.createContainer(tid, name, actionImage, 
userProvidedImage, memory, cpuShares, cpuLimit))
   }
 
   override def init(): Unit = {
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
index 8292eba1e..d627627d5 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
@@ -67,12 +67,14 @@ class KubernetesContainerFactory(
     Await.ready(cleaning, 
KubernetesContainerFactoryProvider.runtimeDeleteTimeout)
   }
 
-  override def createContainer(tid: TransactionId,
-                               name: String,
-                               actionImage: ImageName,
-                               userProvidedImage: Boolean,
-                               memory: ByteSize,
-                               cpuShares: Int)(implicit config: WhiskConfig, 
logging: Logging): Future[Container] = {
+  override def createContainer(
+    tid: TransactionId,
+    name: String,
+    actionImage: ImageName,
+    userProvidedImage: Boolean,
+    memory: ByteSize,
+    cpuShares: Int,
+    cpuLimit: Option[Double])(implicit config: WhiskConfig, logging: Logging): 
Future[Container] = {
     val image = actionImage.resolveImageName(Some(
       ContainerFactory.resolveRegistryConfig(userProvidedImage, 
runtimesRegistryConfig, userImagesRegistryConfig).url))
 
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
index c5fa5a6f2..b0fa73f35 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
@@ -185,6 +185,7 @@ class FunctionPullingContainerProxy(
             Boolean,
             ByteSize,
             Int,
+            Option[Double],
             Option[ExecutableWhiskAction]) => Future[Container],
   entityStore: ArtifactStore[WhiskEntity],
   namespaceBlacklist: NamespaceBlacklist,
@@ -240,6 +241,7 @@ class FunctionPullingContainerProxy(
         job.exec.pull,
         job.memoryLimit,
         poolConfig.cpuShare(job.memoryLimit),
+        poolConfig.cpuLimit(job.memoryLimit),
         None)
         .map(container => PreWarmData(container, job.exec.kind, 
job.memoryLimit, expires = job.ttl.map(_.fromNow)))
         .pipeTo(self)
@@ -254,6 +256,7 @@ class FunctionPullingContainerProxy(
         job.action.exec.pull,
         job.action.limits.memory.megabytes.MB,
         poolConfig.cpuShare(job.action.limits.memory.megabytes.MB),
+        poolConfig.cpuLimit(job.action.limits.memory.megabytes.MB),
         None)
         .andThen {
           case Failure(t) =>
@@ -1274,6 +1277,7 @@ object FunctionPullingContainerProxy {
                       Boolean,
                       ByteSize,
                       Int,
+                      Option[Double],
                       Option[ExecutableWhiskAction]) => Future[Container],
             entityStore: ArtifactStore[WhiskEntity],
             namespaceBlacklist: NamespaceBlacklist,
diff --git a/tests/dat/actions/zippedaction/package-lock.json 
b/tests/dat/actions/zippedaction/package-lock.json
index 4bbddd42a..8f602b8f8 100644
--- a/tests/dat/actions/zippedaction/package-lock.json
+++ b/tests/dat/actions/zippedaction/package-lock.json
@@ -1,18 +1,26 @@
 {
   "name": "test-action",
   "version": "1.0.0",
-  "lockfileVersion": 1,
+  "lockfileVersion": 3,
   "requires": true,
-  "dependencies": {
-    "prog-quote": {
+  "packages": {
+    "": {
+      "name": "test-action",
+      "version": "1.0.0",
+      "license": "Apache 2.0",
+      "dependencies": {
+        "prog-quote": "2.0.0"
+      }
+    },
+    "node_modules/prog-quote": {
       "version": "2.0.0",
       "resolved": 
"https://registry.npmjs.org/prog-quote/-/prog-quote-2.0.0.tgz";,
       "integrity": "sha1-TLBMeosV/zu/kxMQxCsBzSjcMB0=",
-      "requires": {
+      "dependencies": {
         "random-js": "1.0.8"
       }
     },
-    "random-js": {
+    "node_modules/random-js": {
       "version": "1.0.8",
       "resolved": "https://registry.npmjs.org/random-js/-/random-js-1.0.8.tgz";,
       "integrity": "sha1-lo/WiabyXWwKrHZig94vaIycGQo="
diff --git a/tests/src/test/scala/common/LoggedFunction.scala 
b/tests/src/test/scala/common/LoggedFunction.scala
index b5bd8960d..65b4effad 100644
--- a/tests/src/test/scala/common/LoggedFunction.scala
+++ b/tests/src/test/scala/common/LoggedFunction.scala
@@ -92,6 +92,16 @@ class LoggedFunction7[A1, A2, A3, A4, A5, A6, A7, B](body: 
(A1, A2, A3, A4, A5,
   }
 }
 
+class LoggedFunction8[A1, A2, A3, A4, A5, A6, A7, A8, B](body: (A1, A2, A3, 
A4, A5, A6, A7, A8) => B)
+    extends Function8[A1, A2, A3, A4, A5, A6, A7, A8, B] {
+  val calls = mutable.Buffer[(A1, A2, A3, A4, A5, A6, A7, A8)]()
+
+  override def apply(v1: A1, v2: A2, v3: A3, v4: A4, v5: A5, v6: A6, v7: A7, 
v8: A8): B = {
+    calls += ((v1, v2, v3, v4, v5, v6, v7, v8))
+    body(v1, v2, v3, v4, v5, v6, v7, v8)
+  }
+}
+
 class SynchronizedLoggedFunction1[A1, B](body: A1 => B) extends Function1[A1, 
B] {
   val calls = mutable.Buffer[A1]()
 
@@ -157,6 +167,8 @@ object LoggedFunction {
     new LoggedFunction6[A1, A2, A3, A4, A5, A6, B](body)
   def apply[A1, A2, A3, A4, A5, A6, A7, B](body: (A1, A2, A3, A4, A5, A6, A7) 
=> B) =
     new LoggedFunction7[A1, A2, A3, A4, A5, A6, A7, B](body)
+  def apply[A1, A2, A3, A4, A5, A6, A7, A8, B](body: (A1, A2, A3, A4, A5, A6, 
A7, A8) => B) =
+    new LoggedFunction8[A1, A2, A3, A4, A5, A6, A7, A8, B](body)
 }
 
 object SynchronizedLoggedFunction {
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerFactoryTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerFactoryTests.scala
index baacd95c8..feebd2f05 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerFactoryTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerFactoryTests.scala
@@ -140,7 +140,91 @@ class DockerContainerFactoryTests
         userImagesRegistryConfig,
         DockerContainerFactoryConfig(true))(actorSystem, executionContext, 
logging, dockerApiStub, mock[RuncApi])
 
-    val cf = factory.createContainer(tid, "testContainer", image, false, 
10.MB, 32)
+    val cf = factory.createContainer(tid, "testContainer", image, false, 
10.MB, 32, None)
+
+    val c = Await.result(cf, 5000.milliseconds)
+
+    Await.result(c.destroy(), 500.milliseconds)
+
+  }
+
+  it should "set the docker run args with cpu limit when provided" in {
+
+    val image = ExecManifest.runtimesManifest.manifests("nodejs:20").image
+
+    implicit val tid = TransactionId.testing
+    val dockerApiStub = mock[DockerApiWithFileAccess]
+    //setup run expectation
+    (dockerApiStub
+      .run(_: String, _: Seq[String])(_: TransactionId))
+      .expects(
+        image.resolveImageName(Some(runtimesRegistryConfig.url)),
+        List(
+          "--cpu-shares",
+          "32", //should be calculated as 1024/(numcore * sharefactor) via 
ContainerFactory.cpuShare
+          "--memory",
+          "10m",
+          "--memory-swap",
+          "10m",
+          "--network",
+          "net1",
+          "-e",
+          "__OW_API_HOST=",
+          "-e",
+          "k1=v1",
+          "-e",
+          "k2=v2",
+          "-e",
+          "k3=",
+          "--dns",
+          "dns1",
+          "--dns",
+          "dns2",
+          "--name",
+          "testContainer",
+          "--cpus",
+          "0.5",
+          "--extra1",
+          "e1",
+          "--extra1",
+          "e2",
+          "--extra2",
+          "e3",
+          "--extra2",
+          "e4"),
+        *)
+      .returning(Future.successful { ContainerId("fakecontainerid") })
+    //setup inspect expectation
+    (dockerApiStub
+      .inspectIPAddress(_: ContainerId, _: String)(_: TransactionId))
+      .expects(ContainerId("fakecontainerid"), "net1", *)
+      .returning(Future.successful { ContainerAddress("1.2.3.4", 1234) })
+    //setup rm expectation
+    (dockerApiStub
+      .rm(_: ContainerId)(_: TransactionId))
+      .expects(ContainerId("fakecontainerid"), *)
+      .returning(Future.successful(()))
+    //setup clientVersion exceptation
+    (dockerApiStub.clientVersion _)
+      .expects()
+      .returning("mock_test_client")
+
+    val factory =
+      new DockerContainerFactory(
+        InvokerInstanceId(0, userMemory = defaultUserMemory),
+        Map.empty,
+        ContainerArgsConfig(
+          "net1",
+          Seq("dns1", "dns2"),
+          Seq.empty,
+          Seq.empty,
+          Seq("k1=v1", "k2=v2", "k3"),
+          Map("extra1" -> Set("e1", "e2"), "extra2" -> Set("e3", "e4"))),
+        runtimesRegistryConfig,
+        userImagesRegistryConfig,
+        DockerContainerFactoryConfig(true))(actorSystem, executionContext, 
logging, dockerApiStub, mock[RuncApi])
+
+    val cf = factory.createContainer(tid, "testContainer", image, false, 
10.MB, 32, Some(0.5))
 
     val c = Await.result(cf, 5000.milliseconds)
 
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala
index dbfb85883..25a63da55 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala
@@ -137,6 +137,7 @@ class DockerContainerTests
     val image = "image"
     val memory = 128.MB
     val cpuShares = 1
+    val cpuLimit = 0.5
     val environment = Map("test" -> "hi")
     val network = "testwork"
     val name = "myContainer"
@@ -145,6 +146,7 @@ class DockerContainerTests
       image = Right(ImageName(image)),
       memory = memory,
       cpuShares = cpuShares,
+      cpuLimit = Some(cpuLimit),
       environment = environment,
       network = network,
       name = Some(name),
@@ -171,6 +173,7 @@ class DockerContainerTests
     args should contain inOrder ("--cpu-shares", cpuShares.toString)
     args should contain inOrder ("--network", network)
     args should contain inOrder ("--name", name)
+    args should contain inOrder ("--cpus", cpuLimit.toString)
 
     // Assert proper environment passing
     args should contain allOf ("-e", "test=hi")
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolConfigTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolConfigTests.scala
new file mode 100644
index 000000000..23e3a738e
--- /dev/null
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolConfigTests.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.test
+
+import org.apache.openwhisk.core.containerpool.ContainerPoolConfig
+import org.apache.openwhisk.core.entity.ByteSize
+import org.apache.openwhisk.core.entity.size.SizeInt
+import org.junit.runner.RunWith
+import org.scalatest.{FlatSpec, Matchers}
+import org.scalatest.junit.JUnitRunner
+
+import scala.concurrent.duration.DurationInt
+
+@RunWith(classOf[JUnitRunner])
+class ContainerPoolConfigTests extends FlatSpec with Matchers {
+
+  def createPoolConfig(userMemory: ByteSize, userCpus: Option[Double] = None): 
ContainerPoolConfig = {
+    ContainerPoolConfig(userMemory, 0.5, false, 2.second, 10.seconds, None, 1, 
3, false, 1.second, 10, userCpus)
+  }
+
+  it should "calculate container cpu shares" in {
+    val (userMemory, memoryLimit) = (2.GB, 256.MB)
+    val poolConfig = createPoolConfig(userMemory)
+    poolConfig.cpuShare(memoryLimit) shouldBe 128
+  }
+
+  it should "use min cpu shares when calculated container cpu shares is too 
low" in {
+    val (userMemory, memoryLimit) = (1024.MB, 1.MB)
+    val poolConfig = createPoolConfig(userMemory)
+    poolConfig.cpuShare(memoryLimit) shouldBe 2 // calculated shares would be 
1, but min is 2
+  }
+
+  it should "calculate container cpu limit" in {
+    val (userMemory, memoryLimit, userCpus) = (2.GB, 256.MB, 2.0)
+    val poolConfig = createPoolConfig(userMemory, Some(userCpus))
+    poolConfig.cpuLimit(memoryLimit) shouldBe Some(0.25)
+  }
+
+  it should "correctly round container cpu limit" in {
+    val (userMemory, memoryLimit, userCpus) = (768.MB, 256.MB, 2.0)
+    val poolConfig = createPoolConfig(userMemory, Some(userCpus))
+    poolConfig.cpuLimit(memoryLimit) shouldBe Some(0.66667) // calculated 
limit is 0.666..., rounded to 0.66667
+  }
+
+  it should "use min container cpu limit when calculated limit is too low" in {
+    val (userMemory, memoryLimit, userCpus) = (1024.MB, 1.MB, 1.0)
+    val poolConfig = createPoolConfig(userMemory, Some(userCpus))
+    poolConfig.cpuLimit(memoryLimit) shouldBe Some(0.01) // calculated limit 
is 0.001, but min is 0.01
+  }
+
+  it should "return None for container cpu limit when userCpus is not set" in {
+    val (userMemory, memoryLimit) = (2.GB, 256.MB)
+    val poolConfig = createPoolConfig(userMemory)
+    poolConfig.cpuLimit(memoryLimit) shouldBe None
+  }
+
+  it should "require userCpus to be greater than 0" in {
+    assertThrows[IllegalArgumentException] {
+      createPoolConfig(2.GB, Some(-1.0))
+    }
+  }
+}
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
index 03a0e1f1e..e29eb15e5 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
@@ -238,7 +238,14 @@ class ContainerProxyTests
 
   /** Creates an inspectable factory */
   def createFactory(response: Future[Container]) = LoggedFunction {
-    (_: TransactionId, _: String, _: ImageName, _: Boolean, _: ByteSize, _: 
Int, _: Option[ExecutableWhiskAction]) =>
+    (_: TransactionId,
+     _: String,
+     _: ImageName,
+     _: Boolean,
+     _: ByteSize,
+     _: Int,
+     _: Option[Double],
+     _: Option[ExecutableWhiskAction]) =>
       response
   }
 
@@ -332,7 +339,7 @@ class ContainerProxyTests
     preWarm(machine)
 
     factory.calls should have size 1
-    val (tid, name, _, _, memory, cpuShares, _) = factory.calls(0)
+    val (tid, name, _, _, memory, cpuShares, _, _) = factory.calls(0)
     tid shouldBe TransactionId.invokerWarmup
     name should fullyMatch regex """wskmyname\d+_\d+_prewarm_actionKind"""
     memory shouldBe memoryLimit
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
index 1aa85ad9e..714a8f9b1 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
@@ -201,6 +201,7 @@ class FunctionPullingContainerPoolTests
       prewarmPromotion,
       memorySyncInterval,
       batchDeletionSize,
+      Some(2),
       prewarmContainerCreationConfig)
 
   def sendAckToScheduler(producer: MessageProducer)(schedulerInstanceId: 
SchedulerInstanceId,
@@ -460,6 +461,7 @@ class FunctionPullingContainerPoolTests
       false,
       FiniteDuration(10, TimeUnit.SECONDS),
       10,
+      Some(2),
       prewarmContainerCreationConfig)
 
     val pool = system.actorOf(
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
index 36e1416fd..3200aeed9 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
@@ -215,7 +215,14 @@ class FunctionPullingContainerProxyTests
 
   /** Creates an inspectable factory */
   def createFactory(response: Future[Container]) = LoggedFunction {
-    (_: TransactionId, _: String, _: ImageName, _: Boolean, _: ByteSize, _: 
Int, _: Option[ExecutableWhiskAction]) =>
+    (_: TransactionId,
+     _: String,
+     _: ImageName,
+     _: Boolean,
+     _: ByteSize,
+     _: Int,
+     _: Option[Double],
+     _: Option[ExecutableWhiskAction]) =>
       response
   }
 
@@ -396,7 +403,7 @@ class FunctionPullingContainerProxyTests
     preWarm(machine, probe)
 
     factory.calls should have size 1
-    val (tid, name, _, _, memory, _, _) = factory.calls(0)
+    val (tid, name, _, _, memory, _, _, _) = factory.calls(0)
     tid shouldBe TransactionId.invokerWarmup
     name should fullyMatch regex """wskmyname\d+_\d+_prewarm_actionKind"""
     memory shouldBe memoryLimit
@@ -586,7 +593,7 @@ class FunctionPullingContainerProxyTests
       case RequestActivation(Some(_), None) => true
     }
 
-    val (tid, name, _, _, memory, _, _) = factory.calls(0)
+    val (tid, name, _, _, memory, _, _, _) = factory.calls(0)
     tid shouldBe TransactionId.invokerColdstart
     name should fullyMatch regex """wskmyname\d+_\d+_actionSpace_actionName"""
     memory shouldBe memoryLimit
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/yarn/test/YARNContainerFactoryTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/yarn/test/YARNContainerFactoryTests.scala
index a77ca0d5b..56196ed4c 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/yarn/test/YARNContainerFactoryTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/yarn/test/YARNContainerFactoryTests.scala
@@ -176,7 +176,8 @@ class YARNContainerFactoryTests
       imageToCreate,
       unuseduserProvidedImage = true,
       ByteSize(256, SizeUnits.MB),
-      1)
+      1,
+      None)
 
     Await.result(containerFuture, 60.seconds)
 
@@ -231,7 +232,8 @@ class YARNContainerFactoryTests
       imageNotToDelete,
       unuseduserProvidedImage = true,
       ByteSize(256, SizeUnits.MB),
-      1)
+      1,
+      None)
 
     val containerFuture2 = factory.createContainer(
       TransactionId.testing,
@@ -239,7 +241,8 @@ class YARNContainerFactoryTests
       imageToDelete,
       unuseduserProvidedImage = true,
       ByteSize(256, SizeUnits.MB),
-      1)
+      1,
+      None)
 
     val containerFuture3 = factory.createContainer(
       TransactionId.testing,
@@ -247,7 +250,8 @@ class YARNContainerFactoryTests
       imageToDelete,
       unuseduserProvidedImage = true,
       ByteSize(256, SizeUnits.MB),
-      1)
+      1,
+      None)
 
     val containerFuture4 = factory.createContainer(
       TransactionId.testing,
@@ -255,7 +259,8 @@ class YARNContainerFactoryTests
       imageToDelete,
       unuseduserProvidedImage = true,
       ByteSize(256, SizeUnits.MB),
-      1)
+      1,
+      None)
 
     val container1 = Await.result(containerFuture1, 30.seconds)
     val container2 = Await.result(containerFuture2, 30.seconds)
@@ -349,7 +354,8 @@ class YARNContainerFactoryTests
       images(0),
       unuseduserProvidedImage = true,
       ByteSize(256, SizeUnits.MB),
-      1)
+      1,
+      None)
 
     val container2Future = factory.createContainer(
       TransactionId.testing,
@@ -357,7 +363,8 @@ class YARNContainerFactoryTests
       images(1),
       unuseduserProvidedImage = true,
       ByteSize(256, SizeUnits.MB),
-      1)
+      1,
+      None)
 
     val container3Future = factory.createContainer(
       TransactionId.testing,
@@ -365,7 +372,8 @@ class YARNContainerFactoryTests
       images(0),
       unuseduserProvidedImage = true,
       ByteSize(256, SizeUnits.MB),
-      1)
+      1,
+      None)
 
     Await.result(container1Future, 30.seconds)
     val container2 = Await.result(container2Future, 30.seconds)
@@ -463,14 +471,16 @@ class YARNContainerFactoryTests
       imageToCreate,
       unuseduserProvidedImage = true,
       ByteSize(256, SizeUnits.MB),
-      1)
+      1,
+      None)
     val containerFuture1 = factory1.createContainer(
       TransactionId.testing,
       "name",
       imageToCreate,
       unuseduserProvidedImage = true,
       ByteSize(256, SizeUnits.MB),
-      1)
+      1,
+      None)
 
     Await.result(containerFuture0, 60.seconds)
     Await.result(containerFuture1, 60.seconds)


Reply via email to