This is an automated email from the ASF dual-hosted git repository.
rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new c3f486d Fix order of messages being sent by the ContainerPool. (#2421)
c3f486d is described below
commit c3f486d5090606cbd8744ec0dec6448c2b26bb7a
Author: Markus Thömmes <[email protected]>
AuthorDate: Thu Jul 13 23:16:24 2017 +0200
Fix order of messages being sent by the ContainerPool. (#2421)
The ContainerPool indicates to the ActivationFeed that it can perform more
work and as a result gets more messages from the ActivationFeed. In order to
properly back-pressure and protect the ContainerPool, it only ever sends such a
message iff it changed its internal state to an extent that it is guaranteed to
be able to handle more work.
Also add log message to complain about improper buffer.
---
.../scala/whisk/core/containerpool/ContainerPool.scala | 9 +++------
.../scala/whisk/core/containerpool/ContainerProxy.scala | 13 -------------
.../src/main/scala/whisk/core/invoker/Invoker.scala | 2 +-
.../core/containerpool/test/ContainerPoolTests.scala | 15 ---------------
.../core/containerpool/test/ContainerProxyTests.scala | 5 -----
5 files changed, 4 insertions(+), 40 deletions(-)
diff --git
a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
index f1f51f5..3689050 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
@@ -106,13 +106,14 @@ class ContainerPool(
freePool.remove(actor)
actor ! r // forwards the run request to the container
case None =>
+ logging.error(this, "Rescheduling Run message, too many
message in the pool")(r.msg.transid)
self ! r
}
// Container is free to take more work
case NeedWork(data: WarmedData) =>
freePool.update(sender(), data)
- busyPool.remove(sender())
+ busyPool.remove(sender()).foreach(_ => feed !
MessageFeed.Processed)
// Container is prewarmed and ready to take work
case NeedWork(data: PreWarmedData) =>
@@ -121,11 +122,7 @@ class ContainerPool(
// Container got removed
case ContainerRemoved =>
freePool.remove(sender())
- busyPool.remove(sender())
-
- // Activation completed
- case ActivationCompleted =>
- feed ! MessageFeed.Processed
+ busyPool.remove(sender()).foreach(_ => feed !
MessageFeed.Processed)
}
/** Creates a new container and updates state accordingly. */
diff --git
a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index 1b8c7fe..652453d 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -67,12 +67,6 @@ case object Remove
case class NeedWork(data: ContainerData)
case object ContainerPaused
case object ContainerRemoved
-/**
- * Indicates the container resource is now free to receive a new request.
- * This message is sent to the parent which in turn notifies the feed that a
- * resource slot is available.
- */
-case object ActivationCompleted
/**
* A proxy that wraps a Container. It is used to keep track of the lifecycle
@@ -153,7 +147,6 @@ class ContainerProxy(
// implicitly via a FailureMessage which will be processed
later when the state
// transitions to Running
val activation =
ContainerProxy.constructWhiskActivation(job, Interval.zero, response)
- self ! ActivationCompleted
sendActiveAck(transid, activation,
job.msg.rootControllerIndex)
storeActivation(transid, activation)
}.flatMap {
@@ -213,11 +206,6 @@ class ContainerProxy(
context.parent ! ContainerRemoved
stop()
- // Activation finished either successfully or not
- case Event(ActivationCompleted, _) =>
- context.parent ! ActivationCompleted
- stay
-
case _ => delay
}
@@ -369,7 +357,6 @@ class ContainerProxy(
}.andThen {
case Success(activation) => storeActivation(tid, activation)
}.flatMap { activation =>
- self ! ActivationCompleted
// Fail the future iff the activation was unsuccessful to
facilitate
// better cleanup logic.
if (activation.response.isSuccess) Future.successful(activation)
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index 239370f..a789e4e 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -484,7 +484,7 @@ object Invoker {
val maxdepth = ContainerPool.getDefaultMaxActive(config)
val consumer = new KafkaConsumerConnector(config.kafkaHost,
"invokers", topic, maxdepth, maxPollInterval = TimeLimit.MAX_DURATION +
1.minute)
val producer = new KafkaProducerConnector(config.kafkaHost, ec)
- val dispatcher = new Dispatcher(consumer, 500 milliseconds, 2 *
maxdepth, actorSystem)
+ val dispatcher = new Dispatcher(consumer, 500 milliseconds, maxdepth,
actorSystem)
val invoker = if
(Try(config.invokerUseReactivePool.toBoolean).getOrElse(false)) {
new InvokerReactive(config, invokerInstance,
dispatcher.activationFeed, producer)
diff --git
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
index 866d370..5202ef8 100644
---
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
+++
b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
@@ -108,15 +108,6 @@ class ContainerPoolTests extends
TestKit(ActorSystem("ContainerPool"))
behavior of "ContainerPool"
- it should "indicate free resources to the feed once activations finish" in
within(timeout) {
- val (containers, factory) = testContainers(1)
- val feed = TestProbe()
-
- val pool = system.actorOf(ContainerPool.props(factory, 0, 0, feed.ref))
- containers(0).send(pool, ActivationCompleted)
- feed.expectMsg(MessageFeed.Processed)
- }
-
/*
* CONTAINER SCHEDULING
*
@@ -158,7 +149,6 @@ class ContainerPoolTests extends
TestKit(ActorSystem("ContainerPool"))
pool ! runMessage
containers(0).expectMsg(runMessage)
containers(0).send(pool, NeedWork(warmedData()))
- containers(0).send(pool, ActivationCompleted)
feed.expectMsg(MessageFeed.Processed)
pool ! runMessageDifferentEverything
containers(0).expectMsg(Remove)
@@ -176,16 +166,13 @@ class ContainerPoolTests extends
TestKit(ActorSystem("ContainerPool"))
pool ! runMessage
containers(0).expectMsg(runMessage)
containers(0).send(pool, NeedWork(warmedData(lastUsed =
Instant.EPOCH)))
- containers(0).send(pool, ActivationCompleted)
feed.expectMsg(MessageFeed.Processed)
// Run the second container, don't remove the first one
pool ! runMessageDifferentEverything
containers(1).expectMsg(runMessageDifferentEverything)
containers(1).send(pool, NeedWork(warmedData(lastUsed = Instant.now)))
- containers(1).send(pool, ActivationCompleted)
feed.expectMsg(MessageFeed.Processed)
-
pool ! runMessageDifferentNamespace
containers(2).expectMsg(runMessageDifferentNamespace)
@@ -202,7 +189,6 @@ class ContainerPoolTests extends
TestKit(ActorSystem("ContainerPool"))
pool ! runMessage
containers(0).expectMsg(runMessage)
containers(0).send(pool, NeedWork(warmedData()))
- containers(0).send(pool, ActivationCompleted)
feed.expectMsg(MessageFeed.Processed)
pool ! runMessageDifferentNamespace
containers(0).expectMsg(Remove)
@@ -218,7 +204,6 @@ class ContainerPoolTests extends
TestKit(ActorSystem("ContainerPool"))
pool ! runMessage
containers(0).expectMsg(runMessage)
containers(0).send(pool, NeedWork(warmedData()))
- containers(0).send(pool, ActivationCompleted)
feed.expectMsg(MessageFeed.Processed)
pool ! runMessage
containers(0).expectMsg(runMessage)
diff --git
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index 3f3f24b..ab9c69e 100644
---
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++
b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -106,7 +106,6 @@ class ContainerProxyTests extends
TestKit(ActorSystem("ContainerProxys"))
def run(machine: ActorRef, currentState: ContainerState) = {
machine ! Run(action, message)
expectMsg(Transition(machine, currentState, Running))
- expectMsg(ActivationCompleted)
expectWarmed(invocationNamespace.name, action)
expectMsg(Transition(machine, Running, Ready))
}
@@ -273,7 +272,6 @@ class ContainerProxyTests extends
TestKit(ActorSystem("ContainerProxys"))
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
- expectMsg(ActivationCompleted)
expectMsg(ContainerRemoved)
awaitAssert {
@@ -302,7 +300,6 @@ class ContainerProxyTests extends
TestKit(ActorSystem("ContainerProxys"))
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
- expectMsg(ActivationCompleted)
expectMsg(ContainerRemoved) // The message is sent as soon as the
container decides to destroy itself
expectMsg(Transition(machine, Running, Removing))
@@ -331,7 +328,6 @@ class ContainerProxyTests extends
TestKit(ActorSystem("ContainerProxys"))
registerCallback(machine)
machine ! Run(action, message)
expectMsg(Transition(machine, Uninitialized, Running))
- expectMsg(ActivationCompleted)
expectMsg(ContainerRemoved) // The message is sent as soon as the
container decides to destroy itself
expectMsg(Transition(machine, Running, Removing))
@@ -431,7 +427,6 @@ class ContainerProxyTests extends
TestKit(ActorSystem("ContainerProxys"))
// Finish /init, note that /run and log-collecting happens nonetheless
initPromise.success(Interval.zero)
- expectMsg(ActivationCompleted)
expectWarmed(invocationNamespace.name, action)
expectMsg(Transition(machine, Running, Ready))
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].