This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new c3f486d  Fix order of messages being sent by the ContainerPool. (#2421)
c3f486d is described below

commit c3f486d5090606cbd8744ec0dec6448c2b26bb7a
Author: Markus Thömmes <[email protected]>
AuthorDate: Thu Jul 13 23:16:24 2017 +0200

    Fix order of messages being sent by the ContainerPool. (#2421)
    
    
    The ContainerPool indicates to the ActivationFeed that it can perform more 
work and as a result gets more messages from the ActivationFeed. In order to 
properly back-pressure and protect the ContainerPool, it only ever sends such a 
message iff it changed its internal state to an extent that it is guaranteed to 
be able to handle more work.
    
    Also add log message to complain about improper buffer.
---
 .../scala/whisk/core/containerpool/ContainerPool.scala    |  9 +++------
 .../scala/whisk/core/containerpool/ContainerProxy.scala   | 13 -------------
 .../src/main/scala/whisk/core/invoker/Invoker.scala       |  2 +-
 .../core/containerpool/test/ContainerPoolTests.scala      | 15 ---------------
 .../core/containerpool/test/ContainerProxyTests.scala     |  5 -----
 5 files changed, 4 insertions(+), 40 deletions(-)

diff --git 
a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala 
b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
index f1f51f5..3689050 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
@@ -106,13 +106,14 @@ class ContainerPool(
                     freePool.remove(actor)
                     actor ! r // forwards the run request to the container
                 case None =>
+                    logging.error(this, "Rescheduling Run message, too many 
message in the pool")(r.msg.transid)
                     self ! r
             }
 
         // Container is free to take more work
         case NeedWork(data: WarmedData) =>
             freePool.update(sender(), data)
-            busyPool.remove(sender())
+            busyPool.remove(sender()).foreach(_ => feed ! 
MessageFeed.Processed)
 
         // Container is prewarmed and ready to take work
         case NeedWork(data: PreWarmedData) =>
@@ -121,11 +122,7 @@ class ContainerPool(
         // Container got removed
         case ContainerRemoved =>
             freePool.remove(sender())
-            busyPool.remove(sender())
-
-        // Activation completed
-        case ActivationCompleted =>
-            feed ! MessageFeed.Processed
+            busyPool.remove(sender()).foreach(_ => feed ! 
MessageFeed.Processed)
     }
 
     /** Creates a new container and updates state accordingly. */
diff --git 
a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala 
b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index 1b8c7fe..652453d 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -67,12 +67,6 @@ case object Remove
 case class NeedWork(data: ContainerData)
 case object ContainerPaused
 case object ContainerRemoved
-/**
- * Indicates the container resource is now free to receive a new request.
- * This message is sent to the parent which in turn notifies the feed that a
- * resource slot is available.
- */
-case object ActivationCompleted
 
 /**
  * A proxy that wraps a Container. It is used to keep track of the lifecycle
@@ -153,7 +147,6 @@ class ContainerProxy(
                     // implicitly via a FailureMessage which will be processed 
later when the state
                     // transitions to Running
                     val activation = 
ContainerProxy.constructWhiskActivation(job, Interval.zero, response)
-                    self ! ActivationCompleted
                     sendActiveAck(transid, activation, 
job.msg.rootControllerIndex)
                     storeActivation(transid, activation)
             }.flatMap {
@@ -213,11 +206,6 @@ class ContainerProxy(
             context.parent ! ContainerRemoved
             stop()
 
-        // Activation finished either successfully or not
-        case Event(ActivationCompleted, _) =>
-            context.parent ! ActivationCompleted
-            stay
-
         case _ => delay
     }
 
@@ -369,7 +357,6 @@ class ContainerProxy(
         }.andThen {
             case Success(activation) => storeActivation(tid, activation)
         }.flatMap { activation =>
-            self ! ActivationCompleted
             // Fail the future iff the activation was unsuccessful to 
facilitate
             // better cleanup logic.
             if (activation.response.isSuccess) Future.successful(activation)
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala 
b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index 239370f..a789e4e 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -484,7 +484,7 @@ object Invoker {
         val maxdepth = ContainerPool.getDefaultMaxActive(config)
         val consumer = new KafkaConsumerConnector(config.kafkaHost, 
"invokers", topic, maxdepth, maxPollInterval = TimeLimit.MAX_DURATION + 
1.minute)
         val producer = new KafkaProducerConnector(config.kafkaHost, ec)
-        val dispatcher = new Dispatcher(consumer, 500 milliseconds, 2 * 
maxdepth, actorSystem)
+        val dispatcher = new Dispatcher(consumer, 500 milliseconds, maxdepth, 
actorSystem)
 
         val invoker = if 
(Try(config.invokerUseReactivePool.toBoolean).getOrElse(false)) {
             new InvokerReactive(config, invokerInstance, 
dispatcher.activationFeed, producer)
diff --git 
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala 
b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
index 866d370..5202ef8 100644
--- 
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
+++ 
b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
@@ -108,15 +108,6 @@ class ContainerPoolTests extends 
TestKit(ActorSystem("ContainerPool"))
 
     behavior of "ContainerPool"
 
-    it should "indicate free resources to the feed once activations finish" in 
within(timeout) {
-        val (containers, factory) = testContainers(1)
-        val feed = TestProbe()
-
-        val pool = system.actorOf(ContainerPool.props(factory, 0, 0, feed.ref))
-        containers(0).send(pool, ActivationCompleted)
-        feed.expectMsg(MessageFeed.Processed)
-    }
-
     /*
      * CONTAINER SCHEDULING
      *
@@ -158,7 +149,6 @@ class ContainerPoolTests extends 
TestKit(ActorSystem("ContainerPool"))
         pool ! runMessage
         containers(0).expectMsg(runMessage)
         containers(0).send(pool, NeedWork(warmedData()))
-        containers(0).send(pool, ActivationCompleted)
         feed.expectMsg(MessageFeed.Processed)
         pool ! runMessageDifferentEverything
         containers(0).expectMsg(Remove)
@@ -176,16 +166,13 @@ class ContainerPoolTests extends 
TestKit(ActorSystem("ContainerPool"))
         pool ! runMessage
         containers(0).expectMsg(runMessage)
         containers(0).send(pool, NeedWork(warmedData(lastUsed = 
Instant.EPOCH)))
-        containers(0).send(pool, ActivationCompleted)
         feed.expectMsg(MessageFeed.Processed)
 
         // Run the second container, don't remove the first one
         pool ! runMessageDifferentEverything
         containers(1).expectMsg(runMessageDifferentEverything)
         containers(1).send(pool, NeedWork(warmedData(lastUsed = Instant.now)))
-        containers(1).send(pool, ActivationCompleted)
         feed.expectMsg(MessageFeed.Processed)
-
         pool ! runMessageDifferentNamespace
         containers(2).expectMsg(runMessageDifferentNamespace)
 
@@ -202,7 +189,6 @@ class ContainerPoolTests extends 
TestKit(ActorSystem("ContainerPool"))
         pool ! runMessage
         containers(0).expectMsg(runMessage)
         containers(0).send(pool, NeedWork(warmedData()))
-        containers(0).send(pool, ActivationCompleted)
         feed.expectMsg(MessageFeed.Processed)
         pool ! runMessageDifferentNamespace
         containers(0).expectMsg(Remove)
@@ -218,7 +204,6 @@ class ContainerPoolTests extends 
TestKit(ActorSystem("ContainerPool"))
         pool ! runMessage
         containers(0).expectMsg(runMessage)
         containers(0).send(pool, NeedWork(warmedData()))
-        containers(0).send(pool, ActivationCompleted)
         feed.expectMsg(MessageFeed.Processed)
         pool ! runMessage
         containers(0).expectMsg(runMessage)
diff --git 
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala 
b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index 3f3f24b..ab9c69e 100644
--- 
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++ 
b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -106,7 +106,6 @@ class ContainerProxyTests extends 
TestKit(ActorSystem("ContainerProxys"))
     def run(machine: ActorRef, currentState: ContainerState) = {
         machine ! Run(action, message)
         expectMsg(Transition(machine, currentState, Running))
-        expectMsg(ActivationCompleted)
         expectWarmed(invocationNamespace.name, action)
         expectMsg(Transition(machine, Running, Ready))
     }
@@ -273,7 +272,6 @@ class ContainerProxyTests extends 
TestKit(ActorSystem("ContainerProxys"))
         registerCallback(machine)
         machine ! Run(action, message)
         expectMsg(Transition(machine, Uninitialized, Running))
-        expectMsg(ActivationCompleted)
         expectMsg(ContainerRemoved)
 
         awaitAssert {
@@ -302,7 +300,6 @@ class ContainerProxyTests extends 
TestKit(ActorSystem("ContainerProxys"))
         registerCallback(machine)
         machine ! Run(action, message)
         expectMsg(Transition(machine, Uninitialized, Running))
-        expectMsg(ActivationCompleted)
         expectMsg(ContainerRemoved) // The message is sent as soon as the 
container decides to destroy itself
         expectMsg(Transition(machine, Running, Removing))
 
@@ -331,7 +328,6 @@ class ContainerProxyTests extends 
TestKit(ActorSystem("ContainerProxys"))
         registerCallback(machine)
         machine ! Run(action, message)
         expectMsg(Transition(machine, Uninitialized, Running))
-        expectMsg(ActivationCompleted)
         expectMsg(ContainerRemoved) // The message is sent as soon as the 
container decides to destroy itself
         expectMsg(Transition(machine, Running, Removing))
 
@@ -431,7 +427,6 @@ class ContainerProxyTests extends 
TestKit(ActorSystem("ContainerProxys"))
 
         // Finish /init, note that /run and log-collecting happens nonetheless
         initPromise.success(Interval.zero)
-        expectMsg(ActivationCompleted)
         expectWarmed(invocationNamespace.name, action)
         expectMsg(Transition(machine, Running, Ready))
 

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to