rabbah closed pull request #3232: Making prewarm kind (and count) configurable
URL: https://github.com/apache/incubator-openwhisk/pull/3232
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/scala/whisk/core/entity/ExecManifest.scala 
b/common/scala/src/main/scala/whisk/core/entity/ExecManifest.scala
index 668ef609f6..3acb49022a 100644
--- a/common/scala/src/main/scala/whisk/core/entity/ExecManifest.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/ExecManifest.scala
@@ -122,6 +122,7 @@ protected[core] object ExecManifest {
    * @param requireMain true iff main entry point is not optional
    * @param sentinelledLogs true iff the runtime generates stdout/stderr log 
sentinels after an activation
    * @param image optional image name, otherwise inferred via fixed mapping 
(remove colons and append 'action')
+   * @param stemCells optional list of stemCells to be initialized by invoker 
per kind
    */
   protected[core] case class RuntimeManifest(kind: String,
                                              image: ImageName,
@@ -129,7 +130,8 @@ protected[core] object ExecManifest {
                                              default: Option[Boolean] = None,
                                              attached: Option[Attached] = None,
                                              requireMain: Option[Boolean] = 
None,
-                                             sentinelledLogs: Option[Boolean] 
= None) {
+                                             sentinelledLogs: Option[Boolean] 
= None,
+                                             stemCells: Option[List[StemCell]] 
= None) {
 
     protected[entity] def toJsonSummary = {
       JsObject(
@@ -138,10 +140,16 @@ protected[core] object ExecManifest {
         "deprecated" -> deprecated.getOrElse(false).toJson,
         "default" -> default.getOrElse(false).toJson,
         "attached" -> attached.isDefined.toJson,
-        "requireMain" -> requireMain.getOrElse(false).toJson)
+        "requireMain" -> requireMain.getOrElse(false).toJson,
+        "stemCells" -> stemCells.getOrElse(List()).toJson)
     }
   }
 
+  /**
+   * A stemcell for a container image to be initialized by Invoker
+   */
+  protected[core] case class StemCell(count: Int, memory: String)
+
   /**
    * An image name for an action refers to the container image canonically as
    * "prefix/name[:tag]" e.g., "openwhisk/python3action:latest".
@@ -285,6 +293,7 @@ protected[core] object ExecManifest {
     private val defaultSplitter = "([a-z0-9]+):default".r
   }
 
+  protected[entity] implicit val stemCellSerdes = jsonFormat2(StemCell.apply)
   protected[entity] implicit val imageNameSerdes = jsonFormat3(ImageName.apply)
-  protected[entity] implicit val runtimeManifestSerdes = 
jsonFormat7(RuntimeManifest)
+  protected[entity] implicit val runtimeManifestSerdes = 
jsonFormat8(RuntimeManifest)
 }
diff --git 
a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala 
b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
index 183556942e..d39e78a49d 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
@@ -60,7 +60,7 @@ case class WorkerData(data: ContainerData, state: WorkerState)
  */
 class ContainerPool(childFactory: ActorRefFactory => ActorRef,
                     feed: ActorRef,
-                    prewarmConfig: Option[PrewarmingConfig] = None,
+                    prewarmConfigs: Option[List[PrewarmingConfig]] = None,
                     poolConfig: ContainerPoolConfig)
     extends Actor {
   implicit val logging = new AkkaLogging(context.system.log)
@@ -70,7 +70,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
   var prewarmedPool = immutable.Map.empty[ActorRef, ContainerData]
   val logMessageInterval = 10.seconds
 
-  prewarmConfig.foreach { config =>
+  prewarmConfigs.getOrElse(List()).foreach { config =>
     logging.info(this, s"pre-warming ${config.count} ${config.exec.kind} 
containers")(TransactionId.invokerWarmup)
     (1 to config.count).foreach { _ =>
       prewarmContainer(config.exec, config.memoryLimit)
@@ -204,26 +204,25 @@ class ContainerPool(childFactory: ActorRefFactory => 
ActorRef,
    * @param kind the kind you want to invoke
    * @return the container iff found
    */
-  def takePrewarmContainer(action: ExecutableWhiskAction): Option[(ActorRef, 
ContainerData)] =
-    prewarmConfig.flatMap { config =>
-      val kind = action.exec.kind
-      val memory = action.limits.memory.megabytes.MB
-      prewarmedPool
-        .find {
-          case (_, PreWarmedData(_, `kind`, `memory`)) => true
-          case _                                       => false
-        }
-        .map {
-          case (ref, data) =>
-            // Move the container to the usual pool
-            freePool = freePool + (ref -> data)
-            prewarmedPool = prewarmedPool - ref
-            // Create a new prewarm container
-            prewarmContainer(config.exec, config.memoryLimit)
+  def takePrewarmContainer(action: ExecutableWhiskAction): Option[(ActorRef, 
ContainerData)] = {
+    val kind = action.exec.kind
+    val memory = action.limits.memory.megabytes.MB
+    prewarmedPool
+      .find {
+        case (_, PreWarmedData(_, `kind`, `memory`)) => true
+        case _                                       => false
+      }
+      .map {
+        case (ref, data) =>
+          // Move the container to the usual pool
+          freePool = freePool + (ref -> data)
+          prewarmedPool = prewarmedPool - ref
+          // Create a new prewarm container
+          prewarmContainer(action.exec, memory)
 
-            (ref, data)
-        }
-    }
+          (ref, data)
+      }
+  }
 
   /** Removes a container and updates state accordingly. */
   def removeContainer(toDelete: ActorRef) = {
@@ -282,8 +281,8 @@ object ContainerPool {
   def props(factory: ActorRefFactory => ActorRef,
             poolConfig: ContainerPoolConfig,
             feed: ActorRef,
-            prewarmConfig: Option[PrewarmingConfig] = None) =
-    Props(new ContainerPool(factory, feed, prewarmConfig, poolConfig))
+            prewarmConfigs: Option[List[PrewarmingConfig]] = None) =
+    Props(new ContainerPool(factory, feed, prewarmConfigs, poolConfig))
 }
 
 /** Contains settings needed to perform container prewarming */
diff --git 
a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala 
b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index b132dd815f..ac71edf09a 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -33,7 +33,6 @@ import whisk.core.containerpool._
 import whisk.core.containerpool.logging.LogStoreProvider
 import whisk.core.database._
 import whisk.core.entity._
-import whisk.core.entity.size._
 import whisk.http.Messages
 import whisk.spi.SpiLoader
 
@@ -173,14 +172,20 @@ class InvokerReactive(
       ContainerProxy
         .props(containerFactory.createContainer, ack, store, 
logsProvider.collectLogs, instance, poolConfig))
 
-  private val prewarmKind = "nodejs:6"
-  private val prewarmExec = ExecManifest.runtimesManifest
-    .resolveDefaultRuntime(prewarmKind)
-    .map(manifest => CodeExecAsString(manifest, "", None))
-    .get
+  val runtimes = ExecManifest.runtimesManifest
+  val prewarmingConfigs =
+    for ((kind, rm) <- runtimes.manifests; stemCell <- 
rm.stemCells.getOrElse(List()) if stemCell.count > 0) yield {
+      val prewarmExec = ExecManifest.runtimesManifest
+        .resolveDefaultRuntime(kind)
+        .map { manifest =>
+          new CodeExecAsString(manifest, "", None)
+        }
+        .get
+      (PrewarmingConfig(stemCell.count, prewarmExec, 
ByteSize.fromString(stemCell.memory)))
+    }
 
-  private val pool = actorSystem.actorOf(
-    ContainerPool.props(childFactory, poolConfig, activationFeed, 
Some(PrewarmingConfig(2, prewarmExec, 256.MB))))
+  private val pool =
+    actorSystem.actorOf(ContainerPool.props(childFactory, poolConfig, 
activationFeed, Some(prewarmingConfigs.toList)))
 
   /** Is called when an ActivationMessage is read from Kafka */
   def processActivationMessage(bytes: Array[Byte]): Future[Unit] = {
diff --git 
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala 
b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
index b61e6f8db0..45a809b62a 100644
--- 
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
+++ 
b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
@@ -240,7 +240,8 @@ class ContainerPoolTests
 
     val pool =
       system.actorOf(
-        ContainerPool.props(factory, ContainerPoolConfig(0, 0), feed.ref, 
Some(PrewarmingConfig(1, exec, memoryLimit))))
+        ContainerPool
+          .props(factory, ContainerPoolConfig(0, 0), feed.ref, 
Some(List(PrewarmingConfig(1, exec, memoryLimit)))))
     containers(0).expectMsg(Start(exec, memoryLimit))
   }
 
@@ -250,7 +251,8 @@ class ContainerPoolTests
 
     val pool =
       system.actorOf(
-        ContainerPool.props(factory, ContainerPoolConfig(1, 1), feed.ref, 
Some(PrewarmingConfig(1, exec, memoryLimit))))
+        ContainerPool
+          .props(factory, ContainerPoolConfig(1, 1), feed.ref, 
Some(List(PrewarmingConfig(1, exec, memoryLimit)))))
     containers(0).expectMsg(Start(exec, memoryLimit))
     containers(0).send(pool, NeedWork(preWarmedData(exec.kind)))
     pool ! runMessage
@@ -265,7 +267,11 @@ class ContainerPoolTests
 
     val pool = system.actorOf(
       ContainerPool
-        .props(factory, ContainerPoolConfig(1, 1), feed.ref, 
Some(PrewarmingConfig(1, alternativeExec, memoryLimit))))
+        .props(
+          factory,
+          ContainerPoolConfig(1, 1),
+          feed.ref,
+          Some(List(PrewarmingConfig(1, alternativeExec, memoryLimit)))))
     containers(0).expectMsg(Start(alternativeExec, memoryLimit)) // container0 
was prewarmed
     containers(0).send(pool, NeedWork(preWarmedData(alternativeExec.kind)))
     pool ! runMessage
@@ -281,7 +287,7 @@ class ContainerPoolTests
     val pool =
       system.actorOf(
         ContainerPool
-          .props(factory, ContainerPoolConfig(1, 1), feed.ref, 
Some(PrewarmingConfig(1, exec, alternativeLimit))))
+          .props(factory, ContainerPoolConfig(1, 1), feed.ref, 
Some(List(PrewarmingConfig(1, exec, alternativeLimit)))))
     containers(0).expectMsg(Start(exec, alternativeLimit)) // container0 was 
prewarmed
     containers(0).send(pool, NeedWork(preWarmedData(exec.kind, 
alternativeLimit)))
     pool ! runMessage
diff --git 
a/tests/src/test/scala/whisk/core/entity/test/ExecManifestTests.scala 
b/tests/src/test/scala/whisk/core/entity/test/ExecManifestTests.scala
index 71f02379e9..cb89a15db3 100644
--- a/tests/src/test/scala/whisk/core/entity/test/ExecManifestTests.scala
+++ b/tests/src/test/scala/whisk/core/entity/test/ExecManifestTests.scala
@@ -63,10 +63,11 @@ class ExecManifestTests extends FlatSpec with 
WskActorSystem with StreamLogging
     val k1 = RuntimeManifest("k1", ImageName("???"))
     val k2 = RuntimeManifest("k2", ImageName("???"), default = Some(true))
     val p1 = RuntimeManifest("p1", ImageName("???"))
-    val mf = manifestFactory(JsObject("ks" -> Set(k1, k2).toJson, "p1" -> 
Set(p1).toJson))
+    val s1 = RuntimeManifest("s1", ImageName("???"), stemCells = 
Some(List(StemCell(2, "256M"))))
+    val mf = manifestFactory(JsObject("ks" -> Set(k1, k2).toJson, "p1" -> 
Set(p1).toJson, "s1" -> Set(s1).toJson))
     val runtimes = ExecManifest.runtimes(mf, RuntimeManifestConfig()).get
 
-    Seq("k1", "k2", "p1").foreach {
+    Seq("k1", "k2", "p1", "s1").foreach {
       runtimes.knownContainerRuntimes.contains(_) shouldBe true
     }
 
@@ -75,9 +76,11 @@ class ExecManifestTests extends FlatSpec with WskActorSystem 
with StreamLogging
     runtimes.resolveDefaultRuntime("k1") shouldBe Some(k1)
     runtimes.resolveDefaultRuntime("k2") shouldBe Some(k2)
     runtimes.resolveDefaultRuntime("p1") shouldBe Some(p1)
+    runtimes.resolveDefaultRuntime("s1") shouldBe Some(s1)
 
     runtimes.resolveDefaultRuntime("ks:default") shouldBe Some(k2)
     runtimes.resolveDefaultRuntime("p1:default") shouldBe Some(p1)
+    runtimes.resolveDefaultRuntime("s1:default") shouldBe Some(s1)
   }
 
   it should "read a valid configuration without default prefix, default tag" 
in {
@@ -85,9 +88,15 @@ class ExecManifestTests extends FlatSpec with WskActorSystem 
with StreamLogging
     val i2 = RuntimeManifest("i2", ImageName("???", Some("ppp")), default = 
Some(true))
     val j1 = RuntimeManifest("j1", ImageName("???", Some("ppp"), Some("ttt")))
     val k1 = RuntimeManifest("k1", ImageName("???", None, Some("ttt")))
+    val s1 = RuntimeManifest("s1", ImageName("???"), stemCells = 
Some(List(StemCell(2, "256M"))))
 
     val mf =
-      JsObject("runtimes" -> JsObject("is" -> Set(i1, i2).toJson, "js" -> 
Set(j1).toJson, "ks" -> Set(k1).toJson))
+      JsObject(
+        "runtimes" -> JsObject(
+          "is" -> Set(i1, i2).toJson,
+          "js" -> Set(j1).toJson,
+          "ks" -> Set(k1).toJson,
+          "ss" -> Set(s1).toJson))
     val rmc = RuntimeManifestConfig(defaultImagePrefix = Some("pre"), 
defaultImageTag = Some("test"))
     val runtimes = ExecManifest.runtimes(mf, rmc).get
 
@@ -95,6 +104,9 @@ class ExecManifestTests extends FlatSpec with WskActorSystem 
with StreamLogging
     runtimes.resolveDefaultRuntime("i2").get.image.publicImageName shouldBe 
"ppp/???:test"
     runtimes.resolveDefaultRuntime("j1").get.image.publicImageName shouldBe 
"ppp/???:ttt"
     runtimes.resolveDefaultRuntime("k1").get.image.publicImageName shouldBe 
"pre/???:ttt"
+    runtimes.resolveDefaultRuntime("s1").get.image.publicImageName shouldBe 
"pre/???:test"
+    runtimes.resolveDefaultRuntime("s1").get.stemCells.get(0).count shouldBe 2
+    runtimes.resolveDefaultRuntime("s1").get.stemCells.get(0).memory shouldBe 
"256M"
   }
 
   it should "read a valid configuration with blackbox images but without 
default prefix or tag" in {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to