This is an automated email from the ASF dual-hosted git repository.
bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 0c27a650a Add optional cpu limit to spawned action containers (#5443)
0c27a650a is described below
commit 0c27a650ab6073e131e5c74002465e93cf4d8621
Author: Quinten Parker <[email protected]>
AuthorDate: Fri Sep 29 06:27:59 2023 -0700
Add optional cpu limit to spawned action containers (#5443)
* Add optional cpu limit to spawned action containers
* conf
* formatting
* Update core/invoker/src/main/resources/application.conf
Co-authored-by: Dominic Kim <[email protected]>
* formatting
* formatting
* ansible
---------
Co-authored-by: Dominic Kim <[email protected]>
---
ansible/group_vars/all | 1 +
ansible/roles/invoker/tasks/deploy.yml | 1 +
.../core/containerpool/ContainerFactory.scala | 20 ++++-
.../openwhisk/core/yarn/YARNContainerFactory.scala | 3 +-
core/invoker/src/main/resources/application.conf | 2 +
.../core/containerpool/ContainerProxy.scala | 4 +
.../containerpool/docker/DockerContainer.scala | 2 +
.../docker/DockerContainerFactory.scala | 15 ++--
.../docker/StandaloneDockerContainerFactory.scala | 16 ++--
.../kubernetes/KubernetesContainerFactory.scala | 14 ++--
.../v2/FunctionPullingContainerProxy.scala | 4 +
tests/dat/actions/zippedaction/package-lock.json | 18 +++--
tests/src/test/scala/common/LoggedFunction.scala | 12 +++
.../docker/test/DockerContainerFactoryTests.scala | 86 +++++++++++++++++++++-
.../docker/test/DockerContainerTests.scala | 3 +
.../test/ContainerPoolConfigTests.scala | 77 +++++++++++++++++++
.../containerpool/test/ContainerProxyTests.scala | 11 ++-
.../test/FunctionPullingContainerPoolTests.scala | 2 +
.../test/FunctionPullingContainerProxyTests.scala | 13 +++-
.../yarn/test/YARNContainerFactoryTests.scala | 30 +++++---
20 files changed, 290 insertions(+), 44 deletions(-)
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 072e4aebb..7b80ec6a0 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -209,6 +209,7 @@ invoker:
heap: "{{ invoker_heap | default('2g') }}"
arguments: "{{ invoker_arguments | default('') }}"
userMemory: "{{ invoker_user_memory | default('2048m') }}"
+ userCpus: "{{ invoker_user_cpus | default() }}"
# Specify if it is allowed to deploy more than 1 invoker on a single machine.
allowMultipleInstances: "{{ invoker_allow_multiple_instances |
default(false) }}"
# Specify if it should use docker-runc or docker to pause/unpause containers
diff --git a/ansible/roles/invoker/tasks/deploy.yml
b/ansible/roles/invoker/tasks/deploy.yml
index a2089052c..fba7bf94b 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -258,6 +258,7 @@
"CONFIG_whisk_containerFactory_containerArgs_network": "{{
invoker_container_network_name | default('bridge') }}"
"INVOKER_CONTAINER_POLICY": "{{ invoker_container_policy_name |
default()}}"
"CONFIG_whisk_containerPool_userMemory": "{{
hostvars[groups['invokers'][invoker_index | int]].user_memory |
default(invoker.userMemory) }}"
+ "CONFIG_whisk_containerPool_userCpus": "{{ invoker.userCpus | default()
}}"
"CONFIG_whisk_docker_client_parallelRuns": "{{ invoker_parallel_runs |
default() }}"
"CONFIG_whisk_docker_containerFactory_useRunc": "{{ invoker.useRunc |
default(false) | lower }}"
"WHISK_LOGS_DIR": "{{ whisk_logs_dir }}"
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
index e2cba1331..a7272d0a2 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
@@ -25,7 +25,7 @@ import org.apache.openwhisk.spi.Spi
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
-import scala.math.max
+import scala.math.{max, round}
case class ContainerArgsConfig(network: String,
dnsServers: Seq[String] = Seq.empty,
@@ -55,6 +55,7 @@ case class ContainerPoolConfig(userMemory: ByteSize,
prewarmPromotion: Boolean,
memorySyncInterval: FiniteDuration,
batchDeletionSize: Int,
+ userCpus: Option[Double] = None,
prewarmContainerCreationConfig:
Option[PrewarmContainerCreationConfig] = None) {
require(
concurrentPeekFactor > 0 && concurrentPeekFactor <= 1.0,
@@ -62,6 +63,7 @@ case class ContainerPoolConfig(userMemory: ByteSize,
require(prewarmExpirationCheckInterval.toSeconds > 0,
"prewarmExpirationCheckInterval must be > 0")
require(batchDeletionSize > 0, "batch deletion size must be > 0")
+ require(userCpus.forall(_ > 0), "userCpus must be > 0")
/**
* The shareFactor indicates the number of containers that would share a
single core, on average.
@@ -73,6 +75,16 @@ case class ContainerPoolConfig(userMemory: ByteSize,
// Grant more CPU to a container if it allocates more memory.
def cpuShare(reservedMemory: ByteSize) =
max((totalShare / (userMemory.toBytes / reservedMemory.toBytes)).toInt, 2)
// The minimum allowed cpu-shares is 2
+
+ private val minContainerCpus = 0.01 // The minimum cpus allowed by docker is
0.01
+ private val roundingMultiplier = 100000
+ def cpuLimit(reservedMemory: ByteSize): Option[Double] = {
+ userCpus.map(c => {
+ val containerCpus = c / (userMemory.toBytes / reservedMemory.toBytes)
+ val roundedContainerCpus = round(containerCpus *
roundingMultiplier).toDouble / roundingMultiplier // Only use decimal precision
of 5
+ max(roundedContainerCpus, minContainerCpus)
+ })
+ }
}
case class PrewarmContainerCreationConfig(maxConcurrent: Int, creationDelay:
FiniteDuration) {
@@ -116,8 +128,9 @@ trait ContainerFactory {
userProvidedImage: Boolean,
memory: ByteSize,
cpuShares: Int,
+ cpuLimit: Option[Double],
action: Option[ExecutableWhiskAction])(implicit config: WhiskConfig,
logging: Logging): Future[Container] = {
- createContainer(tid, name, actionImage, userProvidedImage, memory,
cpuShares)
+ createContainer(tid, name, actionImage, userProvidedImage, memory,
cpuShares, cpuLimit)
}
def createContainer(tid: TransactionId,
@@ -125,7 +138,8 @@ trait ContainerFactory {
actionImage: ExecManifest.ImageName,
userProvidedImage: Boolean,
memory: ByteSize,
- cpuShares: Int)(implicit config: WhiskConfig, logging:
Logging): Future[Container]
+ cpuShares: Int,
+ cpuLimit: Option[Double])(implicit config: WhiskConfig,
logging: Logging): Future[Container]
/** perform any initialization */
def init(): Unit
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNContainerFactory.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNContainerFactory.scala
index e8fc86b12..d27df8109 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNContainerFactory.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/yarn/YARNContainerFactory.scala
@@ -123,7 +123,8 @@ class YARNContainerFactory(actorSystem: ActorSystem,
actionImage: ExecManifest.ImageName,
unuseduserProvidedImage: Boolean,
unusedmemory: ByteSize,
- unusedcpuShares: Int)(implicit config: WhiskConfig, logging: Logging):
Future[Container] = {
+ unusedcpuShares: Int,
+ unusedcpuLimit: Option[Double])(implicit config: WhiskConfig, logging:
Logging): Future[Container] = {
implicit val timeout: Timeout =
Timeout(containerStartTimeoutMS.milliseconds)
//First send the create command to YARN, then with a different actor, wait
for the container to be ready
diff --git a/core/invoker/src/main/resources/application.conf
b/core/invoker/src/main/resources/application.conf
index 6fca2210f..f04dfdbac 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -71,6 +71,8 @@ whisk {
prewarm-promotion: false # if true, action can take prewarm container
which has bigger memory
memory-sync-interval: 1 second # period to sync memory info to etcd
batch-deletion-size: 10 # batch size for removing containers when disable
invoker, too big value may cause docker/k8s overload
+ # optional setting to specify the total allocatable cpus for all action
containers, each container will get a fraction of this proportional to its
allocated memory to limit the cpu
+ # user-cpus: 1
}
kubernetes {
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
index b5548b3b6..8261d49da 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
@@ -250,6 +250,7 @@ class ContainerProxy(factory: (TransactionId,
Boolean,
ByteSize,
Int,
+ Option[Double],
Option[ExecutableWhiskAction]) =>
Future[Container],
sendActiveAck: ActiveAck,
storeActivation: (TransactionId, WhiskActivation,
Boolean, UserContext) => Future[Any],
@@ -288,6 +289,7 @@ class ContainerProxy(factory: (TransactionId,
job.exec.pull,
job.memoryLimit,
poolConfig.cpuShare(job.memoryLimit),
+ poolConfig.cpuLimit(job.memoryLimit),
None)
.map(container =>
PreWarmCompleted(PreWarmedData(container, job.exec.kind,
job.memoryLimit, expires = job.ttl.map(_.fromNow))))
@@ -307,6 +309,7 @@ class ContainerProxy(factory: (TransactionId,
job.action.exec.pull,
job.action.limits.memory.megabytes.MB,
poolConfig.cpuShare(job.action.limits.memory.megabytes.MB),
+ poolConfig.cpuLimit(job.action.limits.memory.megabytes.MB),
Some(job.action))
// container factory will either yield a new container ready to execute
the action, or
@@ -978,6 +981,7 @@ object ContainerProxy {
Boolean,
ByteSize,
Int,
+ Option[Double],
Option[ExecutableWhiskAction]) => Future[Container],
ack: ActiveAck,
store: (TransactionId, WhiskActivation, Boolean, UserContext) =>
Future[Any],
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
index d624ba30a..378000b52 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
@@ -63,6 +63,7 @@ object DockerContainer {
registryConfig: Option[RuntimesRegistryConfig] = None,
memory: ByteSize = 256.MB,
cpuShares: Int = 0,
+ cpuLimit: Option[Double] = None,
environment: Map[String, String] = Map.empty,
network: String = "bridge",
dnsServers: Seq[String] = Seq.empty,
@@ -101,6 +102,7 @@ object DockerContainer {
dnsSearch.flatMap(d => Seq("--dns-search", d)) ++
dnsOptions.flatMap(d => Seq(dnsOptString, d)) ++
name.map(n => Seq("--name", n)).getOrElse(Seq.empty) ++
+ cpuLimit.map(c => Seq("--cpus", c.toString)).getOrElse(Seq.empty) ++
params
val registryConfigUrl = registryConfig.map(_.url).getOrElse("")
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainerFactory.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainerFactory.scala
index b52479186..c4e1f8eff 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainerFactory.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainerFactory.scala
@@ -57,12 +57,14 @@ class DockerContainerFactory(instance: InvokerInstanceId,
extends ContainerFactory {
/** Create a container using docker cli */
- override def createContainer(tid: TransactionId,
- name: String,
- actionImage: ExecManifest.ImageName,
- userProvidedImage: Boolean,
- memory: ByteSize,
- cpuShares: Int)(implicit config: WhiskConfig,
logging: Logging): Future[Container] = {
+ override def createContainer(
+ tid: TransactionId,
+ name: String,
+ actionImage: ExecManifest.ImageName,
+ userProvidedImage: Boolean,
+ memory: ByteSize,
+ cpuShares: Int,
+ cpuLimit: Option[Double])(implicit config: WhiskConfig, logging: Logging):
Future[Container] = {
val registryConfig =
ContainerFactory.resolveRegistryConfig(userProvidedImage,
runtimesRegistryConfig, userImagesRegistryConfig)
val image = if (userProvidedImage) Left(actionImage) else
Right(actionImage)
@@ -72,6 +74,7 @@ class DockerContainerFactory(instance: InvokerInstanceId,
registryConfig = Some(registryConfig),
memory = memory,
cpuShares = cpuShares,
+ cpuLimit = cpuLimit,
environment = Map("__OW_API_HOST" -> config.wskApiHost) ++
containerArgsConfig.extraEnvVarMap,
network = containerArgsConfig.network,
dnsServers = containerArgsConfig.dnsServers,
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/StandaloneDockerContainerFactory.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/StandaloneDockerContainerFactory.scala
index b3102ccdd..0251da7fe 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/StandaloneDockerContainerFactory.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/StandaloneDockerContainerFactory.scala
@@ -61,12 +61,14 @@ class StandaloneDockerContainerFactory(instance:
InvokerInstanceId, parameters:
private val pulledImages = new TrieMap[String, Boolean]()
private val factoryConfig =
loadConfigOrThrow[StandaloneDockerConfig](ConfigKeys.standaloneDockerContainerFactory)
- override def createContainer(tid: TransactionId,
- name: String,
- actionImage: ExecManifest.ImageName,
- userProvidedImage: Boolean,
- memory: ByteSize,
- cpuShares: Int)(implicit config: WhiskConfig,
logging: Logging): Future[Container] = {
+ override def createContainer(
+ tid: TransactionId,
+ name: String,
+ actionImage: ExecManifest.ImageName,
+ userProvidedImage: Boolean,
+ memory: ByteSize,
+ cpuShares: Int,
+ cpuLimit: Option[Double])(implicit config: WhiskConfig, logging: Logging):
Future[Container] = {
//For standalone server usage we would also want to pull the OpenWhisk
provided image so as to ensure if
//local setup does not have the image then it pulls it down
@@ -84,7 +86,7 @@ class StandaloneDockerContainerFactory(instance:
InvokerInstanceId, parameters:
}
} else Future.successful(true)
- pulled.flatMap(_ => super.createContainer(tid, name, actionImage,
userProvidedImage, memory, cpuShares))
+ pulled.flatMap(_ => super.createContainer(tid, name, actionImage,
userProvidedImage, memory, cpuShares, cpuLimit))
}
override def init(): Unit = {
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
index 8292eba1e..d627627d5 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
@@ -67,12 +67,14 @@ class KubernetesContainerFactory(
Await.ready(cleaning,
KubernetesContainerFactoryProvider.runtimeDeleteTimeout)
}
- override def createContainer(tid: TransactionId,
- name: String,
- actionImage: ImageName,
- userProvidedImage: Boolean,
- memory: ByteSize,
- cpuShares: Int)(implicit config: WhiskConfig,
logging: Logging): Future[Container] = {
+ override def createContainer(
+ tid: TransactionId,
+ name: String,
+ actionImage: ImageName,
+ userProvidedImage: Boolean,
+ memory: ByteSize,
+ cpuShares: Int,
+ cpuLimit: Option[Double])(implicit config: WhiskConfig, logging: Logging):
Future[Container] = {
val image = actionImage.resolveImageName(Some(
ContainerFactory.resolveRegistryConfig(userProvidedImage,
runtimesRegistryConfig, userImagesRegistryConfig).url))
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
index c5fa5a6f2..b0fa73f35 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
@@ -185,6 +185,7 @@ class FunctionPullingContainerProxy(
Boolean,
ByteSize,
Int,
+ Option[Double],
Option[ExecutableWhiskAction]) => Future[Container],
entityStore: ArtifactStore[WhiskEntity],
namespaceBlacklist: NamespaceBlacklist,
@@ -240,6 +241,7 @@ class FunctionPullingContainerProxy(
job.exec.pull,
job.memoryLimit,
poolConfig.cpuShare(job.memoryLimit),
+ poolConfig.cpuLimit(job.memoryLimit),
None)
.map(container => PreWarmData(container, job.exec.kind,
job.memoryLimit, expires = job.ttl.map(_.fromNow)))
.pipeTo(self)
@@ -254,6 +256,7 @@ class FunctionPullingContainerProxy(
job.action.exec.pull,
job.action.limits.memory.megabytes.MB,
poolConfig.cpuShare(job.action.limits.memory.megabytes.MB),
+ poolConfig.cpuLimit(job.action.limits.memory.megabytes.MB),
None)
.andThen {
case Failure(t) =>
@@ -1274,6 +1277,7 @@ object FunctionPullingContainerProxy {
Boolean,
ByteSize,
Int,
+ Option[Double],
Option[ExecutableWhiskAction]) => Future[Container],
entityStore: ArtifactStore[WhiskEntity],
namespaceBlacklist: NamespaceBlacklist,
diff --git a/tests/dat/actions/zippedaction/package-lock.json
b/tests/dat/actions/zippedaction/package-lock.json
index 4bbddd42a..8f602b8f8 100644
--- a/tests/dat/actions/zippedaction/package-lock.json
+++ b/tests/dat/actions/zippedaction/package-lock.json
@@ -1,18 +1,26 @@
{
"name": "test-action",
"version": "1.0.0",
- "lockfileVersion": 1,
+ "lockfileVersion": 3,
"requires": true,
- "dependencies": {
- "prog-quote": {
+ "packages": {
+ "": {
+ "name": "test-action",
+ "version": "1.0.0",
+ "license": "Apache 2.0",
+ "dependencies": {
+ "prog-quote": "2.0.0"
+ }
+ },
+ "node_modules/prog-quote": {
"version": "2.0.0",
"resolved":
"https://registry.npmjs.org/prog-quote/-/prog-quote-2.0.0.tgz",
"integrity": "sha1-TLBMeosV/zu/kxMQxCsBzSjcMB0=",
- "requires": {
+ "dependencies": {
"random-js": "1.0.8"
}
},
- "random-js": {
+ "node_modules/random-js": {
"version": "1.0.8",
"resolved": "https://registry.npmjs.org/random-js/-/random-js-1.0.8.tgz",
"integrity": "sha1-lo/WiabyXWwKrHZig94vaIycGQo="
diff --git a/tests/src/test/scala/common/LoggedFunction.scala
b/tests/src/test/scala/common/LoggedFunction.scala
index b5bd8960d..65b4effad 100644
--- a/tests/src/test/scala/common/LoggedFunction.scala
+++ b/tests/src/test/scala/common/LoggedFunction.scala
@@ -92,6 +92,16 @@ class LoggedFunction7[A1, A2, A3, A4, A5, A6, A7, B](body:
(A1, A2, A3, A4, A5,
}
}
+class LoggedFunction8[A1, A2, A3, A4, A5, A6, A7, A8, B](body: (A1, A2, A3,
A4, A5, A6, A7, A8) => B)
+ extends Function8[A1, A2, A3, A4, A5, A6, A7, A8, B] {
+ val calls = mutable.Buffer[(A1, A2, A3, A4, A5, A6, A7, A8)]()
+
+ override def apply(v1: A1, v2: A2, v3: A3, v4: A4, v5: A5, v6: A6, v7: A7,
v8: A8): B = {
+ calls += ((v1, v2, v3, v4, v5, v6, v7, v8))
+ body(v1, v2, v3, v4, v5, v6, v7, v8)
+ }
+}
+
class SynchronizedLoggedFunction1[A1, B](body: A1 => B) extends Function1[A1,
B] {
val calls = mutable.Buffer[A1]()
@@ -157,6 +167,8 @@ object LoggedFunction {
new LoggedFunction6[A1, A2, A3, A4, A5, A6, B](body)
def apply[A1, A2, A3, A4, A5, A6, A7, B](body: (A1, A2, A3, A4, A5, A6, A7)
=> B) =
new LoggedFunction7[A1, A2, A3, A4, A5, A6, A7, B](body)
+ def apply[A1, A2, A3, A4, A5, A6, A7, A8, B](body: (A1, A2, A3, A4, A5, A6,
A7, A8) => B) =
+ new LoggedFunction8[A1, A2, A3, A4, A5, A6, A7, A8, B](body)
}
object SynchronizedLoggedFunction {
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerFactoryTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerFactoryTests.scala
index baacd95c8..feebd2f05 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerFactoryTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerFactoryTests.scala
@@ -140,7 +140,91 @@ class DockerContainerFactoryTests
userImagesRegistryConfig,
DockerContainerFactoryConfig(true))(actorSystem, executionContext,
logging, dockerApiStub, mock[RuncApi])
- val cf = factory.createContainer(tid, "testContainer", image, false,
10.MB, 32)
+ val cf = factory.createContainer(tid, "testContainer", image, false,
10.MB, 32, None)
+
+ val c = Await.result(cf, 5000.milliseconds)
+
+ Await.result(c.destroy(), 500.milliseconds)
+
+ }
+
+ it should "set the docker run args with cpu limit when provided" in {
+
+ val image = ExecManifest.runtimesManifest.manifests("nodejs:20").image
+
+ implicit val tid = TransactionId.testing
+ val dockerApiStub = mock[DockerApiWithFileAccess]
+ //setup run expectation
+ (dockerApiStub
+ .run(_: String, _: Seq[String])(_: TransactionId))
+ .expects(
+ image.resolveImageName(Some(runtimesRegistryConfig.url)),
+ List(
+ "--cpu-shares",
+ "32", //should be calculated as 1024/(numcore * sharefactor) via
ContainerFactory.cpuShare
+ "--memory",
+ "10m",
+ "--memory-swap",
+ "10m",
+ "--network",
+ "net1",
+ "-e",
+ "__OW_API_HOST=",
+ "-e",
+ "k1=v1",
+ "-e",
+ "k2=v2",
+ "-e",
+ "k3=",
+ "--dns",
+ "dns1",
+ "--dns",
+ "dns2",
+ "--name",
+ "testContainer",
+ "--cpus",
+ "0.5",
+ "--extra1",
+ "e1",
+ "--extra1",
+ "e2",
+ "--extra2",
+ "e3",
+ "--extra2",
+ "e4"),
+ *)
+ .returning(Future.successful { ContainerId("fakecontainerid") })
+ //setup inspect expectation
+ (dockerApiStub
+ .inspectIPAddress(_: ContainerId, _: String)(_: TransactionId))
+ .expects(ContainerId("fakecontainerid"), "net1", *)
+ .returning(Future.successful { ContainerAddress("1.2.3.4", 1234) })
+ //setup rm expectation
+ (dockerApiStub
+ .rm(_: ContainerId)(_: TransactionId))
+ .expects(ContainerId("fakecontainerid"), *)
+ .returning(Future.successful(()))
+ //setup clientVersion exceptation
+ (dockerApiStub.clientVersion _)
+ .expects()
+ .returning("mock_test_client")
+
+ val factory =
+ new DockerContainerFactory(
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ Map.empty,
+ ContainerArgsConfig(
+ "net1",
+ Seq("dns1", "dns2"),
+ Seq.empty,
+ Seq.empty,
+ Seq("k1=v1", "k2=v2", "k3"),
+ Map("extra1" -> Set("e1", "e2"), "extra2" -> Set("e3", "e4"))),
+ runtimesRegistryConfig,
+ userImagesRegistryConfig,
+ DockerContainerFactoryConfig(true))(actorSystem, executionContext,
logging, dockerApiStub, mock[RuncApi])
+
+ val cf = factory.createContainer(tid, "testContainer", image, false,
10.MB, 32, Some(0.5))
val c = Await.result(cf, 5000.milliseconds)
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala
index dbfb85883..25a63da55 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/DockerContainerTests.scala
@@ -137,6 +137,7 @@ class DockerContainerTests
val image = "image"
val memory = 128.MB
val cpuShares = 1
+ val cpuLimit = 0.5
val environment = Map("test" -> "hi")
val network = "testwork"
val name = "myContainer"
@@ -145,6 +146,7 @@ class DockerContainerTests
image = Right(ImageName(image)),
memory = memory,
cpuShares = cpuShares,
+ cpuLimit = Some(cpuLimit),
environment = environment,
network = network,
name = Some(name),
@@ -171,6 +173,7 @@ class DockerContainerTests
args should contain inOrder ("--cpu-shares", cpuShares.toString)
args should contain inOrder ("--network", network)
args should contain inOrder ("--name", name)
+ args should contain inOrder ("--cpus", cpuLimit.toString)
// Assert proper environment passing
args should contain allOf ("-e", "test=hi")
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolConfigTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolConfigTests.scala
new file mode 100644
index 000000000..23e3a738e
--- /dev/null
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolConfigTests.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.containerpool.test
+
+import org.apache.openwhisk.core.containerpool.ContainerPoolConfig
+import org.apache.openwhisk.core.entity.ByteSize
+import org.apache.openwhisk.core.entity.size.SizeInt
+import org.junit.runner.RunWith
+import org.scalatest.{FlatSpec, Matchers}
+import org.scalatest.junit.JUnitRunner
+
+import scala.concurrent.duration.DurationInt
+
+@RunWith(classOf[JUnitRunner])
+class ContainerPoolConfigTests extends FlatSpec with Matchers {
+
+ def createPoolConfig(userMemory: ByteSize, userCpus: Option[Double] = None):
ContainerPoolConfig = {
+ ContainerPoolConfig(userMemory, 0.5, false, 2.second, 10.seconds, None, 1,
3, false, 1.second, 10, userCpus)
+ }
+
+ it should "calculate container cpu shares" in {
+ val (userMemory, memoryLimit) = (2.GB, 256.MB)
+ val poolConfig = createPoolConfig(userMemory)
+ poolConfig.cpuShare(memoryLimit) shouldBe 128
+ }
+
+ it should "use min cpu shares when calculated container cpu shares is too
low" in {
+ val (userMemory, memoryLimit) = (1024.MB, 1.MB)
+ val poolConfig = createPoolConfig(userMemory)
+ poolConfig.cpuShare(memoryLimit) shouldBe 2 // calculated shares would be
1, but min is 2
+ }
+
+ it should "calculate container cpu limit" in {
+ val (userMemory, memoryLimit, userCpus) = (2.GB, 256.MB, 2.0)
+ val poolConfig = createPoolConfig(userMemory, Some(userCpus))
+ poolConfig.cpuLimit(memoryLimit) shouldBe Some(0.25)
+ }
+
+ it should "correctly round container cpu limit" in {
+ val (userMemory, memoryLimit, userCpus) = (768.MB, 256.MB, 2.0)
+ val poolConfig = createPoolConfig(userMemory, Some(userCpus))
+ poolConfig.cpuLimit(memoryLimit) shouldBe Some(0.66667) // calculated
limit is 0.666..., rounded to 0.66667
+ }
+
+ it should "use min container cpu limit when calculated limit is too low" in {
+ val (userMemory, memoryLimit, userCpus) = (1024.MB, 1.MB, 1.0)
+ val poolConfig = createPoolConfig(userMemory, Some(userCpus))
+ poolConfig.cpuLimit(memoryLimit) shouldBe Some(0.01) // calculated limit
is 0.001, but min is 0.01
+ }
+
+ it should "return None for container cpu limit when userCpus is not set" in {
+ val (userMemory, memoryLimit) = (2.GB, 256.MB)
+ val poolConfig = createPoolConfig(userMemory)
+ poolConfig.cpuLimit(memoryLimit) shouldBe None
+ }
+
+ it should "require userCpus to be greater than 0" in {
+ assertThrows[IllegalArgumentException] {
+ createPoolConfig(2.GB, Some(-1.0))
+ }
+ }
+}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
index 03a0e1f1e..e29eb15e5 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
@@ -238,7 +238,14 @@ class ContainerProxyTests
/** Creates an inspectable factory */
def createFactory(response: Future[Container]) = LoggedFunction {
- (_: TransactionId, _: String, _: ImageName, _: Boolean, _: ByteSize, _:
Int, _: Option[ExecutableWhiskAction]) =>
+ (_: TransactionId,
+ _: String,
+ _: ImageName,
+ _: Boolean,
+ _: ByteSize,
+ _: Int,
+ _: Option[Double],
+ _: Option[ExecutableWhiskAction]) =>
response
}
@@ -332,7 +339,7 @@ class ContainerProxyTests
preWarm(machine)
factory.calls should have size 1
- val (tid, name, _, _, memory, cpuShares, _) = factory.calls(0)
+ val (tid, name, _, _, memory, cpuShares, _, _) = factory.calls(0)
tid shouldBe TransactionId.invokerWarmup
name should fullyMatch regex """wskmyname\d+_\d+_prewarm_actionKind"""
memory shouldBe memoryLimit
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
index 1aa85ad9e..714a8f9b1 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
@@ -201,6 +201,7 @@ class FunctionPullingContainerPoolTests
prewarmPromotion,
memorySyncInterval,
batchDeletionSize,
+ Some(2),
prewarmContainerCreationConfig)
def sendAckToScheduler(producer: MessageProducer)(schedulerInstanceId:
SchedulerInstanceId,
@@ -460,6 +461,7 @@ class FunctionPullingContainerPoolTests
false,
FiniteDuration(10, TimeUnit.SECONDS),
10,
+ Some(2),
prewarmContainerCreationConfig)
val pool = system.actorOf(
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
index 36e1416fd..3200aeed9 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
@@ -215,7 +215,14 @@ class FunctionPullingContainerProxyTests
/** Creates an inspectable factory */
def createFactory(response: Future[Container]) = LoggedFunction {
- (_: TransactionId, _: String, _: ImageName, _: Boolean, _: ByteSize, _:
Int, _: Option[ExecutableWhiskAction]) =>
+ (_: TransactionId,
+ _: String,
+ _: ImageName,
+ _: Boolean,
+ _: ByteSize,
+ _: Int,
+ _: Option[Double],
+ _: Option[ExecutableWhiskAction]) =>
response
}
@@ -396,7 +403,7 @@ class FunctionPullingContainerProxyTests
preWarm(machine, probe)
factory.calls should have size 1
- val (tid, name, _, _, memory, _, _) = factory.calls(0)
+ val (tid, name, _, _, memory, _, _, _) = factory.calls(0)
tid shouldBe TransactionId.invokerWarmup
name should fullyMatch regex """wskmyname\d+_\d+_prewarm_actionKind"""
memory shouldBe memoryLimit
@@ -586,7 +593,7 @@ class FunctionPullingContainerProxyTests
case RequestActivation(Some(_), None) => true
}
- val (tid, name, _, _, memory, _, _) = factory.calls(0)
+ val (tid, name, _, _, memory, _, _, _) = factory.calls(0)
tid shouldBe TransactionId.invokerColdstart
name should fullyMatch regex """wskmyname\d+_\d+_actionSpace_actionName"""
memory shouldBe memoryLimit
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/yarn/test/YARNContainerFactoryTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/yarn/test/YARNContainerFactoryTests.scala
index a77ca0d5b..56196ed4c 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/yarn/test/YARNContainerFactoryTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/yarn/test/YARNContainerFactoryTests.scala
@@ -176,7 +176,8 @@ class YARNContainerFactoryTests
imageToCreate,
unuseduserProvidedImage = true,
ByteSize(256, SizeUnits.MB),
- 1)
+ 1,
+ None)
Await.result(containerFuture, 60.seconds)
@@ -231,7 +232,8 @@ class YARNContainerFactoryTests
imageNotToDelete,
unuseduserProvidedImage = true,
ByteSize(256, SizeUnits.MB),
- 1)
+ 1,
+ None)
val containerFuture2 = factory.createContainer(
TransactionId.testing,
@@ -239,7 +241,8 @@ class YARNContainerFactoryTests
imageToDelete,
unuseduserProvidedImage = true,
ByteSize(256, SizeUnits.MB),
- 1)
+ 1,
+ None)
val containerFuture3 = factory.createContainer(
TransactionId.testing,
@@ -247,7 +250,8 @@ class YARNContainerFactoryTests
imageToDelete,
unuseduserProvidedImage = true,
ByteSize(256, SizeUnits.MB),
- 1)
+ 1,
+ None)
val containerFuture4 = factory.createContainer(
TransactionId.testing,
@@ -255,7 +259,8 @@ class YARNContainerFactoryTests
imageToDelete,
unuseduserProvidedImage = true,
ByteSize(256, SizeUnits.MB),
- 1)
+ 1,
+ None)
val container1 = Await.result(containerFuture1, 30.seconds)
val container2 = Await.result(containerFuture2, 30.seconds)
@@ -349,7 +354,8 @@ class YARNContainerFactoryTests
images(0),
unuseduserProvidedImage = true,
ByteSize(256, SizeUnits.MB),
- 1)
+ 1,
+ None)
val container2Future = factory.createContainer(
TransactionId.testing,
@@ -357,7 +363,8 @@ class YARNContainerFactoryTests
images(1),
unuseduserProvidedImage = true,
ByteSize(256, SizeUnits.MB),
- 1)
+ 1,
+ None)
val container3Future = factory.createContainer(
TransactionId.testing,
@@ -365,7 +372,8 @@ class YARNContainerFactoryTests
images(0),
unuseduserProvidedImage = true,
ByteSize(256, SizeUnits.MB),
- 1)
+ 1,
+ None)
Await.result(container1Future, 30.seconds)
val container2 = Await.result(container2Future, 30.seconds)
@@ -463,14 +471,16 @@ class YARNContainerFactoryTests
imageToCreate,
unuseduserProvidedImage = true,
ByteSize(256, SizeUnits.MB),
- 1)
+ 1,
+ None)
val containerFuture1 = factory1.createContainer(
TransactionId.testing,
"name",
imageToCreate,
unuseduserProvidedImage = true,
ByteSize(256, SizeUnits.MB),
- 1)
+ 1,
+ None)
Await.result(containerFuture0, 60.seconds)
Await.result(containerFuture1, 60.seconds)