This is an automated email from the ASF dual-hosted git repository.
dgrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new da21c9f Track activation counts in ContainerPool (not ContainerProxy)
(#4186)
da21c9f is described below
commit da21c9fe49b2ae72c95b6866b30d984c65253724
Author: tysonnorris <[email protected]>
AuthorDate: Mon Feb 11 11:45:20 2019 -0800
Track activation counts in ContainerPool (not ContainerProxy) (#4186)
Track activation counts in ContainerPool (not ContainerProxy) so that
Prewam/cold -> Warm can be tracked as Warming and receive activations to avoid
spawning extra containers
---
.../openwhisk/core/containerpool/Container.scala | 4 +
.../core/containerpool/ContainerPool.scala | 82 ++++++----
.../core/containerpool/ContainerProxy.scala | 171 ++++++++++++++++-----
.../containerpool/test/ContainerPoolTests.scala | 164 ++++++++++++++++++++
.../containerpool/test/ContainerProxyTests.scala | 48 ++----
.../openwhisk/core/limits/ConcurrencyTests.scala | 2 +-
6 files changed, 368 insertions(+), 103 deletions(-)
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
index 354ec38..1da3e2f 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
@@ -223,6 +223,10 @@ trait Container {
private def closeConnections(toClose: Option[ContainerClient]): Future[Unit]
= {
toClose.map(_.close()).getOrElse(Future.successful(()))
}
+
+ /** This is so that we can easily log the container id during
ContainerPool.logContainerStart().
+ * Null check is here since some tests use stub[Container] so id is null
during those tests. */
+ override def toString() = if (id == null) "no-container-id" else id.toString
}
/** Indicates a general error with the container */
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
index 81c5427..390eea8 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala
@@ -22,7 +22,6 @@ import org.apache.openwhisk.common.{AkkaLogging,
LoggingMarkers, TransactionId}
import org.apache.openwhisk.core.connector.MessageFeed
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
-
import scala.collection.immutable
import scala.concurrent.duration._
import scala.util.Try
@@ -79,7 +78,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
}
}
- def logContainerStart(r: Run, containerState: String, activeActivations:
Int): Unit = {
+ def logContainerStart(r: Run, containerState: String, activeActivations:
Int, container: Option[Container]): Unit = {
val namespaceName = r.msg.user.namespace.name
val actionName = r.action.name.name
val maxConcurrent = r.action.limits.concurrency.maxConcurrent
@@ -88,7 +87,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
r.msg.transid.mark(
this,
LoggingMarkers.INVOKER_CONTAINER_START(containerState),
- s"containerStart containerState: $containerState ($activeActivations of
max $maxConcurrent) action: $actionName namespace: $namespaceName activationId:
$activationId",
+ s"containerStart containerState: $containerState container: $container
activations: $activeActivations of max $maxConcurrent action: $actionName
namespace: $namespaceName activationId: $activationId",
akka.event.Logging.InfoLevel)
}
@@ -112,9 +111,9 @@ class ContainerPool(childFactory: ActorRefFactory =>
ActorRef,
// Schedule a job to a warm container
ContainerPool
.schedule(r.action, r.msg.user.namespace.name, freePool)
- .map(container => (container, "warm"))
+ .map(container => (container, container._2.initingState))
//warmed, warming, and warmingCold always know their state
.orElse(
- // There was no warm container. Try to take a prewarm
container or a cold container.
+ // There was no warm/warming/warmingCold container. Try to
take a prewarm container or a cold container.
// Is there enough space to create a new container or do other
containers have to be removed?
if (hasPoolSpaceFor(busyPool ++ freePool,
r.action.limits.memory.megabytes.MB)) {
@@ -140,15 +139,26 @@ class ContainerPool(childFactory: ActorRefFactory =>
ActorRef,
createdContainer match {
case Some(((actor, data), containerState)) =>
+ //increment active count before storing in pool map
+ val newData = data.nextRun(r)
+ val container = newData.getContainer
+
+ if (newData.activeActivationCount < 1) {
+ logging.error(this, s"invalid activation count < 1 ${newData}")
+ }
+
//only move to busyPool if max reached
- if (data.activeActivationCount + 1 >=
r.action.limits.concurrency.maxConcurrent) {
+ if (!newData.hasCapacity()) {
if (r.action.limits.concurrency.maxConcurrent > 1) {
logging.info(
this,
- s"container for ${r.action} is now busy with
${data.activeActivationCount + 1} activations")
+ s"container ${container} is now busy with
${newData.activeActivationCount} activations")
}
- busyPool = busyPool + (actor -> data)
+ busyPool = busyPool + (actor -> newData)
freePool = freePool - actor
+ } else {
+ //update freePool to track counts
+ freePool = freePool + (actor -> newData)
}
// Remove the action that get's executed now from the buffer and
execute the next one afterwards.
if (isResentFromBuffer) {
@@ -159,7 +169,7 @@ class ContainerPool(childFactory: ActorRefFactory =>
ActorRef,
runBuffer.dequeueOption.foreach { case (run, _) => self ! run }
}
actor ! r // forwards the run request to the container
- logContainerStart(r, containerState, data.activeActivationCount)
+ logContainerStart(r, containerState,
newData.activeActivationCount, container)
case None =>
// this can also happen if createContainer fails to start a new
container, or
// if a job is rescheduled but the container it was allocated to
has not yet destroyed itself
@@ -193,34 +203,27 @@ class ContainerPool(childFactory: ActorRefFactory =>
ActorRef,
}
// Container is free to take more work
- case NeedWork(data: WarmedData) =>
+ case NeedWork(warmData: WarmedData) =>
feed ! MessageFeed.Processed
- if (data.activeActivationCount <
data.action.limits.concurrency.maxConcurrent) {
+ val oldData = freePool.get(sender()).getOrElse(busyPool(sender()))
+ val newData = warmData.copy(activeActivationCount =
oldData.activeActivationCount - 1)
+ if (newData.activeActivationCount < 0) {
+ logging.error(this, s"invalid activation count after warming < 1
${newData}")
+ }
+ if (newData.hasCapacity()) {
//remove from busy pool (may already not be there), put back into free
pool (to update activation counts)
- freePool = freePool + (sender() -> data)
+ freePool = freePool + (sender() -> newData)
if (busyPool.contains(sender())) {
busyPool = busyPool - sender()
- if (data.action.limits.concurrency.maxConcurrent > 1) {
+ if (newData.action.limits.concurrency.maxConcurrent > 1) {
logging.info(
this,
- s"container for ${data.action} is no longer busy with
${data.activeActivationCount} activations")
+ s"concurrent container ${newData.container} is no longer busy
with ${newData.activeActivationCount} activations")
}
}
} else {
- //update freePool IFF it was previously PreWarmedData (it is still
free, but now has WarmedData)
- //otherwise update busyPool to reflect the updated activation counts
- freePool.get(sender()) match {
- case Some(_: PreWarmedData) =>
- freePool = freePool + (sender() -> data)
- case None =>
- if (data.action.limits.concurrency.maxConcurrent > 1) {
- logging.info(
- this,
- s"container for ${data.action} is now busy with
${data.activeActivationCount} activations")
- }
- busyPool = busyPool + (sender() -> data)
- case _ => //was free+WarmedData - do nothing
- }
+ busyPool = busyPool + (sender() -> newData)
+ freePool = freePool - sender()
}
// Container is prewarmed and ready to take work
@@ -342,12 +345,23 @@ object ContainerPool {
protected[containerpool] def schedule[A](action: ExecutableWhiskAction,
invocationNamespace: EntityName,
idles: Map[A, ContainerData]):
Option[(A, ContainerData)] = {
- idles.find {
- case (_, WarmedData(_, `invocationNamespace`, `action`, _,
activeActivationCount))
- if activeActivationCount < action.limits.concurrency.maxConcurrent =>
- true
- case _ => false
- }
+ idles
+ .find {
+ case (_, c @ WarmedData(_, `invocationNamespace`, `action`, _, _)) if
c.hasCapacity() => true
+ case _
=> false
+ }
+ .orElse {
+ idles.find {
+ case (_, c @ WarmingData(_, `invocationNamespace`, `action`, _, _))
if c.hasCapacity() => true
+ case _
=> false
+ }
+ }
+ .orElse {
+ idles.find {
+ case (_, c @ WarmingColdData(`invocationNamespace`, `action`, _, _))
if c.hasCapacity() => true
+ case _
=> false
+ }
+ }
}
/**
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
index 0fdea5e..7f45cc2 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
@@ -18,13 +18,11 @@
package org.apache.openwhisk.core.containerpool
import java.time.Instant
-
import akka.actor.Status.{Failure => FailureMessage}
import akka.actor.{FSM, Props, Stash}
import akka.event.Logging.InfoLevel
import akka.pattern.pipe
import pureconfig.loadConfigOrThrow
-
import scala.collection.immutable
import spray.json.DefaultJsonProtocol._
import spray.json._
@@ -38,7 +36,6 @@ import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.invoker.InvokerReactive.ActiveAck
import org.apache.openwhisk.http.Messages
-
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.{Failure, Success}
@@ -55,26 +52,115 @@ case object Paused extends ContainerState
case object Removing extends ContainerState
// Data
-sealed abstract class ContainerData(val lastUsed: Instant,
- val memoryLimit: ByteSize,
- val activeActivationCount: Int = 0)
-case class NoData() extends ContainerData(Instant.EPOCH, 0.B)
-case class MemoryData(override val memoryLimit: ByteSize) extends
ContainerData(Instant.EPOCH, memoryLimit)
-case class PreWarmedData(container: Container,
+/** Base data type */
+sealed abstract class ContainerData(val lastUsed: Instant, val memoryLimit:
ByteSize, val activeActivationCount: Int) {
+
+ /** When ContainerProxy in this state is scheduled, it may result in a new
state (ContainerData)*/
+ def nextRun(r: Run): ContainerData
+
+ /**
+ * Return Some(container) (for ContainerStarted instances) or None(for
ContainerNotStarted instances)
+ * Useful for cases where all ContainerData instances are handled, vs cases
where only ContainerStarted
+ * instances are handled */
+ def getContainer: Option[Container]
+
+ /** String to indicate the state of this container after scheduling */
+ val initingState: String
+
+ /** Inidicates whether this container can service additional activations */
+ def hasCapacity(): Boolean
+}
+
+/** abstract type to indicate an unstarted container */
+sealed abstract class ContainerNotStarted(override val lastUsed: Instant,
+ override val memoryLimit: ByteSize,
+ override val activeActivationCount:
Int)
+ extends ContainerData(lastUsed, memoryLimit, activeActivationCount) {
+ override def getContainer = None
+ override val initingState = "cold"
+}
+
+/** abstract type to indicate a started container */
+sealed abstract class ContainerStarted(val container: Container,
+ override val lastUsed: Instant,
+ override val memoryLimit: ByteSize,
+ override val activeActivationCount: Int)
+ extends ContainerData(lastUsed, memoryLimit, activeActivationCount) {
+ override def getContainer = Some(container)
+}
+
+/** trait representing a container that is in use and (potentially) usable by
subsequent or concurrent activations */
+sealed abstract trait ContainerInUse {
+ val activeActivationCount: Int
+ val action: ExecutableWhiskAction
+ def hasCapacity() =
+ activeActivationCount < action.limits.concurrency.maxConcurrent
+}
+
+/** trait representing a container that is NOT in use and is usable by
subsequent activation(s) */
+sealed abstract trait ContainerNotInUse {
+ def hasCapacity() = true
+}
+
+/** type representing a cold (not running) container */
+case class NoData(override val activeActivationCount: Int = 0)
+ extends ContainerNotStarted(Instant.EPOCH, 0.B, activeActivationCount)
+ with ContainerNotInUse {
+ override def nextRun(r: Run) = WarmingColdData(r.msg.user.namespace.name,
r.action, Instant.now, 1)
+}
+
+/** type representing a cold (not running) container with specific memory
allocation */
+case class MemoryData(override val memoryLimit: ByteSize, override val
activeActivationCount: Int = 0)
+ extends ContainerNotStarted(Instant.EPOCH, memoryLimit,
activeActivationCount)
+ with ContainerNotInUse {
+ override def nextRun(r: Run) = WarmingColdData(r.msg.user.namespace.name,
r.action, Instant.now, 1)
+}
+
+/** type representing a prewarmed (running, but unused) container (with a
specific memory allocation) */
+case class PreWarmedData(override val container: Container,
kind: String,
override val memoryLimit: ByteSize,
override val activeActivationCount: Int = 0)
- extends ContainerData(Instant.EPOCH, memoryLimit, activeActivationCount)
-case class WarmedData(container: Container,
+ extends ContainerStarted(container, Instant.EPOCH, memoryLimit,
activeActivationCount)
+ with ContainerNotInUse {
+ override val initingState = "prewarmed"
+ override def nextRun(r: Run) =
+ WarmingData(container, r.msg.user.namespace.name, r.action, Instant.now, 1)
+}
+
+/** type representing a prewarm (running, but not used) container that is
being initialized (for a specific action + invocation namespace) */
+case class WarmingData(override val container: Container,
+ invocationNamespace: EntityName,
+ action: ExecutableWhiskAction,
+ override val lastUsed: Instant,
+ override val activeActivationCount: Int = 0)
+ extends ContainerStarted(container, lastUsed,
action.limits.memory.megabytes.MB, activeActivationCount)
+ with ContainerInUse {
+ override val initingState = "warming"
+ override def nextRun(r: Run) = copy(activeActivationCount =
activeActivationCount + 1)
+}
+
+/** type representing a cold (not yet running) container that is being
initialized (for a specific action + invocation namespace) */
+case class WarmingColdData(invocationNamespace: EntityName,
+ action: ExecutableWhiskAction,
+ override val lastUsed: Instant,
+ override val activeActivationCount: Int = 0)
+ extends ContainerNotStarted(lastUsed, action.limits.memory.megabytes.MB,
activeActivationCount)
+ with ContainerInUse {
+ override val initingState = "warmingCold"
+ override def nextRun(r: Run) = copy(activeActivationCount =
activeActivationCount + 1)
+}
+
+/** type representing a warm container that has already been in use (for a
specific action + invocation namespace) */
+case class WarmedData(override val container: Container,
invocationNamespace: EntityName,
action: ExecutableWhiskAction,
override val lastUsed: Instant,
override val activeActivationCount: Int = 0)
- extends ContainerData(lastUsed, action.limits.memory.megabytes.MB) {
- def incrementActive: WarmedData =
- WarmedData(container, invocationNamespace, action, Instant.now,
activeActivationCount + 1)
- def decrementActive: WarmedData =
- WarmedData(container, invocationNamespace, action, Instant.now,
activeActivationCount - 1)
+ extends ContainerStarted(container, lastUsed,
action.limits.memory.megabytes.MB, activeActivationCount)
+ with ContainerInUse {
+ override val initingState = "warmed"
+ override def nextRun(r: Run) = copy(activeActivationCount =
activeActivationCount + 1)
}
// Events received by the actor
@@ -142,6 +228,9 @@ class ContainerProxy(
implicit val logging = new AkkaLogging(context.system.log)
var rescheduleJob = false // true iff actor receives a job but cannot
process it because actor will destroy itself
var runBuffer = immutable.Queue.empty[Run] //does not retain order, but does
manage jobs that would have pushed past action concurrency limit
+
+ //keep a separate count to avoid confusion with
ContainerState.activeActivationCount that is tracked/modified only in
ContainerPool
+ var activeCount = 0;
startWith(Uninitialized, NoData())
when(Uninitialized) {
@@ -162,7 +251,7 @@ class ContainerProxy(
// cold start (no container to reuse or available stem cell container)
case Event(job: Run, _) =>
implicit val transid = job.msg.transid
-
+ activeCount += 1
// create a new container
val container = factory(
job.msg.transid,
@@ -233,6 +322,7 @@ class ContainerProxy(
when(Started) {
case Event(job: Run, data: PreWarmedData) =>
implicit val transid = job.msg.transid
+ activeCount += 1
initializeAndRun(data.container, job)
.map(_ => RunCompleted)
.pipeTo(self)
@@ -248,43 +338,52 @@ class ContainerProxy(
// Init was successful
case Event(completed: InitCompleted, _: PreWarmedData) =>
- //in case concurrency supported, multiple runs can begin as soon as init
is complete
- context.parent ! NeedWork(completed.data)
stay using completed.data
+ // Init was successful
+ case Event(data: WarmedData, _: PreWarmedData) =>
+ //in case concurrency supported, multiple runs can begin as soon as init
is complete
+ context.parent ! NeedWork(data)
+ stay using data
+
// Run was successful
- case Event(RunCompleted, s: WarmedData) =>
- val newData = s.decrementActive
+ case Event(RunCompleted, data: WarmedData) =>
+ activeCount -= 1
//if there are items in runbuffer, process them if there is capacity,
and stay; otherwise if we have any pending activations, also stay
- if (requestWork(newData) || newData.activeActivationCount > 0) {
- stay using newData
+ if (requestWork(data) || activeCount > 0) {
+ stay using data
} else {
- goto(Ready) using newData
+ goto(Ready) using data
}
case Event(job: Run, data: WarmedData)
- if stateData.activeActivationCount >=
data.action.limits.concurrency.maxConcurrent && !rescheduleJob => //if we are
over concurrency limit, and not a failure on resume
+ if activeCount >= data.action.limits.concurrency.maxConcurrent &&
!rescheduleJob => //if we are over concurrency limit, and not a failure on
resume
+ logging.warn(this, s"buffering for container ${data.container};
${activeCount} activations in flight")
runBuffer = runBuffer.enqueue(job)
stay()
case Event(job: Run, data: WarmedData)
- if stateData.activeActivationCount <
data.action.limits.concurrency.maxConcurrent && !rescheduleJob => //if there
was a delay, and not a failure on resume, skip the run
-
+ if activeCount < data.action.limits.concurrency.maxConcurrent &&
!rescheduleJob => //if there was a delay, and not a failure on resume, skip the
run
+ activeCount += 1
implicit val transid = job.msg.transid
- val newData = data.incrementActive
initializeAndRun(data.container, job)
.map(_ => RunCompleted)
.pipeTo(self)
- stay() using newData
+ stay() using data
// Failed after /init (the first run failed)
- case Event(_: FailureMessage, data: PreWarmedData) =>
destroyContainer(data.container)
+ case Event(_: FailureMessage, data: PreWarmedData) =>
+ activeCount -= 1
+ destroyContainer(data.container)
// Failed for a subsequent /run
- case Event(_: FailureMessage, data: WarmedData) =>
destroyContainer(data.container)
+ case Event(_: FailureMessage, data: WarmedData) =>
+ activeCount -= 1
+ destroyContainer(data.container)
// Failed at getting a container for a cold-start run
case Event(_: FailureMessage, _) =>
+ activeCount -= 1
context.parent ! ContainerRemoved
rejectBuffered()
stop()
@@ -295,13 +394,13 @@ class ContainerProxy(
when(Ready, stateTimeout = pauseGrace) {
case Event(job: Run, data: WarmedData) =>
implicit val transid = job.msg.transid
- val newData = data.incrementActive
+ activeCount += 1
initializeAndRun(data.container, job)
.map(_ => RunCompleted)
.pipeTo(self)
- goto(Running) using newData
+ goto(Running) using data
// pause grace timed out
case Event(StateTimeout, data: WarmedData) =>
@@ -320,7 +419,7 @@ class ContainerProxy(
when(Paused, stateTimeout = unusedTimeout) {
case Event(job: Run, data: WarmedData) =>
implicit val transid = job.msg.transid
- val newData = data.incrementActive
+ activeCount += 1
data.container
.resume()
@@ -336,7 +435,7 @@ class ContainerProxy(
.map(_ => RunCompleted)
.pipeTo(self)
- goto(Running) using newData
+ goto(Running) using data
// container is reclaimed by the pool or it has become too old
case Event(StateTimeout | Remove, data: WarmedData) =>
@@ -366,7 +465,7 @@ class ContainerProxy(
/** Either process runbuffer or signal parent to send work; return true if
runbuffer is being processed */
def requestWork(newData: WarmedData): Boolean = {
//if there is concurrency capacity, process runbuffer, or signal NeedWork
- if (newData.activeActivationCount <
newData.action.limits.concurrency.maxConcurrent) {
+ if (activeCount < newData.action.limits.concurrency.maxConcurrent) {
runBuffer.dequeueOption match {
case Some((run, q)) =>
runBuffer = q
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
index 5e65e4c..431b3f2 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
@@ -86,6 +86,12 @@ class ContainerPoolTests
val invocationNamespace = EntityName("invocationSpace")
val differentInvocationNamespace = EntityName("invocationSpace2")
val action = ExecutableWhiskAction(EntityPath("actionSpace"),
EntityName("actionName"), exec)
+ val concurrencyEnabled =
Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean)
+ val concurrentAction = ExecutableWhiskAction(
+ EntityPath("actionSpace"),
+ EntityName("actionName"),
+ exec,
+ limits = ActionLimits(concurrency = ConcurrencyLimit(if
(concurrencyEnabled) 3 else 1)))
val differentAction = action.copy(name = EntityName("actionName2"))
val largeAction =
action.copy(
@@ -98,6 +104,8 @@ class ContainerPoolTests
val runMessageDifferentVersion =
createRunMessage(action.copy().revision(DocRevision("v2")), invocationNamespace)
val runMessageDifferentNamespace = createRunMessage(action,
differentInvocationNamespace)
val runMessageDifferentEverything = createRunMessage(differentAction,
differentInvocationNamespace)
+ val runMessageConcurrent = createRunMessage(concurrentAction,
invocationNamespace)
+ val runMessageConcurrentDifferentNamespace =
createRunMessage(concurrentAction, differentInvocationNamespace)
/** Helper to create PreWarmedData */
def preWarmedData(kind: String, memoryLimit: ByteSize = memoryLimit) =
@@ -471,6 +479,84 @@ class ContainerPoolTests
containers(4).send(pool, NeedWork(warmedData()))
feed.expectMsg(MessageFeed.Processed)
}
+
+ it should "increase activation counts when scheduling to containers whose
actions support concurrency" in {
+ assume(concurrencyEnabled)
+ val (containers, factory) = testContainers(2)
+ val feed = TestProbe()
+
+ val pool = system.actorOf(ContainerPool.props(factory,
poolConfig(MemoryLimit.stdMemory * 4), feed.ref))
+
+ // container0 is created and used
+ pool ! runMessageConcurrent
+ containers(0).expectMsg(runMessageConcurrent)
+
+ // container0 is reused
+ pool ! runMessageConcurrent
+ containers(0).expectMsg(runMessageConcurrent)
+
+ // container0 is reused
+ pool ! runMessageConcurrent
+ containers(0).expectMsg(runMessageConcurrent)
+
+ // container1 is created and used (these concurrent containers are
configured with max 3 concurrent activations)
+ pool ! runMessageConcurrent
+ containers(1).expectMsg(runMessageConcurrent)
+ }
+
+ it should "schedule concurrent activations to different containers for
different namespaces" in {
+ assume(concurrencyEnabled)
+ val (containers, factory) = testContainers(2)
+ val feed = TestProbe()
+
+ val pool = system.actorOf(ContainerPool.props(factory,
poolConfig(MemoryLimit.stdMemory * 4), feed.ref))
+
+ // container0 is created and used
+ pool ! runMessageConcurrent
+ containers(0).expectMsg(runMessageConcurrent)
+
+ // container1 is created and used
+ pool ! runMessageConcurrentDifferentNamespace
+ containers(1).expectMsg(runMessageConcurrentDifferentNamespace)
+ }
+
+ it should "decrease activation counts when receiving NeedWork for actions
that support concurrency" in {
+ assume(concurrencyEnabled)
+ val (containers, factory) = testContainers(2)
+ val feed = TestProbe()
+
+ val pool = system.actorOf(ContainerPool.props(factory,
poolConfig(MemoryLimit.stdMemory * 4), feed.ref))
+
+ // container0 is created and used
+ pool ! runMessageConcurrent
+ containers(0).expectMsg(runMessageConcurrent)
+
+ // container0 is reused
+ pool ! runMessageConcurrent
+ containers(0).expectMsg(runMessageConcurrent)
+
+ // container0 is reused
+ pool ! runMessageConcurrent
+ containers(0).expectMsg(runMessageConcurrent)
+
+ // container1 is created and used (these concurrent containers are
configured with max 3 concurrent activations)
+ pool ! runMessageConcurrent
+ containers(1).expectMsg(runMessageConcurrent)
+
+ // container1 is reused
+ pool ! runMessageConcurrent
+ containers(1).expectMsg(runMessageConcurrent)
+
+ // container1 is reused
+ pool ! runMessageConcurrent
+ containers(1).expectMsg(runMessageConcurrent)
+
+ containers(0).send(pool, NeedWork(warmedData(action = concurrentAction)))
+
+ // container0 is reused (since active count decreased)
+ pool ! runMessageConcurrent
+ containers(0).expectMsg(runMessageConcurrent)
+ }
}
/**
@@ -497,6 +583,20 @@ class ContainerPoolObjectTests extends FlatSpec with
Matchers with MockFactory {
active: Int = 0) =
WarmedData(stub[Container], EntityName(namespace), action, lastUsed,
active)
+ /** Helper to create WarmingData with sensible defaults */
+ def warmingData(action: ExecutableWhiskAction = createAction(),
+ namespace: String = standardNamespace.asString,
+ lastUsed: Instant = Instant.now,
+ active: Int = 0) =
+ WarmingData(stub[Container], EntityName(namespace), action, lastUsed,
active)
+
+ /** Helper to create WarmingData with sensible defaults */
+ def warmingColdData(action: ExecutableWhiskAction = createAction(),
+ namespace: String = standardNamespace.asString,
+ lastUsed: Instant = Instant.now,
+ active: Int = 0) =
+ WarmingColdData(EntityName(namespace), action, lastUsed, active)
+
/** Helper to create PreWarmedData with sensible defaults */
def preWarmedData(kind: String = "anyKind") = PreWarmedData(stub[Container],
kind, 256.MB)
@@ -588,6 +688,70 @@ class ContainerPoolObjectTests extends FlatSpec with
Matchers with MockFactory {
}
+ it should "use a warming when active activation count < maxconcurrent" in {
+ val concurrencyEnabled =
Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean)
+ val maxConcurrent = if (concurrencyEnabled) 25 else 1
+
+ val action = createAction(limits = ActionLimits(concurrency =
ConcurrencyLimit(maxConcurrent)))
+ val data = warmingData(active = maxConcurrent - 1, action = action)
+ val pool = Map('warming -> data)
+ ContainerPool.schedule(data.action, data.invocationNamespace, pool)
shouldBe Some('warming, data)
+
+ val data2 = warmedData(active = maxConcurrent - 1, action = action)
+ val pool2 = pool ++ Map('warm -> data2)
+
+ ContainerPool.schedule(data2.action, data2.invocationNamespace, pool2)
shouldBe Some('warm, data2)
+ }
+
+ it should "prefer warm to warming when active activation count <
maxconcurrent" in {
+ val concurrencyEnabled =
Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean)
+ val maxConcurrent = if (concurrencyEnabled) 25 else 1
+
+ val action = createAction(limits = ActionLimits(concurrency =
ConcurrencyLimit(maxConcurrent)))
+ val data = warmingColdData(active = maxConcurrent - 1, action = action)
+ val data2 = warmedData(active = maxConcurrent - 1, action = action)
+ val pool = Map('warming -> data, 'warm -> data2)
+ ContainerPool.schedule(data.action, data.invocationNamespace, pool)
shouldBe Some('warm, data2)
+ }
+
+ it should "use a warmingCold when active activation count < maxconcurrent"
in {
+ val concurrencyEnabled =
Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean)
+ val maxConcurrent = if (concurrencyEnabled) 25 else 1
+
+ val action = createAction(limits = ActionLimits(concurrency =
ConcurrencyLimit(maxConcurrent)))
+ val data = warmingColdData(active = maxConcurrent - 1, action = action)
+ val pool = Map('warmingCold -> data)
+ ContainerPool.schedule(data.action, data.invocationNamespace, pool)
shouldBe Some('warmingCold, data)
+
+ //after scheduling, the pool will update with new data to set active =
maxConcurrent
+ val data2 = warmingColdData(active = maxConcurrent, action = action)
+ val pool2 = Map('warmingCold -> data2)
+
+ ContainerPool.schedule(data2.action, data2.invocationNamespace, pool2)
shouldBe None
+ }
+
+ it should "prefer warm to warmingCold when active activation count <
maxconcurrent" in {
+ val concurrencyEnabled =
Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean)
+ val maxConcurrent = if (concurrencyEnabled) 25 else 1
+
+ val action = createAction(limits = ActionLimits(concurrency =
ConcurrencyLimit(maxConcurrent)))
+ val data = warmingColdData(active = maxConcurrent - 1, action = action)
+ val data2 = warmedData(active = maxConcurrent - 1, action = action)
+ val pool = Map('warmingCold -> data, 'warm -> data2)
+ ContainerPool.schedule(data.action, data.invocationNamespace, pool)
shouldBe Some('warm, data2)
+ }
+
+ it should "prefer warming to warmingCold when active activation count <
maxconcurrent" in {
+ val concurrencyEnabled =
Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean)
+ val maxConcurrent = if (concurrencyEnabled) 25 else 1
+
+ val action = createAction(limits = ActionLimits(concurrency =
ConcurrencyLimit(maxConcurrent)))
+ val data = warmingColdData(active = maxConcurrent - 1, action = action)
+ val data2 = warmingData(active = maxConcurrent - 1, action = action)
+ val pool = Map('warmingCold -> data, 'warming -> data2)
+ ContainerPool.schedule(data.action, data.invocationNamespace, pool)
shouldBe Some('warming, data2)
+ }
+
behavior of "ContainerPool remove()"
it should "not provide a container if pool is empty" in {
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
index 0477fcc..b51832b 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
@@ -127,13 +127,10 @@ class ContainerProxyTests
}
/** Run the common action on the state-machine, assumes good cases */
- def run(machine: ActorRef, currentState: ContainerState, expectInit: Boolean
= true) = {
+ def run(machine: ActorRef, currentState: ContainerState) = {
machine ! Run(action, message)
expectMsg(Transition(machine, currentState, Running))
- if (expectInit) {
- expectWarmed(invocationNamespace.name, action, 1)
- }
- expectWarmed(invocationNamespace.name, action, 0)
+ expectWarmed(invocationNamespace.name, action)
expectMsg(Transition(machine, Running, Ready))
}
@@ -143,16 +140,10 @@ class ContainerProxyTests
}
/** Expect a NeedWork message with warmed data */
- def expectWarmed(namespace: String, action: ExecutableWhiskAction, count:
Int) = {
+ def expectWarmed(namespace: String, action: ExecutableWhiskAction) = {
val test = EntityName(namespace)
expectMsgPF() {
- case a @ NeedWork(WarmedData(_, `test`, `action`, _, _)) if
a.data.activeActivationCount == count => //matched, otherwise will fail
- }
- }
- def expectAnyWarmed(namespace: String, action: ExecutableWhiskAction) = {
- val test = EntityName(namespace)
- expectMsgPF() {
- case a @ NeedWork(WarmedData(_, `test`, `action`, _, _)) =>
+ case a @ NeedWork(WarmedData(_, `test`, `action`, _, _)) => //matched,
otherwise will fail
}
}
@@ -257,7 +248,7 @@ class ContainerProxyTests
registerCallback(machine)
preWarm(machine)
- run(machine, Started, true)
+ run(machine, Started)
// Timeout causes the container to pause
timeout(machine)
@@ -303,7 +294,7 @@ class ContainerProxyTests
run(machine, Started)
// Note that there are no intermediate state changes
- run(machine, Ready, false)
+ run(machine, Ready)
awaitAssert {
factory.calls should have size 1
@@ -359,7 +350,7 @@ class ContainerProxyTests
run(machine, Started)
timeout(machine)
expectPause(machine)
- run(machine, Paused, false)
+ run(machine, Paused)
awaitAssert {
factory.calls should have size 1
@@ -400,7 +391,7 @@ class ContainerProxyTests
poolConfig,
pauseGrace = pauseGrace))
registerCallback(machine)
- run(machine, Uninitialized, true)
+ run(machine, Uninitialized)
awaitAssert {
factory.calls should have size 1
@@ -443,8 +434,7 @@ class ContainerProxyTests
machine ! Run(noLogsAction, message)
expectMsg(Transition(machine, Uninitialized, Running))
- expectWarmed(invocationNamespace.name, noLogsAction, 1)
- expectWarmed(invocationNamespace.name, noLogsAction, 0)
+ expectWarmed(invocationNamespace.name, noLogsAction)
expectMsg(Transition(machine, Running, Ready))
awaitAssert {
@@ -505,17 +495,16 @@ class ContainerProxyTests
//complete the init
initPromise.success(initInterval)
- expectWarmed(invocationNamespace.name, concurrentAction, 1) //when init
completes
//complete the first run
runPromises(0).success(runInterval, ActivationResponse.success())
- expectWarmed(invocationNamespace.name, concurrentAction, 0) //when first
completes (count is 0 since stashed not counted)
+ expectWarmed(invocationNamespace.name, concurrentAction) //when first
completes (count is 0 since stashed not counted)
expectMsg(Transition(machine, Running, Ready)) //wait for first to
complete to skip the delay step that can only reliably be tested in single
threaded
expectMsg(Transition(machine, Ready, Running)) //when second starts (after
delay...)
//complete the second run
runPromises(1).success(runInterval, ActivationResponse.success())
- expectWarmed(invocationNamespace.name, concurrentAction, 0) //when second
completes
+ expectWarmed(invocationNamespace.name, concurrentAction) //when second
completes
//go back to ready after first and second runs are complete
expectMsg(Transition(machine, Running, Ready))
@@ -536,13 +525,12 @@ class ContainerProxyTests
//complete the fifth run (request new work, 1 active remain)
runPromises(4).success(runInterval, ActivationResponse.success())
- expectWarmed(invocationNamespace.name, concurrentAction, 1) //when fifth
completes
+ expectWarmed(invocationNamespace.name, concurrentAction) //when fifth
completes
//complete the sixth run (request new work 0 active remain)
runPromises(5).success(runInterval, ActivationResponse.success())
- //expectWarmed(invocationNamespace.name, concurrentAction, 1) //when sixth
completes
- expectWarmed(invocationNamespace.name, concurrentAction, 0) //when sixth
completes
+ expectWarmed(invocationNamespace.name, concurrentAction) //when sixth
completes
// back to ready
expectMsg(Transition(machine, Running, Ready))
@@ -611,7 +599,7 @@ class ContainerProxyTests
// Note that there are no intermediate state changes
//second one will succeed
- run(machine, Ready, false)
+ run(machine, Ready)
//With exception of the error on first run, the assertions should be the
same as in
// `run an action and continue with a next run without pausing the
container`
@@ -763,7 +751,6 @@ class ContainerProxyTests
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
- expectWarmed(invocationNamespace.name, action, 1)
expectMsg(ContainerRemoved) // The message is sent as soon as the
container decides to destroy itself
expectMsg(Transition(machine, Running, Removing))
@@ -802,7 +789,6 @@ class ContainerProxyTests
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
- expectWarmed(invocationNamespace.name, action, 1)
expectMsg(ContainerRemoved) // The message is sent as soon as the
container decides to destroy itself
expectMsg(Transition(machine, Running, Removing))
@@ -840,7 +826,6 @@ class ContainerProxyTests
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
- expectWarmed(invocationNamespace.name, action, 1)
expectMsg(ContainerRemoved) // The message is sent as soon as the
container decides to destroy itself
expectMsg(Transition(machine, Running, Removing))
@@ -978,8 +963,7 @@ class ContainerProxyTests
// Finish /init, note that /run and log-collecting happens nonetheless
initPromise.success(Interval.zero)
- expectWarmed(invocationNamespace.name, action, 1)
- expectWarmed(invocationNamespace.name, action, 0)
+ expectWarmed(invocationNamespace.name, action)
expectMsg(Transition(machine, Running, Ready))
// Remove the container after the transaction finished
@@ -1062,7 +1046,7 @@ class ContainerProxyTests
class TestContainer(initPromise: Option[Promise[Interval]] = None,
runPromises: Seq[Promise[(Interval,
ActivationResponse)]] = Seq.empty)
extends Container {
- protected val id = ContainerId("testcontainer")
+ protected[core] val id = ContainerId("testcontainer")
protected val addr = ContainerAddress("0.0.0.0")
protected implicit val logging: Logging = log
protected implicit val ec: ExecutionContext = system.dispatcher
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/limits/ConcurrencyTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/limits/ConcurrencyTests.scala
index 458f1a2..eb0e110 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/limits/ConcurrencyTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/limits/ConcurrencyTests.scala
@@ -137,7 +137,7 @@ class ConcurrencyTests extends TestHelpers with
WskTestHelpers with WskActorSyst
}
//none of the actions will complete till the requestCount is reached
- Await.result(Future.sequence(runs), 30.seconds).foreach { run =>
+ Await.result(Future.sequence(runs), 50.seconds).foreach { run =>
withActivation(wsk.activation, run) { response =>
val logs = response.logs.get
withClue(logs) { logs.size shouldBe 0 }