This is an automated email from the ASF dual-hosted git repository. cbickel pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 27c3e10 Externalize InvokerPool initialization logic. (#3238) 27c3e10 is described below commit 27c3e10266bbd9e1a0a0e64aa35054c965f3d4bf Author: Markus Thömmes <markusthoem...@me.com> AuthorDate: Thu Feb 1 08:43:12 2018 +0100 Externalize InvokerPool initialization logic. (#3238) This piece of logic clutters the loadbalancer's code for no good reason. We should externalize it. --- .../core/loadBalancer/ContainerPoolBalancer.scala | 57 +++------------------- .../core/loadBalancer/InvokerSupervision.scala | 57 ++++++++++++++++++---- 2 files changed, 55 insertions(+), 59 deletions(-) diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala index aed332e..786a94a 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala @@ -19,7 +19,7 @@ package whisk.core.loadBalancer import java.nio.charset.StandardCharsets -import akka.actor.{ActorRefFactory, ActorSystem, Props} +import akka.actor.{ActorSystem, Props} import akka.cluster.Cluster import akka.pattern.ask import akka.stream.ActorMaterializer @@ -29,16 +29,14 @@ import pureconfig._ import whisk.common.{Logging, LoggingMarkers, TransactionId} import whisk.core.WhiskConfig._ import whisk.core.connector._ -import whisk.core.database.NoDocumentException import whisk.core.entity._ -import whisk.core.entity.types.EntityStore import whisk.core.{ConfigKeys, WhiskConfig} import whisk.spi.SpiLoader import akka.event.Logging.InfoLevel import scala.annotation.tailrec import scala.concurrent.duration._ -import scala.concurrent.{Await, ExecutionContext, Future, Promise} +import scala.concurrent.{ExecutionContext, Future, Promise} import scala.util.{Failure, Success} case class LoadbalancerConfig(blackboxFraction: Double, invokerBusyThreshold: Int) @@ -50,9 +48,6 @@ class ContainerPoolBalancer(config: WhiskConfig, controllerInstance: InstanceId) private val lbConfig = loadConfigOrThrow[LoadbalancerConfig](ConfigKeys.loadbalancer) - /** Used to manage an action for testing invoker health */ /** Used to manage an action for testing invoker health */ - private val entityStore = WhiskEntityStore.datastore(config) - /** The execution context for futures */ private implicit val executionContext: ExecutionContext = actorSystem.dispatcher @@ -168,28 +163,6 @@ class ContainerPoolBalancer(config: WhiskConfig, controllerInstance: InstanceId) }) } - /** - * Creates or updates a health test action by updating the entity store. - * This method is intended for use on startup. - * @return Future that completes successfully iff the action is added to the database - */ - private def createTestActionForInvokerHealth(db: EntityStore, action: WhiskAction): Future[Unit] = { - implicit val tid = TransactionId.loadbalancer - WhiskAction - .get(db, action.docid) - .flatMap { oldAction => - WhiskAction.put(db, action.revision(oldAction.rev))(tid, notifier = None) - } - .recover { - case _: NoDocumentException => WhiskAction.put(db, action)(tid, notifier = None) - } - .map(_ => {}) - .andThen { - case Success(_) => logging.info(this, "test action for invoker health now exists") - case Failure(e) => logging.error(this, s"error creating test action for invoker health: $e") - } - } - /** Gets a producer which can publish messages to the kafka bus. */ private val messagingProvider = SpiLoader.get[MessagingProvider] private val messageProducer = messagingProvider.getProducer(config, executionContext) @@ -216,29 +189,15 @@ class ContainerPoolBalancer(config: WhiskConfig, controllerInstance: InstanceId) case Failure(e) => transid.failed(this, start, s"error on posting to topic $topic") } } - private val invokerPool = { - // Do not create the invokerPool if it is not possible to create the health test action to recover the invokers. - InvokerPool - .healthAction(controllerInstance) - .map { - // Await the creation of the test action; on failure, this will abort the constructor which should - // in turn abort the startup of the controller. - a => - Await.result(createTestActionForInvokerHealth(entityStore, a), 1.minute) - } - .orElse { - throw new IllegalStateException( - "cannot create test action for invoker health because runtime manifest is not valid") - } - val maxPingsPerPoll = 128 - val pingConsumer = - messagingProvider.getConsumer(config, s"health${controllerInstance.toInt}", "health", maxPeek = maxPingsPerPoll) - val invokerFactory = (f: ActorRefFactory, invokerInstance: InstanceId) => - f.actorOf(InvokerActor.props(invokerInstance, controllerInstance)) + private val invokerPool = { + InvokerPool.prepare(controllerInstance, WhiskEntityStore.datastore(config)) actorSystem.actorOf( - InvokerPool.props(invokerFactory, (m, i) => sendActivationToInvoker(messageProducer, m, i), pingConsumer)) + InvokerPool.props( + (f, i) => f.actorOf(InvokerActor.props(i, controllerInstance)), + (m, i) => sendActivationToInvoker(messageProducer, m, i), + messagingProvider.getConsumer(config, s"health${controllerInstance.toInt}", "health", maxPeek = 128))) } /** diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala index 13c3a70..0c75176 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala @@ -20,29 +20,24 @@ package whisk.core.loadBalancer import java.nio.charset.StandardCharsets import scala.collection.immutable -import scala.concurrent.Future +import scala.concurrent.{Await, Future} import scala.concurrent.duration._ import scala.util.Failure import scala.util.Success import org.apache.kafka.clients.producer.RecordMetadata -import akka.actor.Actor -import akka.actor.ActorRef -import akka.actor.ActorRefFactory -import akka.actor.FSM +import akka.actor.{Actor, ActorRef, ActorRefFactory, FSM, Props} import akka.actor.FSM.CurrentState import akka.actor.FSM.SubscribeTransitionCallBack import akka.actor.FSM.Transition -import akka.actor.Props import akka.pattern.pipe import akka.util.Timeout -import whisk.common.AkkaLogging -import whisk.common.LoggingMarkers -import whisk.common.RingBuffer -import whisk.common.TransactionId +import whisk.common._ import whisk.core.connector._ +import whisk.core.database.NoDocumentException import whisk.core.entitlement.Privilege import whisk.core.entity.ActivationId.ActivationIdGenerator import whisk.core.entity._ +import whisk.core.entity.types.EntityStore // Received events case object GetStatus @@ -169,6 +164,48 @@ class InvokerPool(childFactory: (ActorRefFactory, InstanceId) => ActorRef, } object InvokerPool { + private def createTestActionForInvokerHealth(db: EntityStore, action: WhiskAction): Future[Unit] = { + implicit val tid = TransactionId.loadbalancer + implicit val ec = db.executionContext + implicit val logging = db.logging + + WhiskAction + .get(db, action.docid) + .flatMap { oldAction => + WhiskAction.put(db, action.revision(oldAction.rev))(tid, notifier = None) + } + .recover { + case _: NoDocumentException => WhiskAction.put(db, action)(tid, notifier = None) + } + .map(_ => {}) + .andThen { + case Success(_) => logging.info(this, "test action for invoker health now exists") + case Failure(e) => logging.error(this, s"error creating test action for invoker health: $e") + } + } + + /** + * Prepares everything for the health protocol to work (i.e. creates a testaction) + * + * @param controllerInstance instance of the controller we run in + * @param entityStore store to write the action to + * @return throws an exception on failure to prepare + */ + def prepare(controllerInstance: InstanceId, entityStore: EntityStore): Unit = { + InvokerPool + .healthAction(controllerInstance) + .map { + // Await the creation of the test action; on failure, this will abort the constructor which should + // in turn abort the startup of the controller. + a => + Await.result(createTestActionForInvokerHealth(entityStore, a), 1.minute) + } + .orElse { + throw new IllegalStateException( + "cannot create test action for invoker health because runtime manifest is not valid") + } + } + def props(f: (ActorRefFactory, InstanceId) => ActorRef, p: (ActivationMessage, InstanceId) => Future[RecordMetadata], pc: MessageConsumer) = { -- To stop receiving notification emails like this one, please contact cbic...@apache.org.