This is an automated email from the ASF dual-hosted git repository.

dgrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new da21c9f  Track activation counts in ContainerPool (not ContainerProxy) 
(#4186)
da21c9f is described below

commit da21c9fe49b2ae72c95b6866b30d984c65253724
Author: tysonnorris <[email protected]>
AuthorDate: Mon Feb 11 11:45:20 2019 -0800

    Track activation counts in ContainerPool (not ContainerProxy) (#4186)
    
    Track activation counts in ContainerPool (not ContainerProxy) so that 
Prewam/cold -> Warm can be tracked as Warming and receive activations to avoid 
spawning extra containers
---
 .../openwhisk/core/containerpool/Container.scala   |   4 +
 .../core/containerpool/ContainerPool.scala         |  82 ++++++----
 .../core/containerpool/ContainerProxy.scala        | 171 ++++++++++++++++-----
 .../containerpool/test/ContainerPoolTests.scala    | 164 ++++++++++++++++++++
 .../containerpool/test/ContainerProxyTests.scala   |  48 ++----
 .../openwhisk/core/limits/ConcurrencyTests.scala   |   2 +-
 6 files changed, 368 insertions(+), 103 deletions(-)

diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
index 354ec38..1da3e2f 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
@@ -223,6 +223,10 @@ trait Container {
   private def closeConnections(toClose: Option[ContainerClient]): Future[Unit] 
= {
     toClose.map(_.close()).getOrElse(Future.successful(()))
   }
+
+  /** This is so that we can easily log the container id during 
ContainerPool.logContainerStart().
+   *  Null check is here since some tests use stub[Container] so id is null 
during those tests. */
+  override def toString() = if (id == null) "no-container-id" else id.toString
 }
 
 /** Indicates a general error with the container */
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
index 81c5427..390eea8 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
@@ -22,7 +22,6 @@ import org.apache.openwhisk.common.{AkkaLogging, 
LoggingMarkers, TransactionId}
 import org.apache.openwhisk.core.connector.MessageFeed
 import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.core.entity.size._
-
 import scala.collection.immutable
 import scala.concurrent.duration._
 import scala.util.Try
@@ -79,7 +78,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
     }
   }
 
-  def logContainerStart(r: Run, containerState: String, activeActivations: 
Int): Unit = {
+  def logContainerStart(r: Run, containerState: String, activeActivations: 
Int, container: Option[Container]): Unit = {
     val namespaceName = r.msg.user.namespace.name
     val actionName = r.action.name.name
     val maxConcurrent = r.action.limits.concurrency.maxConcurrent
@@ -88,7 +87,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
     r.msg.transid.mark(
       this,
       LoggingMarkers.INVOKER_CONTAINER_START(containerState),
-      s"containerStart containerState: $containerState ($activeActivations of 
max $maxConcurrent) action: $actionName namespace: $namespaceName activationId: 
$activationId",
+      s"containerStart containerState: $containerState container: $container 
activations: $activeActivations of max $maxConcurrent action: $actionName 
namespace: $namespaceName activationId: $activationId",
       akka.event.Logging.InfoLevel)
   }
 
@@ -112,9 +111,9 @@ class ContainerPool(childFactory: ActorRefFactory => 
ActorRef,
             // Schedule a job to a warm container
             ContainerPool
               .schedule(r.action, r.msg.user.namespace.name, freePool)
-              .map(container => (container, "warm"))
+              .map(container => (container, container._2.initingState)) 
//warmed, warming, and warmingCold always know their state
               .orElse(
-                // There was no warm container. Try to take a prewarm 
container or a cold container.
+                // There was no warm/warming/warmingCold container. Try to 
take a prewarm container or a cold container.
 
                 // Is there enough space to create a new container or do other 
containers have to be removed?
                 if (hasPoolSpaceFor(busyPool ++ freePool, 
r.action.limits.memory.megabytes.MB)) {
@@ -140,15 +139,26 @@ class ContainerPool(childFactory: ActorRefFactory => 
ActorRef,
 
         createdContainer match {
           case Some(((actor, data), containerState)) =>
+            //increment active count before storing in pool map
+            val newData = data.nextRun(r)
+            val container = newData.getContainer
+
+            if (newData.activeActivationCount < 1) {
+              logging.error(this, s"invalid activation count < 1 ${newData}")
+            }
+
             //only move to busyPool if max reached
-            if (data.activeActivationCount + 1 >= 
r.action.limits.concurrency.maxConcurrent) {
+            if (!newData.hasCapacity()) {
               if (r.action.limits.concurrency.maxConcurrent > 1) {
                 logging.info(
                   this,
-                  s"container for ${r.action} is now busy with 
${data.activeActivationCount + 1} activations")
+                  s"container ${container} is now busy with 
${newData.activeActivationCount} activations")
               }
-              busyPool = busyPool + (actor -> data)
+              busyPool = busyPool + (actor -> newData)
               freePool = freePool - actor
+            } else {
+              //update freePool to track counts
+              freePool = freePool + (actor -> newData)
             }
             // Remove the action that get's executed now from the buffer and 
execute the next one afterwards.
             if (isResentFromBuffer) {
@@ -159,7 +169,7 @@ class ContainerPool(childFactory: ActorRefFactory => 
ActorRef,
               runBuffer.dequeueOption.foreach { case (run, _) => self ! run }
             }
             actor ! r // forwards the run request to the container
-            logContainerStart(r, containerState, data.activeActivationCount)
+            logContainerStart(r, containerState, 
newData.activeActivationCount, container)
           case None =>
             // this can also happen if createContainer fails to start a new 
container, or
             // if a job is rescheduled but the container it was allocated to 
has not yet destroyed itself
@@ -193,34 +203,27 @@ class ContainerPool(childFactory: ActorRefFactory => 
ActorRef,
       }
 
     // Container is free to take more work
-    case NeedWork(data: WarmedData) =>
+    case NeedWork(warmData: WarmedData) =>
       feed ! MessageFeed.Processed
-      if (data.activeActivationCount < 
data.action.limits.concurrency.maxConcurrent) {
+      val oldData = freePool.get(sender()).getOrElse(busyPool(sender()))
+      val newData = warmData.copy(activeActivationCount = 
oldData.activeActivationCount - 1)
+      if (newData.activeActivationCount < 0) {
+        logging.error(this, s"invalid activation count after warming < 1 
${newData}")
+      }
+      if (newData.hasCapacity()) {
         //remove from busy pool (may already not be there), put back into free 
pool (to update activation counts)
-        freePool = freePool + (sender() -> data)
+        freePool = freePool + (sender() -> newData)
         if (busyPool.contains(sender())) {
           busyPool = busyPool - sender()
-          if (data.action.limits.concurrency.maxConcurrent > 1) {
+          if (newData.action.limits.concurrency.maxConcurrent > 1) {
             logging.info(
               this,
-              s"container for ${data.action} is no longer busy with 
${data.activeActivationCount} activations")
+              s"concurrent container ${newData.container} is no longer busy 
with ${newData.activeActivationCount} activations")
           }
         }
       } else {
-        //update freePool IFF it was previously PreWarmedData (it is still 
free, but now has WarmedData)
-        //otherwise update busyPool to reflect the updated activation counts
-        freePool.get(sender()) match {
-          case Some(_: PreWarmedData) =>
-            freePool = freePool + (sender() -> data)
-          case None =>
-            if (data.action.limits.concurrency.maxConcurrent > 1) {
-              logging.info(
-                this,
-                s"container for ${data.action} is now busy with 
${data.activeActivationCount} activations")
-            }
-            busyPool = busyPool + (sender() -> data)
-          case _ => //was free+WarmedData - do nothing
-        }
+        busyPool = busyPool + (sender() -> newData)
+        freePool = freePool - sender()
       }
 
     // Container is prewarmed and ready to take work
@@ -342,12 +345,23 @@ object ContainerPool {
   protected[containerpool] def schedule[A](action: ExecutableWhiskAction,
                                            invocationNamespace: EntityName,
                                            idles: Map[A, ContainerData]): 
Option[(A, ContainerData)] = {
-    idles.find {
-      case (_, WarmedData(_, `invocationNamespace`, `action`, _, 
activeActivationCount))
-          if activeActivationCount < action.limits.concurrency.maxConcurrent =>
-        true
-      case _ => false
-    }
+    idles
+      .find {
+        case (_, c @ WarmedData(_, `invocationNamespace`, `action`, _, _)) if 
c.hasCapacity() => true
+        case _                                                                 
               => false
+      }
+      .orElse {
+        idles.find {
+          case (_, c @ WarmingData(_, `invocationNamespace`, `action`, _, _)) 
if c.hasCapacity() => true
+          case _                                                               
                  => false
+        }
+      }
+      .orElse {
+        idles.find {
+          case (_, c @ WarmingColdData(`invocationNamespace`, `action`, _, _)) 
if c.hasCapacity() => true
+          case _                                                               
                   => false
+        }
+      }
   }
 
   /**
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
index 0fdea5e..7f45cc2 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
@@ -18,13 +18,11 @@
 package org.apache.openwhisk.core.containerpool
 
 import java.time.Instant
-
 import akka.actor.Status.{Failure => FailureMessage}
 import akka.actor.{FSM, Props, Stash}
 import akka.event.Logging.InfoLevel
 import akka.pattern.pipe
 import pureconfig.loadConfigOrThrow
-
 import scala.collection.immutable
 import spray.json.DefaultJsonProtocol._
 import spray.json._
@@ -38,7 +36,6 @@ import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.core.entity.size._
 import org.apache.openwhisk.core.invoker.InvokerReactive.ActiveAck
 import org.apache.openwhisk.http.Messages
-
 import scala.concurrent.Future
 import scala.concurrent.duration._
 import scala.util.{Failure, Success}
@@ -55,26 +52,115 @@ case object Paused extends ContainerState
 case object Removing extends ContainerState
 
 // Data
-sealed abstract class ContainerData(val lastUsed: Instant,
-                                    val memoryLimit: ByteSize,
-                                    val activeActivationCount: Int = 0)
-case class NoData() extends ContainerData(Instant.EPOCH, 0.B)
-case class MemoryData(override val memoryLimit: ByteSize) extends 
ContainerData(Instant.EPOCH, memoryLimit)
-case class PreWarmedData(container: Container,
+/** Base data type */
+sealed abstract class ContainerData(val lastUsed: Instant, val memoryLimit: 
ByteSize, val activeActivationCount: Int) {
+
+  /** When ContainerProxy in this state is scheduled, it may result in a new 
state (ContainerData)*/
+  def nextRun(r: Run): ContainerData
+
+  /**
+   *  Return Some(container) (for ContainerStarted instances) or None(for 
ContainerNotStarted instances)
+   *  Useful for cases where all ContainerData instances are handled, vs cases 
where only ContainerStarted
+   *  instances are handled */
+  def getContainer: Option[Container]
+
+  /** String to indicate the state of this container after scheduling */
+  val initingState: String
+
+  /** Inidicates whether this container can service additional activations */
+  def hasCapacity(): Boolean
+}
+
+/** abstract type to indicate an unstarted container */
+sealed abstract class ContainerNotStarted(override val lastUsed: Instant,
+                                          override val memoryLimit: ByteSize,
+                                          override val activeActivationCount: 
Int)
+    extends ContainerData(lastUsed, memoryLimit, activeActivationCount) {
+  override def getContainer = None
+  override val initingState = "cold"
+}
+
+/** abstract type to indicate a started container */
+sealed abstract class ContainerStarted(val container: Container,
+                                       override val lastUsed: Instant,
+                                       override val memoryLimit: ByteSize,
+                                       override val activeActivationCount: Int)
+    extends ContainerData(lastUsed, memoryLimit, activeActivationCount) {
+  override def getContainer = Some(container)
+}
+
+/** trait representing a container that is in use and (potentially) usable by 
subsequent or concurrent activations */
+sealed abstract trait ContainerInUse {
+  val activeActivationCount: Int
+  val action: ExecutableWhiskAction
+  def hasCapacity() =
+    activeActivationCount < action.limits.concurrency.maxConcurrent
+}
+
+/** trait representing a container that is NOT in use and is usable by 
subsequent activation(s) */
+sealed abstract trait ContainerNotInUse {
+  def hasCapacity() = true
+}
+
+/** type representing a cold (not running) container */
+case class NoData(override val activeActivationCount: Int = 0)
+    extends ContainerNotStarted(Instant.EPOCH, 0.B, activeActivationCount)
+    with ContainerNotInUse {
+  override def nextRun(r: Run) = WarmingColdData(r.msg.user.namespace.name, 
r.action, Instant.now, 1)
+}
+
+/** type representing a cold (not running) container with specific memory 
allocation */
+case class MemoryData(override val memoryLimit: ByteSize, override val 
activeActivationCount: Int = 0)
+    extends ContainerNotStarted(Instant.EPOCH, memoryLimit, 
activeActivationCount)
+    with ContainerNotInUse {
+  override def nextRun(r: Run) = WarmingColdData(r.msg.user.namespace.name, 
r.action, Instant.now, 1)
+}
+
+/** type representing a prewarmed (running, but unused) container (with a 
specific memory allocation) */
+case class PreWarmedData(override val container: Container,
                          kind: String,
                          override val memoryLimit: ByteSize,
                          override val activeActivationCount: Int = 0)
-    extends ContainerData(Instant.EPOCH, memoryLimit, activeActivationCount)
-case class WarmedData(container: Container,
+    extends ContainerStarted(container, Instant.EPOCH, memoryLimit, 
activeActivationCount)
+    with ContainerNotInUse {
+  override val initingState = "prewarmed"
+  override def nextRun(r: Run) =
+    WarmingData(container, r.msg.user.namespace.name, r.action, Instant.now, 1)
+}
+
+/** type representing a prewarm (running, but not used) container that is 
being initialized (for a specific action + invocation namespace) */
+case class WarmingData(override val container: Container,
+                       invocationNamespace: EntityName,
+                       action: ExecutableWhiskAction,
+                       override val lastUsed: Instant,
+                       override val activeActivationCount: Int = 0)
+    extends ContainerStarted(container, lastUsed, 
action.limits.memory.megabytes.MB, activeActivationCount)
+    with ContainerInUse {
+  override val initingState = "warming"
+  override def nextRun(r: Run) = copy(activeActivationCount = 
activeActivationCount + 1)
+}
+
+/** type representing a cold (not yet running) container that is being 
initialized (for a specific action + invocation namespace) */
+case class WarmingColdData(invocationNamespace: EntityName,
+                           action: ExecutableWhiskAction,
+                           override val lastUsed: Instant,
+                           override val activeActivationCount: Int = 0)
+    extends ContainerNotStarted(lastUsed, action.limits.memory.megabytes.MB, 
activeActivationCount)
+    with ContainerInUse {
+  override val initingState = "warmingCold"
+  override def nextRun(r: Run) = copy(activeActivationCount = 
activeActivationCount + 1)
+}
+
+/** type representing a warm container that has already been in use (for a 
specific action + invocation namespace) */
+case class WarmedData(override val container: Container,
                       invocationNamespace: EntityName,
                       action: ExecutableWhiskAction,
                       override val lastUsed: Instant,
                       override val activeActivationCount: Int = 0)
-    extends ContainerData(lastUsed, action.limits.memory.megabytes.MB) {
-  def incrementActive: WarmedData =
-    WarmedData(container, invocationNamespace, action, Instant.now, 
activeActivationCount + 1)
-  def decrementActive: WarmedData =
-    WarmedData(container, invocationNamespace, action, Instant.now, 
activeActivationCount - 1)
+    extends ContainerStarted(container, lastUsed, 
action.limits.memory.megabytes.MB, activeActivationCount)
+    with ContainerInUse {
+  override val initingState = "warmed"
+  override def nextRun(r: Run) = copy(activeActivationCount = 
activeActivationCount + 1)
 }
 
 // Events received by the actor
@@ -142,6 +228,9 @@ class ContainerProxy(
   implicit val logging = new AkkaLogging(context.system.log)
   var rescheduleJob = false // true iff actor receives a job but cannot 
process it because actor will destroy itself
   var runBuffer = immutable.Queue.empty[Run] //does not retain order, but does 
manage jobs that would have pushed past action concurrency limit
+
+  //keep a separate count to avoid confusion with 
ContainerState.activeActivationCount that is tracked/modified only in 
ContainerPool
+  var activeCount = 0;
   startWith(Uninitialized, NoData())
 
   when(Uninitialized) {
@@ -162,7 +251,7 @@ class ContainerProxy(
     // cold start (no container to reuse or available stem cell container)
     case Event(job: Run, _) =>
       implicit val transid = job.msg.transid
-
+      activeCount += 1
       // create a new container
       val container = factory(
         job.msg.transid,
@@ -233,6 +322,7 @@ class ContainerProxy(
   when(Started) {
     case Event(job: Run, data: PreWarmedData) =>
       implicit val transid = job.msg.transid
+      activeCount += 1
       initializeAndRun(data.container, job)
         .map(_ => RunCompleted)
         .pipeTo(self)
@@ -248,43 +338,52 @@ class ContainerProxy(
 
     // Init was successful
     case Event(completed: InitCompleted, _: PreWarmedData) =>
-      //in case concurrency supported, multiple runs can begin as soon as init 
is complete
-      context.parent ! NeedWork(completed.data)
       stay using completed.data
 
+    // Init was successful
+    case Event(data: WarmedData, _: PreWarmedData) =>
+      //in case concurrency supported, multiple runs can begin as soon as init 
is complete
+      context.parent ! NeedWork(data)
+      stay using data
+
     // Run was successful
-    case Event(RunCompleted, s: WarmedData) =>
-      val newData = s.decrementActive
+    case Event(RunCompleted, data: WarmedData) =>
+      activeCount -= 1
 
       //if there are items in runbuffer, process them if there is capacity, 
and stay; otherwise if we have any pending activations, also stay
-      if (requestWork(newData) || newData.activeActivationCount > 0) {
-        stay using newData
+      if (requestWork(data) || activeCount > 0) {
+        stay using data
       } else {
-        goto(Ready) using newData
+        goto(Ready) using data
       }
     case Event(job: Run, data: WarmedData)
-        if stateData.activeActivationCount >= 
data.action.limits.concurrency.maxConcurrent && !rescheduleJob => //if we are 
over concurrency limit, and not a failure on resume
+        if activeCount >= data.action.limits.concurrency.maxConcurrent && 
!rescheduleJob => //if we are over concurrency limit, and not a failure on 
resume
+      logging.warn(this, s"buffering for container ${data.container}; 
${activeCount} activations in flight")
       runBuffer = runBuffer.enqueue(job)
       stay()
     case Event(job: Run, data: WarmedData)
-        if stateData.activeActivationCount < 
data.action.limits.concurrency.maxConcurrent && !rescheduleJob => //if there 
was a delay, and not a failure on resume, skip the run
-
+        if activeCount < data.action.limits.concurrency.maxConcurrent && 
!rescheduleJob => //if there was a delay, and not a failure on resume, skip the 
run
+      activeCount += 1
       implicit val transid = job.msg.transid
-      val newData = data.incrementActive
 
       initializeAndRun(data.container, job)
         .map(_ => RunCompleted)
         .pipeTo(self)
-      stay() using newData
+      stay() using data
 
     // Failed after /init (the first run failed)
-    case Event(_: FailureMessage, data: PreWarmedData) => 
destroyContainer(data.container)
+    case Event(_: FailureMessage, data: PreWarmedData) =>
+      activeCount -= 1
+      destroyContainer(data.container)
 
     // Failed for a subsequent /run
-    case Event(_: FailureMessage, data: WarmedData) => 
destroyContainer(data.container)
+    case Event(_: FailureMessage, data: WarmedData) =>
+      activeCount -= 1
+      destroyContainer(data.container)
 
     // Failed at getting a container for a cold-start run
     case Event(_: FailureMessage, _) =>
+      activeCount -= 1
       context.parent ! ContainerRemoved
       rejectBuffered()
       stop()
@@ -295,13 +394,13 @@ class ContainerProxy(
   when(Ready, stateTimeout = pauseGrace) {
     case Event(job: Run, data: WarmedData) =>
       implicit val transid = job.msg.transid
-      val newData = data.incrementActive
+      activeCount += 1
 
       initializeAndRun(data.container, job)
         .map(_ => RunCompleted)
         .pipeTo(self)
 
-      goto(Running) using newData
+      goto(Running) using data
 
     // pause grace timed out
     case Event(StateTimeout, data: WarmedData) =>
@@ -320,7 +419,7 @@ class ContainerProxy(
   when(Paused, stateTimeout = unusedTimeout) {
     case Event(job: Run, data: WarmedData) =>
       implicit val transid = job.msg.transid
-      val newData = data.incrementActive
+      activeCount += 1
 
       data.container
         .resume()
@@ -336,7 +435,7 @@ class ContainerProxy(
         .map(_ => RunCompleted)
         .pipeTo(self)
 
-      goto(Running) using newData
+      goto(Running) using data
 
     // container is reclaimed by the pool or it has become too old
     case Event(StateTimeout | Remove, data: WarmedData) =>
@@ -366,7 +465,7 @@ class ContainerProxy(
   /** Either process runbuffer or signal parent to send work; return true if 
runbuffer is being processed */
   def requestWork(newData: WarmedData): Boolean = {
     //if there is concurrency capacity, process runbuffer, or signal NeedWork
-    if (newData.activeActivationCount < 
newData.action.limits.concurrency.maxConcurrent) {
+    if (activeCount < newData.action.limits.concurrency.maxConcurrent) {
       runBuffer.dequeueOption match {
         case Some((run, q)) =>
           runBuffer = q
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
index 5e65e4c..431b3f2 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
@@ -86,6 +86,12 @@ class ContainerPoolTests
   val invocationNamespace = EntityName("invocationSpace")
   val differentInvocationNamespace = EntityName("invocationSpace2")
   val action = ExecutableWhiskAction(EntityPath("actionSpace"), 
EntityName("actionName"), exec)
+  val concurrencyEnabled = 
Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean)
+  val concurrentAction = ExecutableWhiskAction(
+    EntityPath("actionSpace"),
+    EntityName("actionName"),
+    exec,
+    limits = ActionLimits(concurrency = ConcurrencyLimit(if 
(concurrencyEnabled) 3 else 1)))
   val differentAction = action.copy(name = EntityName("actionName2"))
   val largeAction =
     action.copy(
@@ -98,6 +104,8 @@ class ContainerPoolTests
   val runMessageDifferentVersion = 
createRunMessage(action.copy().revision(DocRevision("v2")), invocationNamespace)
   val runMessageDifferentNamespace = createRunMessage(action, 
differentInvocationNamespace)
   val runMessageDifferentEverything = createRunMessage(differentAction, 
differentInvocationNamespace)
+  val runMessageConcurrent = createRunMessage(concurrentAction, 
invocationNamespace)
+  val runMessageConcurrentDifferentNamespace = 
createRunMessage(concurrentAction, differentInvocationNamespace)
 
   /** Helper to create PreWarmedData */
   def preWarmedData(kind: String, memoryLimit: ByteSize = memoryLimit) =
@@ -471,6 +479,84 @@ class ContainerPoolTests
     containers(4).send(pool, NeedWork(warmedData()))
     feed.expectMsg(MessageFeed.Processed)
   }
+
+  it should "increase activation counts when scheduling to containers whose 
actions support concurrency" in {
+    assume(concurrencyEnabled)
+    val (containers, factory) = testContainers(2)
+    val feed = TestProbe()
+
+    val pool = system.actorOf(ContainerPool.props(factory, 
poolConfig(MemoryLimit.stdMemory * 4), feed.ref))
+
+    // container0 is created and used
+    pool ! runMessageConcurrent
+    containers(0).expectMsg(runMessageConcurrent)
+
+    // container0 is reused
+    pool ! runMessageConcurrent
+    containers(0).expectMsg(runMessageConcurrent)
+
+    // container0 is reused
+    pool ! runMessageConcurrent
+    containers(0).expectMsg(runMessageConcurrent)
+
+    // container1 is created and used (these concurrent containers are 
configured with max 3 concurrent activations)
+    pool ! runMessageConcurrent
+    containers(1).expectMsg(runMessageConcurrent)
+  }
+
+  it should "schedule concurrent activations to different containers for 
different namespaces" in {
+    assume(concurrencyEnabled)
+    val (containers, factory) = testContainers(2)
+    val feed = TestProbe()
+
+    val pool = system.actorOf(ContainerPool.props(factory, 
poolConfig(MemoryLimit.stdMemory * 4), feed.ref))
+
+    // container0 is created and used
+    pool ! runMessageConcurrent
+    containers(0).expectMsg(runMessageConcurrent)
+
+    // container1 is created and used
+    pool ! runMessageConcurrentDifferentNamespace
+    containers(1).expectMsg(runMessageConcurrentDifferentNamespace)
+  }
+
+  it should "decrease activation counts when receiving NeedWork for actions 
that support concurrency" in {
+    assume(concurrencyEnabled)
+    val (containers, factory) = testContainers(2)
+    val feed = TestProbe()
+
+    val pool = system.actorOf(ContainerPool.props(factory, 
poolConfig(MemoryLimit.stdMemory * 4), feed.ref))
+
+    // container0 is created and used
+    pool ! runMessageConcurrent
+    containers(0).expectMsg(runMessageConcurrent)
+
+    // container0 is reused
+    pool ! runMessageConcurrent
+    containers(0).expectMsg(runMessageConcurrent)
+
+    // container0 is reused
+    pool ! runMessageConcurrent
+    containers(0).expectMsg(runMessageConcurrent)
+
+    // container1 is created and used (these concurrent containers are 
configured with max 3 concurrent activations)
+    pool ! runMessageConcurrent
+    containers(1).expectMsg(runMessageConcurrent)
+
+    // container1 is reused
+    pool ! runMessageConcurrent
+    containers(1).expectMsg(runMessageConcurrent)
+
+    // container1 is reused
+    pool ! runMessageConcurrent
+    containers(1).expectMsg(runMessageConcurrent)
+
+    containers(0).send(pool, NeedWork(warmedData(action = concurrentAction)))
+
+    // container0 is reused (since active count decreased)
+    pool ! runMessageConcurrent
+    containers(0).expectMsg(runMessageConcurrent)
+  }
 }
 
 /**
@@ -497,6 +583,20 @@ class ContainerPoolObjectTests extends FlatSpec with 
Matchers with MockFactory {
                  active: Int = 0) =
     WarmedData(stub[Container], EntityName(namespace), action, lastUsed, 
active)
 
+  /** Helper to create WarmingData with sensible defaults */
+  def warmingData(action: ExecutableWhiskAction = createAction(),
+                  namespace: String = standardNamespace.asString,
+                  lastUsed: Instant = Instant.now,
+                  active: Int = 0) =
+    WarmingData(stub[Container], EntityName(namespace), action, lastUsed, 
active)
+
+  /** Helper to create WarmingData with sensible defaults */
+  def warmingColdData(action: ExecutableWhiskAction = createAction(),
+                      namespace: String = standardNamespace.asString,
+                      lastUsed: Instant = Instant.now,
+                      active: Int = 0) =
+    WarmingColdData(EntityName(namespace), action, lastUsed, active)
+
   /** Helper to create PreWarmedData with sensible defaults */
   def preWarmedData(kind: String = "anyKind") = PreWarmedData(stub[Container], 
kind, 256.MB)
 
@@ -588,6 +688,70 @@ class ContainerPoolObjectTests extends FlatSpec with 
Matchers with MockFactory {
 
   }
 
+  it should "use a warming when active activation count < maxconcurrent" in {
+    val concurrencyEnabled = 
Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean)
+    val maxConcurrent = if (concurrencyEnabled) 25 else 1
+
+    val action = createAction(limits = ActionLimits(concurrency = 
ConcurrencyLimit(maxConcurrent)))
+    val data = warmingData(active = maxConcurrent - 1, action = action)
+    val pool = Map('warming -> data)
+    ContainerPool.schedule(data.action, data.invocationNamespace, pool) 
shouldBe Some('warming, data)
+
+    val data2 = warmedData(active = maxConcurrent - 1, action = action)
+    val pool2 = pool ++ Map('warm -> data2)
+
+    ContainerPool.schedule(data2.action, data2.invocationNamespace, pool2) 
shouldBe Some('warm, data2)
+  }
+
+  it should "prefer warm to warming when active activation count < 
maxconcurrent" in {
+    val concurrencyEnabled = 
Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean)
+    val maxConcurrent = if (concurrencyEnabled) 25 else 1
+
+    val action = createAction(limits = ActionLimits(concurrency = 
ConcurrencyLimit(maxConcurrent)))
+    val data = warmingColdData(active = maxConcurrent - 1, action = action)
+    val data2 = warmedData(active = maxConcurrent - 1, action = action)
+    val pool = Map('warming -> data, 'warm -> data2)
+    ContainerPool.schedule(data.action, data.invocationNamespace, pool) 
shouldBe Some('warm, data2)
+  }
+
+  it should "use a warmingCold when active activation count < maxconcurrent" 
in {
+    val concurrencyEnabled = 
Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean)
+    val maxConcurrent = if (concurrencyEnabled) 25 else 1
+
+    val action = createAction(limits = ActionLimits(concurrency = 
ConcurrencyLimit(maxConcurrent)))
+    val data = warmingColdData(active = maxConcurrent - 1, action = action)
+    val pool = Map('warmingCold -> data)
+    ContainerPool.schedule(data.action, data.invocationNamespace, pool) 
shouldBe Some('warmingCold, data)
+
+    //after scheduling, the pool will update with new data to set active = 
maxConcurrent
+    val data2 = warmingColdData(active = maxConcurrent, action = action)
+    val pool2 = Map('warmingCold -> data2)
+
+    ContainerPool.schedule(data2.action, data2.invocationNamespace, pool2) 
shouldBe None
+  }
+
+  it should "prefer warm to warmingCold when active activation count < 
maxconcurrent" in {
+    val concurrencyEnabled = 
Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean)
+    val maxConcurrent = if (concurrencyEnabled) 25 else 1
+
+    val action = createAction(limits = ActionLimits(concurrency = 
ConcurrencyLimit(maxConcurrent)))
+    val data = warmingColdData(active = maxConcurrent - 1, action = action)
+    val data2 = warmedData(active = maxConcurrent - 1, action = action)
+    val pool = Map('warmingCold -> data, 'warm -> data2)
+    ContainerPool.schedule(data.action, data.invocationNamespace, pool) 
shouldBe Some('warm, data2)
+  }
+
+  it should "prefer warming to warmingCold when active activation count < 
maxconcurrent" in {
+    val concurrencyEnabled = 
Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean)
+    val maxConcurrent = if (concurrencyEnabled) 25 else 1
+
+    val action = createAction(limits = ActionLimits(concurrency = 
ConcurrencyLimit(maxConcurrent)))
+    val data = warmingColdData(active = maxConcurrent - 1, action = action)
+    val data2 = warmingData(active = maxConcurrent - 1, action = action)
+    val pool = Map('warmingCold -> data, 'warming -> data2)
+    ContainerPool.schedule(data.action, data.invocationNamespace, pool) 
shouldBe Some('warming, data2)
+  }
+
   behavior of "ContainerPool remove()"
 
   it should "not provide a container if pool is empty" in {
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
index 0477fcc..b51832b 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
@@ -127,13 +127,10 @@ class ContainerProxyTests
   }
 
   /** Run the common action on the state-machine, assumes good cases */
-  def run(machine: ActorRef, currentState: ContainerState, expectInit: Boolean 
= true) = {
+  def run(machine: ActorRef, currentState: ContainerState) = {
     machine ! Run(action, message)
     expectMsg(Transition(machine, currentState, Running))
-    if (expectInit) {
-      expectWarmed(invocationNamespace.name, action, 1)
-    }
-    expectWarmed(invocationNamespace.name, action, 0)
+    expectWarmed(invocationNamespace.name, action)
     expectMsg(Transition(machine, Running, Ready))
   }
 
@@ -143,16 +140,10 @@ class ContainerProxyTests
   }
 
   /** Expect a NeedWork message with warmed data */
-  def expectWarmed(namespace: String, action: ExecutableWhiskAction, count: 
Int) = {
+  def expectWarmed(namespace: String, action: ExecutableWhiskAction) = {
     val test = EntityName(namespace)
     expectMsgPF() {
-      case a @ NeedWork(WarmedData(_, `test`, `action`, _, _)) if 
a.data.activeActivationCount == count => //matched, otherwise will fail
-    }
-  }
-  def expectAnyWarmed(namespace: String, action: ExecutableWhiskAction) = {
-    val test = EntityName(namespace)
-    expectMsgPF() {
-      case a @ NeedWork(WarmedData(_, `test`, `action`, _, _)) =>
+      case a @ NeedWork(WarmedData(_, `test`, `action`, _, _)) => //matched, 
otherwise will fail
     }
   }
 
@@ -257,7 +248,7 @@ class ContainerProxyTests
     registerCallback(machine)
 
     preWarm(machine)
-    run(machine, Started, true)
+    run(machine, Started)
 
     // Timeout causes the container to pause
     timeout(machine)
@@ -303,7 +294,7 @@ class ContainerProxyTests
 
     run(machine, Started)
     // Note that there are no intermediate state changes
-    run(machine, Ready, false)
+    run(machine, Ready)
 
     awaitAssert {
       factory.calls should have size 1
@@ -359,7 +350,7 @@ class ContainerProxyTests
     run(machine, Started)
     timeout(machine)
     expectPause(machine)
-    run(machine, Paused, false)
+    run(machine, Paused)
 
     awaitAssert {
       factory.calls should have size 1
@@ -400,7 +391,7 @@ class ContainerProxyTests
             poolConfig,
             pauseGrace = pauseGrace))
     registerCallback(machine)
-    run(machine, Uninitialized, true)
+    run(machine, Uninitialized)
 
     awaitAssert {
       factory.calls should have size 1
@@ -443,8 +434,7 @@ class ContainerProxyTests
 
     machine ! Run(noLogsAction, message)
     expectMsg(Transition(machine, Uninitialized, Running))
-    expectWarmed(invocationNamespace.name, noLogsAction, 1)
-    expectWarmed(invocationNamespace.name, noLogsAction, 0)
+    expectWarmed(invocationNamespace.name, noLogsAction)
     expectMsg(Transition(machine, Running, Ready))
 
     awaitAssert {
@@ -505,17 +495,16 @@ class ContainerProxyTests
 
     //complete the init
     initPromise.success(initInterval)
-    expectWarmed(invocationNamespace.name, concurrentAction, 1) //when init 
completes
 
     //complete the first run
     runPromises(0).success(runInterval, ActivationResponse.success())
-    expectWarmed(invocationNamespace.name, concurrentAction, 0) //when first 
completes (count is 0 since stashed not counted)
+    expectWarmed(invocationNamespace.name, concurrentAction) //when first 
completes (count is 0 since stashed not counted)
     expectMsg(Transition(machine, Running, Ready)) //wait for first to 
complete to skip the delay step that can only reliably be tested in single 
threaded
     expectMsg(Transition(machine, Ready, Running)) //when second starts (after 
delay...)
 
     //complete the second run
     runPromises(1).success(runInterval, ActivationResponse.success())
-    expectWarmed(invocationNamespace.name, concurrentAction, 0) //when second 
completes
+    expectWarmed(invocationNamespace.name, concurrentAction) //when second 
completes
 
     //go back to ready after first and second runs are complete
     expectMsg(Transition(machine, Running, Ready))
@@ -536,13 +525,12 @@ class ContainerProxyTests
 
     //complete the fifth run (request new work, 1 active remain)
     runPromises(4).success(runInterval, ActivationResponse.success())
-    expectWarmed(invocationNamespace.name, concurrentAction, 1) //when fifth 
completes
+    expectWarmed(invocationNamespace.name, concurrentAction) //when fifth 
completes
 
     //complete the sixth run (request new work 0 active remain)
     runPromises(5).success(runInterval, ActivationResponse.success())
 
-    //expectWarmed(invocationNamespace.name, concurrentAction, 1) //when sixth 
completes
-    expectWarmed(invocationNamespace.name, concurrentAction, 0) //when sixth 
completes
+    expectWarmed(invocationNamespace.name, concurrentAction) //when sixth 
completes
 
     // back to ready
     expectMsg(Transition(machine, Running, Ready))
@@ -611,7 +599,7 @@ class ContainerProxyTests
 
     // Note that there are no intermediate state changes
     //second one will succeed
-    run(machine, Ready, false)
+    run(machine, Ready)
 
     //With exception of the error on first run, the assertions should be the 
same as in
     //         `run an action and continue with a next run without pausing the 
container`
@@ -763,7 +751,6 @@ class ContainerProxyTests
     registerCallback(machine)
     machine ! Run(action, message)
     expectMsg(Transition(machine, Uninitialized, Running))
-    expectWarmed(invocationNamespace.name, action, 1)
     expectMsg(ContainerRemoved) // The message is sent as soon as the 
container decides to destroy itself
     expectMsg(Transition(machine, Running, Removing))
 
@@ -802,7 +789,6 @@ class ContainerProxyTests
     registerCallback(machine)
     machine ! Run(action, message)
     expectMsg(Transition(machine, Uninitialized, Running))
-    expectWarmed(invocationNamespace.name, action, 1)
     expectMsg(ContainerRemoved) // The message is sent as soon as the 
container decides to destroy itself
     expectMsg(Transition(machine, Running, Removing))
 
@@ -840,7 +826,6 @@ class ContainerProxyTests
     registerCallback(machine)
     machine ! Run(action, message)
     expectMsg(Transition(machine, Uninitialized, Running))
-    expectWarmed(invocationNamespace.name, action, 1)
     expectMsg(ContainerRemoved) // The message is sent as soon as the 
container decides to destroy itself
     expectMsg(Transition(machine, Running, Removing))
 
@@ -978,8 +963,7 @@ class ContainerProxyTests
 
     // Finish /init, note that /run and log-collecting happens nonetheless
     initPromise.success(Interval.zero)
-    expectWarmed(invocationNamespace.name, action, 1)
-    expectWarmed(invocationNamespace.name, action, 0)
+    expectWarmed(invocationNamespace.name, action)
     expectMsg(Transition(machine, Running, Ready))
 
     // Remove the container after the transaction finished
@@ -1062,7 +1046,7 @@ class ContainerProxyTests
   class TestContainer(initPromise: Option[Promise[Interval]] = None,
                       runPromises: Seq[Promise[(Interval, 
ActivationResponse)]] = Seq.empty)
       extends Container {
-    protected val id = ContainerId("testcontainer")
+    protected[core] val id = ContainerId("testcontainer")
     protected val addr = ContainerAddress("0.0.0.0")
     protected implicit val logging: Logging = log
     protected implicit val ec: ExecutionContext = system.dispatcher
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/limits/ConcurrencyTests.scala 
b/tests/src/test/scala/org/apache/openwhisk/core/limits/ConcurrencyTests.scala
index 458f1a2..eb0e110 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/limits/ConcurrencyTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/limits/ConcurrencyTests.scala
@@ -137,7 +137,7 @@ class ConcurrencyTests extends TestHelpers with 
WskTestHelpers with WskActorSyst
       }
 
       //none of the actions will complete till the requestCount is reached
-      Await.result(Future.sequence(runs), 30.seconds).foreach { run =>
+      Await.result(Future.sequence(runs), 50.seconds).foreach { run =>
         withActivation(wsk.activation, run) { response =>
           val logs = response.logs.get
           withClue(logs) { logs.size shouldBe 0 }

Reply via email to