This is an automated email from the ASF dual-hosted git repository.

cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 27c3e10  Externalize InvokerPool initialization logic. (#3238)
27c3e10 is described below

commit 27c3e10266bbd9e1a0a0e64aa35054c965f3d4bf
Author: Markus Thömmes <markusthoem...@me.com>
AuthorDate: Thu Feb 1 08:43:12 2018 +0100

    Externalize InvokerPool initialization logic. (#3238)
    
    This piece of logic clutters the loadbalancer's code for no good reason. We 
should externalize it.
---
 .../core/loadBalancer/ContainerPoolBalancer.scala  | 57 +++-------------------
 .../core/loadBalancer/InvokerSupervision.scala     | 57 ++++++++++++++++++----
 2 files changed, 55 insertions(+), 59 deletions(-)

diff --git 
a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
 
b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
index aed332e..786a94a 100644
--- 
a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
+++ 
b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
@@ -19,7 +19,7 @@ package whisk.core.loadBalancer
 
 import java.nio.charset.StandardCharsets
 
-import akka.actor.{ActorRefFactory, ActorSystem, Props}
+import akka.actor.{ActorSystem, Props}
 import akka.cluster.Cluster
 import akka.pattern.ask
 import akka.stream.ActorMaterializer
@@ -29,16 +29,14 @@ import pureconfig._
 import whisk.common.{Logging, LoggingMarkers, TransactionId}
 import whisk.core.WhiskConfig._
 import whisk.core.connector._
-import whisk.core.database.NoDocumentException
 import whisk.core.entity._
-import whisk.core.entity.types.EntityStore
 import whisk.core.{ConfigKeys, WhiskConfig}
 import whisk.spi.SpiLoader
 import akka.event.Logging.InfoLevel
 
 import scala.annotation.tailrec
 import scala.concurrent.duration._
-import scala.concurrent.{Await, ExecutionContext, Future, Promise}
+import scala.concurrent.{ExecutionContext, Future, Promise}
 import scala.util.{Failure, Success}
 
 case class LoadbalancerConfig(blackboxFraction: Double, invokerBusyThreshold: 
Int)
@@ -50,9 +48,6 @@ class ContainerPoolBalancer(config: WhiskConfig, 
controllerInstance: InstanceId)
 
   private val lbConfig = 
loadConfigOrThrow[LoadbalancerConfig](ConfigKeys.loadbalancer)
 
-  /** Used to manage an action for testing invoker health */ /** Used to 
manage an action for testing invoker health */
-  private val entityStore = WhiskEntityStore.datastore(config)
-
   /** The execution context for futures */
   private implicit val executionContext: ExecutionContext = 
actorSystem.dispatcher
 
@@ -168,28 +163,6 @@ class ContainerPoolBalancer(config: WhiskConfig, 
controllerInstance: InstanceId)
       })
   }
 
-  /**
-   * Creates or updates a health test action by updating the entity store.
-   * This method is intended for use on startup.
-   * @return Future that completes successfully iff the action is added to the 
database
-   */
-  private def createTestActionForInvokerHealth(db: EntityStore, action: 
WhiskAction): Future[Unit] = {
-    implicit val tid = TransactionId.loadbalancer
-    WhiskAction
-      .get(db, action.docid)
-      .flatMap { oldAction =>
-        WhiskAction.put(db, action.revision(oldAction.rev))(tid, notifier = 
None)
-      }
-      .recover {
-        case _: NoDocumentException => WhiskAction.put(db, action)(tid, 
notifier = None)
-      }
-      .map(_ => {})
-      .andThen {
-        case Success(_) => logging.info(this, "test action for invoker health 
now exists")
-        case Failure(e) => logging.error(this, s"error creating test action 
for invoker health: $e")
-      }
-  }
-
   /** Gets a producer which can publish messages to the kafka bus. */
   private val messagingProvider = SpiLoader.get[MessagingProvider]
   private val messageProducer = messagingProvider.getProducer(config, 
executionContext)
@@ -216,29 +189,15 @@ class ContainerPoolBalancer(config: WhiskConfig, 
controllerInstance: InstanceId)
       case Failure(e) => transid.failed(this, start, s"error on posting to 
topic $topic")
     }
   }
-  private val invokerPool = {
-    // Do not create the invokerPool if it is not possible to create the 
health test action to recover the invokers.
-    InvokerPool
-      .healthAction(controllerInstance)
-      .map {
-        // Await the creation of the test action; on failure, this will abort 
the constructor which should
-        // in turn abort the startup of the controller.
-        a =>
-          Await.result(createTestActionForInvokerHealth(entityStore, a), 
1.minute)
-      }
-      .orElse {
-        throw new IllegalStateException(
-          "cannot create test action for invoker health because runtime 
manifest is not valid")
-      }
 
-    val maxPingsPerPoll = 128
-    val pingConsumer =
-      messagingProvider.getConsumer(config, 
s"health${controllerInstance.toInt}", "health", maxPeek = maxPingsPerPoll)
-    val invokerFactory = (f: ActorRefFactory, invokerInstance: InstanceId) =>
-      f.actorOf(InvokerActor.props(invokerInstance, controllerInstance))
+  private val invokerPool = {
+    InvokerPool.prepare(controllerInstance, WhiskEntityStore.datastore(config))
 
     actorSystem.actorOf(
-      InvokerPool.props(invokerFactory, (m, i) => 
sendActivationToInvoker(messageProducer, m, i), pingConsumer))
+      InvokerPool.props(
+        (f, i) => f.actorOf(InvokerActor.props(i, controllerInstance)),
+        (m, i) => sendActivationToInvoker(messageProducer, m, i),
+        messagingProvider.getConsumer(config, 
s"health${controllerInstance.toInt}", "health", maxPeek = 128)))
   }
 
   /**
diff --git 
a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
 
b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
index 13c3a70..0c75176 100644
--- 
a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
+++ 
b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
@@ -20,29 +20,24 @@ package whisk.core.loadBalancer
 import java.nio.charset.StandardCharsets
 
 import scala.collection.immutable
-import scala.concurrent.Future
+import scala.concurrent.{Await, Future}
 import scala.concurrent.duration._
 import scala.util.Failure
 import scala.util.Success
 import org.apache.kafka.clients.producer.RecordMetadata
-import akka.actor.Actor
-import akka.actor.ActorRef
-import akka.actor.ActorRefFactory
-import akka.actor.FSM
+import akka.actor.{Actor, ActorRef, ActorRefFactory, FSM, Props}
 import akka.actor.FSM.CurrentState
 import akka.actor.FSM.SubscribeTransitionCallBack
 import akka.actor.FSM.Transition
-import akka.actor.Props
 import akka.pattern.pipe
 import akka.util.Timeout
-import whisk.common.AkkaLogging
-import whisk.common.LoggingMarkers
-import whisk.common.RingBuffer
-import whisk.common.TransactionId
+import whisk.common._
 import whisk.core.connector._
+import whisk.core.database.NoDocumentException
 import whisk.core.entitlement.Privilege
 import whisk.core.entity.ActivationId.ActivationIdGenerator
 import whisk.core.entity._
+import whisk.core.entity.types.EntityStore
 
 // Received events
 case object GetStatus
@@ -169,6 +164,48 @@ class InvokerPool(childFactory: (ActorRefFactory, 
InstanceId) => ActorRef,
 }
 
 object InvokerPool {
+  private def createTestActionForInvokerHealth(db: EntityStore, action: 
WhiskAction): Future[Unit] = {
+    implicit val tid = TransactionId.loadbalancer
+    implicit val ec = db.executionContext
+    implicit val logging = db.logging
+
+    WhiskAction
+      .get(db, action.docid)
+      .flatMap { oldAction =>
+        WhiskAction.put(db, action.revision(oldAction.rev))(tid, notifier = 
None)
+      }
+      .recover {
+        case _: NoDocumentException => WhiskAction.put(db, action)(tid, 
notifier = None)
+      }
+      .map(_ => {})
+      .andThen {
+        case Success(_) => logging.info(this, "test action for invoker health 
now exists")
+        case Failure(e) => logging.error(this, s"error creating test action 
for invoker health: $e")
+      }
+  }
+
+  /**
+   * Prepares everything for the health protocol to work (i.e. creates a 
testaction)
+   *
+   * @param controllerInstance instance of the controller we run in
+   * @param entityStore store to write the action to
+   * @return throws an exception on failure to prepare
+   */
+  def prepare(controllerInstance: InstanceId, entityStore: EntityStore): Unit 
= {
+    InvokerPool
+      .healthAction(controllerInstance)
+      .map {
+        // Await the creation of the test action; on failure, this will abort 
the constructor which should
+        // in turn abort the startup of the controller.
+        a =>
+          Await.result(createTestActionForInvokerHealth(entityStore, a), 
1.minute)
+      }
+      .orElse {
+        throw new IllegalStateException(
+          "cannot create test action for invoker health because runtime 
manifest is not valid")
+      }
+  }
+
   def props(f: (ActorRefFactory, InstanceId) => ActorRef,
             p: (ActivationMessage, InstanceId) => Future[RecordMetadata],
             pc: MessageConsumer) = {

-- 
To stop receiving notification emails like this one, please contact
cbic...@apache.org.

Reply via email to