markusthoemmes closed pull request #3663: Support SPI for ContainerPool and
ContainerProxy
URL: https://github.com/apache/incubator-openwhisk/pull/3663
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/common/scala/src/main/resources/reference.conf
b/common/scala/src/main/resources/reference.conf
index 35cf4f09a5..249ca24a25 100644
--- a/common/scala/src/main/resources/reference.conf
+++ b/common/scala/src/main/resources/reference.conf
@@ -7,4 +7,6 @@ whisk.spi {
ContainerFactoryProvider =
whisk.core.containerpool.docker.DockerContainerFactoryProvider
LogStoreProvider =
whisk.core.containerpool.logging.DockerToActivationLogStoreProvider
LoadBalancerProvider = whisk.core.loadBalancer.ShardingContainerPoolBalancer
+ ContainerPoolProvider = whisk.core.containerpool.DefaultContainerPool
+ ContainerProxyProvider = whisk.core.containerpool.DefaultContainerProxy
}
diff --git
a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
index d26ebdc2f9..8ec3aa5a65 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
@@ -17,268 +17,40 @@
package whisk.core.containerpool
-import scala.collection.immutable
-import whisk.common.{AkkaLogging, LoggingMarkers, TransactionId}
-import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
+import akka.actor.{ActorRef, ActorRefFactory, Props}
+import whisk.common.TransactionId
+import whisk.core.entity.ExecManifest.ImageName
import whisk.core.entity._
-import whisk.core.entity.size._
-import whisk.core.connector.MessageFeed
+import whisk.spi.Spi
-import scala.concurrent.duration._
+import scala.concurrent.Future
-sealed trait WorkerState
-case object Busy extends WorkerState
-case object Free extends WorkerState
-
-case class WorkerData(data: ContainerData, state: WorkerState)
-
-/**
- * A pool managing containers to run actions on.
- *
- * This pool fulfills the other half of the ContainerProxy contract. Only
- * one job (either Start or Run) is sent to a child-actor at any given
- * time. The pool then waits for a response of that container, indicating
- * the container is done with the job. Only then will the pool send another
- * request to that container.
- *
- * Upon actor creation, the pool will start to prewarm containers according
- * to the provided prewarmConfig, iff set. Those containers will **not** be
- * part of the poolsize calculation, which is capped by the poolSize parameter.
- * Prewarm containers are only used, if they have matching arguments
- * (kind, memory) and there is space in the pool.
- *
- * @param childFactory method to create new container proxy actor
- * @param feed actor to request more work from
- * @param prewarmConfig optional settings for container prewarming
- * @param poolConfig config for the ContainerPool
- */
-class ContainerPool(childFactory: ActorRefFactory => ActorRef,
- feed: ActorRef,
- prewarmConfig: List[PrewarmingConfig] = List.empty,
- poolConfig: ContainerPoolConfig)
- extends Actor {
- implicit val logging = new AkkaLogging(context.system.log)
-
- var freePool = immutable.Map.empty[ActorRef, ContainerData]
- var busyPool = immutable.Map.empty[ActorRef, ContainerData]
- var prewarmedPool = immutable.Map.empty[ActorRef, ContainerData]
- val logMessageInterval = 10.seconds
-
- prewarmConfig.foreach { config =>
- logging.info(this, s"pre-warming ${config.count} ${config.exec.kind}
${config.memoryLimit.toString}")(
- TransactionId.invokerWarmup)
- (1 to config.count).foreach { _ =>
- prewarmContainer(config.exec, config.memoryLimit)
- }
- }
-
- def logContainerStart(r: Run, containerState: String): Unit = {
- val namespaceName = r.msg.user.namespace.name
- val actionName = r.action.name.name
- val activationId = r.msg.activationId.toString
-
- r.msg.transid.mark(
- this,
- LoggingMarkers.INVOKER_CONTAINER_START(containerState),
- s"containerStart containerState: $containerState action: $actionName
namespace: $namespaceName activationId: $activationId",
- akka.event.Logging.InfoLevel)
- }
-
- def receive: Receive = {
- // A job to run on a container
- //
- // Run messages are received either via the feed or from child containers
which cannot process
- // their requests and send them back to the pool for rescheduling (this
may happen if "docker" operations
- // fail for example, or a container has aged and was destroying itself
when a new request was assigned)
- case r: Run =>
- val createdContainer = if (busyPool.size <
poolConfig.maxActiveContainers) {
-
- // Schedule a job to a warm container
- ContainerPool
- .schedule(r.action, r.msg.user.namespace, freePool)
- .map(container => {
- (container, "warm")
- })
- .orElse {
- if (busyPool.size + freePool.size <
poolConfig.maxActiveContainers) {
- takePrewarmContainer(r.action)
- .map(container => {
- (container, "prewarmed")
- })
- .orElse {
- Some(createContainer(), "cold")
- }
- } else None
- }
- .orElse {
- // Remove a container and create a new one for the given job
- ContainerPool.remove(freePool).map { toDelete =>
- removeContainer(toDelete)
- takePrewarmContainer(r.action)
- .map(container => {
- (container, "recreated")
- })
- .getOrElse {
- (createContainer(), "recreated")
- }
- }
- }
- } else None
-
- createdContainer match {
- case Some(((actor, data), containerState)) =>
- busyPool = busyPool + (actor -> data)
- freePool = freePool - actor
- actor ! r // forwards the run request to the container
- logContainerStart(r, containerState)
- case None =>
- // this can also happen if createContainer fails to start a new
container, or
- // if a job is rescheduled but the container it was allocated to has
not yet destroyed itself
- // (and a new container would over commit the pool)
- val isErrorLogged =
r.retryLogDeadline.map(_.isOverdue).getOrElse(true)
- val retryLogDeadline = if (isErrorLogged) {
- logging.error(
- this,
- s"Rescheduling Run message, too many message in the pool,
freePoolSize: ${freePool.size}, " +
- s"busyPoolSize: ${busyPool.size}, maxActiveContainers
${poolConfig.maxActiveContainers}, " +
- s"userNamespace: ${r.msg.user.namespace}, action:
${r.action}")(r.msg.transid)
- Some(logMessageInterval.fromNow)
- } else {
- r.retryLogDeadline
- }
- self ! Run(r.action, r.msg, retryLogDeadline)
- }
-
- // Container is free to take more work
- case NeedWork(data: WarmedData) =>
- freePool = freePool + (sender() -> data)
- busyPool.get(sender()).foreach { _ =>
- busyPool = busyPool - sender()
- feed ! MessageFeed.Processed
- }
-
- // Container is prewarmed and ready to take work
- case NeedWork(data: PreWarmedData) =>
- prewarmedPool = prewarmedPool + (sender() -> data)
-
- // Container got removed
- case ContainerRemoved =>
- freePool = freePool - sender()
- busyPool.get(sender()).foreach { _ =>
- busyPool = busyPool - sender()
- // container was busy, so there is capacity to accept another job
request
- feed ! MessageFeed.Processed
- }
-
- // This message is received for one of these reasons:
- // 1. Container errored while resuming a warm container, could not process
the job, and sent the job back
- // 2. The container aged, is destroying itself, and was assigned a job
which it had to send back
- // 3. The container aged and is destroying itself
- // Update the free/busy lists but no message is sent to the feed since
there is no change in capacity yet
- case RescheduleJob =>
- freePool = freePool - sender()
- busyPool = busyPool - sender()
- }
+trait ContainerPool {
/** Creates a new container and updates state accordingly. */
- def createContainer(): (ActorRef, ContainerData) = {
- val ref = childFactory(context)
- val data = NoData()
- freePool = freePool + (ref -> data)
- ref -> data
- }
+ def createContainer(): (ActorRef, ContainerData)
/** Creates a new prewarmed container */
- def prewarmContainer(exec: CodeExec[_], memoryLimit: ByteSize) =
- childFactory(context) ! Start(exec, memoryLimit)
-
- /**
- * Takes a prewarm container out of the prewarmed pool
- * iff a container with a matching kind is found.
- *
- * @param kind the kind you want to invoke
- * @return the container iff found
- */
- def takePrewarmContainer(action: ExecutableWhiskAction): Option[(ActorRef,
ContainerData)] = {
- val kind = action.exec.kind
- val memory = action.limits.memory.megabytes.MB
- prewarmedPool
- .find {
- case (_, PreWarmedData(_, `kind`, `memory`)) => true
- case _ => false
- }
- .map {
- case (ref, data) =>
- // Move the container to the usual pool
- freePool = freePool + (ref -> data)
- prewarmedPool = prewarmedPool - ref
- // Create a new prewarm container
- // NOTE: prewarming ignores the action code in exec, but this is
dangerous as the field is accessible to the factory
- prewarmContainer(action.exec, memory)
- (ref, data)
- }
- }
+ def prewarmContainer(exec: CodeExec[_], memoryLimit: ByteSize)
/** Removes a container and updates state accordingly. */
- def removeContainer(toDelete: ActorRef) = {
- toDelete ! Remove
- freePool = freePool - toDelete
- busyPool = busyPool - toDelete
- }
-}
-
-object ContainerPool {
-
- /**
- * Finds the best container for a given job to run on.
- *
- * Selects an arbitrary warm container from the passed pool of idle
containers
- * that matches the action and the invocation namespace. The implementation
uses
- * matching such that structural equality of action and the invocation
namespace
- * is required.
- * Returns None iff no matching container is in the idle pool.
- * Does not consider pre-warmed containers.
- *
- * @param action the action to run
- * @param invocationNamespace the namespace, that wants to run the action
- * @param idles a map of idle containers, awaiting work
- * @return a container if one found
- */
- protected[containerpool] def schedule[A](action: ExecutableWhiskAction,
- invocationNamespace: EntityName,
- idles: Map[A, ContainerData]):
Option[(A, ContainerData)] = {
- idles.find {
- case (_, WarmedData(_, `invocationNamespace`, `action`, _)) => true
- case _ => false
- }
- }
+ def removeContainer(toDelete: ActorRef)
- /**
- * Finds the oldest previously used container to remove to make space for
the job passed to run.
- *
- * NOTE: This method is never called to remove an action that is in the pool
already,
- * since this would be picked up earlier in the scheduler and the container
reused.
- *
- * @param pool a map of all free containers in the pool
- * @return a container to be removed iff found
- */
- protected[containerpool] def remove[A](pool: Map[A, ContainerData]):
Option[A] = {
- val freeContainers = pool.collect {
- case (ref, w: WarmedData) => ref -> w
- }
+}
- if (freeContainers.nonEmpty) {
- val (ref, _) = freeContainers.minBy(_._2.lastUsed)
- Some(ref)
- } else None
- }
+trait ContainerPoolProvider extends Spi {
+ def getContainerProxyFactory(
+ factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) =>
Future[Container],
+ ack: (TransactionId, WhiskActivation, Boolean, InstanceId, UUID) =>
Future[Any],
+ store: (TransactionId, WhiskActivation) => Future[Any],
+ collectLogs: (TransactionId, Identity, WhiskActivation, Container,
ExecutableWhiskAction) => Future[ActivationLogs],
+ instance: InstanceId,
+ poolConfig: ContainerPoolConfig): ActorRefFactory => ActorRef
def props(factory: ActorRefFactory => ActorRef,
poolConfig: ContainerPoolConfig,
feed: ActorRef,
- prewarmConfig: List[PrewarmingConfig] = List.empty) =
- Props(new ContainerPool(factory, feed, prewarmConfig, poolConfig))
-}
+ prewarmConfig: List[PrewarmingConfig] = List.empty): Props
-/** Contains settings needed to perform container prewarming. */
-case class PrewarmingConfig(count: Int, exec: CodeExec[_], memoryLimit:
ByteSize)
+ def requiredProperties: Map[String, String]
+}
diff --git
a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index b75ad72a7e..06f3e718c8 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -17,501 +17,16 @@
package whisk.core.containerpool
-import java.time.Instant
-
-import scala.concurrent.Future
-import scala.concurrent.duration._
-import scala.util.Success
-import scala.util.Failure
-import akka.actor.FSM
-import akka.actor.Props
-import akka.actor.Stash
-import akka.actor.Status.{Failure => FailureMessage}
-import akka.pattern.pipe
-import spray.json._
-import spray.json.DefaultJsonProtocol._
-import whisk.common.{AkkaLogging, Counter, LoggingMarkers, TransactionId}
-import whisk.core.connector.ActivationMessage
-import whisk.core.containerpool.logging.LogCollectingException
+import whisk.common.TransactionId
import whisk.core.entity._
-import whisk.core.entity.size._
-import whisk.core.entity.ExecManifest.ImageName
-import whisk.http.Messages
-import akka.event.Logging.InfoLevel
-import pureconfig.loadConfigOrThrow
-import whisk.core.ConfigKeys
-
-// States
-sealed trait ContainerState
-case object Uninitialized extends ContainerState
-case object Starting extends ContainerState
-case object Started extends ContainerState
-case object Running extends ContainerState
-case object Ready extends ContainerState
-case object Pausing extends ContainerState
-case object Paused extends ContainerState
-case object Removing extends ContainerState
-
-// Data
-sealed abstract class ContainerData(val lastUsed: Instant)
-case class NoData() extends ContainerData(Instant.EPOCH)
-case class PreWarmedData(container: Container, kind: String, memoryLimit:
ByteSize) extends ContainerData(Instant.EPOCH)
-case class WarmedData(container: Container,
- invocationNamespace: EntityName,
- action: ExecutableWhiskAction,
- override val lastUsed: Instant)
- extends ContainerData(lastUsed)
-
-// Events received by the actor
-case class Start(exec: CodeExec[_], memoryLimit: ByteSize)
-case class Run(action: ExecutableWhiskAction, msg: ActivationMessage,
retryLogDeadline: Option[Deadline] = None)
-case object Remove
-
-// Events sent by the actor
-case class NeedWork(data: ContainerData)
-case object ContainerPaused
-case object ContainerRemoved // when container is destroyed
-case object RescheduleJob // job is sent back to parent and could not be
processed because container is being destroyed
-
-/**
- * A proxy that wraps a Container. It is used to keep track of the lifecycle
- * of a container and to guarantee a contract between the client of the
container
- * and the container itself.
- *
- * The contract is as follows:
- * 1. Only one job is to be sent to the ContainerProxy at one time.
ContainerProxy
- * will delay all further jobs until a previous job has finished.
- * 2. The next job can be sent to the ContainerProxy after it indicates
available
- * capacity by sending NeedWork to its parent.
- * 3. A Remove message can be sent at any point in time. Like multiple jobs
though,
- * it will be delayed until the currently running job finishes.
- *
- * @constructor
- * @param factory a function generating a Container
- * @param sendActiveAck a function sending the activation via active ack
- * @param storeActivation a function storing the activation in a persistent
store
- * @param unusedTimeout time after which the container is automatically thrown
away
- * @param pauseGrace time to wait for new work before pausing the container
- */
-class ContainerProxy(
- factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) =>
Future[Container],
- sendActiveAck: (TransactionId, WhiskActivation, Boolean, InstanceId, UUID)
=> Future[Any],
- storeActivation: (TransactionId, WhiskActivation) => Future[Any],
- collectLogs: (TransactionId, Identity, WhiskActivation, Container,
ExecutableWhiskAction) => Future[ActivationLogs],
- instance: InstanceId,
- poolConfig: ContainerPoolConfig,
- unusedTimeout: FiniteDuration,
- pauseGrace: FiniteDuration)
- extends FSM[ContainerState, ContainerData]
- with Stash {
- implicit val ec = context.system.dispatcher
- implicit val logging = new AkkaLogging(context.system.log)
- var rescheduleJob = false // true iff actor receives a job but cannot
process it because actor will destroy itself
-
- startWith(Uninitialized, NoData())
-
- when(Uninitialized) {
- // pre warm a container (creates a stem cell container)
- case Event(job: Start, _) =>
- factory(
- TransactionId.invokerWarmup,
- ContainerProxy.containerName(instance, "prewarm", job.exec.kind),
- job.exec.image,
- job.exec.pull,
- job.memoryLimit,
- poolConfig.cpuShare)
- .map(container => PreWarmedData(container, job.exec.kind,
job.memoryLimit))
- .pipeTo(self)
-
- goto(Starting)
-
- // cold start (no container to reuse or available stem cell container)
- case Event(job: Run, _) =>
- implicit val transid = job.msg.transid
-
- // create a new container
- val container = factory(
- job.msg.transid,
- ContainerProxy.containerName(instance, job.msg.user.namespace.name,
job.action.name.name),
- job.action.exec.image,
- job.action.exec.pull,
- job.action.limits.memory.megabytes.MB,
- poolConfig.cpuShare)
-
- // container factory will either yield a new container ready to execute
the action, or
- // starting up the container failed; for the latter, it's either an
internal error starting
- // a container or a docker action that is not conforming to the required
action API
- container
- .andThen {
- case Success(container) =>
- // the container is ready to accept an activation; register it as
PreWarmed; this
- // normalizes the life cycle for containers and their cleanup when
activations fail
- self ! PreWarmedData(container, job.action.exec.kind,
job.action.limits.memory.megabytes.MB)
-
- case Failure(t) =>
- // the container did not come up cleanly, so disambiguate the
failure mode and then cleanup
- // the failure is either the system fault, or for docker actions,
the application/developer fault
- val response = t match {
- case WhiskContainerStartupError(msg) =>
ActivationResponse.whiskError(msg)
- case BlackboxStartupError(msg) =>
ActivationResponse.applicationError(msg)
- case _ =>
ActivationResponse.whiskError(Messages.resourceProvisionError)
- }
- // construct an appropriate activation and record it in the
datastore,
- // also update the feed and active ack; the container cleanup is
queued
- // implicitly via a FailureMessage which will be processed later
when the state
- // transitions to Running
- val activation = ContainerProxy.constructWhiskActivation(job,
None, Interval.zero, response)
- sendActiveAck(transid, activation, job.msg.blocking,
job.msg.rootControllerIndex, job.msg.user.authkey.uuid)
- storeActivation(transid, activation)
- }
- .flatMap { container =>
- // now attempt to inject the user code and run the action
- initializeAndRun(container, job)
- .map(_ => WarmedData(container, job.msg.user.namespace,
job.action, Instant.now))
- }
- .pipeTo(self)
-
- goto(Running)
- }
-
- when(Starting) {
- // container was successfully obtained
- case Event(data: PreWarmedData, _) =>
- context.parent ! NeedWork(data)
- goto(Started) using data
-
- // container creation failed
- case Event(_: FailureMessage, _) =>
- context.parent ! ContainerRemoved
- stop()
-
- case _ => delay
- }
-
- when(Started) {
- case Event(job: Run, data: PreWarmedData) =>
- implicit val transid = job.msg.transid
- initializeAndRun(data.container, job)
- .map(_ => WarmedData(data.container, job.msg.user.namespace,
job.action, Instant.now))
- .pipeTo(self)
-
- goto(Running)
-
- case Event(Remove, data: PreWarmedData) => destroyContainer(data.container)
- }
-
- when(Running) {
- // Intermediate state, we were able to start a container
- // and we keep it in case we need to destroy it.
- case Event(data: PreWarmedData, _) => stay using data
-
- // Run was successful
- case Event(data: WarmedData, _) =>
- context.parent ! NeedWork(data)
- goto(Ready) using data
-
- // Failed after /init (the first run failed)
- case Event(_: FailureMessage, data: PreWarmedData) =>
destroyContainer(data.container)
-
- // Failed for a subsequent /run
- case Event(_: FailureMessage, data: WarmedData) =>
destroyContainer(data.container)
-
- // Failed at getting a container for a cold-start run
- case Event(_: FailureMessage, _) =>
- context.parent ! ContainerRemoved
- stop()
-
- case _ => delay
- }
-
- when(Ready, stateTimeout = pauseGrace) {
- case Event(job: Run, data: WarmedData) =>
- implicit val transid = job.msg.transid
- initializeAndRun(data.container, job)
- .map(_ => WarmedData(data.container, job.msg.user.namespace,
job.action, Instant.now))
- .pipeTo(self)
-
- goto(Running)
-
- // pause grace timed out
- case Event(StateTimeout, data: WarmedData) =>
- data.container.suspend()(TransactionId.invokerNanny).map(_ =>
ContainerPaused).pipeTo(self)
- goto(Pausing)
-
- case Event(Remove, data: WarmedData) => destroyContainer(data.container)
- }
- when(Pausing) {
- case Event(ContainerPaused, data: WarmedData) => goto(Paused)
- case Event(_: FailureMessage, data: WarmedData) =>
destroyContainer(data.container)
- case _ => delay
- }
-
- when(Paused, stateTimeout = unusedTimeout) {
- case Event(job: Run, data: WarmedData) =>
- implicit val transid = job.msg.transid
- data.container
- .resume()
- .andThen {
- // Sending the message to self on a failure will cause the message
- // to ultimately be sent back to the parent (which will retry it)
- // when container removal is done.
- case Failure(_) =>
- rescheduleJob = true
- self ! job
- }
- .flatMap(_ => initializeAndRun(data.container, job))
- .map(_ => WarmedData(data.container, job.msg.user.namespace,
job.action, Instant.now))
- .pipeTo(self)
-
- goto(Running)
-
- // container is reclaimed by the pool or it has become too old
- case Event(StateTimeout | Remove, data: WarmedData) =>
- rescheduleJob = true // to supress sending message to the pool and not
double count
- destroyContainer(data.container)
- }
-
- when(Removing) {
- case Event(job: Run, _) =>
- // Send the job back to the pool to be rescheduled
- context.parent ! job
- stay
- case Event(ContainerRemoved, _) => stop()
- case Event(_: FailureMessage, _) => stop()
- }
-
- // Unstash all messages stashed while in intermediate state
- onTransition {
- case _ -> Started => unstashAll()
- case _ -> Ready => unstashAll()
- case _ -> Paused => unstashAll()
- case _ -> Removing => unstashAll()
- }
-
- initialize()
-
- /** Delays all incoming messages until unstashAll() is called */
- def delay = {
- stash()
- stay
- }
-
- /**
- * Destroys the container after unpausing it if needed. Can be used
- * as a state progression as it goes to Removing.
- *
- * @param container the container to destroy
- */
- def destroyContainer(container: Container) = {
- if (!rescheduleJob) {
- context.parent ! ContainerRemoved
- } else {
- context.parent ! RescheduleJob
- }
-
- val unpause = stateName match {
- case Paused => container.resume()(TransactionId.invokerNanny)
- case _ => Future.successful(())
- }
-
- unpause
- .flatMap(_ => container.destroy()(TransactionId.invokerNanny))
- .map(_ => ContainerRemoved)
- .pipeTo(self)
-
- goto(Removing)
- }
-
- /**
- * Runs the job, initialize first if necessary.
- * Completes the job by:
- * 1. sending an activate ack,
- * 2. fetching the logs for the run,
- * 3. indicating the resource is free to the parent pool,
- * 4. recording the result to the data store
- *
- * @param container the container to run the job on
- * @param job the job to run
- * @return a future completing after logs have been collected and
- * added to the WhiskActivation
- */
- def initializeAndRun(container: Container, job: Run)(implicit tid:
TransactionId): Future[WhiskActivation] = {
- val actionTimeout = job.action.limits.timeout.duration
-
- // Only initialize iff we haven't yet warmed the container
- val initialize = stateData match {
- case data: WarmedData => Future.successful(None)
- case _ =>
container.initialize(job.action.containerInitializer,
actionTimeout).map(Some(_))
- }
-
- val activation: Future[WhiskActivation] = initialize
- .flatMap { initInterval =>
- val parameters = job.msg.content getOrElse JsObject()
-
- val environment = JsObject(
- "api_key" -> job.msg.user.authkey.compact.toJson,
- "namespace" -> job.msg.user.namespace.toJson,
- "action_name" -> job.msg.action.qualifiedNameWithLeadingSlash.toJson,
- "activation_id" -> job.msg.activationId.toString.toJson,
- // compute deadline on invoker side avoids discrepancies inside
container
- // but potentially under-estimates actual deadline
- "deadline" -> (Instant.now.toEpochMilli +
actionTimeout.toMillis).toString.toJson)
-
- container.run(parameters, environment,
actionTimeout)(job.msg.transid).map {
- case (runInterval, response) =>
- val initRunInterval = initInterval
- .map(i =>
Interval(runInterval.start.minusMillis(i.duration.toMillis), runInterval.end))
- .getOrElse(runInterval)
- ContainerProxy.constructWhiskActivation(job, initInterval,
initRunInterval, response)
- }
- }
- .recover {
- case InitializationError(interval, response) =>
- ContainerProxy.constructWhiskActivation(job, Some(interval),
interval, response)
- case t =>
- // Actually, this should never happen - but we want to make sure to
not miss a problem
- logging.error(this, s"caught unexpected error while running
activation: ${t}")
- ContainerProxy.constructWhiskActivation(
- job,
- None,
- Interval.zero,
- ActivationResponse.whiskError(Messages.abnormalRun))
- }
-
- // Sending active ack. Entirely asynchronous and not waited upon.
- activation.foreach(sendActiveAck(tid, _, job.msg.blocking,
job.msg.rootControllerIndex, job.msg.user.authkey.uuid))
-
- // Adds logs to the raw activation.
- val activationWithLogs: Future[Either[ActivationLogReadingError,
WhiskActivation]] = activation
- .flatMap { activation =>
- // Skips log collection entirely, if the limit is set to 0
- if (job.action.limits.logs.asMegaBytes == 0.MB) {
- Future.successful(Right(activation))
- } else {
- val start = tid.started(this, LoggingMarkers.INVOKER_COLLECT_LOGS,
logLevel = InfoLevel)
- collectLogs(tid, job.msg.user, activation, container, job.action)
- .andThen {
- case Success(_) => tid.finished(this, start)
- case Failure(t) => tid.failed(this, start, s"reading logs
failed: $t")
- }
- .map(logs => Right(activation.withLogs(logs)))
- .recover {
- case LogCollectingException(logs) =>
- Left(ActivationLogReadingError(activation.withLogs(logs)))
- case _ =>
-
Left(ActivationLogReadingError(activation.withLogs(ActivationLogs(Vector(Messages.logFailure)))))
- }
- }
- }
-
- // Storing the record. Entirely asynchronous and not waited upon.
- activationWithLogs.map(_.fold(_.activation,
identity)).foreach(storeActivation(tid, _))
-
- // Disambiguate activation errors and transform the Either into a
failed/successful Future respectively.
- activationWithLogs.flatMap {
- case Right(act) if !act.response.isSuccess =>
Future.failed(ActivationUnsuccessfulError(act))
- case Left(error) => Future.failed(error)
- case Right(act) => Future.successful(act)
- }
- }
-}
-
-final case class ContainerProxyTimeoutConfig(idleContainer: FiniteDuration,
pauseGrace: FiniteDuration)
-
-object ContainerProxy {
- def props(
- factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) =>
Future[Container],
- ack: (TransactionId, WhiskActivation, Boolean, InstanceId, UUID) =>
Future[Any],
- store: (TransactionId, WhiskActivation) => Future[Any],
- collectLogs: (TransactionId, Identity, WhiskActivation, Container,
ExecutableWhiskAction) => Future[ActivationLogs],
- instance: InstanceId,
- poolConfig: ContainerPoolConfig,
- unusedTimeout: FiniteDuration = timeouts.idleContainer,
- pauseGrace: FiniteDuration = timeouts.pauseGrace) =
- Props(new ContainerProxy(factory, ack, store, collectLogs, instance,
poolConfig, unusedTimeout, pauseGrace))
-
- // Needs to be thread-safe as it's used by multiple proxies concurrently.
- private val containerCount = new Counter
-
- val timeouts =
loadConfigOrThrow[ContainerProxyTimeoutConfig](ConfigKeys.containerProxyTimeouts)
-
- /**
- * Generates a unique container name.
- *
- * @param prefix the container name's prefix
- * @param suffix the container name's suffix
- * @return a unique container name
- */
- def containerName(instance: InstanceId, prefix: String, suffix: String):
String = {
- def isAllowed(c: Char): Boolean = c.isLetterOrDigit || c == '_'
-
- val sanitizedPrefix = prefix.filter(isAllowed)
- val sanitizedSuffix = suffix.filter(isAllowed)
-
-
s"${ContainerFactory.containerNamePrefix(instance)}_${containerCount.next()}_${sanitizedPrefix}_${sanitizedSuffix}"
- }
-
- /**
- * Creates a WhiskActivation ready to be sent via active ack.
- *
- * @param job the job that was executed
- * @param interval the time it took to execute the job
- * @param response the response to return to the user
- * @return a WhiskActivation to be sent to the user
- */
- def constructWhiskActivation(job: Run,
- initInterval: Option[Interval],
- totalInterval: Interval,
- response: ActivationResponse) = {
- val causedBy = Some {
- if (job.msg.causedBySequence) {
- Parameters(WhiskActivation.causedByAnnotation, JsString(Exec.SEQUENCE))
- } else {
- // emit the internal system hold time as the 'wait' time, but only for
non-sequence
- // actions, since the transid start time for a sequence does not
correspond
- // with a specific component of the activation but the entire sequence;
- // it will require some work to generate a new transaction id for a
sequence
- // component - however, because the trace of activations is recorded
in the parent
- // sequence, a client can determine the queue time for sequences that
way
- val end = initInterval.map(_.start).getOrElse(totalInterval.start)
- Parameters(
- WhiskActivation.waitTimeAnnotation,
- Interval(job.msg.transid.meta.start, end).duration.toMillis.toJson)
- }
- }
+import scala.concurrent.Future
- val initTime = {
- initInterval.map(initTime =>
Parameters(WhiskActivation.initTimeAnnotation,
initTime.duration.toMillis.toJson))
- }
+protected[containerpool] trait ContainerProxy {
- WhiskActivation(
- activationId = job.msg.activationId,
- namespace = job.msg.user.namespace.toPath,
- subject = job.msg.user.subject,
- cause = job.msg.cause,
- name = job.action.name,
- version = job.action.version,
- start = totalInterval.start,
- end = totalInterval.end,
- duration = Some(totalInterval.duration.toMillis),
- response = response,
- annotations = {
- Parameters(WhiskActivation.limitsAnnotation, job.action.limits.toJson)
++
- Parameters(WhiskActivation.pathAnnotation,
JsString(job.action.fullyQualifiedName(false).asString)) ++
- Parameters(WhiskActivation.kindAnnotation,
JsString(job.action.exec.kind)) ++
- causedBy ++ initTime
- })
- }
-}
+ /** Initialize the container and run codes in it */
+ def initializeAndRun(container: Container, job: Run)(implicit tid:
TransactionId): Future[WhiskActivation]
-/** Indicates that something went wrong with an activation and the container
should be removed */
-trait ActivationError extends Exception {
- val activation: WhiskActivation
+ /** Destroy the container */
+ def destroyContainer(container: Container): Any
}
-
-/** Indicates an activation with a non-successful response */
-case class ActivationUnsuccessfulError(activation: WhiskActivation) extends
ActivationError
-
-/** Indicates reading logs for an activation failed (terminally, truncated) */
-case class ActivationLogReadingError(activation: WhiskActivation) extends
ActivationError
diff --git
a/core/invoker/src/main/scala/whisk/core/containerpool/DefaultContainerPool.scala
b/core/invoker/src/main/scala/whisk/core/containerpool/DefaultContainerPool.scala
new file mode 100644
index 0000000000..e4978323d9
--- /dev/null
+++
b/core/invoker/src/main/scala/whisk/core/containerpool/DefaultContainerPool.scala
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
+import whisk.common.{AkkaLogging, LoggingMarkers, TransactionId}
+import whisk.core.WhiskConfig._
+import whisk.core.connector.MessageFeed
+import whisk.core.entity.ExecManifest.ImageName
+import whisk.core.entity._
+import whisk.core.entity.size._
+
+import scala.collection.immutable
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
+sealed trait WorkerState
+case object Busy extends WorkerState
+case object Free extends WorkerState
+
+case class WorkerData(data: ContainerData, state: WorkerState)
+
+/**
+ * A pool managing containers to run actions on.
+ *
+ * This pool fulfills the other half of the ContainerProxy contract. Only
+ * one job (either Start or Run) is sent to a child-actor at any given
+ * time. The pool then waits for a response of that container, indicating
+ * the container is done with the job. Only then will the pool send another
+ * request to that container.
+ *
+ * Upon actor creation, the pool will start to prewarm containers according
+ * to the provided prewarmConfig, iff set. Those containers will **not** be
+ * part of the poolsize calculation, which is capped by the poolSize parameter.
+ * Prewarm containers are only used, if they have matching arguments
+ * (kind, memory) and there is space in the pool.
+ *
+ * @param childFactory method to create new container proxy actor
+ * @param feed actor to request more work from
+ * @param prewarmConfig optional settings for container prewarming
+ * @param poolConfig config for the ContainerPool
+ */
+class DefaultContainerPool(childFactory: ActorRefFactory => ActorRef,
+ feed: ActorRef,
+ prewarmConfig: List[PrewarmingConfig] = List.empty,
+ poolConfig: ContainerPoolConfig)
+ extends Actor {
+ implicit val logging = new AkkaLogging(context.system.log)
+
+ var freePool = immutable.Map.empty[ActorRef, ContainerData]
+ var busyPool = immutable.Map.empty[ActorRef, ContainerData]
+ var prewarmedPool = immutable.Map.empty[ActorRef, ContainerData]
+ val logMessageInterval = 10.seconds
+
+ prewarmConfig.foreach { config =>
+ logging.info(this, s"pre-warming ${config.count} ${config.exec.kind}
${config.memoryLimit.toString}")(
+ TransactionId.invokerWarmup)
+ (1 to config.count).foreach { _ =>
+ prewarmContainer(config.exec, config.memoryLimit)
+ }
+ }
+
+ def logContainerStart(r: Run, containerState: String): Unit = {
+ val namespaceName = r.msg.user.namespace.name
+ val actionName = r.action.name.name
+ val activationId = r.msg.activationId.toString
+
+ r.msg.transid.mark(
+ this,
+ LoggingMarkers.INVOKER_CONTAINER_START(containerState),
+ s"containerStart containerState: $containerState action: $actionName
namespace: $namespaceName activationId: $activationId",
+ akka.event.Logging.InfoLevel)
+ }
+
+ def receive: Receive = {
+ // A job to run on a container
+ //
+ // Run messages are received either via the feed or from child containers
which cannot process
+ // their requests and send them back to the pool for rescheduling (this
may happen if "docker" operations
+ // fail for example, or a container has aged and was destroying itself
when a new request was assigned)
+ case r: Run =>
+ val createdContainer = if (busyPool.size <
poolConfig.maxActiveContainers) {
+
+ // Schedule a job to a warm container
+ DefaultContainerPool
+ .schedule(r.action, r.msg.user.namespace, freePool)
+ .map(container => {
+ (container, "warm")
+ })
+ .orElse {
+ if (busyPool.size + freePool.size <
poolConfig.maxActiveContainers) {
+ takePrewarmContainer(r.action)
+ .map(container => {
+ (container, "prewarmed")
+ })
+ .orElse {
+ Some(createContainer(), "cold")
+ }
+ } else None
+ }
+ .orElse {
+ // Remove a container and create a new one for the given job
+ DefaultContainerPool.remove(freePool).map { toDelete =>
+ removeContainer(toDelete)
+ takePrewarmContainer(r.action)
+ .map(container => {
+ (container, "recreated")
+ })
+ .getOrElse {
+ (createContainer(), "recreated")
+ }
+ }
+ }
+ } else None
+
+ createdContainer match {
+ case Some(((actor, data), containerState)) =>
+ busyPool = busyPool + (actor -> data)
+ freePool = freePool - actor
+ actor ! r // forwards the run request to the container
+ logContainerStart(r, containerState)
+ case None =>
+ // this can also happen if createContainer fails to start a new
container, or
+ // if a job is rescheduled but the container it was allocated to has
not yet destroyed itself
+ // (and a new container would over commit the pool)
+ val isErrorLogged =
r.retryLogDeadline.map(_.isOverdue).getOrElse(true)
+ val retryLogDeadline = if (isErrorLogged) {
+ logging.error(
+ this,
+ s"Rescheduling Run message, too many message in the pool,
freePoolSize: ${freePool.size}, " +
+ s"busyPoolSize: ${busyPool.size}, maxActiveContainers
${poolConfig.maxActiveContainers}, " +
+ s"userNamespace: ${r.msg.user.namespace}, action:
${r.action}")(r.msg.transid)
+ Some(logMessageInterval.fromNow)
+ } else {
+ r.retryLogDeadline
+ }
+ self ! Run(r.action, r.msg, retryLogDeadline)
+ }
+
+ // Container is free to take more work
+ case NeedWork(data: WarmedData) =>
+ freePool = freePool + (sender() -> data)
+ busyPool.get(sender()).foreach { _ =>
+ busyPool = busyPool - sender()
+ feed ! MessageFeed.Processed
+ }
+
+ // Container is prewarmed and ready to take work
+ case NeedWork(data: PreWarmedData) =>
+ prewarmedPool = prewarmedPool + (sender() -> data)
+
+ // Container got removed
+ case ContainerRemoved =>
+ freePool = freePool - sender()
+ busyPool.get(sender()).foreach { _ =>
+ busyPool = busyPool - sender()
+ // container was busy, so there is capacity to accept another job
request
+ feed ! MessageFeed.Processed
+ }
+
+ // This message is received for one of these reasons:
+ // 1. Container errored while resuming a warm container, could not process
the job, and sent the job back
+ // 2. The container aged, is destroying itself, and was assigned a job
which it had to send back
+ // 3. The container aged and is destroying itself
+ // Update the free/busy lists but no message is sent to the feed since
there is no change in capacity yet
+ case RescheduleJob =>
+ freePool = freePool - sender()
+ busyPool = busyPool - sender()
+ }
+
+ /** Creates a new container and updates state accordingly. */
+ def createContainer(): (ActorRef, ContainerData) = {
+ val ref = childFactory(context)
+ val data = NoData()
+ freePool = freePool + (ref -> data)
+ ref -> data
+ }
+
+ /** Creates a new prewarmed container */
+ def prewarmContainer(exec: CodeExec[_], memoryLimit: ByteSize) =
+ childFactory(context) ! Start(exec, memoryLimit)
+
+ /**
+ * Takes a prewarm container out of the prewarmed pool
+ * iff a container with a matching kind is found.
+ *
+ * @param kind the kind you want to invoke
+ * @return the container iff found
+ */
+ def takePrewarmContainer(action: ExecutableWhiskAction): Option[(ActorRef,
ContainerData)] = {
+ val kind = action.exec.kind
+ val memory = action.limits.memory.megabytes.MB
+ prewarmedPool
+ .find {
+ case (_, PreWarmedData(_, `kind`, `memory`)) => true
+ case _ => false
+ }
+ .map {
+ case (ref, data) =>
+ // Move the container to the usual pool
+ freePool = freePool + (ref -> data)
+ prewarmedPool = prewarmedPool - ref
+ // Create a new prewarm container
+ // NOTE: prewarming ignores the action code in exec, but this is
dangerous as the field is accessible to the factory
+ prewarmContainer(action.exec, memory)
+ (ref, data)
+ }
+ }
+
+ /** Removes a container and updates state accordingly. */
+ def removeContainer(toDelete: ActorRef) = {
+ toDelete ! Remove
+ freePool = freePool - toDelete
+ busyPool = busyPool - toDelete
+ }
+}
+
+object DefaultContainerPool extends ContainerPoolProvider {
+
+ /**
+ * Finds the best container for a given job to run on.
+ *
+ * Selects an arbitrary warm container from the passed pool of idle
containers
+ * that matches the action and the invocation namespace. The implementation
uses
+ * matching such that structural equality of action and the invocation
namespace
+ * is required.
+ * Returns None iff no matching container is in the idle pool.
+ * Does not consider pre-warmed containers.
+ *
+ * @param action the action to run
+ * @param invocationNamespace the namespace, that wants to run the action
+ * @param idles a map of idle containers, awaiting work
+ * @return a container if one found
+ */
+ protected[containerpool] def schedule[A](action: ExecutableWhiskAction,
+ invocationNamespace: EntityName,
+ idles: Map[A, ContainerData]):
Option[(A, ContainerData)] = {
+ idles.find {
+ case (_, WarmedData(_, `invocationNamespace`, `action`, _)) => true
+ case _ => false
+ }
+ }
+
+ /**
+ * Finds the oldest previously used container to remove to make space for
the job passed to run.
+ *
+ * NOTE: This method is never called to remove an action that is in the pool
already,
+ * since this would be picked up earlier in the scheduler and the container
reused.
+ *
+ * @param pool a map of all free containers in the pool
+ * @return a container to be removed iff found
+ */
+ protected[containerpool] def remove[A](pool: Map[A, ContainerData]):
Option[A] = {
+ val freeContainers = pool.collect {
+ case (ref, w: WarmedData) => ref -> w
+ }
+
+ if (freeContainers.nonEmpty) {
+ val (ref, _) = freeContainers.minBy(_._2.lastUsed)
+ Some(ref)
+ } else None
+ }
+
+ override def getContainerProxyFactory(
+ factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) =>
Future[Container],
+ ack: (TransactionId, WhiskActivation, Boolean, InstanceId, UUID) =>
Future[Any],
+ store: (TransactionId, WhiskActivation) => Future[Any],
+ collectLogs: (TransactionId, Identity, WhiskActivation, Container,
ExecutableWhiskAction) => Future[ActivationLogs],
+ instance: InstanceId,
+ poolConfig: ContainerPoolConfig): ActorRefFactory => ActorRef = { f:
ActorRefFactory =>
+ f.actorOf(DefaultContainerProxy.props(factory, ack, store, collectLogs,
instance, poolConfig))
+ }
+
+ override def requiredProperties: Map[String, String] = kafkaHosts
+
+ override def props(factory: ActorRefFactory => ActorRef,
+ poolConfig: ContainerPoolConfig,
+ feed: ActorRef,
+ prewarmConfig: List[PrewarmingConfig] = List.empty) =
+ Props(new DefaultContainerPool(factory, feed, prewarmConfig, poolConfig))
+}
+
+/** Contains settings needed to perform container prewarming. */
+case class PrewarmingConfig(count: Int, exec: CodeExec[_], memoryLimit:
ByteSize)
diff --git
a/core/invoker/src/main/scala/whisk/core/containerpool/DefaultContainerProxy.scala
b/core/invoker/src/main/scala/whisk/core/containerpool/DefaultContainerProxy.scala
new file mode 100644
index 0000000000..40a9a91eed
--- /dev/null
+++
b/core/invoker/src/main/scala/whisk/core/containerpool/DefaultContainerProxy.scala
@@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool
+
+import java.time.Instant
+
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.util.Success
+import scala.util.Failure
+import akka.actor.FSM
+import akka.actor.Props
+import akka.actor.Stash
+import akka.actor.Status.{Failure => FailureMessage}
+import akka.pattern.pipe
+import spray.json._
+import spray.json.DefaultJsonProtocol._
+import whisk.common.{AkkaLogging, Counter, LoggingMarkers, TransactionId}
+import whisk.core.connector.ActivationMessage
+import whisk.core.containerpool.logging.LogCollectingException
+import whisk.core.entity._
+import whisk.core.entity.size._
+import whisk.core.entity.ExecManifest.ImageName
+import whisk.http.Messages
+import akka.event.Logging.InfoLevel
+import pureconfig.loadConfigOrThrow
+import whisk.core.ConfigKeys
+
+// States
+sealed trait ContainerState
+case object Uninitialized extends ContainerState
+case object Starting extends ContainerState
+case object Started extends ContainerState
+case object Running extends ContainerState
+case object Ready extends ContainerState
+case object Pausing extends ContainerState
+case object Paused extends ContainerState
+case object Removing extends ContainerState
+
+// Data
+sealed abstract class ContainerData(val lastUsed: Instant)
+case class NoData() extends ContainerData(Instant.EPOCH)
+case class PreWarmedData(container: Container, kind: String, memoryLimit:
ByteSize) extends ContainerData(Instant.EPOCH)
+case class WarmedData(container: Container,
+ invocationNamespace: EntityName,
+ action: ExecutableWhiskAction,
+ override val lastUsed: Instant)
+ extends ContainerData(lastUsed)
+
+// Events received by the actor
+case class Start(exec: CodeExec[_], memoryLimit: ByteSize)
+case class Run(action: ExecutableWhiskAction, msg: ActivationMessage,
retryLogDeadline: Option[Deadline] = None)
+case object Remove
+
+// Events sent by the actor
+case class NeedWork(data: ContainerData)
+case object ContainerPaused
+case object ContainerRemoved // when container is destroyed
+case object RescheduleJob // job is sent back to parent and could not be
processed because container is being destroyed
+
+/**
+ * A proxy that wraps a Container. It is used to keep track of the lifecycle
+ * of a container and to guarantee a contract between the client of the
container
+ * and the container itself.
+ *
+ * The contract is as follows:
+ * 1. Only one job is to be sent to the ContainerProxy at one time.
ContainerProxy
+ * will delay all further jobs until a previous job has finished.
+ * 2. The next job can be sent to the ContainerProxy after it indicates
available
+ * capacity by sending NeedWork to its parent.
+ * 3. A Remove message can be sent at any point in time. Like multiple jobs
though,
+ * it will be delayed until the currently running job finishes.
+ *
+ * @constructor
+ * @param factory a function generating a Container
+ * @param sendActiveAck a function sending the activation via active ack
+ * @param storeActivation a function storing the activation in a persistent
store
+ * @param unusedTimeout time after which the container is automatically thrown
away
+ * @param pauseGrace time to wait for new work before pausing the container
+ */
+class DefaultContainerProxy(
+ factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) =>
Future[Container],
+ sendActiveAck: (TransactionId, WhiskActivation, Boolean, InstanceId, UUID)
=> Future[Any],
+ storeActivation: (TransactionId, WhiskActivation) => Future[Any],
+ collectLogs: (TransactionId, Identity, WhiskActivation, Container,
ExecutableWhiskAction) => Future[ActivationLogs],
+ instance: InstanceId,
+ poolConfig: ContainerPoolConfig,
+ unusedTimeout: FiniteDuration,
+ pauseGrace: FiniteDuration)
+ extends FSM[ContainerState, ContainerData]
+ with Stash {
+ implicit val ec = context.system.dispatcher
+ implicit val logging = new AkkaLogging(context.system.log)
+ var rescheduleJob = false // true iff actor receives a job but cannot
process it because actor will destroy itself
+
+ startWith(Uninitialized, NoData())
+
+ when(Uninitialized) {
+ // pre warm a container (creates a stem cell container)
+ case Event(job: Start, _) =>
+ factory(
+ TransactionId.invokerWarmup,
+ DefaultContainerProxy.containerName(instance, "prewarm",
job.exec.kind),
+ job.exec.image,
+ job.exec.pull,
+ job.memoryLimit,
+ poolConfig.cpuShare)
+ .map(container => PreWarmedData(container, job.exec.kind,
job.memoryLimit))
+ .pipeTo(self)
+
+ goto(Starting)
+
+ // cold start (no container to reuse or available stem cell container)
+ case Event(job: Run, _) =>
+ implicit val transid = job.msg.transid
+
+ // create a new container
+ val container = factory(
+ job.msg.transid,
+ DefaultContainerProxy.containerName(instance,
job.msg.user.namespace.name, job.action.name.name),
+ job.action.exec.image,
+ job.action.exec.pull,
+ job.action.limits.memory.megabytes.MB,
+ poolConfig.cpuShare)
+
+ // container factory will either yield a new container ready to execute
the action, or
+ // starting up the container failed; for the latter, it's either an
internal error starting
+ // a container or a docker action that is not conforming to the required
action API
+ container
+ .andThen {
+ case Success(container) =>
+ // the container is ready to accept an activation; register it as
PreWarmed; this
+ // normalizes the life cycle for containers and their cleanup when
activations fail
+ self ! PreWarmedData(container, job.action.exec.kind,
job.action.limits.memory.megabytes.MB)
+
+ case Failure(t) =>
+ // the container did not come up cleanly, so disambiguate the
failure mode and then cleanup
+ // the failure is either the system fault, or for docker actions,
the application/developer fault
+ val response = t match {
+ case WhiskContainerStartupError(msg) =>
ActivationResponse.whiskError(msg)
+ case BlackboxStartupError(msg) =>
ActivationResponse.applicationError(msg)
+ case _ =>
ActivationResponse.whiskError(Messages.resourceProvisionError)
+ }
+ // construct an appropriate activation and record it in the
datastore,
+ // also update the feed and active ack; the container cleanup is
queued
+ // implicitly via a FailureMessage which will be processed later
when the state
+ // transitions to Running
+ val activation =
DefaultContainerProxy.constructWhiskActivation(job, None, Interval.zero,
response)
+ sendActiveAck(transid, activation, job.msg.blocking,
job.msg.rootControllerIndex, job.msg.user.authkey.uuid)
+ storeActivation(transid, activation)
+ }
+ .flatMap { container =>
+ // now attempt to inject the user code and run the action
+ initializeAndRun(container, job)
+ .map(_ => WarmedData(container, job.msg.user.namespace,
job.action, Instant.now))
+ }
+ .pipeTo(self)
+
+ goto(Running)
+ }
+
+ when(Starting) {
+ // container was successfully obtained
+ case Event(data: PreWarmedData, _) =>
+ context.parent ! NeedWork(data)
+ goto(Started) using data
+
+ // container creation failed
+ case Event(_: FailureMessage, _) =>
+ context.parent ! ContainerRemoved
+ stop()
+
+ case _ => delay
+ }
+
+ when(Started) {
+ case Event(job: Run, data: PreWarmedData) =>
+ implicit val transid = job.msg.transid
+ initializeAndRun(data.container, job)
+ .map(_ => WarmedData(data.container, job.msg.user.namespace,
job.action, Instant.now))
+ .pipeTo(self)
+
+ goto(Running)
+
+ case Event(Remove, data: PreWarmedData) => destroyContainer(data.container)
+ }
+
+ when(Running) {
+ // Intermediate state, we were able to start a container
+ // and we keep it in case we need to destroy it.
+ case Event(data: PreWarmedData, _) => stay using data
+
+ // Run was successful
+ case Event(data: WarmedData, _) =>
+ context.parent ! NeedWork(data)
+ goto(Ready) using data
+
+ // Failed after /init (the first run failed)
+ case Event(_: FailureMessage, data: PreWarmedData) =>
destroyContainer(data.container)
+
+ // Failed for a subsequent /run
+ case Event(_: FailureMessage, data: WarmedData) =>
destroyContainer(data.container)
+
+ // Failed at getting a container for a cold-start run
+ case Event(_: FailureMessage, _) =>
+ context.parent ! ContainerRemoved
+ stop()
+
+ case _ => delay
+ }
+
+ when(Ready, stateTimeout = pauseGrace) {
+ case Event(job: Run, data: WarmedData) =>
+ implicit val transid = job.msg.transid
+ initializeAndRun(data.container, job)
+ .map(_ => WarmedData(data.container, job.msg.user.namespace,
job.action, Instant.now))
+ .pipeTo(self)
+
+ goto(Running)
+
+ // pause grace timed out
+ case Event(StateTimeout, data: WarmedData) =>
+ data.container.suspend()(TransactionId.invokerNanny).map(_ =>
ContainerPaused).pipeTo(self)
+ goto(Pausing)
+
+ case Event(Remove, data: WarmedData) => destroyContainer(data.container)
+ }
+
+ when(Pausing) {
+ case Event(ContainerPaused, data: WarmedData) => goto(Paused)
+ case Event(_: FailureMessage, data: WarmedData) =>
destroyContainer(data.container)
+ case _ => delay
+ }
+
+ when(Paused, stateTimeout = unusedTimeout) {
+ case Event(job: Run, data: WarmedData) =>
+ implicit val transid = job.msg.transid
+ data.container
+ .resume()
+ .andThen {
+ // Sending the message to self on a failure will cause the message
+ // to ultimately be sent back to the parent (which will retry it)
+ // when container removal is done.
+ case Failure(_) =>
+ rescheduleJob = true
+ self ! job
+ }
+ .flatMap(_ => initializeAndRun(data.container, job))
+ .map(_ => WarmedData(data.container, job.msg.user.namespace,
job.action, Instant.now))
+ .pipeTo(self)
+
+ goto(Running)
+
+ // container is reclaimed by the pool or it has become too old
+ case Event(StateTimeout | Remove, data: WarmedData) =>
+ rescheduleJob = true // to supress sending message to the pool and not
double count
+ destroyContainer(data.container)
+ }
+
+ when(Removing) {
+ case Event(job: Run, _) =>
+ // Send the job back to the pool to be rescheduled
+ context.parent ! job
+ stay
+ case Event(ContainerRemoved, _) => stop()
+ case Event(_: FailureMessage, _) => stop()
+ }
+
+ // Unstash all messages stashed while in intermediate state
+ onTransition {
+ case _ -> Started => unstashAll()
+ case _ -> Ready => unstashAll()
+ case _ -> Paused => unstashAll()
+ case _ -> Removing => unstashAll()
+ }
+
+ initialize()
+
+ /** Delays all incoming messages until unstashAll() is called */
+ def delay = {
+ stash()
+ stay
+ }
+
+ /**
+ * Destroys the container after unpausing it if needed. Can be used
+ * as a state progression as it goes to Removing.
+ *
+ * @param container the container to destroy
+ */
+ def destroyContainer(container: Container) = {
+ if (!rescheduleJob) {
+ context.parent ! ContainerRemoved
+ } else {
+ context.parent ! RescheduleJob
+ }
+
+ val unpause = stateName match {
+ case Paused => container.resume()(TransactionId.invokerNanny)
+ case _ => Future.successful(())
+ }
+
+ unpause
+ .flatMap(_ => container.destroy()(TransactionId.invokerNanny))
+ .map(_ => ContainerRemoved)
+ .pipeTo(self)
+
+ goto(Removing)
+ }
+
+ /**
+ * Runs the job, initialize first if necessary.
+ * Completes the job by:
+ * 1. sending an activate ack,
+ * 2. fetching the logs for the run,
+ * 3. indicating the resource is free to the parent pool,
+ * 4. recording the result to the data store
+ *
+ * @param container the container to run the job on
+ * @param job the job to run
+ * @return a future completing after logs have been collected and
+ * added to the WhiskActivation
+ */
+ def initializeAndRun(container: Container, job: Run)(implicit tid:
TransactionId): Future[WhiskActivation] = {
+ val actionTimeout = job.action.limits.timeout.duration
+
+ // Only initialize iff we haven't yet warmed the container
+ val initialize = stateData match {
+ case data: WarmedData => Future.successful(None)
+ case _ =>
container.initialize(job.action.containerInitializer,
actionTimeout).map(Some(_))
+ }
+
+ val activation: Future[WhiskActivation] = initialize
+ .flatMap { initInterval =>
+ val parameters = job.msg.content getOrElse JsObject()
+
+ val environment = JsObject(
+ "api_key" -> job.msg.user.authkey.compact.toJson,
+ "namespace" -> job.msg.user.namespace.toJson,
+ "action_name" -> job.msg.action.qualifiedNameWithLeadingSlash.toJson,
+ "activation_id" -> job.msg.activationId.toString.toJson,
+ // compute deadline on invoker side avoids discrepancies inside
container
+ // but potentially under-estimates actual deadline
+ "deadline" -> (Instant.now.toEpochMilli +
actionTimeout.toMillis).toString.toJson)
+
+ container.run(parameters, environment,
actionTimeout)(job.msg.transid).map {
+ case (runInterval, response) =>
+ val initRunInterval = initInterval
+ .map(i =>
Interval(runInterval.start.minusMillis(i.duration.toMillis), runInterval.end))
+ .getOrElse(runInterval)
+ DefaultContainerProxy.constructWhiskActivation(job, initInterval,
initRunInterval, response)
+ }
+ }
+ .recover {
+ case InitializationError(interval, response) =>
+ DefaultContainerProxy.constructWhiskActivation(job, Some(interval),
interval, response)
+ case t =>
+ // Actually, this should never happen - but we want to make sure to
not miss a problem
+ logging.error(this, s"caught unexpected error while running
activation: ${t}")
+ DefaultContainerProxy.constructWhiskActivation(
+ job,
+ None,
+ Interval.zero,
+ ActivationResponse.whiskError(Messages.abnormalRun))
+ }
+
+ // Sending active ack. Entirely asynchronous and not waited upon.
+ activation.foreach(sendActiveAck(tid, _, job.msg.blocking,
job.msg.rootControllerIndex, job.msg.user.authkey.uuid))
+
+ // Adds logs to the raw activation.
+ val activationWithLogs: Future[Either[ActivationLogReadingError,
WhiskActivation]] = activation
+ .flatMap { activation =>
+ // Skips log collection entirely, if the limit is set to 0
+ if (job.action.limits.logs.asMegaBytes == 0.MB) {
+ Future.successful(Right(activation))
+ } else {
+ val start = tid.started(this, LoggingMarkers.INVOKER_COLLECT_LOGS,
logLevel = InfoLevel)
+ collectLogs(tid, job.msg.user, activation, container, job.action)
+ .andThen {
+ case Success(_) => tid.finished(this, start)
+ case Failure(t) => tid.failed(this, start, s"reading logs
failed: $t")
+ }
+ .map(logs => Right(activation.withLogs(logs)))
+ .recover {
+ case LogCollectingException(logs) =>
+ Left(ActivationLogReadingError(activation.withLogs(logs)))
+ case _ =>
+
Left(ActivationLogReadingError(activation.withLogs(ActivationLogs(Vector(Messages.logFailure)))))
+ }
+ }
+ }
+
+ // Storing the record. Entirely asynchronous and not waited upon.
+ activationWithLogs.map(_.fold(_.activation,
identity)).foreach(storeActivation(tid, _))
+
+ // Disambiguate activation errors and transform the Either into a
failed/successful Future respectively.
+ activationWithLogs.flatMap {
+ case Right(act) if !act.response.isSuccess =>
Future.failed(ActivationUnsuccessfulError(act))
+ case Left(error) => Future.failed(error)
+ case Right(act) => Future.successful(act)
+ }
+ }
+}
+
+final case class ContainerProxyTimeoutConfig(idleContainer: FiniteDuration,
pauseGrace: FiniteDuration)
+
+object DefaultContainerProxy {
+ def props(
+ factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) =>
Future[Container],
+ ack: (TransactionId, WhiskActivation, Boolean, InstanceId, UUID) =>
Future[Any],
+ store: (TransactionId, WhiskActivation) => Future[Any],
+ collectLogs: (TransactionId, Identity, WhiskActivation, Container,
ExecutableWhiskAction) => Future[ActivationLogs],
+ instance: InstanceId,
+ poolConfig: ContainerPoolConfig,
+ unusedTimeout: FiniteDuration = timeouts.idleContainer,
+ pauseGrace: FiniteDuration = timeouts.pauseGrace) =
+ Props(new DefaultContainerProxy(factory, ack, store, collectLogs,
instance, poolConfig, unusedTimeout, pauseGrace))
+
+ // Needs to be thread-safe as it's used by multiple proxies concurrently.
+ private val containerCount = new Counter
+
+ val timeouts =
loadConfigOrThrow[ContainerProxyTimeoutConfig](ConfigKeys.containerProxyTimeouts)
+
+ /**
+ * Generates a unique container name.
+ *
+ * @param prefix the container name's prefix
+ * @param suffix the container name's suffix
+ * @return a unique container name
+ */
+ def containerName(instance: InstanceId, prefix: String, suffix: String):
String = {
+ def isAllowed(c: Char): Boolean = c.isLetterOrDigit || c == '_'
+
+ val sanitizedPrefix = prefix.filter(isAllowed)
+ val sanitizedSuffix = suffix.filter(isAllowed)
+
+
s"${ContainerFactory.containerNamePrefix(instance)}_${containerCount.next()}_${sanitizedPrefix}_${sanitizedSuffix}"
+ }
+
+ /**
+ * Creates a WhiskActivation ready to be sent via active ack.
+ *
+ * @param job the job that was executed
+ * @param interval the time it took to execute the job
+ * @param response the response to return to the user
+ * @return a WhiskActivation to be sent to the user
+ */
+ def constructWhiskActivation(job: Run,
+ initInterval: Option[Interval],
+ totalInterval: Interval,
+ response: ActivationResponse) = {
+ val causedBy = Some {
+ if (job.msg.causedBySequence) {
+ Parameters(WhiskActivation.causedByAnnotation, JsString(Exec.SEQUENCE))
+ } else {
+ // emit the internal system hold time as the 'wait' time, but only for
non-sequence
+ // actions, since the transid start time for a sequence does not
correspond
+ // with a specific component of the activation but the entire sequence;
+ // it will require some work to generate a new transaction id for a
sequence
+ // component - however, because the trace of activations is recorded
in the parent
+ // sequence, a client can determine the queue time for sequences that
way
+ val end = initInterval.map(_.start).getOrElse(totalInterval.start)
+ Parameters(
+ WhiskActivation.waitTimeAnnotation,
+ Interval(job.msg.transid.meta.start, end).duration.toMillis.toJson)
+ }
+ }
+
+ val initTime = {
+ initInterval.map(initTime =>
Parameters(WhiskActivation.initTimeAnnotation,
initTime.duration.toMillis.toJson))
+ }
+
+ WhiskActivation(
+ activationId = job.msg.activationId,
+ namespace = job.msg.user.namespace.toPath,
+ subject = job.msg.user.subject,
+ cause = job.msg.cause,
+ name = job.action.name,
+ version = job.action.version,
+ start = totalInterval.start,
+ end = totalInterval.end,
+ duration = Some(totalInterval.duration.toMillis),
+ response = response,
+ annotations = {
+ Parameters(WhiskActivation.limitsAnnotation, job.action.limits.toJson)
++
+ Parameters(WhiskActivation.pathAnnotation,
JsString(job.action.fullyQualifiedName(false).asString)) ++
+ Parameters(WhiskActivation.kindAnnotation,
JsString(job.action.exec.kind)) ++
+ causedBy ++ initTime
+ })
+ }
+}
+
+/** Indicates that something went wrong with an activation and the container
should be removed */
+trait ActivationError extends Exception {
+ val activation: WhiskActivation
+}
+
+/** Indicates an activation with a non-successful response */
+case class ActivationUnsuccessfulError(activation: WhiskActivation) extends
ActivationError
+
+/** Indicates reading logs for an activation failed (terminally, truncated) */
+case class ActivationLogReadingError(activation: WhiskActivation) extends
ActivationError
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index 4a69d3cf99..eda01290fb 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -17,32 +17,26 @@
package whisk.core.invoker
-import scala.concurrent.Await
-import scala.concurrent.duration._
-import scala.concurrent.Future
-import scala.util.Failure
-import scala.util.Try
+import akka.Done
+import akka.actor.{ActorSystem, CoordinatedShutdown}
+import akka.stream.ActorMaterializer
import kamon.Kamon
-import org.apache.curator.retry.RetryUntilElapsed
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.recipes.shared.SharedCount
-import akka.Done
-import akka.actor.ActorSystem
-import akka.actor.CoordinatedShutdown
-import akka.stream.ActorMaterializer
-import whisk.common.AkkaLogging
-import whisk.common.Scheduler
+import org.apache.curator.retry.RetryUntilElapsed
+import whisk.common.{AkkaLogging, Scheduler, TransactionId}
import whisk.core.WhiskConfig
import whisk.core.WhiskConfig._
-import whisk.core.connector.MessagingProvider
-import whisk.core.connector.PingMessage
-import whisk.core.entity._
-import whisk.core.entity.ExecManifest
-import whisk.core.entity.InstanceId
+import whisk.core.connector.{MessagingProvider, PingMessage}
+import whisk.core.containerpool.ContainerPoolProvider
+import whisk.core.entity.{ExecManifest, InstanceId}
import whisk.http.{BasicHttpService, BasicRasService}
import whisk.spi.SpiLoader
import whisk.utils.ExecutionContextFactory
-import whisk.common.TransactionId
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration._
+import scala.util.{Failure, Try}
case class CmdLineArgs(name: Option[String] = None, id: Option[Int] = None)
@@ -56,6 +50,7 @@ object Invoker {
ExecManifest.requiredProperties ++
kafkaHosts ++
zookeeperHosts ++
+ SpiLoader.get[ContainerPoolProvider].requiredProperties ++
wskApiHost ++ Map(dockerImageTag -> "latest") ++
Map(invokerName -> "")
diff --git
a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 20cbbd46cd..ce5744893e 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -20,26 +20,26 @@ package whisk.core.invoker
import java.nio.charset.StandardCharsets
import java.time.Instant
-import akka.actor.{ActorRefFactory, ActorSystem, Props}
+import akka.actor.{ActorSystem, Props}
import akka.event.Logging.InfoLevel
import akka.stream.ActorMaterializer
import org.apache.kafka.common.errors.RecordTooLargeException
import pureconfig._
+import spray.json.DefaultJsonProtocol._
import spray.json._
import whisk.common._
-import whisk.core.{ConfigKeys, WhiskConfig}
import whisk.core.connector._
import whisk.core.containerpool._
import whisk.core.containerpool.logging.LogStoreProvider
import whisk.core.database._
import whisk.core.entity._
+import whisk.core.{ConfigKeys, WhiskConfig}
import whisk.http.Messages
import whisk.spi.SpiLoader
-import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
-import DefaultJsonProtocol._
class InvokerReactive(
config: WhiskConfig,
@@ -167,10 +167,15 @@ class InvokerReactive(
}
/** Creates a ContainerProxy Actor when being called. */
- private val childFactory = (f: ActorRefFactory) =>
- f.actorOf(
- ContainerProxy
- .props(containerFactory.createContainer, ack, store,
logsProvider.collectLogs, instance, poolConfig))
+ private val childFactory = SpiLoader
+ .get[ContainerPoolProvider]
+ .getContainerProxyFactory(
+ containerFactory.createContainer,
+ ack,
+ store,
+ logsProvider.collectLogs,
+ instance,
+ poolConfig)
val prewarmingConfigs: List[PrewarmingConfig] = {
ExecManifest.runtimesManifest.stemcells.flatMap {
@@ -181,8 +186,10 @@ class InvokerReactive(
}.toList
}
- private val pool =
- actorSystem.actorOf(ContainerPool.props(childFactory, poolConfig,
activationFeed, prewarmingConfigs))
+ private val pool = actorSystem.actorOf(
+ SpiLoader
+ .get[ContainerPoolProvider]
+ .props(childFactory, poolConfig, activationFeed, prewarmingConfigs))
/** Is called when an ActivationMessage is read from Kafka */
def processActivationMessage(bytes: Array[Byte]): Future[Unit] = {
diff --git
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
b/tests/src/test/scala/whisk/core/containerpool/test/DefaultContainerPoolTests.scala
similarity index 84%
rename from
tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
rename to
tests/src/test/scala/whisk/core/containerpool/test/DefaultContainerPoolTests.scala
index 3fe12538fe..3634417bfa 100644
---
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
+++
b/tests/src/test/scala/whisk/core/containerpool/test/DefaultContainerPoolTests.scala
@@ -19,30 +19,21 @@ package whisk.core.containerpool.test
import java.time.Instant
-import scala.collection.mutable
-import scala.concurrent.duration._
-
+import akka.actor.{ActorRefFactory, ActorSystem}
+import akka.testkit.{ImplicitSender, TestKit, TestProbe}
import org.junit.runner.RunWith
import org.scalamock.scalatest.MockFactory
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.FlatSpec
-import org.scalatest.FlatSpecLike
-import org.scalatest.Matchers
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, FlatSpecLike, Matchers}
import org.scalatest.junit.JUnitRunner
-
-import akka.actor.ActorRefFactory
-import akka.actor.ActorSystem
-import akka.testkit.ImplicitSender
-import akka.testkit.TestKit
-import akka.testkit.TestProbe
import whisk.common.TransactionId
-import whisk.core.connector.ActivationMessage
+import whisk.core.connector.{ActivationMessage, MessageFeed}
import whisk.core.containerpool._
+import whisk.core.entity.ExecManifest.{ImageName, RuntimeManifest}
import whisk.core.entity._
-import whisk.core.entity.ExecManifest.RuntimeManifest
-import whisk.core.entity.ExecManifest.ImageName
import whisk.core.entity.size._
-import whisk.core.connector.MessageFeed
+
+import scala.collection.mutable
+import scala.concurrent.duration._
/**
* Behavior tests for the ContainerPool
@@ -50,7 +41,7 @@ import whisk.core.connector.MessageFeed
* These tests test the runtime behavior of a ContainerPool actor.
*/
@RunWith(classOf[JUnitRunner])
-class ContainerPoolTests
+class DefaultContainerPoolTests
extends TestKit(ActorSystem("ContainerPool"))
with ImplicitSender
with FlatSpecLike
@@ -123,7 +114,7 @@ class ContainerPoolTests
it should "reuse a warm container" in within(timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
- val pool = system.actorOf(ContainerPool.props(factory,
ContainerPoolConfig(2, 2), feed.ref))
+ val pool = system.actorOf(DefaultContainerPool.props(factory,
ContainerPoolConfig(2, 2), feed.ref))
pool ! runMessage
containers(0).expectMsg(runMessage)
@@ -137,7 +128,7 @@ class ContainerPoolTests
it should "reuse a warm container when action is the same even if revision
changes" in within(timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
- val pool = system.actorOf(ContainerPool.props(factory,
ContainerPoolConfig(2, 2), feed.ref))
+ val pool = system.actorOf(DefaultContainerPool.props(factory,
ContainerPoolConfig(2, 2), feed.ref))
pool ! runMessage
containers(0).expectMsg(runMessage)
@@ -152,7 +143,7 @@ class ContainerPoolTests
val (containers, factory) = testContainers(2)
val feed = TestProbe()
- val pool = system.actorOf(ContainerPool.props(factory,
ContainerPoolConfig(2, 2), feed.ref))
+ val pool = system.actorOf(DefaultContainerPool.props(factory,
ContainerPoolConfig(2, 2), feed.ref))
pool ! runMessage
containers(0).expectMsg(runMessage)
// Note that the container doesn't respond, thus it's not free to take work
@@ -166,7 +157,7 @@ class ContainerPoolTests
val feed = TestProbe()
// a pool with only 1 slot
- val pool = system.actorOf(ContainerPool.props(factory,
ContainerPoolConfig(1, 1), feed.ref))
+ val pool = system.actorOf(DefaultContainerPool.props(factory,
ContainerPoolConfig(1, 1), feed.ref))
pool ! runMessage
containers(0).expectMsg(runMessage)
containers(0).send(pool, NeedWork(warmedData()))
@@ -181,7 +172,7 @@ class ContainerPoolTests
val feed = TestProbe()
// a pool with only 1 active slot but 2 slots in total
- val pool = system.actorOf(ContainerPool.props(factory,
ContainerPoolConfig(1, 2), feed.ref))
+ val pool = system.actorOf(DefaultContainerPool.props(factory,
ContainerPoolConfig(1, 2), feed.ref))
// Run the first container
pool ! runMessage
@@ -207,7 +198,7 @@ class ContainerPoolTests
val feed = TestProbe()
// a pool with only 1 slot
- val pool = system.actorOf(ContainerPool.props(factory,
ContainerPoolConfig(1, 1), feed.ref))
+ val pool = system.actorOf(DefaultContainerPool.props(factory,
ContainerPoolConfig(1, 1), feed.ref))
pool ! runMessage
containers(0).expectMsg(runMessage)
containers(0).send(pool, NeedWork(warmedData()))
@@ -222,7 +213,7 @@ class ContainerPoolTests
val feed = TestProbe()
// a pool with only 1 slot
- val pool = system.actorOf(ContainerPool.props(factory,
ContainerPoolConfig(1, 1), feed.ref))
+ val pool = system.actorOf(DefaultContainerPool.props(factory,
ContainerPoolConfig(1, 1), feed.ref))
pool ! runMessage
containers(0).expectMsg(runMessage)
containers(0).send(pool, RescheduleJob) // emulate container failure ...
@@ -240,7 +231,8 @@ class ContainerPoolTests
val pool =
system.actorOf(
- ContainerPool.props(factory, ContainerPoolConfig(0, 0), feed.ref,
List(PrewarmingConfig(1, exec, memoryLimit))))
+ DefaultContainerPool
+ .props(factory, ContainerPoolConfig(0, 0), feed.ref,
List(PrewarmingConfig(1, exec, memoryLimit))))
containers(0).expectMsg(Start(exec, memoryLimit))
}
@@ -250,7 +242,8 @@ class ContainerPoolTests
val pool =
system.actorOf(
- ContainerPool.props(factory, ContainerPoolConfig(1, 1), feed.ref,
List(PrewarmingConfig(1, exec, memoryLimit))))
+ DefaultContainerPool
+ .props(factory, ContainerPoolConfig(1, 1), feed.ref,
List(PrewarmingConfig(1, exec, memoryLimit))))
containers(0).expectMsg(Start(exec, memoryLimit))
containers(0).send(pool, NeedWork(preWarmedData(exec.kind)))
pool ! runMessage
@@ -264,7 +257,7 @@ class ContainerPoolTests
val alternativeExec = CodeExecAsString(RuntimeManifest("anotherKind",
ImageName("testImage")), "testCode", None)
val pool = system.actorOf(
- ContainerPool
+ DefaultContainerPool
.props(factory, ContainerPoolConfig(1, 1), feed.ref,
List(PrewarmingConfig(1, alternativeExec, memoryLimit))))
containers(0).expectMsg(Start(alternativeExec, memoryLimit)) // container0
was prewarmed
containers(0).send(pool, NeedWork(preWarmedData(alternativeExec.kind)))
@@ -280,7 +273,7 @@ class ContainerPoolTests
val pool =
system.actorOf(
- ContainerPool
+ DefaultContainerPool
.props(factory, ContainerPoolConfig(1, 1), feed.ref,
List(PrewarmingConfig(1, exec, alternativeLimit))))
containers(0).expectMsg(Start(exec, alternativeLimit)) // container0 was
prewarmed
containers(0).send(pool, NeedWork(preWarmedData(exec.kind,
alternativeLimit)))
@@ -295,7 +288,7 @@ class ContainerPoolTests
val (containers, factory) = testContainers(2)
val feed = TestProbe()
- val pool = system.actorOf(ContainerPool.props(factory,
ContainerPoolConfig(2, 2), feed.ref))
+ val pool = system.actorOf(DefaultContainerPool.props(factory,
ContainerPoolConfig(2, 2), feed.ref))
// container0 is created and used
pool ! runMessage
@@ -323,7 +316,7 @@ class ContainerPoolTests
* of the ContainerPool object.
*/
@RunWith(classOf[JUnitRunner])
-class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory
{
+class DefaultContainerPoolObjectTests extends FlatSpec with Matchers with
MockFactory {
val actionExec = CodeExecAsString(RuntimeManifest("actionKind",
ImageName("testImage")), "testCode", None)
val standardNamespace = EntityName("standardNamespace")
@@ -348,7 +341,7 @@ class ContainerPoolObjectTests extends FlatSpec with
Matchers with MockFactory {
behavior of "ContainerPool schedule()"
it should "not provide a container if idle pool is empty" in {
- ContainerPool.schedule(createAction(), standardNamespace, Map()) shouldBe
None
+ DefaultContainerPool.schedule(createAction(), standardNamespace, Map())
shouldBe None
}
it should "reuse an applicable warm container from idle pool with one
container" in {
@@ -356,14 +349,14 @@ class ContainerPoolObjectTests extends FlatSpec with
Matchers with MockFactory {
val pool = Map('name -> data)
// copy to make sure, referencial equality doesn't suffice
- ContainerPool.schedule(data.action.copy(), data.invocationNamespace, pool)
shouldBe Some('name, data)
+ DefaultContainerPool.schedule(data.action.copy(),
data.invocationNamespace, pool) shouldBe Some('name, data)
}
it should "reuse an applicable warm container from idle pool with several
applicable containers" in {
val data = warmedData()
val pool = Map('first -> data, 'second -> data)
- ContainerPool.schedule(data.action.copy(), data.invocationNamespace, pool)
should (be(Some('first, data)) or be(
+ DefaultContainerPool.schedule(data.action.copy(),
data.invocationNamespace, pool) should (be(Some('first, data)) or be(
Some('second, data)))
}
@@ -371,7 +364,7 @@ class ContainerPoolObjectTests extends FlatSpec with
Matchers with MockFactory {
val matchingData = warmedData()
val pool = Map('none -> noData(), 'pre -> preWarmedData(), 'warm ->
matchingData)
- ContainerPool.schedule(matchingData.action.copy(),
matchingData.invocationNamespace, pool) shouldBe Some(
+ DefaultContainerPool.schedule(matchingData.action.copy(),
matchingData.invocationNamespace, pool) shouldBe Some(
'warm,
matchingData)
}
@@ -381,7 +374,7 @@ class ContainerPoolObjectTests extends FlatSpec with
Matchers with MockFactory {
// data is **not** in the pool!
val pool = Map('none -> noData(), 'pre -> preWarmedData())
- ContainerPool.schedule(data.action.copy(), data.invocationNamespace, pool)
shouldBe None
+ DefaultContainerPool.schedule(data.action.copy(),
data.invocationNamespace, pool) shouldBe None
}
it should "not reuse a warm container with different invocation namespace"
in {
@@ -390,7 +383,7 @@ class ContainerPoolObjectTests extends FlatSpec with
Matchers with MockFactory {
val differentNamespace = EntityName(data.invocationNamespace.asString +
"butDifferent")
data.invocationNamespace should not be differentNamespace
- ContainerPool.schedule(data.action.copy(), differentNamespace, pool)
shouldBe None
+ DefaultContainerPool.schedule(data.action.copy(), differentNamespace,
pool) shouldBe None
}
it should "not reuse a warm container with different action name" in {
@@ -399,7 +392,7 @@ class ContainerPoolObjectTests extends FlatSpec with
Matchers with MockFactory {
val pool = Map('warm -> data)
data.action.name should not be differentAction.name
- ContainerPool.schedule(differentAction, data.invocationNamespace, pool)
shouldBe None
+ DefaultContainerPool.schedule(differentAction, data.invocationNamespace,
pool) shouldBe None
}
it should "not reuse a warm container with different action version" in {
@@ -408,24 +401,24 @@ class ContainerPoolObjectTests extends FlatSpec with
Matchers with MockFactory {
val pool = Map('warm -> data)
data.action.version should not be differentAction.version
- ContainerPool.schedule(differentAction, data.invocationNamespace, pool)
shouldBe None
+ DefaultContainerPool.schedule(differentAction, data.invocationNamespace,
pool) shouldBe None
}
behavior of "ContainerPool remove()"
it should "not provide a container if pool is empty" in {
- ContainerPool.remove(Map()) shouldBe None
+ DefaultContainerPool.remove(Map()) shouldBe None
}
it should "not provide a container from busy pool with non-warm containers"
in {
val pool = Map('none -> noData(), 'pre -> preWarmedData())
- ContainerPool.remove(pool) shouldBe None
+ DefaultContainerPool.remove(pool) shouldBe None
}
it should "provide a container from pool with one single free container" in {
val data = warmedData()
val pool = Map('warm -> data)
- ContainerPool.remove(pool) shouldBe Some('warm)
+ DefaultContainerPool.remove(pool) shouldBe Some('warm)
}
it should "provide oldest container from busy pool with multiple containers"
in {
@@ -436,6 +429,6 @@ class ContainerPoolObjectTests extends FlatSpec with
Matchers with MockFactory {
val pool = Map('first -> first, 'second -> second, 'oldest -> oldest)
- ContainerPool.remove(pool) shouldBe Some('oldest)
+ DefaultContainerPool.remove(pool) shouldBe Some('oldest)
}
}
diff --git
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
b/tests/src/test/scala/whisk/core/containerpool/test/DefaultContainerProxyTests.scala
similarity index 98%
rename from
tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
rename to
tests/src/test/scala/whisk/core/containerpool/test/DefaultContainerProxyTests.scala
index 205834ed40..cefa6bd437 100644
---
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++
b/tests/src/test/scala/whisk/core/containerpool/test/DefaultContainerProxyTests.scala
@@ -43,7 +43,7 @@ import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future, Promise}
@RunWith(classOf[JUnitRunner])
-class ContainerProxyTests
+class DefaultContainerProxyTests
extends TestKit(ActorSystem("ContainerProxys"))
with ImplicitSender
with FlatSpecLike
@@ -178,7 +178,7 @@ class ContainerProxyTests
val store = createStore
val machine =
childActorOf(
- ContainerProxy
+ DefaultContainerProxy
.props(
factory,
createAcker(),
@@ -207,7 +207,7 @@ class ContainerProxyTests
val machine =
childActorOf(
- ContainerProxy
+ DefaultContainerProxy
.props(factory, acker, store, collector, InstanceId(0), poolConfig,
pauseGrace = timeout))
registerCallback(machine)
@@ -244,7 +244,7 @@ class ContainerProxyTests
val machine =
childActorOf(
- ContainerProxy
+ DefaultContainerProxy
.props(factory, acker, store, collector, InstanceId(0), poolConfig,
pauseGrace = timeout))
registerCallback(machine)
preWarm(machine)
@@ -292,7 +292,7 @@ class ContainerProxyTests
val machine =
childActorOf(
- ContainerProxy
+ DefaultContainerProxy
.props(factory, acker, store, collector, InstanceId(0), poolConfig,
pauseGrace = timeout))
registerCallback(machine)
preWarm(machine)
@@ -331,7 +331,7 @@ class ContainerProxyTests
val machine =
childActorOf(
- ContainerProxy
+ DefaultContainerProxy
.props(factory, acker, store, collector, InstanceId(0), poolConfig,
pauseGrace = timeout))
registerCallback(machine)
run(machine, Uninitialized)
@@ -364,7 +364,7 @@ class ContainerProxyTests
val machine =
childActorOf(
- ContainerProxy
+ DefaultContainerProxy
.props(factory, acker, store, collector, InstanceId(0), poolConfig,
pauseGrace = timeout))
registerCallback(machine)
@@ -395,7 +395,7 @@ class ContainerProxyTests
val machine =
childActorOf(
- ContainerProxy
+ DefaultContainerProxy
.props(factory, acker, store, collector, InstanceId(0), poolConfig,
pauseGrace = timeout))
registerCallback(machine)
machine ! Run(action, message)
@@ -431,7 +431,7 @@ class ContainerProxyTests
val machine =
childActorOf(
- ContainerProxy
+ DefaultContainerProxy
.props(factory, acker, store, collector, InstanceId(0), poolConfig,
pauseGrace = timeout))
registerCallback(machine)
machine ! Run(action, message)
@@ -471,7 +471,7 @@ class ContainerProxyTests
val machine =
childActorOf(
- ContainerProxy
+ DefaultContainerProxy
.props(factory, acker, store, collector, InstanceId(0), poolConfig,
pauseGrace = timeout))
registerCallback(machine)
machine ! Run(action, message)
@@ -502,7 +502,7 @@ class ContainerProxyTests
val machine =
childActorOf(
- ContainerProxy
+ DefaultContainerProxy
.props(factory, acker, store, collector, InstanceId(0), poolConfig,
pauseGrace = timeout))
registerCallback(machine)
machine ! Run(action, message)
@@ -532,7 +532,7 @@ class ContainerProxyTests
val machine =
childActorOf(
- ContainerProxy
+ DefaultContainerProxy
.props(factory, acker, store, collector, InstanceId(0), poolConfig,
pauseGrace = timeout))
registerCallback(machine)
machine ! Run(action, message)
@@ -566,7 +566,7 @@ class ContainerProxyTests
val machine =
childActorOf(
- ContainerProxy
+ DefaultContainerProxy
.props(factory, acker, store, createCollector(), InstanceId(0),
poolConfig, pauseGrace = timeout))
registerCallback(machine)
run(machine, Uninitialized) // first run an activation
@@ -602,7 +602,7 @@ class ContainerProxyTests
val machine =
childActorOf(
- ContainerProxy
+ DefaultContainerProxy
.props(factory, acker, store, createCollector(), InstanceId(0),
poolConfig, pauseGrace = timeout))
registerCallback(machine)
run(machine, Uninitialized)
@@ -639,7 +639,7 @@ class ContainerProxyTests
val machine =
childActorOf(
- ContainerProxy
+ DefaultContainerProxy
.props(factory, acker, store, collector, InstanceId(0), poolConfig,
pauseGrace = timeout))
registerCallback(machine)
@@ -691,7 +691,7 @@ class ContainerProxyTests
val machine =
childActorOf(
- ContainerProxy
+ DefaultContainerProxy
.props(factory, acker, store, collector, InstanceId(0), poolConfig,
pauseGrace = timeout))
registerCallback(machine)
run(machine, Uninitialized)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services