This is an automated email from the ASF dual-hosted git repository.

tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 1274fdf  Adjust prewarm container dynamically (#4871)
1274fdf is described below

commit 1274fdf4d4a8083042666bb47e2219fc3098991d
Author: ningyougang <[email protected]>
AuthorDate: Thu May 21 00:41:10 2020 +0800

    Adjust prewarm container dynamically (#4871)
    
    * Adjust prewarm container dynamically
    
    Co-authored-by: ning.yougang <[email protected]>
---
 ansible/files/runtimes.json                        |  11 +-
 ansible/roles/invoker/tasks/deploy.yml             |   1 +
 .../org/apache/openwhisk/common/Logging.scala      |   6 +
 .../core/containerpool/ContainerFactory.scala      |   8 +-
 .../openwhisk/core/entity/ExecManifest.scala       |  77 ++++++-
 core/invoker/src/main/resources/application.conf   |   1 +
 .../core/containerpool/ContainerPool.scala         | 243 +++++++++++++++++----
 .../core/containerpool/ContainerProxy.scala        |  47 ++--
 .../openwhisk/core/invoker/InvokerReactive.scala   |   2 +-
 .../mesos/test/MesosContainerFactoryTest.scala     |   4 +-
 .../containerpool/test/ContainerPoolTests.scala    | 190 ++++++++++++++--
 .../containerpool/test/ContainerProxyTests.scala   |  21 +-
 .../core/entity/test/ExecManifestTests.scala       | 152 +++++++++++--
 .../openwhisk/core/entity/test/ExecTests.scala     |   2 +-
 14 files changed, 653 insertions(+), 112 deletions(-)

diff --git a/ansible/files/runtimes.json b/ansible/files/runtimes.json
index 34038ae..722bbfc 100644
--- a/ansible/files/runtimes.json
+++ b/ansible/files/runtimes.json
@@ -57,8 +57,15 @@
                 },
                 "stemCells": [
                     {
-                        "count": 2,
-                        "memory": "256 MB"
+                        "initialCount": 2,
+                        "memory": "256 MB",
+                        "reactive": {
+                            "minCount": 1,
+                            "maxCount": 4,
+                            "ttl": "2 minutes",
+                            "threshold": 1,
+                            "increment": 1
+                        }
                     }
                 ]
             },
diff --git a/ansible/roles/invoker/tasks/deploy.yml 
b/ansible/roles/invoker/tasks/deploy.yml
index 62361e6..ea4ce48 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -278,6 +278,7 @@
       "CONFIG_whisk_invoker_https_keystorePassword": "{{ 
invoker.ssl.keystore.password }}"
       "CONFIG_whisk_invoker_https_keystoreFlavor": "{{ invoker.ssl.storeFlavor 
}}"
       "CONFIG_whisk_invoker_https_clientAuth": "{{ invoker.ssl.clientAuth }}"
+      "CONFIG_whisk_containerPool_prewarmExpirationCheckInterval": "{{ 
container_pool_prewarm_expirationCheckInterval | default('1 minute') }}"
 
 - name: extend invoker dns env
   set_fact:
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index 37591be..7f27b26 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -511,6 +511,12 @@ object LoggingMarkers {
     LogMarkerToken(containerPool, "prewarmSize", 
counter)(MeasurementUnit.information.megabytes)
   val CONTAINER_POOL_IDLES_COUNT =
     LogMarkerToken(containerPool, "idlesCount", counter)(MeasurementUnit.none)
+  def CONTAINER_POOL_PREWARM_COLDSTART(memory: String, kind: String) =
+    LogMarkerToken(containerPool, "prewarmColdstart", counter, None, 
Map("memory" -> memory, "kind" -> kind))(
+      MeasurementUnit.none)
+  def CONTAINER_POOL_PREWARM_EXPIRED(memory: String, kind: String) =
+    LogMarkerToken(containerPool, "prewarmExpired", counter, None, 
Map("memory" -> memory, "kind" -> kind))(
+      MeasurementUnit.none)
   val CONTAINER_POOL_IDLES_SIZE =
     LogMarkerToken(containerPool, "idlesSize", 
counter)(MeasurementUnit.information.megabytes)
 
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
index d984f99..f2e5ab7 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
@@ -24,6 +24,7 @@ import org.apache.openwhisk.core.entity.{ByteSize, 
ExecManifest, ExecutableWhisk
 import org.apache.openwhisk.spi.Spi
 
 import scala.concurrent.Future
+import scala.concurrent.duration.FiniteDuration
 import scala.math.max
 
 case class ContainerArgsConfig(network: String,
@@ -43,11 +44,16 @@ case class ContainerArgsConfig(network: String,
     }.toMap
 }
 
-case class ContainerPoolConfig(userMemory: ByteSize, concurrentPeekFactor: 
Double, akkaClient: Boolean) {
+case class ContainerPoolConfig(userMemory: ByteSize,
+                               concurrentPeekFactor: Double,
+                               akkaClient: Boolean,
+                               prewarmExpirationCheckInterval: FiniteDuration) 
{
   require(
     concurrentPeekFactor > 0 && concurrentPeekFactor <= 1.0,
     s"concurrentPeekFactor must be > 0 and <= 1.0; was $concurrentPeekFactor")
 
+  require(prewarmExpirationCheckInterval.toSeconds > 0, 
"prewarmExpirationCheckInterval must be > 0")
+
   /**
    * The shareFactor indicates the number of containers that would share a 
single core, on average.
    * cpuShare is a docker option (-c) whereby a container's CPU access is 
limited.
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ExecManifest.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ExecManifest.scala
index fe90515..9011b1e 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ExecManifest.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ExecManifest.scala
@@ -26,7 +26,10 @@ import spray.json.DefaultJsonProtocol._
 import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
 import org.apache.openwhisk.core.entity.Attachments._
 import org.apache.openwhisk.core.entity.Attachments.Attached._
-import fastparse._, NoWhitespace._
+import fastparse._
+import NoWhitespace._
+
+import scala.concurrent.duration.{Duration, FiniteDuration}
 
 /**
  * Reads manifest of supported runtimes from configuration file and stores
@@ -135,11 +138,37 @@ protected[core] object ExecManifest {
   /**
    * A stemcell configuration read from the manifest for a container image to 
be initialized by the container pool.
    *
-   * @param count  the number of stemcell containers to create
+   * @param initialCount  the initial number of stemcell containers to create
    * @param memory the max memory this stemcell will allocate
+   * @param reactive the reactive prewarming prewarmed config, which is 
disabled by default
    */
-  protected[entity] case class StemCell(count: Int, memory: ByteSize) {
-    require(count > 0, "count must be positive")
+  protected[entity] case class StemCell(initialCount: Int,
+                                        memory: ByteSize,
+                                        reactive: 
Option[ReactivePrewarmingConfig] = None) {
+    require(initialCount > 0, "initialCount must be positive")
+  }
+
+  /**
+   * A stemcell's ReactivePrewarmingConfig configuration
+   *
+   * @param minCount the max number of stemcell containers to exist
+   * @param maxCount  the max number of stemcell containers to create
+   * @param ttl time to live of the prewarmed container
+   * @param threshold the executed activation number of cold start in previous 
one minute
+   * @param increment increase per increment prewarmed number under per 
threshold activations
+   */
+  protected[core] case class ReactivePrewarmingConfig(minCount: Int,
+                                                      maxCount: Int,
+                                                      ttl: FiniteDuration,
+                                                      threshold: Int,
+                                                      increment: Int) {
+    require(
+      minCount >= 0 && minCount <= maxCount,
+      "minCount must be be greater than 0 and less than or equal to maxCount")
+    require(maxCount > 0, "maxCount must be positive")
+    require(ttl.toMillis > 0, "ttl must be positive")
+    require(threshold > 0, "threshold must be positive")
+    require(increment > 0 && increment <= maxCount, "increment must be 
positive and less than or equal to maxCount")
   }
 
   /**
@@ -344,9 +373,45 @@ protected[core] object ExecManifest {
 
   protected[entity] implicit val imageNameSerdes: RootJsonFormat[ImageName] = 
jsonFormat4(ImageName.apply)
 
-  protected[entity] implicit val stemCellSerdes: RootJsonFormat[StemCell] = {
+  protected[entity] implicit val ttlSerdes: RootJsonFormat[FiniteDuration] = 
new RootJsonFormat[FiniteDuration] {
+    override def write(finiteDuration: FiniteDuration): JsValue = 
JsString(finiteDuration.toString)
+
+    override def read(value: JsValue): FiniteDuration = value match {
+      case JsString(s) =>
+        val duration = Duration(s)
+        FiniteDuration(duration.length, duration.unit)
+      case _ =>
+        deserializationError("time unit not supported. Only milliseconds, 
seconds, minutes, hours, days are supported")
+    }
+  }
+
+  protected[entity] implicit val reactivePrewarmingConfigSerdes: 
RootJsonFormat[ReactivePrewarmingConfig] = jsonFormat5(
+    ReactivePrewarmingConfig.apply)
+
+  protected[entity] implicit val stemCellSerdes = new RootJsonFormat[StemCell] 
{
     import org.apache.openwhisk.core.entity.size.serdes
-    jsonFormat2(StemCell.apply)
+    val defaultSerdes = jsonFormat3(StemCell.apply)
+    override def read(value: JsValue): StemCell = {
+      val fields = value.asJsObject.fields
+      val initialCount: Option[Int] =
+        fields
+          .get("initialCount")
+          .orElse(fields.get("count"))
+          .map(_.convertTo[Int])
+      val memory: Option[ByteSize] = 
fields.get("memory").map(_.convertTo[ByteSize])
+      val config = 
fields.get("reactive").map(_.convertTo[ReactivePrewarmingConfig])
+
+      (initialCount, memory) match {
+        case (Some(c), Some(m)) => StemCell(c, m, config)
+        case (Some(c), None) =>
+          throw new IllegalArgumentException(s"memory is required, just 
provide initialCount: ${c}")
+        case (None, Some(m)) =>
+          throw new IllegalArgumentException(s"initialCount is required, just 
provide memory: ${m.toString}")
+        case _ => throw new IllegalArgumentException("both initialCount and 
memory are required")
+      }
+    }
+
+    override def write(s: StemCell) = defaultSerdes.write(s)
   }
 
   protected[entity] implicit val runtimeManifestSerdes: 
RootJsonFormat[RuntimeManifest] = jsonFormat8(RuntimeManifest)
diff --git a/core/invoker/src/main/resources/application.conf 
b/core/invoker/src/main/resources/application.conf
index 63d0824..b7e6eed 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -60,6 +60,7 @@ whisk {
     user-memory: 1024 m
     concurrent-peek-factor: 0.5 #factor used to limit message peeking: 0 < 
factor <= 1.0 - larger number improves concurrent processing, but increases 
risk of message loss during invoker crash
     akka-client:  false # if true, use PoolingContainerClient for HTTP from 
invoker to action container (otherwise use ApacheBlockingContainerClient)
+    prewarm-expiration-check-interval: 1 minute
   }
 
   kubernetes {
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
index 1e267fe..8c3dcb1 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
@@ -18,11 +18,12 @@
 package org.apache.openwhisk.core.containerpool
 
 import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
-import org.apache.openwhisk.common.MetricEmitter
-import org.apache.openwhisk.common.{AkkaLogging, LoggingMarkers, TransactionId}
+import org.apache.openwhisk.common.{Logging, LoggingMarkers, MetricEmitter, 
TransactionId}
 import org.apache.openwhisk.core.connector.MessageFeed
+import org.apache.openwhisk.core.entity.ExecManifest.ReactivePrewarmingConfig
 import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.core.entity.size._
+
 import scala.annotation.tailrec
 import scala.collection.immutable
 import scala.concurrent.duration._
@@ -32,10 +33,14 @@ sealed trait WorkerState
 case object Busy extends WorkerState
 case object Free extends WorkerState
 
+case class ColdStartKey(kind: String, memory: ByteSize)
+
 case class WorkerData(data: ContainerData, state: WorkerState)
 
 case object EmitMetrics
 
+case object AdjustPrewarmedContainer
+
 /**
  * A pool managing containers to run actions on.
  *
@@ -59,16 +64,15 @@ case object EmitMetrics
 class ContainerPool(childFactory: ActorRefFactory => ActorRef,
                     feed: ActorRef,
                     prewarmConfig: List[PrewarmingConfig] = List.empty,
-                    poolConfig: ContainerPoolConfig)
+                    poolConfig: ContainerPoolConfig)(implicit val logging: 
Logging)
     extends Actor {
   import ContainerPool.memoryConsumptionOf
 
-  implicit val logging = new AkkaLogging(context.system.log)
   implicit val ec = context.dispatcher
 
   var freePool = immutable.Map.empty[ActorRef, ContainerData]
   var busyPool = immutable.Map.empty[ActorRef, ContainerData]
-  var prewarmedPool = immutable.Map.empty[ActorRef, ContainerData]
+  var prewarmedPool = immutable.Map.empty[ActorRef, PreWarmedData]
   var prewarmStartingPool = immutable.Map.empty[ActorRef, (String, ByteSize)]
   // If all memory slots are occupied and if there is currently no container 
to be removed, than the actions will be
   // buffered here to keep order of computation.
@@ -80,7 +84,17 @@ class ContainerPool(childFactory: ActorRefFactory => 
ActorRef,
   //periodically emit metrics (don't need to do this for each message!)
   context.system.scheduler.schedule(30.seconds, 10.seconds, self, EmitMetrics)
 
-  backfillPrewarms(true)
+  // Key is ColdStartKey, value is the number of cold Start in minute
+  var coldStartCount = immutable.Map.empty[ColdStartKey, Int]
+
+  adjustPrewarmedContainer(true, false)
+
+  // check periodically, adjust prewarmed container(delete if unused for some 
time and create some increment containers)
+  context.system.scheduler.schedule(
+    2.seconds,
+    poolConfig.prewarmExpirationCheckInterval,
+    self,
+    AdjustPrewarmedContainer)
 
   def logContainerStart(r: Run, containerState: String, activeActivations: 
Int, container: Option[Container]): Unit = {
     val namespaceName = r.msg.user.namespace.name
@@ -131,7 +145,11 @@ class ContainerPool(childFactory: ActorRefFactory => 
ActorRef,
                 if (hasPoolSpaceFor(busyPool ++ freePool, 
r.action.limits.memory.megabytes.MB)) {
                   takePrewarmContainer(r.action)
                     .map(container => (container, "prewarmed"))
-                    
.orElse(Some(createContainer(r.action.limits.memory.megabytes.MB), "cold"))
+                    .orElse {
+                      val container = 
Some(createContainer(r.action.limits.memory.megabytes.MB), "cold")
+                      incrementColdStartCount(r.action.exec.kind, 
r.action.limits.memory.megabytes.MB)
+                      container
+                    }
                 } else None)
               .orElse(
                 // Remove a container and create a new one for the given job
@@ -145,7 +163,11 @@ class ContainerPool(childFactory: ActorRefFactory => 
ActorRef,
                   .map(_ =>
                     takePrewarmContainer(r.action)
                       .map(container => (container, "recreatedPrewarm"))
-                      
.getOrElse(createContainer(r.action.limits.memory.megabytes.MB), "recreated")))
+                      .getOrElse {
+                        val container = 
(createContainer(r.action.limits.memory.megabytes.MB), "recreated")
+                        incrementColdStartCount(r.action.exec.kind, 
r.action.limits.memory.megabytes.MB)
+                        container
+                    }))
 
           } else None
 
@@ -245,7 +267,7 @@ class ContainerPool(childFactory: ActorRefFactory => 
ActorRef,
       prewarmedPool = prewarmedPool + (sender() -> data)
 
     // Container got removed
-    case ContainerRemoved =>
+    case ContainerRemoved(replacePrewarm) =>
       // if container was in free pool, it may have been processing (but under 
capacity),
       // so there is capacity to accept another job request
       freePool.get(sender()).foreach { f =>
@@ -258,8 +280,7 @@ class ContainerPool(childFactory: ActorRefFactory => 
ActorRef,
       processBufferOrFeed()
 
       //in case this was a prewarm
-      prewarmedPool.get(sender()).foreach { _ =>
-        logging.info(this, "failed prewarm removed")
+      prewarmedPool.get(sender()).foreach { data =>
         prewarmedPool = prewarmedPool - sender()
       }
       //in case this was a starting prewarm
@@ -268,8 +289,10 @@ class ContainerPool(childFactory: ActorRefFactory => 
ActorRef,
         prewarmStartingPool = prewarmStartingPool - sender()
       }
 
-      //backfill prewarms on every ContainerRemoved, just in case
-      backfillPrewarms(false) //in case a prewarm is removed due to health 
failure or crash
+      //backfill prewarms on every ContainerRemoved(replacePrewarm = true), 
just in case
+      if (replacePrewarm) {
+        adjustPrewarmedContainer(false, false) //in case a prewarm is removed 
due to health failure or crash
+      }
 
     // This message is received for one of these reasons:
     // 1. Container errored while resuming a warm container, could not process 
the job, and sent the job back
@@ -281,6 +304,9 @@ class ContainerPool(childFactory: ActorRefFactory => 
ActorRef,
       busyPool = busyPool - sender()
     case EmitMetrics =>
       emitMetrics()
+
+    case AdjustPrewarmedContainer =>
+      adjustPrewarmedContainer(false, true)
   }
 
   /** Resend next item in the buffer, or trigger next item in the feed, if no 
items in the buffer. */
@@ -302,26 +328,32 @@ class ContainerPool(childFactory: ActorRefFactory => 
ActorRef,
     }
   }
 
-  /** Install prewarm containers up to the configured requirements for each 
kind/memory combination. */
-  def backfillPrewarms(init: Boolean) = {
-    prewarmConfig.foreach { config =>
-      val kind = config.exec.kind
-      val memory = config.memoryLimit
-      val currentCount = prewarmedPool.count {
-        case (_, PreWarmedData(_, `kind`, `memory`, _)) => true //done starting
-        case _                                          => false //started but 
not finished starting
-      }
-      val startingCount = prewarmStartingPool.count(p => p._2._1 == kind && 
p._2._2 == memory)
-      val containerCount = currentCount + startingCount
-      if (containerCount < config.count) {
-        logging.info(
-          this,
-          s"found ${currentCount} started and ${startingCount} starting; ${if 
(init) "initing" else "backfilling"} ${config.count - containerCount} pre-warms 
to desired count: ${config.count} for kind:${config.exec.kind} 
mem:${config.memoryLimit.toString}")(
-          TransactionId.invokerWarmup)
-        (containerCount until config.count).foreach { _ =>
-          prewarmContainer(config.exec, config.memoryLimit)
+  /** adjust prewarm containers up to the configured requirements for each 
kind/memory combination. */
+  def adjustPrewarmedContainer(init: Boolean, scheduled: Boolean): Unit = {
+    //fill in missing prewarms
+    ContainerPool
+      .increasePrewarms(init, scheduled, coldStartCount, prewarmConfig, 
prewarmedPool, prewarmStartingPool)
+      .foreach { c =>
+        val config = c._1
+        val currentCount = c._2._1
+        val desiredCount = c._2._2
+        if (currentCount < desiredCount) {
+          (currentCount until desiredCount).foreach { _ =>
+            prewarmContainer(config.exec, config.memoryLimit, 
config.reactive.map(_.ttl))
+          }
         }
       }
+    if (scheduled) {
+      //on scheduled time, remove expired prewarms
+      ContainerPool.removeExpired(prewarmConfig, prewarmedPool).foreach(_ ! 
Remove)
+      //on scheduled time, emit cold start counter metric with memory + kind
+      coldStartCount foreach { coldStart =>
+        val coldStartKey = coldStart._1
+        MetricEmitter.emitCounterMetric(
+          
LoggingMarkers.CONTAINER_POOL_PREWARM_COLDSTART(coldStartKey.memory.toString, 
coldStartKey.kind))
+      }
+      //   then clear coldStartCounts each time scheduled event is processed 
to reset counts
+      coldStartCount = immutable.Map.empty[ColdStartKey, Int]
     }
   }
 
@@ -334,10 +366,25 @@ class ContainerPool(childFactory: ActorRefFactory => 
ActorRef,
   }
 
   /** Creates a new prewarmed container */
-  def prewarmContainer(exec: CodeExec[_], memoryLimit: ByteSize): Unit = {
+  def prewarmContainer(exec: CodeExec[_], memoryLimit: ByteSize, ttl: 
Option[FiniteDuration]): Unit = {
     val newContainer = childFactory(context)
     prewarmStartingPool = prewarmStartingPool + (newContainer -> (exec.kind, 
memoryLimit))
-    newContainer ! Start(exec, memoryLimit)
+    newContainer ! Start(exec, memoryLimit, ttl)
+  }
+
+  /** this is only for cold start statistics of prewarm configs, e.g. not 
blackbox or other configs. */
+  def incrementColdStartCount(kind: String, memoryLimit: ByteSize): Unit = {
+    prewarmConfig
+      .filter { config =>
+        kind == config.exec.kind && memoryLimit == config.memoryLimit
+      }
+      .foreach { _ =>
+        val coldStartKey = ColdStartKey(kind, memoryLimit)
+        coldStartCount.get(coldStartKey) match {
+          case Some(value) => coldStartCount = coldStartCount + (coldStartKey 
-> (value + 1))
+          case None        => coldStartCount = coldStartCount + (coldStartKey 
-> 1)
+        }
+      }
   }
 
   /**
@@ -350,10 +397,12 @@ class ContainerPool(childFactory: ActorRefFactory => 
ActorRef,
   def takePrewarmContainer(action: ExecutableWhiskAction): Option[(ActorRef, 
ContainerData)] = {
     val kind = action.exec.kind
     val memory = action.limits.memory.megabytes.MB
-    prewarmedPool
+    val now = Deadline.now
+    prewarmedPool.toSeq
+      .sortBy(_._2.expires.getOrElse(now))
       .find {
-        case (_, PreWarmedData(_, `kind`, `memory`, _)) => true
-        case _                                          => false
+        case (_, PreWarmedData(_, `kind`, `memory`, _, _)) => true
+        case _                                             => false
       }
       .map {
         case (ref, data) =>
@@ -363,7 +412,11 @@ class ContainerPool(childFactory: ActorRefFactory => 
ActorRef,
           // Create a new prewarm container
           // NOTE: prewarming ignores the action code in exec, but this is 
dangerous as the field is accessible to the
           // factory
-          prewarmContainer(action.exec, memory)
+          val ttl = data.expires match {
+            case Some(value) => Some(value.time)
+            case None        => None
+          }
+          prewarmContainer(action.exec, memory, ttl)
           (ref, data)
       }
   }
@@ -499,12 +552,124 @@ object ContainerPool {
     }
   }
 
+  /**
+   * Find the expired actor in prewarmedPool
+   *
+   * @param prewarmConfig
+   * @param prewarmedPool
+   * @param logging
+   * @return a list of expired actor
+   */
+  def removeExpired(prewarmConfig: List[PrewarmingConfig], prewarmedPool: 
Map[ActorRef, PreWarmedData])(
+    implicit logging: Logging): List[ActorRef] = {
+    prewarmConfig.flatMap { config =>
+      val kind = config.exec.kind
+      val memory = config.memoryLimit
+      config.reactive
+        .map { _ =>
+          val expiredPrewarmedContainer = prewarmedPool
+            .filter { warmInfo =>
+              warmInfo match {
+                case (_, p @ PreWarmedData(_, `kind`, `memory`, _, _)) if 
p.isExpired() => true
+                case _                                                         
         => false
+              }
+            }
+          // emit expired container counter metric with memory + kind
+          
MetricEmitter.emitCounterMetric(LoggingMarkers.CONTAINER_POOL_PREWARM_EXPIRED(memory.toString,
 kind))
+          logging.info(
+            this,
+            s"[kind: ${kind} memory: ${memory.toString}] removed 
${expiredPrewarmedContainer.size} expired prewarmed container")
+          expiredPrewarmedContainer.keys
+        }
+        .getOrElse(List.empty)
+    }
+  }
+
+  /**
+   * Find the increased number for the prewarmed kind
+   *
+   * @param init
+   * @param scheduled
+   * @param coldStartCount
+   * @param prewarmConfig
+   * @param prewarmedPool
+   * @param prewarmStartingPool
+   * @param logging
+   * @return the current number and increased number for the kind in the Map
+   */
+  def increasePrewarms(init: Boolean,
+                       scheduled: Boolean,
+                       coldStartCount: Map[ColdStartKey, Int],
+                       prewarmConfig: List[PrewarmingConfig],
+                       prewarmedPool: Map[ActorRef, PreWarmedData],
+                       prewarmStartingPool: Map[ActorRef, (String, ByteSize)])(
+    implicit logging: Logging): Map[PrewarmingConfig, (Int, Int)] = {
+    prewarmConfig.map { config =>
+      val kind = config.exec.kind
+      val memory = config.memoryLimit
+
+      val runningCount = prewarmedPool.count {
+        // done starting, and not expired
+        case (_, p @ PreWarmedData(_, `kind`, `memory`, _, _)) if 
!p.isExpired() => true
+        // started but not finished starting (or expired)
+        case _ => false
+      }
+      val startingCount = prewarmStartingPool.count(p => p._2._1 == kind && 
p._2._2 == memory)
+      val currentCount = runningCount + startingCount
+
+      // determine how many are needed
+      val desiredCount: Int =
+        if (init) config.initialCount
+        else {
+          if (scheduled) {
+            // scheduled/reactive config backfill
+            config.reactive
+              .map(c => getReactiveCold(coldStartCount, c, kind, 
memory).getOrElse(c.minCount)) //reactive -> desired is either cold start 
driven, or minCount
+              .getOrElse(config.initialCount) //not reactive -> desired is 
always initial count
+          } else {
+            // normal backfill after removal - make sure at least minCount or 
initialCount is started
+            config.reactive.map(_.minCount).getOrElse(config.initialCount)
+          }
+        }
+
+      logging.info(
+        this,
+        s"found ${currentCount} started and ${startingCount} starting; ${if 
(init) "initing" else "backfilling"} ${desiredCount - currentCount} pre-warms 
to desired count: ${desiredCount} for kind:${config.exec.kind} 
mem:${config.memoryLimit.toString}")(
+        TransactionId.invokerWarmup)
+      (config, (currentCount, desiredCount))
+    }.toMap
+  }
+
+  /**
+   * Get the required prewarmed container number according to the cold start 
happened in previous minute
+   *
+   * @param coldStartCount
+   * @param config
+   * @param kind
+   * @param memory
+   * @return the required prewarmed container number
+   */
+  def getReactiveCold(coldStartCount: Map[ColdStartKey, Int],
+                      config: ReactivePrewarmingConfig,
+                      kind: String,
+                      memory: ByteSize): Option[Int] = {
+    coldStartCount.get(ColdStartKey(kind, memory)).map { value =>
+      // Let's assume that threshold is `2`, increment is `1` in runtimes.json
+      // if cold start number in previous minute is `2`, requireCount is `2/2 
* 1 = 1`
+      // if cold start number in previous minute is `4`, requireCount is `4/2 
* 1 = 2`
+      math.min(math.max(config.minCount, (value / config.threshold) * 
config.increment), config.maxCount)
+    }
+  }
+
   def props(factory: ActorRefFactory => ActorRef,
             poolConfig: ContainerPoolConfig,
             feed: ActorRef,
-            prewarmConfig: List[PrewarmingConfig] = List.empty) =
+            prewarmConfig: List[PrewarmingConfig] = List.empty)(implicit 
logging: Logging) =
     Props(new ContainerPool(factory, feed, prewarmConfig, poolConfig))
 }
 
 /** Contains settings needed to perform container prewarming. */
-case class PrewarmingConfig(count: Int, exec: CodeExec[_], memoryLimit: 
ByteSize)
+case class PrewarmingConfig(initialCount: Int,
+                            exec: CodeExec[_],
+                            memoryLimit: ByteSize,
+                            reactive: Option[ReactivePrewarmingConfig] = None)
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
index 142173e..d23c0e5 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
@@ -145,12 +145,14 @@ case class MemoryData(override val memoryLimit: ByteSize, 
override val activeAct
 case class PreWarmedData(override val container: Container,
                          kind: String,
                          override val memoryLimit: ByteSize,
-                         override val activeActivationCount: Int = 0)
+                         override val activeActivationCount: Int = 0,
+                         expires: Option[Deadline] = None)
     extends ContainerStarted(container, Instant.EPOCH, memoryLimit, 
activeActivationCount)
     with ContainerNotInUse {
   override val initingState = "prewarmed"
   override def nextRun(r: Run) =
     WarmingData(container, r.msg.user.namespace.name, r.action, Instant.now, 1)
+  def isExpired(): Boolean = expires.exists(_.isOverdue())
 }
 
 /** type representing a prewarm (running, but not used) container that is 
being initialized (for a specific action + invocation namespace) */
@@ -193,7 +195,7 @@ case class WarmedData(override val container: Container,
 }
 
 // Events received by the actor
-case class Start(exec: CodeExec[_], memoryLimit: ByteSize)
+case class Start(exec: CodeExec[_], memoryLimit: ByteSize, ttl: 
Option[FiniteDuration] = None)
 case class Run(action: ExecutableWhiskAction, msg: ActivationMessage, 
retryLogDeadline: Option[Deadline] = None)
 case object Remove
 case class HealthPingEnabled(enabled: Boolean)
@@ -201,7 +203,7 @@ case class HealthPingEnabled(enabled: Boolean)
 // Events sent by the actor
 case class NeedWork(data: ContainerData)
 case object ContainerPaused
-case object ContainerRemoved // when container is destroyed
+case class ContainerRemoved(replacePrewarm: Boolean) // when container is 
destroyed
 case object RescheduleJob // job is sent back to parent and could not be 
processed because container is being destroyed
 case class PreWarmCompleted(data: PreWarmedData)
 case class InitCompleted(data: WarmedData)
@@ -288,7 +290,8 @@ class ContainerProxy(factory: (TransactionId,
         job.memoryLimit,
         poolConfig.cpuShare(job.memoryLimit),
         None)
-        .map(container => PreWarmCompleted(PreWarmedData(container, 
job.exec.kind, job.memoryLimit)))
+        .map(container =>
+          PreWarmCompleted(PreWarmedData(container, job.exec.kind, 
job.memoryLimit, expires = job.ttl.map(_.fromNow))))
         .pipeTo(self)
 
       goto(Starting)
@@ -316,7 +319,7 @@ class ContainerProxy(factory: (TransactionId,
             // the container is ready to accept an activation; register it as 
PreWarmed; this
             // normalizes the life cycle for containers and their cleanup when 
activations fail
             self ! PreWarmCompleted(
-              PreWarmedData(container, job.action.exec.kind, 
job.action.limits.memory.megabytes.MB, 1))
+              PreWarmedData(container, job.action.exec.kind, 
job.action.limits.memory.megabytes.MB, 1, expires = None))
 
           case Failure(t) =>
             // the container did not come up cleanly, so disambiguate the 
failure mode and then cleanup
@@ -359,7 +362,7 @@ class ContainerProxy(factory: (TransactionId,
 
     // container creation failed
     case Event(_: FailureMessage, _) =>
-      context.parent ! ContainerRemoved
+      context.parent ! ContainerRemoved(true)
       stop()
 
     case _ => delay
@@ -372,14 +375,14 @@ class ContainerProxy(factory: (TransactionId,
       initializeAndRun(data.container, job)
         .map(_ => RunCompleted)
         .pipeTo(self)
-      goto(Running) using PreWarmedData(data.container, data.kind, 
data.memoryLimit, 1)
+      goto(Running) using PreWarmedData(data.container, data.kind, 
data.memoryLimit, 1, data.expires)
 
-    case Event(Remove, data: PreWarmedData) => destroyContainer(data)
+    case Event(Remove, data: PreWarmedData) => destroyContainer(data, false)
 
     // prewarm container failed
     case Event(_: FailureMessage, data: PreWarmedData) =>
       
MetricEmitter.emitCounterMetric(LoggingMarkers.INVOKER_CONTAINER_HEALTH_FAILED_PREWARM)
-      destroyContainer(data)
+      destroyContainer(data, true)
   }
 
   when(Running) {
@@ -452,22 +455,22 @@ class ContainerProxy(factory: (TransactionId,
         .getOrElse(data)
       rescheduleJob = true
       rejectBuffered()
-      destroyContainer(newData)
+      destroyContainer(newData, true)
 
     // Failed after /init (the first run failed)
     case Event(_: FailureMessage, data: PreWarmedData) =>
       activeCount -= 1
-      destroyContainer(data)
+      destroyContainer(data, true)
 
     // Failed for a subsequent /run
     case Event(_: FailureMessage, data: WarmedData) =>
       activeCount -= 1
-      destroyContainer(data)
+      destroyContainer(data, true)
 
     // Failed at getting a container for a cold-start run
     case Event(_: FailureMessage, _) =>
       activeCount -= 1
-      context.parent ! ContainerRemoved
+      context.parent ! ContainerRemoved(true)
       rejectBuffered()
       stop()
 
@@ -490,16 +493,16 @@ class ContainerProxy(factory: (TransactionId,
       data.container.suspend()(TransactionId.invokerNanny).map(_ => 
ContainerPaused).pipeTo(self)
       goto(Pausing)
 
-    case Event(Remove, data: WarmedData) => destroyContainer(data)
+    case Event(Remove, data: WarmedData) => destroyContainer(data, true)
 
     // warm container failed
     case Event(_: FailureMessage, data: WarmedData) =>
-      destroyContainer(data)
+      destroyContainer(data, true)
   }
 
   when(Pausing) {
     case Event(ContainerPaused, data: WarmedData)   => goto(Paused)
-    case Event(_: FailureMessage, data: WarmedData) => destroyContainer(data)
+    case Event(_: FailureMessage, data: WarmedData) => destroyContainer(data, 
true)
     case _                                          => delay
   }
 
@@ -526,7 +529,7 @@ class ContainerProxy(factory: (TransactionId,
     // container is reclaimed by the pool or it has become too old
     case Event(StateTimeout | Remove, data: WarmedData) =>
       rescheduleJob = true // to supress sending message to the pool and not 
double count
-      destroyContainer(data)
+      destroyContainer(data, true)
   }
 
   when(Removing) {
@@ -534,8 +537,8 @@ class ContainerProxy(factory: (TransactionId,
       // Send the job back to the pool to be rescheduled
       context.parent ! job
       stay
-    case Event(ContainerRemoved, _)  => stop()
-    case Event(_: FailureMessage, _) => stop()
+    case Event(ContainerRemoved(_), _) => stop()
+    case Event(_: FailureMessage, _)   => stop()
   }
 
   // Unstash all messages stashed while in intermediate state
@@ -614,10 +617,10 @@ class ContainerProxy(factory: (TransactionId,
    *
    * @param newData the ContainerStarted which container will be destroyed
    */
-  def destroyContainer(newData: ContainerStarted) = {
+  def destroyContainer(newData: ContainerStarted, replacePrewarm: Boolean) = {
     val container = newData.container
     if (!rescheduleJob) {
-      context.parent ! ContainerRemoved
+      context.parent ! ContainerRemoved(replacePrewarm)
     } else {
       context.parent ! RescheduleJob
     }
@@ -631,7 +634,7 @@ class ContainerProxy(factory: (TransactionId,
 
     unpause
       .flatMap(_ => container.destroy()(TransactionId.invokerNanny))
-      .map(_ => ContainerRemoved)
+      .map(_ => ContainerRemoved(replacePrewarm))
       .pipeTo(self)
     goto(Removing) using newData
   }
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
index 48b647b..d8d71d1 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
@@ -156,7 +156,7 @@ class InvokerReactive(
     ExecManifest.runtimesManifest.stemcells.flatMap {
       case (mf, cells) =>
         cells.map { cell =>
-          PrewarmingConfig(cell.count, new CodeExecAsString(mf, "", None), 
cell.memory)
+          PrewarmingConfig(cell.initialCount, new CodeExecAsString(mf, "", 
None), cell.memory, cell.reactive)
         }
     }.toList
   }
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
index 72f21a9..656c89c 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala
@@ -17,6 +17,8 @@
 
 package org.apache.openwhisk.core.containerpool.mesos.test
 
+import java.util.concurrent.TimeUnit
+
 import akka.actor.ActorSystem
 import akka.stream.ActorMaterializer
 import akka.stream.scaladsl.Sink
@@ -84,7 +86,7 @@ class MesosContainerFactoryTest
   }
 
   // 80 slots, each 265MB
-  val poolConfig = ContainerPoolConfig(21200.MB, 0.5, false)
+  val poolConfig = ContainerPoolConfig(21200.MB, 0.5, false, FiniteDuration(1, 
TimeUnit.MINUTES))
   val actionMemory = 265.MB
   val mesosCpus = poolConfig.cpuShare(actionMemory) / 1024.0
 
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
index 4efbe85..399bf88 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
@@ -18,6 +18,7 @@
 package org.apache.openwhisk.core.containerpool.test
 
 import java.time.Instant
+import java.util.concurrent.TimeUnit
 
 import scala.collection.mutable
 import scala.concurrent.duration._
@@ -33,15 +34,15 @@ import akka.actor.ActorSystem
 import akka.testkit.ImplicitSender
 import akka.testkit.TestKit
 import akka.testkit.TestProbe
-import common.WhiskProperties
+import common.{StreamLogging, WhiskProperties}
 import org.apache.openwhisk.common.TransactionId
 import org.apache.openwhisk.core.connector.ActivationMessage
 import org.apache.openwhisk.core.containerpool._
 import org.apache.openwhisk.core.entity._
-import org.apache.openwhisk.core.entity.ExecManifest.RuntimeManifest
-import org.apache.openwhisk.core.entity.ExecManifest.ImageName
+import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, 
ReactivePrewarmingConfig, RuntimeManifest}
 import org.apache.openwhisk.core.entity.size._
 import org.apache.openwhisk.core.connector.MessageFeed
+import org.scalatest.concurrent.Eventually
 
 /**
  * Behavior tests for the ContainerPool
@@ -55,7 +56,9 @@ class ContainerPoolTests
     with FlatSpecLike
     with Matchers
     with BeforeAndAfterAll
-    with MockFactory {
+    with MockFactory
+    with Eventually
+    with StreamLogging {
 
   override def afterAll = TestKit.shutdownActorSystem(system)
 
@@ -67,6 +70,9 @@ class ContainerPoolTests
   // the values is done properly.
   val exec = CodeExecAsString(RuntimeManifest("actionKind", 
ImageName("testImage")), "testCode", None)
   val memoryLimit = 256.MB
+  val ttl = FiniteDuration(500, TimeUnit.MILLISECONDS)
+  val threshold = 1
+  val increment = 1
 
   /** Creates a `Run` message */
   def createRunMessage(action: ExecutableWhiskAction, invocationNamespace: 
EntityName) = {
@@ -109,8 +115,8 @@ class ContainerPoolTests
   val runMessageConcurrentDifferentNamespace = 
createRunMessage(concurrentAction, differentInvocationNamespace)
 
   /** Helper to create PreWarmedData */
-  def preWarmedData(kind: String, memoryLimit: ByteSize = memoryLimit) =
-    PreWarmedData(stub[MockableContainer], kind, memoryLimit)
+  def preWarmedData(kind: String, memoryLimit: ByteSize = memoryLimit, 
expires: Option[Deadline] = None) =
+    PreWarmedData(stub[MockableContainer], kind, memoryLimit, expires = 
expires)
 
   /** Helper to create WarmedData */
   def warmedData(run: Run, lastUsed: Instant = Instant.now) = {
@@ -125,7 +131,8 @@ class ContainerPoolTests
     (containers, factory)
   }
 
-  def poolConfig(userMemory: ByteSize) = ContainerPoolConfig(userMemory, 0.5, 
false)
+  def poolConfig(userMemory: ByteSize) =
+    ContainerPoolConfig(userMemory, 0.5, false, FiniteDuration(1, 
TimeUnit.MINUTES))
 
   behavior of "ContainerPool"
 
@@ -388,7 +395,7 @@ class ContainerPoolTests
     containers(0).send(pool, NeedWork(warmedData(runMessage)))
 
     // container0 is deleted
-    containers(0).send(pool, ContainerRemoved)
+    containers(0).send(pool, ContainerRemoved(true))
 
     // container1 is created and used
     pool ! runMessage
@@ -597,7 +604,7 @@ class ContainerPoolTests
     containers(0).send(pool, RescheduleJob)
 
     //trigger buffer processing by ContainerRemoved message
-    pool ! ContainerRemoved
+    pool ! ContainerRemoved(true)
 
     //start second run
     containers(1).expectMsgPF() {
@@ -627,9 +634,9 @@ class ContainerPoolTests
     containers(1).expectNoMessage(100.milliseconds)
 
     //ContainerRemoved triggers buffer processing - if we don't prevent 
duplicates, this will cause the buffer head to be resent!
-    pool ! ContainerRemoved
-    pool ! ContainerRemoved
-    pool ! ContainerRemoved
+    pool ! ContainerRemoved(true)
+    pool ! ContainerRemoved(true)
+    pool ! ContainerRemoved(true)
 
     //complete processing of first run
     containers(0).send(pool, NeedWork(warmedData(run1)))
@@ -753,14 +760,169 @@ class ContainerPoolTests
     containers(1).expectMsg(Start(exec, memoryLimit))
 
     //removing 2 prewarm containers will start 2 containers via backfill
-    containers(0).send(pool, ContainerRemoved)
-    containers(1).send(pool, ContainerRemoved)
+    containers(0).send(pool, ContainerRemoved(true))
+    containers(1).send(pool, ContainerRemoved(true))
     containers(2).expectMsg(Start(exec, memoryLimit))
     containers(3).expectMsg(Start(exec, memoryLimit))
     //make sure extra prewarms are not started
     containers(4).expectNoMessage(100.milliseconds)
     containers(5).expectNoMessage(100.milliseconds)
   }
+
+  it should "adjust prewarm container run well without reactive config" in {
+    val (containers, factory) = testContainers(4)
+    val feed = TestProbe()
+
+    stream.reset()
+    val prewarmExpirationCheckIntervel = FiniteDuration(2, TimeUnit.SECONDS)
+    val poolConfig = ContainerPoolConfig(MemoryLimit.STD_MEMORY * 4, 0.5, 
false, prewarmExpirationCheckIntervel)
+    val initialCount = 2
+    val pool =
+      system.actorOf(
+        ContainerPool
+          .props(factory, poolConfig, feed.ref, 
List(PrewarmingConfig(initialCount, exec, memoryLimit))))
+    containers(0).expectMsg(Start(exec, memoryLimit))
+    containers(1).expectMsg(Start(exec, memoryLimit))
+    containers(0).send(pool, NeedWork(preWarmedData(exec.kind)))
+    containers(1).send(pool, NeedWork(preWarmedData(exec.kind)))
+
+    // when invoker starts, include 0 prewarm container at the very beginning
+    stream.toString should include(s"found 0 started")
+
+    // the desiredCount should equal with initialCount when invoker starts
+    stream.toString should include(s"desired count: ${initialCount}")
+
+    stream.reset()
+
+    // Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler 
after prewarmExpirationCheckIntervel time
+    Thread.sleep(prewarmExpirationCheckIntervel.toMillis)
+
+    // Because already supplemented the prewarmed container, so currentCount 
should equal with initialCount
+    eventually {
+      stream.toString should include(s"found ${initialCount} started")
+    }
+  }
+
+  it should "adjust prewarm container run well with reactive config" in {
+    val (containers, factory) = testContainers(15)
+    val feed = TestProbe()
+
+    stream.reset()
+    val prewarmExpirationCheckIntervel = FiniteDuration(2, TimeUnit.SECONDS)
+    val poolConfig = ContainerPoolConfig(MemoryLimit.STD_MEMORY * 8, 0.5, 
false, prewarmExpirationCheckIntervel)
+    val minCount = 0
+    val initialCount = 2
+    val maxCount = 4
+    val deadline: Option[Deadline] = Some(ttl.fromNow)
+    val reactive: Option[ReactivePrewarmingConfig] =
+      Some(ReactivePrewarmingConfig(minCount, maxCount, ttl, threshold, 
increment))
+    val pool =
+      system.actorOf(
+        ContainerPool
+          .props(factory, poolConfig, feed.ref, 
List(PrewarmingConfig(initialCount, exec, memoryLimit, reactive))))
+    containers(0).expectMsg(Start(exec, memoryLimit, Some(ttl)))
+    containers(1).expectMsg(Start(exec, memoryLimit, Some(ttl)))
+    containers(0).send(pool, NeedWork(preWarmedData(exec.kind, expires = 
deadline)))
+    containers(1).send(pool, NeedWork(preWarmedData(exec.kind, expires = 
deadline)))
+
+    // when invoker starts, include 0 prewarm container at the very beginning
+    stream.toString should include(s"found 0 started")
+
+    // the desiredCount should equal with initialCount when invoker starts
+    stream.toString should include(s"desired count: ${initialCount}")
+
+    stream.reset()
+
+    // Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler 
after prewarmExpirationCheckIntervel time
+    Thread.sleep(prewarmExpirationCheckIntervel.toMillis)
+
+    containers(0).expectMsg(Remove)
+    containers(1).expectMsg(Remove)
+    containers(0).send(pool, ContainerRemoved(false))
+    containers(1).send(pool, ContainerRemoved(false))
+
+    // currentCount should equal with 0 due to these 2 prewarmed containers 
are expired
+    stream.toString should include(s"found 0 started")
+
+    // the desiredCount should equal with minCount because cold start didn't 
happen
+    stream.toString should include(s"desired count: ${minCount}")
+    // Previously created prewarmed containers should be removed
+    stream.toString should include(s"removed ${initialCount} expired prewarmed 
container")
+
+    stream.reset()
+    val action = ExecutableWhiskAction(
+      EntityPath("actionSpace"),
+      EntityName("actionName"),
+      exec,
+      limits = ActionLimits(memory = MemoryLimit(memoryLimit)))
+    val run = createRunMessage(action, invocationNamespace)
+    // 2 code start happened
+    pool ! run
+    pool ! run
+    containers(2).expectMsg(run)
+    containers(3).expectMsg(run)
+
+    // Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler 
after prewarmExpirationCheckIntervel time
+    Thread.sleep(prewarmExpirationCheckIntervel.toMillis)
+
+    eventually {
+      // Because already removed expired prewarmed containrs, so currentCount 
should equal with 0
+      stream.toString should include(s"found 0 started")
+      // the desiredCount should equal with 2 due to cold start happened
+      stream.toString should include(s"desired count: 2")
+    }
+
+    containers(4).expectMsg(Start(exec, memoryLimit, Some(ttl)))
+    containers(5).expectMsg(Start(exec, memoryLimit, Some(ttl)))
+    containers(4).send(pool, NeedWork(preWarmedData(exec.kind, expires = 
deadline)))
+    containers(5).send(pool, NeedWork(preWarmedData(exec.kind, expires = 
deadline)))
+
+    stream.reset()
+
+    // Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler 
after prewarmExpirationCheckIntervel time
+    Thread.sleep(prewarmExpirationCheckIntervel.toMillis)
+
+    containers(4).expectMsg(Remove)
+    containers(5).expectMsg(Remove)
+    containers(4).send(pool, ContainerRemoved(false))
+    containers(5).send(pool, ContainerRemoved(false))
+
+    // removed previous 2 prewarmed container due to expired
+    stream.toString should include(s"removed 2 expired prewarmed container")
+
+    stream.reset()
+    // 5 code start happened(5 > maxCount)
+    pool ! run
+    pool ! run
+    pool ! run
+    pool ! run
+    pool ! run
+
+    containers(6).expectMsg(run)
+    containers(7).expectMsg(run)
+    containers(8).expectMsg(run)
+    containers(9).expectMsg(run)
+    containers(10).expectMsg(run)
+
+    // Make sure AdjustPrewarmedContainer is sent by ContainerPool's scheduler 
after prewarmExpirationCheckIntervel time
+    Thread.sleep(prewarmExpirationCheckIntervel.toMillis)
+
+    eventually {
+      // Because already removed expired prewarmed containrs, so currentCount 
should equal with 0
+      stream.toString should include(s"found 0 started")
+      // in spite of the cold start number > maxCount, but the desiredCount 
can't be greater than maxCount
+      stream.toString should include(s"desired count: ${maxCount}")
+    }
+
+    containers(11).expectMsg(Start(exec, memoryLimit, Some(ttl)))
+    containers(12).expectMsg(Start(exec, memoryLimit, Some(ttl)))
+    containers(13).expectMsg(Start(exec, memoryLimit, Some(ttl)))
+    containers(14).expectMsg(Start(exec, memoryLimit, Some(ttl)))
+    containers(11).send(pool, NeedWork(preWarmedData(exec.kind, expires = 
deadline)))
+    containers(12).send(pool, NeedWork(preWarmedData(exec.kind, expires = 
deadline)))
+    containers(13).send(pool, NeedWork(preWarmedData(exec.kind, expires = 
deadline)))
+    containers(14).send(pool, NeedWork(preWarmedData(exec.kind, expires = 
deadline)))
+  }
 }
 abstract class MockableContainer extends Container {
   protected[core] val addr: ContainerAddress = ContainerAddress("nohost")
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
index 693f600..46e334c 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
@@ -27,6 +27,7 @@ import akka.testkit.{CallingThreadDispatcher, ImplicitSender, 
TestKit, TestProbe
 import akka.util.ByteString
 import common.{LoggedFunction, StreamLogging, SynchronizedLoggedFunction, 
WhiskProperties}
 import java.time.temporal.ChronoUnit
+import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicInteger
 
 import akka.io.Tcp.{Close, CommandFailed, Connect, Connected}
@@ -156,7 +157,7 @@ class ContainerProxyTests
 
   /** Expect a NeedWork message with prewarmed data */
   def expectPreWarmed(kind: String) = expectMsgPF() {
-    case NeedWork(PreWarmedData(_, kind, memoryLimit, _)) => true
+    case NeedWork(PreWarmedData(_, kind, memoryLimit, _, _)) => true
   }
 
   /** Expect a NeedWork message with warmed data */
@@ -274,7 +275,7 @@ class ContainerProxyTests
     (transid: TransactionId, activation: WhiskActivation, 
isBlockingActivation: Boolean, context: UserContext) =>
       Future.successful(())
   }
-  val poolConfig = ContainerPoolConfig(2.MB, 0.5, false)
+  val poolConfig = ContainerPoolConfig(2.MB, 0.5, false, FiniteDuration(1, 
TimeUnit.MINUTES))
   def healthchecksConfig(enabled: Boolean = false) = 
ContainerProxyHealthCheckConfig(enabled, 100.milliseconds, 2)
   val filterEnvVar = (k: String) => Character.isUpperCase(k.charAt(0))
 
@@ -1025,7 +1026,7 @@ class ContainerProxyTests
     registerCallback(machine)
     machine ! Run(action, message)
     expectMsg(Transition(machine, Uninitialized, Running))
-    expectMsg(ContainerRemoved)
+    expectMsg(ContainerRemoved(true))
 
     awaitAssert {
       factory.calls should have size 1
@@ -1070,7 +1071,7 @@ class ContainerProxyTests
     registerCallback(machine)
     machine ! Run(action, message)
     expectMsg(Transition(machine, Uninitialized, Running))
-    expectMsg(ContainerRemoved) // The message is sent as soon as the 
container decides to destroy itself
+    expectMsg(ContainerRemoved(true)) // The message is sent as soon as the 
container decides to destroy itself
     expectMsg(Transition(machine, Running, Removing))
 
     awaitAssert {
@@ -1123,7 +1124,7 @@ class ContainerProxyTests
     registerCallback(machine)
     machine ! Run(action, message)
     expectMsg(Transition(machine, Uninitialized, Running))
-    expectMsg(ContainerRemoved) // The message is sent as soon as the 
container decides to destroy itself
+    expectMsg(ContainerRemoved(true)) // The message is sent as soon as the 
container decides to destroy itself
     expectMsg(Transition(machine, Running, Removing))
 
     awaitAssert {
@@ -1162,7 +1163,7 @@ class ContainerProxyTests
     registerCallback(machine)
     machine ! Run(action, message)
     expectMsg(Transition(machine, Uninitialized, Running))
-    expectMsg(ContainerRemoved) // The message is sent as soon as the 
container decides to destroy itself
+    expectMsg(ContainerRemoved(true)) // The message is sent as soon as the 
container decides to destroy itself
     expectMsg(Transition(machine, Running, Removing))
 
     awaitAssert {
@@ -1200,7 +1201,7 @@ class ContainerProxyTests
     registerCallback(machine)
     machine ! Run(action, message)
     expectMsg(Transition(machine, Uninitialized, Running))
-    expectMsg(ContainerRemoved) // The message is sent as soon as the 
container decides to destroy itself
+    expectMsg(ContainerRemoved(true)) // The message is sent as soon as the 
container decides to destroy itself
     expectMsg(Transition(machine, Running, Removing))
 
     awaitAssert {
@@ -1386,7 +1387,7 @@ class ContainerProxyTests
     preWarm(machine)
 
     //expect failure after healthchecks fail
-    expectMsg(ContainerRemoved)
+    expectMsg(ContainerRemoved(true))
     expectMsg(Transition(machine, Started, Removing))
 
     awaitAssert {
@@ -1426,7 +1427,7 @@ class ContainerProxyTests
     run(machine, Uninitialized)
     timeout(machine) // times out Ready state so container suspends
     expectMsg(Transition(machine, Ready, Pausing))
-    expectMsg(ContainerRemoved) // The message is sent as soon as the 
container decides to destroy itself
+    expectMsg(ContainerRemoved(true)) // The message is sent as soon as the 
container decides to destroy itself
     expectMsg(Transition(machine, Pausing, Removing))
 
     awaitAssert {
@@ -1483,7 +1484,7 @@ class ContainerProxyTests
     expectMsg(Transition(machine, Running, Ready))
 
     // Remove the container after the transaction finished
-    expectMsg(ContainerRemoved)
+    expectMsg(ContainerRemoved(true))
     expectMsg(Transition(machine, Ready, Removing))
 
     awaitAssert {
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ExecManifestTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ExecManifestTests.scala
index 605b046..874bd11 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ExecManifestTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ExecManifestTests.scala
@@ -17,6 +17,8 @@
 
 package org.apache.openwhisk.core.entity.test
 
+import java.util.concurrent.TimeUnit
+
 import common.{StreamLogging, WskActorSystem}
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
@@ -28,6 +30,7 @@ import org.apache.openwhisk.core.entity.ExecManifest._
 import org.apache.openwhisk.core.entity.size._
 import org.apache.openwhisk.core.entity.ByteSize
 
+import scala.concurrent.duration.FiniteDuration
 import scala.util.Success
 
 @RunWith(classOf[JUnitRunner])
@@ -123,7 +126,7 @@ class ExecManifestTests extends FlatSpec with 
WskActorSystem with StreamLogging
     runtimes.resolveDefaultRuntime("p1").get.image.resolveImageName() shouldBe 
"ppp/???:ttt"
     runtimes.resolveDefaultRuntime("q1").get.image.resolveImageName() shouldBe 
"rrr/???:ttt"
     runtimes.resolveDefaultRuntime("s1").get.image.resolveImageName() shouldBe 
"???"
-    runtimes.resolveDefaultRuntime("s1").get.stemCells.get(0).count shouldBe 2
+    runtimes.resolveDefaultRuntime("s1").get.stemCells.get(0).initialCount 
shouldBe 2
     runtimes.resolveDefaultRuntime("s1").get.stemCells.get(0).memory shouldBe 
256.MB
   }
 
@@ -237,7 +240,7 @@ class ExecManifestTests extends FlatSpec with 
WskActorSystem with StreamLogging
 
   it should "de/serialize stem cell configuration" in {
     val cell = StemCell(3, 128.MB)
-    val cellAsJson = JsObject("count" -> JsNumber(3), "memory" -> 
JsString("128 MB"))
+    val cellAsJson = JsObject("initialCount" -> JsNumber(3), "memory" -> 
JsString("128 MB"))
     stemCellSerdes.write(cell) shouldBe cellAsJson
     stemCellSerdes.read(cellAsJson) shouldBe cell
 
@@ -250,19 +253,19 @@ class ExecManifestTests extends FlatSpec with 
WskActorSystem with StreamLogging
     }
 
     an[IllegalArgumentException] shouldBe thrownBy {
-      val cellAsJson = JsObject("count" -> JsNumber(0), "memory" -> 
JsString("128 MB"))
+      val cellAsJson = JsObject("initialCount" -> JsNumber(0), "memory" -> 
JsString("128 MB"))
       stemCellSerdes.read(cellAsJson)
     }
 
     the[IllegalArgumentException] thrownBy {
-      val cellAsJson = JsObject("count" -> JsNumber(1), "memory" -> 
JsString("128"))
+      val cellAsJson = JsObject("initialCount" -> JsNumber(1), "memory" -> 
JsString("128"))
       stemCellSerdes.read(cellAsJson)
     } should have message {
       ByteSize.formatError
     }
   }
 
-  it should "parse manifest from JSON string" in {
+  it should "parse manifest without reactive from JSON string" in {
     val json = """
                  |{ "runtimes": {
                  |    "nodef": [
@@ -273,7 +276,7 @@ class ExecManifestTests extends FlatSpec with 
WskActorSystem with StreamLogging
                  |          "name": "nodejsaction"
                  |        },
                  |        "stemCells": [{
-                 |          "count": 1,
+                 |          "initialCount": 1,
                  |          "memory": "128 MB"
                  |        }]
                  |      }, {
@@ -283,10 +286,10 @@ class ExecManifestTests extends FlatSpec with 
WskActorSystem with StreamLogging
                  |          "name": "nodejsaction"
                  |        },
                  |        "stemCells": [{
-                 |          "count": 1,
+                 |          "initialCount": 1,
                  |          "memory": "128 MB"
                  |        }, {
-                 |          "count": 1,
+                 |          "initialCount": 1,
                  |          "memory": "256 MB"
                  |        }]
                  |      }
@@ -297,7 +300,7 @@ class ExecManifestTests extends FlatSpec with 
WskActorSystem with StreamLogging
                  |        "name": "pythonaction"
                  |      },
                  |      "stemCells": [{
-                 |        "count": 2,
+                 |        "initialCount": 2,
                  |        "memory": "256 MB"
                  |      }]
                  |    }],
@@ -318,7 +321,7 @@ class ExecManifestTests extends FlatSpec with 
WskActorSystem with StreamLogging
                  |}
                  |""".stripMargin.parseJson.asJsObject
 
-    val js6 = RuntimeManifest(
+    val js10 = RuntimeManifest(
       "nodejs:10",
       ImageName("nodejsaction"),
       deprecated = Some(true),
@@ -336,7 +339,7 @@ class ExecManifestTests extends FlatSpec with 
WskActorSystem with StreamLogging
     mf shouldBe {
       Runtimes(
         Set(
-          RuntimeFamily("nodef", Set(js6, js8)),
+          RuntimeFamily("nodef", Set(js10, js8)),
           RuntimeFamily("pythonf", Set(py)),
           RuntimeFamily("swiftf", Set(sw)),
           RuntimeFamily("phpf", Set(ph))),
@@ -344,17 +347,136 @@ class ExecManifestTests extends FlatSpec with 
WskActorSystem with StreamLogging
         None)
     }
 
-    def stemCellFactory(m: RuntimeManifest, cells: List[StemCell]) = cells.map 
{ c =>
-      (m.kind, m.image, c.count, c.memory)
+    mf.stemcells.flatMap {
+      case (m, cells) =>
+        cells.map { c =>
+          (m.kind, m.image, c.initialCount, c.memory)
+        }
+    }.toList should contain theSameElementsAs List(
+      (js10.kind, js10.image, 1, 128.MB),
+      (js8.kind, js8.image, 1, 128.MB),
+      (js8.kind, js8.image, 1, 256.MB),
+      (py.kind, py.image, 2, 256.MB))
+  }
+
+  it should "parse manifest with reactive from JSON string" in {
+    val json = """
+                 |{ "runtimes": {
+                 |    "nodef": [
+                 |      {
+                 |        "kind": "nodejs:10",
+                 |        "deprecated": true,
+                 |        "image": {
+                 |          "name": "nodejsaction"
+                 |        },
+                 |        "stemCells": [{
+                 |          "initialCount": 1,
+                 |          "memory": "128 MB",
+                 |          "reactive": {
+                 |            "minCount": 1,
+                 |            "maxCount": 4,
+                 |            "ttl": "2 minutes",
+                 |            "threshold": 1,
+                 |            "increment": 1
+                 |           }
+                 |        }]
+                 |      }, {
+                 |        "kind": "nodejs:8",
+                 |        "default": true,
+                 |        "image": {
+                 |          "name": "nodejsaction"
+                 |        },
+                 |        "stemCells": [{
+                 |          "initialCount": 1,
+                 |          "memory": "128 MB",
+                 |          "reactive": {
+                 |            "minCount": 1,
+                 |            "maxCount": 4,
+                 |            "ttl": "2 minutes",
+                 |            "threshold": 1,
+                 |            "increment": 1
+                 |          }
+                 |        }, {
+                 |          "initialCount": 1,
+                 |          "memory": "256 MB",
+                 |          "reactive": {
+                 |            "minCount": 1,
+                 |            "maxCount": 4,
+                 |            "ttl": "2 minutes",
+                 |            "threshold": 1,
+                 |            "increment": 1
+                 |           }
+                 |        }]
+                 |      }
+                 |    ],
+                 |    "pythonf": [{
+                 |      "kind": "python",
+                 |      "image": {
+                 |        "name": "pythonaction"
+                 |      },
+                 |      "stemCells": [{
+                 |        "initialCount": 2,
+                 |        "memory": "256 MB",
+                 |        "reactive": {
+                 |           "minCount": 1,
+                 |           "maxCount": 4,
+                 |           "ttl": "2 minutes",
+                 |           "threshold": 1,
+                 |           "increment": 1
+                 |          }
+                 |      }]
+                 |    }],
+                 |    "swiftf": [{
+                 |      "kind": "swift",
+                 |      "image": {
+                 |        "name": "swiftaction"
+                 |      },
+                 |      "stemCells": []
+                 |    }],
+                 |    "phpf": [{
+                 |      "kind": "php",
+                 |      "image": {
+                 |        "name": "phpaction"
+                 |      }
+                 |    }]
+                 |  }
+                 |}
+                 |""".stripMargin.parseJson.asJsObject
+
+    val reactive = Some(ReactivePrewarmingConfig(1, 4, FiniteDuration(2, 
TimeUnit.MINUTES), 1, 1))
+    val js10 = RuntimeManifest(
+      "nodejs:10",
+      ImageName("nodejsaction"),
+      deprecated = Some(true),
+      stemCells = Some(List(StemCell(1, 128.MB, reactive))))
+    val js8 = RuntimeManifest(
+      "nodejs:8",
+      ImageName("nodejsaction"),
+      default = Some(true),
+      stemCells = Some(List(StemCell(1, 128.MB, reactive), StemCell(1, 256.MB, 
reactive))))
+    val py = RuntimeManifest("python", ImageName("pythonaction"), stemCells = 
Some(List(StemCell(2, 256.MB, reactive))))
+    val sw = RuntimeManifest("swift", ImageName("swiftaction"), stemCells = 
Some(List.empty))
+    val ph = RuntimeManifest("php", ImageName("phpaction"))
+    val mf = ExecManifest.runtimes(json, RuntimeManifestConfig()).get
+
+    mf shouldBe {
+      Runtimes(
+        Set(
+          RuntimeFamily("nodef", Set(js10, js8)),
+          RuntimeFamily("pythonf", Set(py)),
+          RuntimeFamily("swiftf", Set(sw)),
+          RuntimeFamily("phpf", Set(ph))),
+        Set.empty,
+        None)
     }
 
     mf.stemcells.flatMap {
       case (m, cells) =>
         cells.map { c =>
-          (m.kind, m.image, c.count, c.memory)
+          (m.kind, m.image, c.initialCount, c.memory)
         }
     }.toList should contain theSameElementsAs List(
-      (js6.kind, js6.image, 1, 128.MB),
+      (js10.kind, js10.image, 1, 128.MB),
       (js8.kind, js8.image, 1, 128.MB),
       (js8.kind, js8.image, 1, 256.MB),
       (py.kind, py.image, 2, 256.MB))
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ExecTests.scala 
b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ExecTests.scala
index cd133c0..0714f52 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ExecTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/entity/test/ExecTests.scala
@@ -152,7 +152,7 @@ class ExecTests extends FlatSpec with Matchers with 
StreamLogging with BeforeAnd
         |        },
         |        "deprecated": false,
         |        "stemCells": [{
-        |          "count": 2,
+        |          "initialCount": 2,
         |          "memory": "256 MB"
         |        }]
         |      }

Reply via email to