This is an automated email from the ASF dual-hosted git repository.
tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 1274fdf Adjust prewarm container dynamically (#4871)
1274fdf is described below
commit 1274fdf4d4a8083042666bb47e2219fc3098991d
Author: ningyougang <[email protected]>
AuthorDate: Thu May 21 00:41:10 2020 +0800
Adjust prewarm container dynamically (#4871)
* Adjust prewarm container dynamically
Co-authored-by: ning.yougang <[email protected]>
---
ansible/files/runtimes.json | 11 +-
ansible/roles/invoker/tasks/deploy.yml | 1 +
.../org/apache/openwhisk/common/Logging.scala | 6 +
.../core/containerpool/ContainerFactory.scala | 8 +-
.../openwhisk/core/entity/ExecManifest.scala | 77 ++++++-
core/invoker/src/main/resources/application.conf | 1 +
.../core/containerpool/ContainerPool.scala | 243 +++++++++++++++++----
.../core/containerpool/ContainerProxy.scala | 47 ++--
.../openwhisk/core/invoker/InvokerReactive.scala | 2 +-
.../mesos/test/MesosContainerFactoryTest.scala | 4 +-
.../containerpool/test/ContainerPoolTests.scala | 190 ++++++++++++++--
.../containerpool/test/ContainerProxyTests.scala | 21 +-
.../core/entity/test/ExecManifestTests.scala | 152 +++++++++++--
.../openwhisk/core/entity/test/ExecTests.scala | 2 +-
14 files changed, 653 insertions(+), 112 deletions(-)
diff --git a/ansible/files/runtimes.json b/ansible/files/runtimes.json
index 34038ae..722bbfc 100644
--- a/ansible/files/runtimes.json
+++ b/ansible/files/runtimes.json
@@ -57,8 +57,15 @@
},
"stemCells": [
{
- "count": 2,
- "memory": "256 MB"
+ "initialCount": 2,
+ "memory": "256 MB",
+ "reactive": {
+ "minCount": 1,
+ "maxCount": 4,
+ "ttl": "2 minutes",
+ "threshold": 1,
+ "increment": 1
+ }
}
]
},
diff --git a/ansible/roles/invoker/tasks/deploy.yml
b/ansible/roles/invoker/tasks/deploy.yml
index 62361e6..ea4ce48 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -278,6 +278,7 @@
"CONFIG_whisk_invoker_https_keystorePassword": "{{
invoker.ssl.keystore.password }}"
"CONFIG_whisk_invoker_https_keystoreFlavor": "{{ invoker.ssl.storeFlavor
}}"
"CONFIG_whisk_invoker_https_clientAuth": "{{ invoker.ssl.clientAuth }}"
+ "CONFIG_whisk_containerPool_prewarmExpirationCheckInterval": "{{
container_pool_prewarm_expirationCheckInterval | default('1 minute') }}"
- name: extend invoker dns env
set_fact:
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index 37591be..7f27b26 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -511,6 +511,12 @@ object LoggingMarkers {
LogMarkerToken(containerPool, "prewarmSize",
counter)(MeasurementUnit.information.megabytes)
val CONTAINER_POOL_IDLES_COUNT =
LogMarkerToken(containerPool, "idlesCount", counter)(MeasurementUnit.none)
+ def CONTAINER_POOL_PREWARM_COLDSTART(memory: String, kind: String) =
+ LogMarkerToken(containerPool, "prewarmColdstart", counter, None,
Map("memory" -> memory, "kind" -> kind))(
+ MeasurementUnit.none)
+ def CONTAINER_POOL_PREWARM_EXPIRED(memory: String, kind: String) =
+ LogMarkerToken(containerPool, "prewarmExpired", counter, None,
Map("memory" -> memory, "kind" -> kind))(
+ MeasurementUnit.none)
val CONTAINER_POOL_IDLES_SIZE =
LogMarkerToken(containerPool, "idlesSize",
counter)(MeasurementUnit.information.megabytes)
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
index d984f99..f2e5ab7 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
@@ -24,6 +24,7 @@ import org.apache.openwhisk.core.entity.{ByteSize,
ExecManifest, ExecutableWhisk
import org.apache.openwhisk.spi.Spi
import scala.concurrent.Future
+import scala.concurrent.duration.FiniteDuration
import scala.math.max
case class ContainerArgsConfig(network: String,
@@ -43,11 +44,16 @@ case class ContainerArgsConfig(network: String,
}.toMap
}
-case class ContainerPoolConfig(userMemory: ByteSize, concurrentPeekFactor:
Double, akkaClient: Boolean) {
+case class ContainerPoolConfig(userMemory: ByteSize,
+ concurrentPeekFactor: Double,
+ akkaClient: Boolean,
+ prewarmExpirationCheckInterval: FiniteDuration)
{
require(
concurrentPeekFactor > 0 && concurrentPeekFactor <= 1.0,
s"concurrentPeekFactor must be > 0 and <= 1.0; was $concurrentPeekFactor")
+ require(prewarmExpirationCheckInterval.toSeconds > 0,
"prewarmExpirationCheckInterval must be > 0")
+
/**
* The shareFactor indicates the number of containers that would share a
single core, on average.
* cpuShare is a docker option (-c) whereby a container's CPU access is
limited.
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ExecManifest.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ExecManifest.scala
index fe90515..9011b1e 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ExecManifest.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ExecManifest.scala
@@ -26,7 +26,10 @@ import spray.json.DefaultJsonProtocol._
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.core.entity.Attachments._
import org.apache.openwhisk.core.entity.Attachments.Attached._
-import fastparse._, NoWhitespace._
+import fastparse._
+import NoWhitespace._
+
+import scala.concurrent.duration.{Duration, FiniteDuration}
/**
* Reads manifest of supported runtimes from configuration file and stores
@@ -135,11 +138,37 @@ protected[core] object ExecManifest {
/**
* A stemcell configuration read from the manifest for a container image to
be initialized by the container pool.
*
- * @param count the number of stemcell containers to create
+ * @param initialCount the initial number of stemcell containers to create
* @param memory the max memory this stemcell will allocate
+ * @param reactive the reactive prewarming prewarmed config, which is
disabled by default
*/
- protected[entity] case class StemCell(count: Int, memory: ByteSize) {
- require(count > 0, "count must be positive")
+ protected[entity] case class StemCell(initialCount: Int,
+ memory: ByteSize,
+ reactive:
Option[ReactivePrewarmingConfig] = None) {
+ require(initialCount > 0, "initialCount must be positive")
+ }
+
+ /**
+ * A stemcell's ReactivePrewarmingConfig configuration
+ *
+ * @param minCount the max number of stemcell containers to exist
+ * @param maxCount the max number of stemcell containers to create
+ * @param ttl time to live of the prewarmed container
+ * @param threshold the executed activation number of cold start in previous
one minute
+ * @param increment increase per increment prewarmed number under per
threshold activations
+ */
+ protected[core] case class ReactivePrewarmingConfig(minCount: Int,
+ maxCount: Int,
+ ttl: FiniteDuration,
+ threshold: Int,
+ increment: Int) {
+ require(
+ minCount >= 0 && minCount <= maxCount,
+ "minCount must be be greater than 0 and less than or equal to maxCount")
+ require(maxCount > 0, "maxCount must be positive")
+ require(ttl.toMillis > 0, "ttl must be positive")
+ require(threshold > 0, "threshold must be positive")
+ require(increment > 0 && increment <= maxCount, "increment must be
positive and less than or equal to maxCount")
}
/**
@@ -344,9 +373,45 @@ protected[core] object ExecManifest {
protected[entity] implicit val imageNameSerdes: RootJsonFormat[ImageName] =
jsonFormat4(ImageName.apply)
- protected[entity] implicit val stemCellSerdes: RootJsonFormat[StemCell] = {
+ protected[entity] implicit val ttlSerdes: RootJsonFormat[FiniteDuration] =
new RootJsonFormat[FiniteDuration] {
+ override def write(finiteDuration: FiniteDuration): JsValue =
JsString(finiteDuration.toString)
+
+ override def read(value: JsValue): FiniteDuration = value match {
+ case JsString(s) =>
+ val duration = Duration(s)
+ FiniteDuration(duration.length, duration.unit)
+ case _ =>
+ deserializationError("time unit not supported. Only milliseconds,
seconds, minutes, hours, days are supported")
+ }
+ }
+
+ protected[entity] implicit val reactivePrewarmingConfigSerdes:
RootJsonFormat[ReactivePrewarmingConfig] = jsonFormat5(
+ ReactivePrewarmingConfig.apply)
+
+ protected[entity] implicit val stemCellSerdes = new RootJsonFormat[StemCell]
{
import org.apache.openwhisk.core.entity.size.serdes
- jsonFormat2(StemCell.apply)
+ val defaultSerdes = jsonFormat3(StemCell.apply)
+ override def read(value: JsValue): StemCell = {
+ val fields = value.asJsObject.fields
+ val initialCount: Option[Int] =
+ fields
+ .get("initialCount")
+ .orElse(fields.get("count"))
+ .map(_.convertTo[Int])
+ val memory: Option[ByteSize] =
fields.get("memory").map(_.convertTo[ByteSize])
+ val config =
fields.get("reactive").map(_.convertTo[ReactivePrewarmingConfig])
+
+ (initialCount, memory) match {
+ case (Some(c), Some(m)) => StemCell(c, m, config)
+ case (Some(c), None) =>
+ throw new IllegalArgumentException(s"memory is required, just
provide initialCount: ${c}")
+ case (None, Some(m)) =>
+ throw new IllegalArgumentException(s"initialCount is required, just
provide memory: ${m.toString}")
+ case _ => throw new IllegalArgumentException("both initialCount and
memory are required")
+ }
+ }
+
+ override def write(s: StemCell) = defaultSerdes.write(s)
}
protected[entity] implicit val runtimeManifestSerdes:
RootJsonFormat[RuntimeManifest] = jsonFormat8(RuntimeManifest)
diff --git a/core/invoker/src/main/resources/application.conf
b/core/invoker/src/main/resources/application.conf
index 63d0824..b7e6eed 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -60,6 +60,7 @@ whisk {
user-memory: 1024 m
concurrent-peek-factor: 0.5 #factor used to limit message peeking: 0 <
factor <= 1.0 - larger number improves concurrent processing, but increases
risk of message loss during invoker crash
akka-client: false # if true, use PoolingContainerClient for HTTP from
invoker to action container (otherwise use ApacheBlockingContainerClient)
+ prewarm-expiration-check-interval: 1 minute
}
kubernetes {
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
index 1e267fe..8c3dcb1 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
@@ -18,11 +18,12 @@
package org.apache.openwhisk.core.containerpool
import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
-import org.apache.openwhisk.common.MetricEmitter
-import org.apache.openwhisk.common.{AkkaLogging, LoggingMarkers, TransactionId}
+import org.apache.openwhisk.common.{Logging, LoggingMarkers, MetricEmitter,
TransactionId}
import org.apache.openwhisk.core.connector.MessageFeed
+import org.apache.openwhisk.core.entity.ExecManifest.ReactivePrewarmingConfig
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
+
import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.duration._
@@ -32,10 +33,14 @@ sealed trait WorkerState
case object Busy extends WorkerState
case object Free extends WorkerState
+case class ColdStartKey(kind: String, memory: ByteSize)
+
case class WorkerData(data: ContainerData, state: WorkerState)
case object EmitMetrics
+case object AdjustPrewarmedContainer
+
/**
* A pool managing containers to run actions on.
*
@@ -59,16 +64,15 @@ case object EmitMetrics
class ContainerPool(childFactory: ActorRefFactory => ActorRef,
feed: ActorRef,
prewarmConfig: List[PrewarmingConfig] = List.empty,
- poolConfig: ContainerPoolConfig)
+ poolConfig: ContainerPoolConfig)(implicit val logging:
Logging)
extends Actor {
import ContainerPool.memoryConsumptionOf
- implicit val logging = new AkkaLogging(context.system.log)
implicit val ec = context.dispatcher
var freePool = immutable.Map.empty[ActorRef, ContainerData]
var busyPool = immutable.Map.empty[ActorRef, ContainerData]
- var prewarmedPool = immutable.Map.empty[ActorRef, ContainerData]
+ var prewarmedPool = immutable.Map.empty[ActorRef, PreWarmedData]
var prewarmStartingPool = immutable.Map.empty[ActorRef, (String, ByteSize)]
// If all memory slots are occupied and if there is currently no container
to be removed, than the actions will be
// buffered here to keep order of computation.
@@ -80,7 +84,17 @@ class ContainerPool(childFactory: ActorRefFactory =>
ActorRef,
//periodically emit metrics (don't need to do this for each message!)
context.system.scheduler.schedule(30.seconds, 10.seconds, self, EmitMetrics)
- backfillPrewarms(true)
+ // Key is ColdStartKey, value is the number of cold Start in minute
+ var coldStartCount = immutable.Map.empty[ColdStartKey, Int]
+
+ adjustPrewarmedContainer(true, false)
+
+ // check periodically, adjust prewarmed container(delete if unused for some
time and create some increment containers)
+ context.system.scheduler.schedule(
+ 2.seconds,
+ poolConfig.prewarmExpirationCheckInterval,
+ self,
+ AdjustPrewarmedContainer)
def logContainerStart(r: Run, containerState: String, activeActivations:
Int, container: Option[Container]): Unit = {
val namespaceName = r.msg.user.namespace.name
@@ -131,7 +145,11 @@ class ContainerPool(childFactory: ActorRefFactory =>
ActorRef,
if (hasPoolSpaceFor(busyPool ++ freePool,
r.action.limits.memory.megabytes.MB)) {
takePrewarmContainer(r.action)
.map(container => (container, "prewarmed"))
-
.orElse(Some(createContainer(r.action.limits.memory.megabytes.MB), "cold"))
+ .orElse {
+ val container =
Some(createContainer(r.action.limits.memory.megabytes.MB), "cold")
+ incrementColdStartCount(r.action.exec.kind,
r.action.limits.memory.megabytes.MB)
+ container
+ }
} else None)
.orElse(
// Remove a container and create a new one for the given job
@@ -145,7 +163,11 @@ class ContainerPool(childFactory: ActorRefFactory =>
ActorRef,
.map(_ =>
takePrewarmContainer(r.action)
.map(container => (container, "recreatedPrewarm"))
-
.getOrElse(createContainer(r.action.limits.memory.megabytes.MB), "recreated")))
+ .getOrElse {
+ val container =
(createContainer(r.action.limits.memory.megabytes.MB), "recreated")
+ incrementColdStartCount(r.action.exec.kind,
r.action.limits.memory.megabytes.MB)
+ container
+ }))
} else None
@@ -245,7 +267,7 @@ class ContainerPool(childFactory: ActorRefFactory =>
ActorRef,
prewarmedPool = prewarmedPool + (sender() -> data)
// Container got removed
- case ContainerRemoved =>
+ case ContainerRemoved(replacePrewarm) =>
// if container was in free pool, it may have been processing (but under
capacity),
// so there is capacity to accept another job request
freePool.get(sender()).foreach { f =>
@@ -258,8 +280,7 @@ class ContainerPool(childFactory: ActorRefFactory =>
ActorRef,
processBufferOrFeed()
//in case this was a prewarm
- prewarmedPool.get(sender()).foreach { _ =>
- logging.info(this, "failed prewarm removed")
+ prewarmedPool.get(sender()).foreach { data =>
prewarmedPool = prewarmedPool - sender()
}
//in case this was a starting prewarm
@@ -268,8 +289,10 @@ class ContainerPool(childFactory: ActorRefFactory =>
ActorRef,
prewarmStartingPool = prewarmStartingPool - sender()
}
- //backfill prewarms on every ContainerRemoved, just in case
- backfillPrewarms(false) //in case a prewarm is removed due to health
failure or crash
+ //backfill prewarms on every ContainerRemoved(replacePrewarm = true),
just in case
+ if (replacePrewarm) {
+ adjustPrewarmedContainer(false, false) //in case a prewarm is removed
due to health failure or crash
+ }
// This message is received for one of these reasons:
// 1. Container errored while resuming a warm container, could not process
the job, and sent the job back
@@ -281,6 +304,9 @@ class ContainerPool(childFactory: ActorRefFactory =>
ActorRef,
busyPool = busyPool - sender()
case EmitMetrics =>
emitMetrics()
+
+ case AdjustPrewarmedContainer =>
+ adjustPrewarmedContainer(false, true)
}
/** Resend next item in the buffer, or trigger next item in the feed, if no
items in the buffer. */
@@ -302,26 +328,32 @@ class ContainerPool(childFactory: ActorRefFactory =>
ActorRef,
}
}
- /** Install prewarm containers up to the configured requirements for each
kind/memory combination. */
- def backfillPrewarms(init: Boolean) = {
- prewarmConfig.foreach { config =>
- val kind = config.exec.kind
- val memory = config.memoryLimit
- val currentCount = prewarmedPool.count {
- case (_, PreWarmedData(_, `kind`, `memory`, _)) => true //done starting
- case _ => false //started but
not finished starting
- }
- val startingCount = prewarmStartingPool.count(p => p._2._1 == kind &&
p._2._2 == memory)
- val containerCount = currentCount + startingCount
- if (containerCount < config.count) {
- logging.info(
- this,
- s"found ${currentCount} started and ${startingCount} starting; ${if
(init) "initing" else "backfilling"} ${config.count - containerCount} pre-warms
to desired count: ${config.count} for kind:${config.exec.kind}
mem:${config.memoryLimit.toString}")(
- TransactionId.invokerWarmup)
- (containerCount until config.count).foreach { _ =>
- prewarmContainer(config.exec, config.memoryLimit)
+ /** adjust prewarm containers up to the configured requirements for each
kind/memory combination. */
+ def adjustPrewarmedContainer(init: Boolean, scheduled: Boolean): Unit = {
+ //fill in missing prewarms
+ ContainerPool
+ .increasePrewarms(init, scheduled, coldStartCount, prewarmConfig,
prewarmedPool, prewarmStartingPool)
+ .foreach { c =>
+ val config = c._1
+ val currentCount = c._2._1
+ val desiredCount = c._2._2
+ if (currentCount < desiredCount) {
+ (currentCount until desiredCount).foreach { _ =>
+ prewarmContainer(config.exec, config.memoryLimit,
config.reactive.map(_.ttl))
+ }
}
}
+ if (scheduled) {
+ //on scheduled time, remove expired prewarms
+ ContainerPool.removeExpired(prewarmConfig, prewarmedPool).foreach(_ !
Remove)
+ //on scheduled time, emit cold start counter metric with memory + kind
+ coldStartCount foreach { coldStart =>
+ val coldStartKey = coldStart._1
+ MetricEmitter.emitCounterMetric(
+
LoggingMarkers.CONTAINER_POOL_PREWARM_COLDSTART(coldStartKey.memory.toString,
coldStartKey.kind))
+ }
+ // then clear coldStartCounts each time scheduled event is processed
to reset counts
+ coldStartCount = immutable.Map.empty[ColdStartKey, Int]
}
}
@@ -334,10 +366,25 @@ class ContainerPool(childFactory: ActorRefFactory =>
ActorRef,
}
/** Creates a new prewarmed container */
- def prewarmContainer(exec: CodeExec[_], memoryLimit: ByteSize): Unit = {
+ def prewarmContainer(exec: CodeExec[_], memoryLimit: ByteSize, ttl:
Option[FiniteDuration]): Unit = {
val newContainer = childFactory(context)
prewarmStartingPool = prewarmStartingPool + (newContainer -> (exec.kind,
memoryLimit))
- newContainer ! Start(exec, memoryLimit)
+ newContainer ! Start(exec, memoryLimit, ttl)
+ }
+
+ /** this is only for cold start statistics of prewarm configs, e.g. not
blackbox or other configs. */
+ def incrementColdStartCount(kind: String, memoryLimit: ByteSize): Unit = {
+ prewarmConfig
+ .filter { config =>
+ kind == config.exec.kind && memoryLimit == config.memoryLimit
+ }
+ .foreach { _ =>
+ val coldStartKey = ColdStartKey(kind, memoryLimit)
+ coldStartCount.get(coldStartKey) match {
+ case Some(value) => coldStartCount = coldStartCount + (coldStartKey
-> (value + 1))
+ case None => coldStartCount = coldStartCount + (coldStartKey
-> 1)
+ }
+ }
}
/**
@@ -350,10 +397,12 @@ class ContainerPool(childFactory: ActorRefFactory =>
ActorRef,
def takePrewarmContainer(action: ExecutableWhiskAction): Option[(ActorRef,
ContainerData)] = {
val kind = action.exec.kind
val memory = action.limits.memory.megabytes.MB
- prewarmedPool
+ val now = Deadline.now
+ prewarmedPool.toSeq
+ .sortBy(_._2.expires.getOrElse(now))
.find {
- case (_, PreWarmedData(_, `kind`, `memory`, _)) => true
- case _ => false
+ case (_, PreWarmedData(_, `kind`, `memory`, _, _)) => true
+ case _ => false
}
.map {
case (ref, data) =>
@@ -363,7 +412,11 @@ class ContainerPool(childFactory: ActorRefFactory =>
ActorRef,
// Create a new prewarm container
// NOTE: prewarming ignores the action code in exec, but this is
dangerous as the field is accessible to the
// factory
- prewarmContainer(action.exec, memory)
+ val ttl = data.expires match {
+ case Some(value) => Some(value.time)
+ case None => None
+ }
+ prewarmContainer(action.exec, memory, ttl)
(ref, data)
}
}
@@ -499,12 +552,124 @@ object ContainerPool {
}
}
+ /**
+ * Find the expired actor in prewarmedPool
+ *
+ * @param prewarmConfig
+ * @param prewarmedPool
+ * @param logging
+ * @return a list of expired actor
+ */
+ def removeExpired(prewarmConfig: List[PrewarmingConfig], prewarmedPool:
Map[ActorRef, PreWarmedData])(
+ implicit logging: Logging): List[ActorRef] = {
+ prewarmConfig.flatMap { config =>
+ val kind = config.exec.kind
+ val memory = config.memoryLimit
+ config.reactive
+ .map { _ =>
+ val expiredPrewarmedContainer = prewarmedPool
+ .filter { warmInfo =>
+ warmInfo match {
+ case (_, p @ PreWarmedData(_, `kind`, `memory`, _, _)) if
p.isExpired() => true
+ case _
=> false
+ }
+ }
+ // emit expired container counter metric with memory + kind
+
MetricEmitter.emitCounterMetric(LoggingMarkers.CONTAINER_POOL_PREWARM_EXPIRED(memory.toString,
kind))
+ logging.info(
+ this,
+ s"[kind: ${kind} memory: ${memory.toString}] removed
${expiredPrewarmedContainer.size} expired prewarmed container")
+ expiredPrewarmedContainer.keys
+ }
+ .getOrElse(List.empty)
+ }
+ }
+
+ /**
+ * Find the increased number for the prewarmed kind
+ *
+ * @param init
+ * @param scheduled
+ * @param coldStartCount
+ * @param prewarmConfig
+ * @param prewarmedPool
+ * @param prewarmStartingPool
+ * @param logging
+ * @return the current number and increased number for the kind in the Map
+ */
+ def increasePrewarms(init: Boolean,
+ scheduled: Boolean,
+ coldStartCount: Map[ColdStartKey, Int],
+ prewarmConfig: List[PrewarmingConfig],
+ prewarmedPool: Map[ActorRef, PreWarmedData],
+ prewarmStartingPool: Map[ActorRef, (String, ByteSize)])(
+ implicit logging: Logging): Map[PrewarmingConfig, (Int, Int)] = {
+ prewarmConfig.map { config =>
+ val kind = config.exec.kind
+ val memory = config.memoryLimit
+
+ val runningCount = prewarmedPool.count {
+ // done starting, and not expired
+ case (_, p @ PreWarmedData(_, `kind`, `memory`, _, _)) if
!p.isExpired() => true
+ // started but not finished starting (or expired)
+ case _ => false
+ }
+ val startingCount = prewarmStartingPool.count(p => p._2._1 == kind &&
p._2._2 == memory)
+ val currentCount = runningCount + startingCount
+
+ // determine how many are needed
+ val desiredCount: Int =
+ if (init) config.initialCount
+ else {
+ if (scheduled) {
+ // scheduled/reactive config backfill
+ config.reactive
+ .map(c => getReactiveCold(coldStartCount, c, kind,
memory).getOrElse(c.minCount)) //reactive -> desired is either cold start
driven, or minCount
+ .getOrElse(config.initialCount) //not reactive -> desired is
always initial count
+ } else {
+ // normal backfill after removal - make sure at least minCount or
initialCount is started
+ config.reactive.map(_.minCount).getOrElse(config.initialCount)
+ }
+ }
+
+ logging.info(
+ this,
+ s"found ${currentCount} started and ${startingCount} starting; ${if
(init) "initing" else "backfilling"} ${desiredCount - currentCount} pre-warms
to desired count: ${desiredCount} for kind:${config.exec.kind}
mem:${config.memoryLimit.toString}")(
+ TransactionId.invokerWarmup)
+ (config, (currentCount, desiredCount))
+ }.toMap
+ }
+
+ /**
+ * Get the required prewarmed container number according to the cold start
happened in previous minute
+ *
+ * @param coldStartCount
+ * @param config
+ * @param kind
+ * @param memory
+ * @return the required prewarmed container number
+ */
+ def getReactiveCold(coldStartCount: Map[ColdStartKey, Int],
+ config: ReactivePrewarmingConfig,
+ kind: String,
+ memory: ByteSize): Option[Int] = {
+ coldStartCount.get(ColdStartKey(kind, memory)).map { value =>
+ // Let's assume that threshold is `2`, increment is `1` in runtimes.json
+ // if cold start number in previous minute is `2`, requireCount is `2/2
* 1 = 1`
+ // if cold start number in previous minute is `4`, requireCount is `4/2
* 1 = 2`
+ math.min(math.max(config.minCount, (value / config.threshold) *
config.increment), config.maxCount)
+ }
+ }
+
def props(factory: ActorRefFactory => ActorRef,
poolConfig: ContainerPoolConfig,
feed: ActorRef,
- prewarmConfig: List[PrewarmingConfig] = List.empty) =
+ prewarmConfig: List[PrewarmingConfig] = List.empty)(implicit
logging: Logging) =
Props(new ContainerPool(factory, feed, prewarmConfig, poolConfig))
}
/** Contains settings needed to perform container prewarming. */
-case class PrewarmingConfig(count: Int, exec: CodeExec[_], memoryLimit:
ByteSize)
+case class PrewarmingConfig(initialCount: Int,
+ exec: CodeExec[_],
+ memoryLimit: ByteSize,
+ reactive: Option[ReactivePrewarmingConfig] = None)
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
index 142173e..d23c0e5 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
@@ -145,12 +145,14 @@ case class MemoryData(override val memoryLimit: ByteSize,
override val activeAct
case class PreWarmedData(override val container: Container,
kind: String,
override val memoryLimit: ByteSize,
- override val activeActivationCount: Int = 0)
+ override val activeActivationCount: Int = 0,
+ expires: Option[Deadline] = None)
extends ContainerStarted(container, Instant.EPOCH, memoryLimit,
activeActivationCount)
with ContainerNotInUse {
override val initingState = "prewarmed"
override def nextRun(r: Run) =
WarmingData(container, r.msg.user.namespace.name, r.action, Instant.now, 1)
+ def isExpired(): Boolean = expires.exists(_.isOverdue())
}
/** type representing a prewarm (running, but not used) container that is
being initialized (for a specific action + invocation namespace) */
@@ -193,7 +195,7 @@ case class WarmedData(override val container: Container,
}
// Events received by the actor
-case class Start(exec: CodeExec[_], memoryLimit: ByteSize)
+case class Start(exec: CodeExec[_], memoryLimit: ByteSize, ttl:
Option[FiniteDuration] = None)
case class Run(action: ExecutableWhiskAction, msg: ActivationMessage,
retryLogDeadline: Option[Deadline] = None)
case object Remove
case class HealthPingEnabled(enabled: Boolean)
@@ -201,7 +203,7 @@ case class HealthPingEnabled(enabled: Boolean)
// Events sent by the actor
case class NeedWork(data: ContainerData)
case object ContainerPaused
-case object ContainerRemoved // when container is destroyed
+case class ContainerRemoved(replacePrewarm: Boolean) // when container is
destroyed
case object RescheduleJob // job is sent back to parent and could not be
processed because container is being destroyed
case class PreWarmCompleted(data: PreWarmedData)
case class InitCompleted(data: WarmedData)
@@ -288,7 +290,8 @@ class ContainerProxy(factory: (TransactionId,
job.memoryLimit,
poolConfig.cpuShare(job.memoryLimit),
None)
- .map(container => PreWarmCompleted(PreWarmedData(container,
job.exec.kind, job.memoryLimit)))
+ .map(container =>
+ PreWarmCompleted(PreWarmedData(container, job.exec.kind,
job.memoryLimit, expires = job.ttl.map(_.fromNow))))
.pipeTo(self)
goto(Starting)
@@ -316,7 +319,7 @@ class ContainerProxy(factory: (TransactionId,
// the container is ready to accept an activation; register it as
PreWarmed; this
// normalizes the life cycle for containers and their cleanup when
activations fail
self ! PreWarmCompleted(
- PreWarmedData(container, job.action.exec.kind,
job.action.limits.memory.megabytes.MB, 1))
+ PreWarmedData(container, job.action.exec.kind,
job.action.limits.memory.megabytes.MB, 1, expires = None))
case Failure(t) =>
// the container did not come up cleanly, so disambiguate the
failure mode and then cleanup
@@ -359,7 +362,7 @@ class ContainerProxy(factory: (TransactionId,
// container creation failed
case Event(_: FailureMessage, _) =>
- context.parent ! ContainerRemoved
+ context.parent ! ContainerRemoved(true)
stop()
case _ => delay
@@ -372,14 +375,14 @@ class ContainerProxy(factory: (TransactionId,
initializeAndRun(data.container, job)
.map(_ => RunCompleted)
.pipeTo(self)
- goto(Running) using PreWarmedData(data.container, data.kind,
data.memoryLimit, 1)
+ goto(Running) using PreWarmedData(data.container, data.kind,
data.memoryLimit, 1, data.expires)
- case Event(Remove, data: PreWarmedData) => destroyContainer(data)
+ case Event(Remove, data: PreWarmedData) => destroyContainer(data, false)
// prewarm container failed
case Event(_: FailureMessage, data: PreWarmedData) =>
MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_CONTAINER_HEALTH_FAILED_PREWARM)
- destroyContainer(data)
+ destroyContainer(data, true)
}
when(Running) {
@@ -452,22 +455,22 @@ class ContainerProxy(factory: (TransactionId,
.getOrElse(data)
rescheduleJob = true
rejectBuffered()
- destroyContainer(newData)
+ destroyContainer(newData, true)
// Failed after /init (the first run failed)
case Event(_: FailureMessage, data: PreWarmedData) =>
activeCount -= 1
- destroyContainer(data)
+ destroyContainer(data, true)
// Failed for a subsequent /run
case Event(_: FailureMessage, data: WarmedData) =>
activeCount -= 1
- destroyContainer(data)
+ destroyContainer(data, true)
// Failed at getting a container for a cold-start run
case Event(_: FailureMessage, _) =>
activeCount -= 1
- context.parent ! ContainerRemoved
+ context.parent ! ContainerRemoved(true)
rejectBuffered()
stop()
@@ -490,16 +493,16 @@ class ContainerProxy(factory: (TransactionId,
data.container.suspend()(TransactionId.invokerNanny).map(_ =>
ContainerPaused).pipeTo(self)
goto(Pausing)
- case Event(Remove, data: WarmedData) => destroyContainer(data)
+ case Event(Remove, data: WarmedData) => destroyContainer(data, true)
// warm container failed
case Event(_: FailureMessage, data: WarmedData) =>
- destroyContainer(data)
+ destroyContainer(data, true)
}
when(Pausing) {
case Event(ContainerPaused, data: WarmedData) => goto(Paused)
- case Event(_: FailureMessage, data: WarmedData) => destroyContainer(data)
+ case Event(_: FailureMessage, data: WarmedData) => destroyContainer(data,
true)
case _ => delay
}
@@ -526,7 +529,7 @@ class ContainerProxy(factory: (TransactionId,
// container is reclaimed by the pool or it has become too old
case Event(StateTimeout | Remove, data: WarmedData) =>
rescheduleJob = true // to supress sending message to the pool and not
double count
- destroyContainer(data)
+ destroyContainer(data, true)
}
when(Removing) {
@@ -534,8 +537,8 @@ class ContainerProxy(factory: (TransactionId,
// Send the job back to the pool to be rescheduled
context.parent ! job
stay
- case Event(ContainerRemoved, _) => stop()
- case Event(_: FailureMessage, _) => stop()
+ case Event(ContainerRemoved(_), _) => stop()
+ case Event(_: FailureMessage, _) => stop()
}
// Unstash all messages stashed while in intermediate state
@@ -614,10 +617,10 @@ class ContainerProxy(factory: (TransactionId,
*
* @param newData the ContainerStarted which container will be destroyed
*/
- def destroyContainer(newData: ContainerStarted) = {
+ def destroyContainer(newData: ContainerStarted, replacePrewarm: Boolean) = {
val container = newData.container
if (!rescheduleJob) {
- context.parent ! ContainerRemoved
+ context.parent ! ContainerRemoved(replacePrewarm)
} else {
context.parent ! RescheduleJob
}
@@ -631,7 +634,7 @@ class ContainerProxy(factory: (TransactionId,
unpause
.flatMap(_ => container.destroy()(TransactionId.invokerNanny))
- .map(_ => ContainerRemoved)
+ .map(_ => ContainerRemoved(replacePrewarm))
.pipeTo(self)
goto(Removing) using newData
}
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
index 48b647b..d8d71d1 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
@@ -156,7 +156,7 @@ class InvokerReactive(
ExecManifest.runtimesManifest.stemcells.flatMap {
case (mf, cells) =>
cells.map { cell =>
- PrewarmingConfig(cell.count, new CodeExecAsString(mf, "", None),
cell.memory)
+ PrewarmingConfig(cell.initialCount, new CodeExecAsString(mf, "",
None), cell.memory, cell.reactive)
}
}.toList
}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
index 72f21a9..656c89c 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
@@ -17,6 +17,8 @@
package org.apache.openwhisk.core.containerpool.mesos.test
+import java.util.concurrent.TimeUnit
+
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
@@ -84,7 +86,7 @@ class MesosContainerFactoryTest
}
// 80 slots, each 265MB
- val poolConfig = ContainerPoolConfig(21200.MB, 0.5, false)
+ val poolConfig = ContainerPoolConfig(21200.MB, 0.5, false, FiniteDuration(1,
TimeUnit.MINUTES))
val actionMemory = 265.MB
val mesosCpus = poolConfig.cpuShare(actionMemory) / 1024.0
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
index 4efbe85..399bf88 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
@@ -18,6 +18,7 @@
package org.apache.openwhisk.core.containerpool.test
import java.time.Instant
+import java.util.concurrent.TimeUnit
import scala.collection.mutable
import scala.concurrent.duration._
@@ -33,15 +34,15 @@ import akka.actor.ActorSystem
import akka.testkit.ImplicitSender
import akka.testkit.TestKit
import akka.testkit.TestProbe
-import common.WhiskProperties
+import common.{StreamLogging, WhiskProperties}
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.connector.ActivationMessage
import org.apache.openwhisk.core.containerpool._
import org.apache.openwhisk.core.entity._
-import org.apache.openwhisk.core.entity.ExecManifest.RuntimeManifest
-import org.apache.openwhisk.core.entity.ExecManifest.ImageName
+import org.apache.openwhisk.core.entity.ExecManifest.{ImageName,
ReactivePrewarmingConfig, RuntimeManifest}
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.connector.MessageFeed
+import org.scalatest.concurrent.Eventually
/**
* Behavior tests for the ContainerPool
@@ -55,7 +56,9 @@ class ContainerPoolTests
with FlatSpecLike
with Matchers
with BeforeAndAfterAll
- with MockFactory {
+ with MockFactory
+ with Eventually
+ with StreamLogging {
override def afterAll = TestKit.shutdownActorSystem(system)
@@ -67,6 +70,9 @@ class ContainerPoolTests
// the values is done properly.
val exec = CodeExecAsString(RuntimeManifest("actionKind",
ImageName("testImage")), "testCode", None)
val memoryLimit = 256.MB
+ val ttl = FiniteDuration(500, TimeUnit.MILLISECONDS)
+ val threshold = 1
+ val increment = 1
/** Creates a `Run` message */
def createRunMessage(action: ExecutableWhiskAction, invocationNamespace:
EntityName) = {
@@ -109,8 +115,8 @@ class ContainerPoolTests
val runMessageConcurrentDifferentNamespace =
createRunMessage(concurrentAction, differentInvocationNamespace)
/** Helper to create PreWarmedData */
- def preWarmedData(kind: String, memoryLimit: ByteSize = memoryLimit) =
- PreWarmedData(stub[MockableContainer], kind, memoryLimit)
+ def preWarmedData(kind: String, memoryLimit: ByteSize = memoryLimit,
expires: Option[Deadline] = None) =
+ PreWarmedData(stub[MockableContainer], kind, memoryLimit, expires =
expires)
/** Helper to create WarmedData */
def warmedData(run: Run, lastUsed: Instant = Instant.now) = {
@@ -125,7 +131,8 @@ class ContainerPoolTests
(containers, factory)
}
- def poolConfig(userMemory: ByteSize) = ContainerPoolConfig(userMemory, 0.5,
false)
+ def poolConfig(userMemory: ByteSize) =
+ ContainerPoolConfig(userMemory, 0.5, false, FiniteDuration(1,
TimeUnit.MINUTES))
behavior of "ContainerPool"
@@ -388,7 +395,7 @@ class ContainerPoolTests
containers(0).send(pool, NeedWork(warmedData(runMessage)))
// container0 is deleted
- containers(0).send(pool, ContainerRemoved)
+ containers(0).send(pool, ContainerRemoved(true))
// container1 is created and used
pool ! runMessage
@@ -597,7 +604,7 @@ class ContainerPoolTests
containers(0).send(pool, RescheduleJob)
//trigger buffer processing by ContainerRemoved message
- pool ! ContainerRemoved
+ pool ! ContainerRemoved(true)
//start second run
containers(1).expectMsgPF() {
@@ -627,9 +634,9 @@ class ContainerPoolTests
containers(1).expectNoMessage(100.milliseconds)
//ContainerRemoved triggers buffer processing - if we don't prevent
duplicates, this will cause the buffer head to be resent!
- pool ! ContainerRemoved
- pool ! ContainerRemoved
- pool ! ContainerRemoved
+ pool ! ContainerRemoved(true)
+ pool ! ContainerRemoved(true)
+ pool ! ContainerRemoved(true)
//complete processing of first run
containers(0).send(pool, NeedWork(warmedData(run1)))
@@ -753,14 +760,169 @@ class ContainerPoolTests
containers(1).expectMsg(Start(exec, memoryLimit))
//removing 2 prewarm containers will start 2 containers via backfill
- containers(0).send(pool, ContainerRemoved)
- containers(1).send(pool, ContainerRemoved)
+ containers(0).send(pool, ContainerRemoved(true))
+ containers(1).send(pool, ContainerRemoved(true))
containers(2).expectMsg(Start(exec, memoryLimit))
containers(3).expectMsg(Start(exec, memoryLimit))
//make sure extra prewarms are not started
containers(4).expectNoMessage(100.milliseconds)
containers(5).expectNoMessage(100.milliseconds)
}
+
+ it should "adjust prewarm container run well without reactive config" in {
+ val (containers, factory) = testContainers(4)
+ val feed = TestProbe()
+
+ stream.reset()
+ val prewarmExpirationCheckIntervel = FiniteDuration(2, TimeUnit.SECONDS)
+ val poolConfig = ContainerPoolConfig(MemoryLimit.STD_MEMORY * 4, 0.5,
false, prewarmExpirationCheckIntervel)
+ val initialCount = 2
+ val pool =
+ system.actorOf(
+ ContainerPool
+ .props(factory, poolConfig, feed.ref,
List(PrewarmingConfig(initialCount, exec, memoryLimit))))
+ containers(0).expectMsg(Start(exec, memoryLimit))
+ containers(1).expectMsg(Start(exec, memoryLimit))
+ containers(0).send(pool, NeedWork(preWarmedData(exec.kind)))
+ containers(1).send(pool, NeedWork(preWarmedData(exec.kind)))
+
+ // when invoker starts, include 0 prewarm container at the very beginning
+ stream.toString should include(s"found 0 started")
+
+ // the desiredCount should equal with initialCount when invoker starts
+ stream.toString should include(s"desired count: ${initialCount}")
+
+ stream.reset()
+
+ // Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler
after prewarmExpirationCheckIntervel time
+ Thread.sleep(prewarmExpirationCheckIntervel.toMillis)
+
+ // Because already supplemented the prewarmed container, so currentCount
should equal with initialCount
+ eventually {
+ stream.toString should include(s"found ${initialCount} started")
+ }
+ }
+
+ it should "adjust prewarm container run well with reactive config" in {
+ val (containers, factory) = testContainers(15)
+ val feed = TestProbe()
+
+ stream.reset()
+ val prewarmExpirationCheckIntervel = FiniteDuration(2, TimeUnit.SECONDS)
+ val poolConfig = ContainerPoolConfig(MemoryLimit.STD_MEMORY * 8, 0.5,
false, prewarmExpirationCheckIntervel)
+ val minCount = 0
+ val initialCount = 2
+ val maxCount = 4
+ val deadline: Option[Deadline] = Some(ttl.fromNow)
+ val reactive: Option[ReactivePrewarmingConfig] =
+ Some(ReactivePrewarmingConfig(minCount, maxCount, ttl, threshold,
increment))
+ val pool =
+ system.actorOf(
+ ContainerPool
+ .props(factory, poolConfig, feed.ref,
List(PrewarmingConfig(initialCount, exec, memoryLimit, reactive))))
+ containers(0).expectMsg(Start(exec, memoryLimit, Some(ttl)))
+ containers(1).expectMsg(Start(exec, memoryLimit, Some(ttl)))
+ containers(0).send(pool, NeedWork(preWarmedData(exec.kind, expires =
deadline)))
+ containers(1).send(pool, NeedWork(preWarmedData(exec.kind, expires =
deadline)))
+
+ // when invoker starts, include 0 prewarm container at the very beginning
+ stream.toString should include(s"found 0 started")
+
+ // the desiredCount should equal with initialCount when invoker starts
+ stream.toString should include(s"desired count: ${initialCount}")
+
+ stream.reset()
+
+ // Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler
after prewarmExpirationCheckIntervel time
+ Thread.sleep(prewarmExpirationCheckIntervel.toMillis)
+
+ containers(0).expectMsg(Remove)
+ containers(1).expectMsg(Remove)
+ containers(0).send(pool, ContainerRemoved(false))
+ containers(1).send(pool, ContainerRemoved(false))
+
+ // currentCount should equal with 0 due to these 2 prewarmed containers
are expired
+ stream.toString should include(s"found 0 started")
+
+ // the desiredCount should equal with minCount because cold start didn't
happen
+ stream.toString should include(s"desired count: ${minCount}")
+ // Previously created prewarmed containers should be removed
+ stream.toString should include(s"removed ${initialCount} expired prewarmed
container")
+
+ stream.reset()
+ val action = ExecutableWhiskAction(
+ EntityPath("actionSpace"),
+ EntityName("actionName"),
+ exec,
+ limits = ActionLimits(memory = MemoryLimit(memoryLimit)))
+ val run = createRunMessage(action, invocationNamespace)
+ // 2 code start happened
+ pool ! run
+ pool ! run
+ containers(2).expectMsg(run)
+ containers(3).expectMsg(run)
+
+ // Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler
after prewarmExpirationCheckIntervel time
+ Thread.sleep(prewarmExpirationCheckIntervel.toMillis)
+
+ eventually {
+ // Because already removed expired prewarmed containrs, so currentCount
should equal with 0
+ stream.toString should include(s"found 0 started")
+ // the desiredCount should equal with 2 due to cold start happened
+ stream.toString should include(s"desired count: 2")
+ }
+
+ containers(4).expectMsg(Start(exec, memoryLimit, Some(ttl)))
+ containers(5).expectMsg(Start(exec, memoryLimit, Some(ttl)))
+ containers(4).send(pool, NeedWork(preWarmedData(exec.kind, expires =
deadline)))
+ containers(5).send(pool, NeedWork(preWarmedData(exec.kind, expires =
deadline)))
+
+ stream.reset()
+
+ // Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler
after prewarmExpirationCheckIntervel time
+ Thread.sleep(prewarmExpirationCheckIntervel.toMillis)
+
+ containers(4).expectMsg(Remove)
+ containers(5).expectMsg(Remove)
+ containers(4).send(pool, ContainerRemoved(false))
+ containers(5).send(pool, ContainerRemoved(false))
+
+ // removed previous 2 prewarmed container due to expired
+ stream.toString should include(s"removed 2 expired prewarmed container")
+
+ stream.reset()
+ // 5 code start happened(5 > maxCount)
+ pool ! run
+ pool ! run
+ pool ! run
+ pool ! run
+ pool ! run
+
+ containers(6).expectMsg(run)
+ containers(7).expectMsg(run)
+ containers(8).expectMsg(run)
+ containers(9).expectMsg(run)
+ containers(10).expectMsg(run)
+
+ // Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler
after prewarmExpirationCheckIntervel time
+ Thread.sleep(prewarmExpirationCheckIntervel.toMillis)
+
+ eventually {
+ // Because already removed expired prewarmed containrs, so currentCount
should equal with 0
+ stream.toString should include(s"found 0 started")
+ // in spite of the cold start number > maxCount, but the desiredCount
can't be greater than maxCount
+ stream.toString should include(s"desired count: ${maxCount}")
+ }
+
+ containers(11).expectMsg(Start(exec, memoryLimit, Some(ttl)))
+ containers(12).expectMsg(Start(exec, memoryLimit, Some(ttl)))
+ containers(13).expectMsg(Start(exec, memoryLimit, Some(ttl)))
+ containers(14).expectMsg(Start(exec, memoryLimit, Some(ttl)))
+ containers(11).send(pool, NeedWork(preWarmedData(exec.kind, expires =
deadline)))
+ containers(12).send(pool, NeedWork(preWarmedData(exec.kind, expires =
deadline)))
+ containers(13).send(pool, NeedWork(preWarmedData(exec.kind, expires =
deadline)))
+ containers(14).send(pool, NeedWork(preWarmedData(exec.kind, expires =
deadline)))
+ }
}
abstract class MockableContainer extends Container {
protected[core] val addr: ContainerAddress = ContainerAddress("nohost")
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
index 693f600..46e334c 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
@@ -27,6 +27,7 @@ import akka.testkit.{CallingThreadDispatcher, ImplicitSender,
TestKit, TestProbe
import akka.util.ByteString
import common.{LoggedFunction, StreamLogging, SynchronizedLoggedFunction,
WhiskProperties}
import java.time.temporal.ChronoUnit
+import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import akka.io.Tcp.{Close, CommandFailed, Connect, Connected}
@@ -156,7 +157,7 @@ class ContainerProxyTests
/** Expect a NeedWork message with prewarmed data */
def expectPreWarmed(kind: String) = expectMsgPF() {
- case NeedWork(PreWarmedData(_, kind, memoryLimit, _)) => true
+ case NeedWork(PreWarmedData(_, kind, memoryLimit, _, _)) => true
}
/** Expect a NeedWork message with warmed data */
@@ -274,7 +275,7 @@ class ContainerProxyTests
(transid: TransactionId, activation: WhiskActivation,
isBlockingActivation: Boolean, context: UserContext) =>
Future.successful(())
}
- val poolConfig = ContainerPoolConfig(2.MB, 0.5, false)
+ val poolConfig = ContainerPoolConfig(2.MB, 0.5, false, FiniteDuration(1,
TimeUnit.MINUTES))
def healthchecksConfig(enabled: Boolean = false) =
ContainerProxyHealthCheckConfig(enabled, 100.milliseconds, 2)
val filterEnvVar = (k: String) => Character.isUpperCase(k.charAt(0))
@@ -1025,7 +1026,7 @@ class ContainerProxyTests
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
- expectMsg(ContainerRemoved)
+ expectMsg(ContainerRemoved(true))
awaitAssert {
factory.calls should have size 1
@@ -1070,7 +1071,7 @@ class ContainerProxyTests
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
- expectMsg(ContainerRemoved) // The message is sent as soon as the
container decides to destroy itself
+ expectMsg(ContainerRemoved(true)) // The message is sent as soon as the
container decides to destroy itself
expectMsg(Transition(machine, Running, Removing))
awaitAssert {
@@ -1123,7 +1124,7 @@ class ContainerProxyTests
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
- expectMsg(ContainerRemoved) // The message is sent as soon as the
container decides to destroy itself
+ expectMsg(ContainerRemoved(true)) // The message is sent as soon as the
container decides to destroy itself
expectMsg(Transition(machine, Running, Removing))
awaitAssert {
@@ -1162,7 +1163,7 @@ class ContainerProxyTests
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
- expectMsg(ContainerRemoved) // The message is sent as soon as the
container decides to destroy itself
+ expectMsg(ContainerRemoved(true)) // The message is sent as soon as the
container decides to destroy itself
expectMsg(Transition(machine, Running, Removing))
awaitAssert {
@@ -1200,7 +1201,7 @@ class ContainerProxyTests
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
- expectMsg(ContainerRemoved) // The message is sent as soon as the
container decides to destroy itself
+ expectMsg(ContainerRemoved(true)) // The message is sent as soon as the
container decides to destroy itself
expectMsg(Transition(machine, Running, Removing))
awaitAssert {
@@ -1386,7 +1387,7 @@ class ContainerProxyTests
preWarm(machine)
//expect failure after healthchecks fail
- expectMsg(ContainerRemoved)
+ expectMsg(ContainerRemoved(true))
expectMsg(Transition(machine, Started, Removing))
awaitAssert {
@@ -1426,7 +1427,7 @@ class ContainerProxyTests
run(machine, Uninitialized)
timeout(machine) // times out Ready state so container suspends
expectMsg(Transition(machine, Ready, Pausing))
- expectMsg(ContainerRemoved) // The message is sent as soon as the
container decides to destroy itself
+ expectMsg(ContainerRemoved(true)) // The message is sent as soon as the
container decides to destroy itself
expectMsg(Transition(machine, Pausing, Removing))
awaitAssert {
@@ -1483,7 +1484,7 @@ class ContainerProxyTests
expectMsg(Transition(machine, Running, Ready))
// Remove the container after the transaction finished
- expectMsg(ContainerRemoved)
+ expectMsg(ContainerRemoved(true))
expectMsg(Transition(machine, Ready, Removing))
awaitAssert {
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ExecManifestTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ExecManifestTests.scala
index 605b046..874bd11 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ExecManifestTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ExecManifestTests.scala
@@ -17,6 +17,8 @@
package org.apache.openwhisk.core.entity.test
+import java.util.concurrent.TimeUnit
+
import common.{StreamLogging, WskActorSystem}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
@@ -28,6 +30,7 @@ import org.apache.openwhisk.core.entity.ExecManifest._
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.entity.ByteSize
+import scala.concurrent.duration.FiniteDuration
import scala.util.Success
@RunWith(classOf[JUnitRunner])
@@ -123,7 +126,7 @@ class ExecManifestTests extends FlatSpec with
WskActorSystem with StreamLogging
runtimes.resolveDefaultRuntime("p1").get.image.resolveImageName() shouldBe
"ppp/???:ttt"
runtimes.resolveDefaultRuntime("q1").get.image.resolveImageName() shouldBe
"rrr/???:ttt"
runtimes.resolveDefaultRuntime("s1").get.image.resolveImageName() shouldBe
"???"
- runtimes.resolveDefaultRuntime("s1").get.stemCells.get(0).count shouldBe 2
+ runtimes.resolveDefaultRuntime("s1").get.stemCells.get(0).initialCount
shouldBe 2
runtimes.resolveDefaultRuntime("s1").get.stemCells.get(0).memory shouldBe
256.MB
}
@@ -237,7 +240,7 @@ class ExecManifestTests extends FlatSpec with
WskActorSystem with StreamLogging
it should "de/serialize stem cell configuration" in {
val cell = StemCell(3, 128.MB)
- val cellAsJson = JsObject("count" -> JsNumber(3), "memory" ->
JsString("128 MB"))
+ val cellAsJson = JsObject("initialCount" -> JsNumber(3), "memory" ->
JsString("128 MB"))
stemCellSerdes.write(cell) shouldBe cellAsJson
stemCellSerdes.read(cellAsJson) shouldBe cell
@@ -250,19 +253,19 @@ class ExecManifestTests extends FlatSpec with
WskActorSystem with StreamLogging
}
an[IllegalArgumentException] shouldBe thrownBy {
- val cellAsJson = JsObject("count" -> JsNumber(0), "memory" ->
JsString("128 MB"))
+ val cellAsJson = JsObject("initialCount" -> JsNumber(0), "memory" ->
JsString("128 MB"))
stemCellSerdes.read(cellAsJson)
}
the[IllegalArgumentException] thrownBy {
- val cellAsJson = JsObject("count" -> JsNumber(1), "memory" ->
JsString("128"))
+ val cellAsJson = JsObject("initialCount" -> JsNumber(1), "memory" ->
JsString("128"))
stemCellSerdes.read(cellAsJson)
} should have message {
ByteSize.formatError
}
}
- it should "parse manifest from JSON string" in {
+ it should "parse manifest without reactive from JSON string" in {
val json = """
|{ "runtimes": {
| "nodef": [
@@ -273,7 +276,7 @@ class ExecManifestTests extends FlatSpec with
WskActorSystem with StreamLogging
| "name": "nodejsaction"
| },
| "stemCells": [{
- | "count": 1,
+ | "initialCount": 1,
| "memory": "128 MB"
| }]
| }, {
@@ -283,10 +286,10 @@ class ExecManifestTests extends FlatSpec with
WskActorSystem with StreamLogging
| "name": "nodejsaction"
| },
| "stemCells": [{
- | "count": 1,
+ | "initialCount": 1,
| "memory": "128 MB"
| }, {
- | "count": 1,
+ | "initialCount": 1,
| "memory": "256 MB"
| }]
| }
@@ -297,7 +300,7 @@ class ExecManifestTests extends FlatSpec with
WskActorSystem with StreamLogging
| "name": "pythonaction"
| },
| "stemCells": [{
- | "count": 2,
+ | "initialCount": 2,
| "memory": "256 MB"
| }]
| }],
@@ -318,7 +321,7 @@ class ExecManifestTests extends FlatSpec with
WskActorSystem with StreamLogging
|}
|""".stripMargin.parseJson.asJsObject
- val js6 = RuntimeManifest(
+ val js10 = RuntimeManifest(
"nodejs:10",
ImageName("nodejsaction"),
deprecated = Some(true),
@@ -336,7 +339,7 @@ class ExecManifestTests extends FlatSpec with
WskActorSystem with StreamLogging
mf shouldBe {
Runtimes(
Set(
- RuntimeFamily("nodef", Set(js6, js8)),
+ RuntimeFamily("nodef", Set(js10, js8)),
RuntimeFamily("pythonf", Set(py)),
RuntimeFamily("swiftf", Set(sw)),
RuntimeFamily("phpf", Set(ph))),
@@ -344,17 +347,136 @@ class ExecManifestTests extends FlatSpec with
WskActorSystem with StreamLogging
None)
}
- def stemCellFactory(m: RuntimeManifest, cells: List[StemCell]) = cells.map
{ c =>
- (m.kind, m.image, c.count, c.memory)
+ mf.stemcells.flatMap {
+ case (m, cells) =>
+ cells.map { c =>
+ (m.kind, m.image, c.initialCount, c.memory)
+ }
+ }.toList should contain theSameElementsAs List(
+ (js10.kind, js10.image, 1, 128.MB),
+ (js8.kind, js8.image, 1, 128.MB),
+ (js8.kind, js8.image, 1, 256.MB),
+ (py.kind, py.image, 2, 256.MB))
+ }
+
+ it should "parse manifest with reactive from JSON string" in {
+ val json = """
+ |{ "runtimes": {
+ | "nodef": [
+ | {
+ | "kind": "nodejs:10",
+ | "deprecated": true,
+ | "image": {
+ | "name": "nodejsaction"
+ | },
+ | "stemCells": [{
+ | "initialCount": 1,
+ | "memory": "128 MB",
+ | "reactive": {
+ | "minCount": 1,
+ | "maxCount": 4,
+ | "ttl": "2 minutes",
+ | "threshold": 1,
+ | "increment": 1
+ | }
+ | }]
+ | }, {
+ | "kind": "nodejs:8",
+ | "default": true,
+ | "image": {
+ | "name": "nodejsaction"
+ | },
+ | "stemCells": [{
+ | "initialCount": 1,
+ | "memory": "128 MB",
+ | "reactive": {
+ | "minCount": 1,
+ | "maxCount": 4,
+ | "ttl": "2 minutes",
+ | "threshold": 1,
+ | "increment": 1
+ | }
+ | }, {
+ | "initialCount": 1,
+ | "memory": "256 MB",
+ | "reactive": {
+ | "minCount": 1,
+ | "maxCount": 4,
+ | "ttl": "2 minutes",
+ | "threshold": 1,
+ | "increment": 1
+ | }
+ | }]
+ | }
+ | ],
+ | "pythonf": [{
+ | "kind": "python",
+ | "image": {
+ | "name": "pythonaction"
+ | },
+ | "stemCells": [{
+ | "initialCount": 2,
+ | "memory": "256 MB",
+ | "reactive": {
+ | "minCount": 1,
+ | "maxCount": 4,
+ | "ttl": "2 minutes",
+ | "threshold": 1,
+ | "increment": 1
+ | }
+ | }]
+ | }],
+ | "swiftf": [{
+ | "kind": "swift",
+ | "image": {
+ | "name": "swiftaction"
+ | },
+ | "stemCells": []
+ | }],
+ | "phpf": [{
+ | "kind": "php",
+ | "image": {
+ | "name": "phpaction"
+ | }
+ | }]
+ | }
+ |}
+ |""".stripMargin.parseJson.asJsObject
+
+ val reactive = Some(ReactivePrewarmingConfig(1, 4, FiniteDuration(2,
TimeUnit.MINUTES), 1, 1))
+ val js10 = RuntimeManifest(
+ "nodejs:10",
+ ImageName("nodejsaction"),
+ deprecated = Some(true),
+ stemCells = Some(List(StemCell(1, 128.MB, reactive))))
+ val js8 = RuntimeManifest(
+ "nodejs:8",
+ ImageName("nodejsaction"),
+ default = Some(true),
+ stemCells = Some(List(StemCell(1, 128.MB, reactive), StemCell(1, 256.MB,
reactive))))
+ val py = RuntimeManifest("python", ImageName("pythonaction"), stemCells =
Some(List(StemCell(2, 256.MB, reactive))))
+ val sw = RuntimeManifest("swift", ImageName("swiftaction"), stemCells =
Some(List.empty))
+ val ph = RuntimeManifest("php", ImageName("phpaction"))
+ val mf = ExecManifest.runtimes(json, RuntimeManifestConfig()).get
+
+ mf shouldBe {
+ Runtimes(
+ Set(
+ RuntimeFamily("nodef", Set(js10, js8)),
+ RuntimeFamily("pythonf", Set(py)),
+ RuntimeFamily("swiftf", Set(sw)),
+ RuntimeFamily("phpf", Set(ph))),
+ Set.empty,
+ None)
}
mf.stemcells.flatMap {
case (m, cells) =>
cells.map { c =>
- (m.kind, m.image, c.count, c.memory)
+ (m.kind, m.image, c.initialCount, c.memory)
}
}.toList should contain theSameElementsAs List(
- (js6.kind, js6.image, 1, 128.MB),
+ (js10.kind, js10.image, 1, 128.MB),
(js8.kind, js8.image, 1, 128.MB),
(js8.kind, js8.image, 1, 256.MB),
(py.kind, py.image, 2, 256.MB))
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ExecTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ExecTests.scala
index cd133c0..0714f52 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ExecTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ExecTests.scala
@@ -152,7 +152,7 @@ class ExecTests extends FlatSpec with Matchers with
StreamLogging with BeforeAnd
| },
| "deprecated": false,
| "stemCells": [{
- | "count": 2,
+ | "initialCount": 2,
| "memory": "256 MB"
| }]
| }