style95 commented on a change in pull request #5192:
URL: https://github.com/apache/openwhisk/pull/5192#discussion_r780639830
##########
File path:
core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
##########
@@ -0,0 +1,727 @@
+package org.apache.openwhisk.core.loadBalancer
+
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.ThreadLocalRandom
+import java.util.concurrent.atomic.LongAdder
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Cancellable,
Props}
+import akka.event.Logging.InfoLevel
+import akka.pattern.ask
+import akka.util.Timeout
+import org.apache.kafka.clients.producer.RecordMetadata
+import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy}
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.controller.Controller
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.etcd.EtcdKV.{InvokerKeys, QueueKeys,
SchedulerKeys, ThrottlingKeys}
+import org.apache.openwhisk.core.etcd.EtcdType._
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig}
+import org.apache.openwhisk.core.scheduler.queue.{CreateQueue,
CreateQueueResponse, QueueManager}
+import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints,
SchedulerStates}
+import org.apache.openwhisk.core.service._
+import org.apache.openwhisk.core.{ConfigKeys, WarmUp, WhiskConfig}
+import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.utils.retry
+import pureconfig._
+import pureconfig.generic.auto._
+
+import scala.collection.JavaConverters._
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future,
Promise}
+import scala.language.postfixOps
+import scala.util.{Failure, Random, Success, Try}
+
+class FPCPoolBalancer(config: WhiskConfig,
+ controllerInstance: ControllerInstanceId,
+ etcdClient: EtcdClient,
+ private val feedFactory: FeedFactory,
+ lbConfig: ShardingContainerPoolBalancerConfig =
+
loadConfigOrThrow[ShardingContainerPoolBalancerConfig](ConfigKeys.loadbalancer),
+ private val messagingProvider: MessagingProvider =
SpiLoader.get[MessagingProvider])(
+ implicit val actorSystem: ActorSystem,
+ logging: Logging)
+ extends LoadBalancer {
+
+ private implicit val executionContext: ExecutionContext =
actorSystem.dispatcher
+ private implicit val requestTimeout: Timeout = Timeout(5.seconds)
+
+ private val entityStore = WhiskEntityStore.datastore()
+
+ private val clusterName =
loadConfigOrThrow[String](ConfigKeys.whiskClusterName)
+
+ /** key: SchedulerEndpoints, value: SchedulerStates */
+ private val schedulerEndpoints = TrieMap[SchedulerEndpoints,
SchedulerStates]()
+ private val messageProducer = messagingProvider.getProducer(config,
Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
+
+ private val watcherName = "distributed-pool-balancer"
+ val watcherService: ActorRef =
actorSystem.actorOf(WatcherService.props(etcdClient))
+
+ /** State related to invocations and throttling */
+ protected[loadBalancer] val activationSlots = TrieMap[ActivationId,
DistributedActivationEntry]()
+ protected[loadBalancer] val activationPromises =
+ TrieMap[ActivationId, Promise[Either[ActivationId, WhiskActivation]]]()
+
+ /** key: queue/${invocationNs}/ns/action/leader, value: SchedulerEndpoints */
+ private val queueEndpoints = TrieMap[String, SchedulerEndpoints]()
+
+ private val activationsPerNamespace = TrieMap[UUID, LongAdder]()
+ private val activationsPerController = TrieMap[ControllerInstanceId,
LongAdder]()
+ private val totalActivations = new LongAdder()
+ private val totalActivationMemory = new LongAdder()
+ private val throttlers = TrieMap[String, Boolean]()
+
+ /**
+ * Publishes activation message on internal bus for an invoker to pick up.
+ *
+ * @param action the action to invoke
+ * @param msg the activation message to publish on an invoker topic
+ * @param transid the transaction id for the request
+ * @return result a nested Future the outer indicating completion of
publishing and
+ * the inner the completion of the action (i.e., the result)
+ * if it is ready before timeout (Right) otherwise the activation id
(Left).
+ * The future is guaranteed to complete within the declared action
time limit
+ * plus a grace period (see activeAckTimeoutGrace).
+ */
+ override def publish(action: ExecutableWhiskActionMetaData, msg:
ActivationMessage)(
+ implicit transid: TransactionId): Future[Future[Either[ActivationId,
WhiskActivation]]] = {
+
+ val topicBaseName = if (schedulerEndpoints.isEmpty) {
+ logging.error(
+ this,
+ s"Failed to invoke action ${action.fullyQualifiedName(false)}, error:
no scheduler endpoint available")
+ Future.failed(LoadBalancerException("No scheduler endpoint available"))
+ } else {
+ val invocationNamespace = msg.user.namespace.name.asString
+ val key = QueueKeys.queue(invocationNamespace,
action.fullyQualifiedName(false), true)
+
+ queueEndpoints.get(key) match {
+ case Some(endPoint) =>
+ Future.successful(
+ schedulerEndpoints.getOrElse(endPoint,
Random.shuffle(schedulerEndpoints.toList).head._2).sid.toString)
+ case None =>
+ etcdClient
+ .get(key)
+ .map { res =>
+ res.getKvsList.asScala.headOption map { kv =>
+ val endPoint: String = kv.getValue
+ SchedulerEndpoints
+ .parse(endPoint)
+ .map { endPoint =>
+ queueEndpoints.update(kv.getKey, endPoint)
+ Some(
+ schedulerEndpoints
+ .getOrElse(endPoint,
Random.shuffle(schedulerEndpoints.toList).head._2)
+ .sid
+ .toString)
+ }
+ .getOrElse {
+
FPCPoolBalancer.schedule(schedulerEndpoints.values.toIndexedSeq).map {
scheduler =>
+ createQueue(invocationNamespace, action.toWhiskAction,
msg.action, msg.revision, scheduler)
+ scheduler.sid.toString
+ }
+ }
+ } getOrElse {
+
FPCPoolBalancer.schedule(schedulerEndpoints.values.toIndexedSeq).map {
scheduler =>
+ createQueue(invocationNamespace, action.toWhiskAction,
msg.action, msg.revision, scheduler)
+ scheduler.sid.toString
+ }
+ }
+ }
+ .map { _.get }
+ .recoverWith {
+ case _ =>
+ Future.failed(LoadBalancerException("No scheduler endpoint
available"))
+ }
+ }
+ }
+ topicBaseName.flatMap { baseName =>
+ val topicName = Controller.topicPrefix + baseName
+ val activationResult = setupActivation(msg, action)
+ sendActivationToKafka(messageProducer, msg, topicName).map(_ =>
activationResult)
+ }
+ }
+
+ private def createQueue(
+ invocationNamespace: String,
+ actionMetaData: WhiskActionMetaData,
+ fullyQualifiedEntityName: FullyQualifiedEntityName,
+ revision: DocRevision,
+ scheduler: SchedulerStates,
+ retryCount: Int = 5,
+ excludeSchedulers: Set[SchedulerInstanceId] = Set.empty)(implicit transid:
TransactionId): Unit = {
+ if (retryCount >= 0)
+ scheduler
+ .getRemoteRef(QueueManager.actorName)
+ .ask(CreateQueue(invocationNamespace,
fullyQualifiedEntityName.copy(binding = None), revision, actionMetaData))
+ .mapTo[CreateQueueResponse]
+ .onComplete {
+ case Success(_) =>
+ logging.info(
+ this,
+ s"Create queue successfully for
$invocationNamespace/$fullyQualifiedEntityName on ${scheduler.sid}")
+ case Failure(t) =>
+ logging.error(
+ this,
+ s"failed to get response from ${scheduler}, error is $t, will
retry for ${retryCount} times")
+ // try another scheduler
+ FPCPoolBalancer
+ .schedule(schedulerEndpoints.values.toIndexedSeq,
excludeSchedulers + scheduler.sid)
+ .map { newScheduler =>
+ createQueue(
+ invocationNamespace,
+ actionMetaData,
+ fullyQualifiedEntityName,
+ revision,
+ newScheduler,
+ retryCount - 1,
+ excludeSchedulers + scheduler.sid)
+ }
+ .getOrElse {
+ logging.error(
+ this,
+ s"failed to create queue for
$invocationNamespace/$fullyQualifiedEntityName, no scheduler endpoint available
related activations may failed")
+ }
+ } else
+ logging.error(
+ this,
+ s"failed to create queue for
$invocationNamespace/$fullyQualifiedEntityName, related activations may failed")
+ }
+
+ /**
+ * 2. Update local state with the to be executed activation.
+ *
+ * All activations are tracked in the activationSlots map. Additionally,
blocking invokes
+ * are tracked in the activation results map. When a result is received via
activeack, it
+ * will cause the result to be forwarded to the caller waiting on the
result, and cancel
+ * the DB poll which is also trying to do the same.
+ */
+ private def setupActivation(msg: ActivationMessage,
+ action: ExecutableWhiskActionMetaData):
Future[Either[ActivationId, WhiskActivation]] = {
+ val isBlackboxInvocation = action.exec.pull
+
+ totalActivations.increment()
+ totalActivationMemory.add(action.limits.memory.megabytes)
+ activationsPerNamespace.getOrElseUpdate(msg.user.namespace.uuid, new
LongAdder()).increment()
+ activationsPerController.getOrElseUpdate(controllerInstance, new
LongAdder()).increment()
+
+ // Timeout is a multiple of the configured maximum action duration. The
minimum timeout is the configured standard
+ // value for action durations to avoid too tight timeouts.
+ // Timeouts in general are diluted by a configurable factor. In essence
this factor controls how much slack you want
+ // to allow in your topics before you start reporting failed activations.
+ val timeout = (action.limits.timeout.duration.max(TimeLimit.STD_DURATION)
* lbConfig.timeoutFactor) + 1.minute
+
+ val resultPromise = if (msg.blocking) {
+ activationPromises.getOrElseUpdate(msg.activationId,
Promise[Either[ActivationId, WhiskActivation]]()).future
+ } else Future.successful(Left(msg.activationId))
+
+ // Install a timeout handler for the catastrophic case where an active ack
is not received at all
+ // (because say an invoker is down completely, or the connection to the
message bus is disrupted) or when
+ // the active ack is significantly delayed (possibly dues to long queues
but the subject should not be penalized);
+ // in this case, if the activation handler is still registered, remove it
and update the books.
+ activationSlots.getOrElseUpdate(
+ msg.activationId, {
+ val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) {
+ processCompletion(msg.activationId, msg.transid, forced = true,
isSystemError = false)
+ }
+
+ // please note: timeoutHandler.cancel must be called on all
non-timeout paths, e.g. Success
+ DistributedActivationEntry(
+ msg.activationId,
+ msg.user.namespace.uuid,
+ msg.user.namespace.name.asString,
+ msg.revision,
+ msg.transid,
+ action.limits.memory.megabytes.MB,
+ action.limits.concurrency.maxConcurrent,
+ action.fullyQualifiedName(true),
+ timeoutHandler,
+ isBlackboxInvocation,
+ msg.blocking,
+ controllerInstance)
+ })
+
+ resultPromise
+ }
+
+ /** 3. Send the activation to the kafka */
+ private def sendActivationToKafka(producer: MessageProducer,
+ msg: ActivationMessage,
+ topic: String): Future[RecordMetadata] = {
+ implicit val transid: TransactionId = msg.transid
+
+
MetricEmitter.emitCounterMetric(LoggingMarkers.LOADBALANCER_ACTIVATION_START)
+ val start = transid.started(this, LoggingMarkers.CONTROLLER_KAFKA)
+
+ producer.send(topic, msg).andThen {
+ case Success(status) =>
+ transid.finished(
+ this,
+ start,
+ s"posted to
${status.topic()}[${status.partition()}][${status.offset()}]",
+ logLevel = InfoLevel)
+ case Failure(_) => transid.failed(this, start, s"error on posting to
topic $topic")
+ }
+ }
+
+ /**
+ * Subscribes to active acks (completion messages from the invokers), and
+ * registers a handler for received active acks from invokers.
+ */
+ private val activationFeed: ActorRef =
+ feedFactory.createFeed(actorSystem, messagingProvider,
processAcknowledgement)
+
+ /** 4. Get the active-ack message and parse it */
+ protected[loadBalancer] def processAcknowledgement(bytes: Array[Byte]):
Future[Unit] = Future {
+ val raw = new String(bytes, StandardCharsets.UTF_8)
+ AcknowledegmentMessage.parse(raw) match {
+ case Success(acknowledegment) =>
+ acknowledegment.isSlotFree.foreach { invoker =>
+ processCompletion(
+ acknowledegment.activationId,
+ acknowledegment.transid,
+ forced = false,
+ isSystemError = acknowledegment.isSystemError.getOrElse(false))
+ }
+
+ acknowledegment.result.foreach { response =>
+ processResult(acknowledegment.activationId, acknowledegment.transid,
response)
+ }
+
+ activationFeed ! MessageFeed.Processed
+ case Failure(t) =>
+ activationFeed ! MessageFeed.Processed
+ logging.error(this, s"failed processing message: $raw")
+
+ case _ =>
+ activationFeed ! MessageFeed.Processed
+ logging.error(this, s"Unexpected Acknowledgment message received by
loadbalancer: $raw")
+ }
+ }
+
+ private def renewTimeoutHandler(entry: DistributedActivationEntry,
+ msg: ActivationMessage,
+ isSystemError: Boolean): Unit = {
+ entry.timeoutHandler.cancel()
+
+ val timeout = (TimeLimit.MAX_DURATION * lbConfig.timeoutFactor) + 1.minute
+ val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) {
+ processCompletion(msg.activationId, msg.transid, forced = true,
isSystemError)
+ }
+ activationSlots.update(msg.activationId, entry.copy(timeoutHandler =
timeoutHandler))
+ }
+
+ /** 5. Process the result ack and return it to the user */
+ private def processResult(aid: ActivationId,
+ tid: TransactionId,
+ response: Either[ActivationId, WhiskActivation]):
Unit = {
+ // Resolve the promise to send the result back to the user.
+ // The activation will be removed from the activation slots later, when
the completion message
+ // is received (because the slot in the invoker is not yet free for new
activations).
+ activationPromises.remove(aid).foreach(_.trySuccess(response))
+ logging.info(this, s"received result ack for '$aid'")(tid)
+ }
+
+ private def deleteActivationSlot(aid: ActivationId, tid: TransactionId):
Option[DistributedActivationEntry] = {
+ activationSlots.remove(aid) match {
+ case option =>
+ if (activationSlots.contains(aid)) {
+ logging.warn(this, s"Failed to delete $aid from activation
slots")(tid)
+ throw new Exception(s"Failed to delete $aid from activation slots")
+ }
+ option
+ }
+ }
+
+ /** Process the completion ack and update the state */
+ protected[loadBalancer] def processCompletion(aid: ActivationId,
+ tid: TransactionId,
+ forced: Boolean,
+ isSystemError: Boolean): Unit
= {
+ implicit val transid = tid
+ activationSlots.remove(aid) match {
+ case Some(entry) =>
+ if (activationSlots.contains(aid))
+ Try { retry(deleteActivationSlot(aid, tid)) } recover {
+ case _ =>
+ logging.error(this, s"Failed to delete $aid from activation
slots")
+ }
+ totalActivations.decrement()
+ totalActivationMemory.add(entry.memory.toMB * (-1))
+ activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement())
+
activationsPerController.get(entry.controllerName).foreach(_.decrement())
+
+ if (!forced) {
+ entry.timeoutHandler.cancel()
+ // notice here that the activationPromises is not touched, because
the expectation is that
+ // the active ack is received as expected, and processing that
message removed the promise
+ // from the corresponding map
+ logging.info(this, s"received completion ack for '$aid', system
error=$isSystemError")(tid)
+ } else {
+ logging.error(this, s"Failed to invoke action ${aid.toString},
error: timeout waiting for the active ack")
+
+ // the entry has timed out; if the active ack is still around,
remove its entry also
+ // and complete the promise with a failure if necessary
+ activationPromises
+ .remove(aid)
+ .foreach(_.tryFailure(new Throwable("no completion or active ack
received yet")))
+ }
+
+ // Active acks that are received here are strictly from user actions -
health actions are not part of
+ // the load balancer's activation map. Inform the invoker pool
supervisor of the user action completion.
+ case None if !forced =>
+ // Received a completion ack that has already been taken out of the
state because of a timeout (forced ack).
+ // The result is ignored because a timeout has already been reported
to the invokerPool per the force.
+ // Logging this condition as a warning because the invoker processed
the activation and sent a completion
+ // message - but not in time.
+ logging.warn(this, s"received completion ack for '$aid' which has no
entry, system error=$isSystemError")(tid)
+ case None =>
+ // The entry has already been removed by a completion ack. This part
of the code is reached by the timeout and can
+ // happen if completion ack and timeout happen roughly at the same
time (the timeout was triggered before the completion
+ // ack canceled the timer). As the completion ack is already processed
we don't have to do anything here.
+ logging.debug(this, s"forced completion ack for '$aid' which has no
entry")(tid)
+ }
+ }
+
+ private val queueKey = QueueKeys.queuePrefix
+ private val schedulerKey = SchedulerKeys.prefix
+ private val throttlingKey = ThrottlingKeys.prefix
+ private val watchedKeys = Set(queueKey, schedulerKey, throttlingKey)
+
+ private val watcher = actorSystem.actorOf(Props(new Actor {
+ watchedKeys.foreach { key =>
+ watcherService ! WatchEndpoint(key, "", true, watcherName, Set(PutEvent,
DeleteEvent))
+ }
+
+ override def receive: Receive = {
+ case WatchEndpointRemoved(watchKey, key, value, true) =>
Review comment:
These endpoints are kept alive by the lease.
When any components go wrong and cannot send a keepalive request for a given
time(10s by default), data are automatically gone and controllers receive this
event.
##########
File path:
core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
##########
@@ -0,0 +1,727 @@
+package org.apache.openwhisk.core.loadBalancer
+
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.ThreadLocalRandom
+import java.util.concurrent.atomic.LongAdder
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Cancellable,
Props}
+import akka.event.Logging.InfoLevel
+import akka.pattern.ask
+import akka.util.Timeout
+import org.apache.kafka.clients.producer.RecordMetadata
+import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy}
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.controller.Controller
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.etcd.EtcdKV.{InvokerKeys, QueueKeys,
SchedulerKeys, ThrottlingKeys}
+import org.apache.openwhisk.core.etcd.EtcdType._
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig}
+import org.apache.openwhisk.core.scheduler.queue.{CreateQueue,
CreateQueueResponse, QueueManager}
+import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints,
SchedulerStates}
+import org.apache.openwhisk.core.service._
+import org.apache.openwhisk.core.{ConfigKeys, WarmUp, WhiskConfig}
+import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.utils.retry
+import pureconfig._
+import pureconfig.generic.auto._
+
+import scala.collection.JavaConverters._
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future,
Promise}
+import scala.language.postfixOps
+import scala.util.{Failure, Random, Success, Try}
+
+class FPCPoolBalancer(config: WhiskConfig,
+ controllerInstance: ControllerInstanceId,
+ etcdClient: EtcdClient,
+ private val feedFactory: FeedFactory,
+ lbConfig: ShardingContainerPoolBalancerConfig =
+
loadConfigOrThrow[ShardingContainerPoolBalancerConfig](ConfigKeys.loadbalancer),
+ private val messagingProvider: MessagingProvider =
SpiLoader.get[MessagingProvider])(
+ implicit val actorSystem: ActorSystem,
+ logging: Logging)
+ extends LoadBalancer {
+
+ private implicit val executionContext: ExecutionContext =
actorSystem.dispatcher
+ private implicit val requestTimeout: Timeout = Timeout(5.seconds)
+
+ private val entityStore = WhiskEntityStore.datastore()
+
+ private val clusterName =
loadConfigOrThrow[String](ConfigKeys.whiskClusterName)
+
+ /** key: SchedulerEndpoints, value: SchedulerStates */
+ private val schedulerEndpoints = TrieMap[SchedulerEndpoints,
SchedulerStates]()
+ private val messageProducer = messagingProvider.getProducer(config,
Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
+
+ private val watcherName = "distributed-pool-balancer"
+ val watcherService: ActorRef =
actorSystem.actorOf(WatcherService.props(etcdClient))
+
+ /** State related to invocations and throttling */
+ protected[loadBalancer] val activationSlots = TrieMap[ActivationId,
DistributedActivationEntry]()
+ protected[loadBalancer] val activationPromises =
+ TrieMap[ActivationId, Promise[Either[ActivationId, WhiskActivation]]]()
+
+ /** key: queue/${invocationNs}/ns/action/leader, value: SchedulerEndpoints */
+ private val queueEndpoints = TrieMap[String, SchedulerEndpoints]()
+
+ private val activationsPerNamespace = TrieMap[UUID, LongAdder]()
+ private val activationsPerController = TrieMap[ControllerInstanceId,
LongAdder]()
+ private val totalActivations = new LongAdder()
+ private val totalActivationMemory = new LongAdder()
+ private val throttlers = TrieMap[String, Boolean]()
+
+ /**
+ * Publishes activation message on internal bus for an invoker to pick up.
+ *
+ * @param action the action to invoke
+ * @param msg the activation message to publish on an invoker topic
+ * @param transid the transaction id for the request
+ * @return result a nested Future the outer indicating completion of
publishing and
+ * the inner the completion of the action (i.e., the result)
+ * if it is ready before timeout (Right) otherwise the activation id
(Left).
+ * The future is guaranteed to complete within the declared action
time limit
+ * plus a grace period (see activeAckTimeoutGrace).
+ */
+ override def publish(action: ExecutableWhiskActionMetaData, msg:
ActivationMessage)(
+ implicit transid: TransactionId): Future[Future[Either[ActivationId,
WhiskActivation]]] = {
+
+ val topicBaseName = if (schedulerEndpoints.isEmpty) {
+ logging.error(
+ this,
+ s"Failed to invoke action ${action.fullyQualifiedName(false)}, error:
no scheduler endpoint available")
+ Future.failed(LoadBalancerException("No scheduler endpoint available"))
+ } else {
+ val invocationNamespace = msg.user.namespace.name.asString
+ val key = QueueKeys.queue(invocationNamespace,
action.fullyQualifiedName(false), true)
+
+ queueEndpoints.get(key) match {
+ case Some(endPoint) =>
+ Future.successful(
+ schedulerEndpoints.getOrElse(endPoint,
Random.shuffle(schedulerEndpoints.toList).head._2).sid.toString)
+ case None =>
+ etcdClient
+ .get(key)
+ .map { res =>
+ res.getKvsList.asScala.headOption map { kv =>
+ val endPoint: String = kv.getValue
+ SchedulerEndpoints
+ .parse(endPoint)
+ .map { endPoint =>
+ queueEndpoints.update(kv.getKey, endPoint)
+ Some(
+ schedulerEndpoints
+ .getOrElse(endPoint,
Random.shuffle(schedulerEndpoints.toList).head._2)
+ .sid
+ .toString)
+ }
+ .getOrElse {
+
FPCPoolBalancer.schedule(schedulerEndpoints.values.toIndexedSeq).map {
scheduler =>
+ createQueue(invocationNamespace, action.toWhiskAction,
msg.action, msg.revision, scheduler)
+ scheduler.sid.toString
+ }
+ }
+ } getOrElse {
+
FPCPoolBalancer.schedule(schedulerEndpoints.values.toIndexedSeq).map {
scheduler =>
+ createQueue(invocationNamespace, action.toWhiskAction,
msg.action, msg.revision, scheduler)
+ scheduler.sid.toString
+ }
+ }
+ }
+ .map { _.get }
+ .recoverWith {
+ case _ =>
+ Future.failed(LoadBalancerException("No scheduler endpoint
available"))
+ }
+ }
+ }
+ topicBaseName.flatMap { baseName =>
+ val topicName = Controller.topicPrefix + baseName
+ val activationResult = setupActivation(msg, action)
+ sendActivationToKafka(messageProducer, msg, topicName).map(_ =>
activationResult)
+ }
+ }
+
+ private def createQueue(
+ invocationNamespace: String,
+ actionMetaData: WhiskActionMetaData,
+ fullyQualifiedEntityName: FullyQualifiedEntityName,
+ revision: DocRevision,
+ scheduler: SchedulerStates,
+ retryCount: Int = 5,
+ excludeSchedulers: Set[SchedulerInstanceId] = Set.empty)(implicit transid:
TransactionId): Unit = {
+ if (retryCount >= 0)
+ scheduler
+ .getRemoteRef(QueueManager.actorName)
+ .ask(CreateQueue(invocationNamespace,
fullyQualifiedEntityName.copy(binding = None), revision, actionMetaData))
+ .mapTo[CreateQueueResponse]
+ .onComplete {
+ case Success(_) =>
+ logging.info(
+ this,
+ s"Create queue successfully for
$invocationNamespace/$fullyQualifiedEntityName on ${scheduler.sid}")
+ case Failure(t) =>
+ logging.error(
+ this,
+ s"failed to get response from ${scheduler}, error is $t, will
retry for ${retryCount} times")
+ // try another scheduler
+ FPCPoolBalancer
+ .schedule(schedulerEndpoints.values.toIndexedSeq,
excludeSchedulers + scheduler.sid)
+ .map { newScheduler =>
+ createQueue(
+ invocationNamespace,
+ actionMetaData,
+ fullyQualifiedEntityName,
+ revision,
+ newScheduler,
+ retryCount - 1,
+ excludeSchedulers + scheduler.sid)
+ }
+ .getOrElse {
+ logging.error(
+ this,
+ s"failed to create queue for
$invocationNamespace/$fullyQualifiedEntityName, no scheduler endpoint available
related activations may failed")
+ }
+ } else
+ logging.error(
+ this,
+ s"failed to create queue for
$invocationNamespace/$fullyQualifiedEntityName, related activations may failed")
+ }
+
+ /**
+ * 2. Update local state with the to be executed activation.
+ *
+ * All activations are tracked in the activationSlots map. Additionally,
blocking invokes
+ * are tracked in the activation results map. When a result is received via
activeack, it
+ * will cause the result to be forwarded to the caller waiting on the
result, and cancel
+ * the DB poll which is also trying to do the same.
+ */
+ private def setupActivation(msg: ActivationMessage,
+ action: ExecutableWhiskActionMetaData):
Future[Either[ActivationId, WhiskActivation]] = {
+ val isBlackboxInvocation = action.exec.pull
+
+ totalActivations.increment()
+ totalActivationMemory.add(action.limits.memory.megabytes)
+ activationsPerNamespace.getOrElseUpdate(msg.user.namespace.uuid, new
LongAdder()).increment()
+ activationsPerController.getOrElseUpdate(controllerInstance, new
LongAdder()).increment()
+
+ // Timeout is a multiple of the configured maximum action duration. The
minimum timeout is the configured standard
+ // value for action durations to avoid too tight timeouts.
+ // Timeouts in general are diluted by a configurable factor. In essence
this factor controls how much slack you want
+ // to allow in your topics before you start reporting failed activations.
+ val timeout = (action.limits.timeout.duration.max(TimeLimit.STD_DURATION)
* lbConfig.timeoutFactor) + 1.minute
+
+ val resultPromise = if (msg.blocking) {
+ activationPromises.getOrElseUpdate(msg.activationId,
Promise[Either[ActivationId, WhiskActivation]]()).future
+ } else Future.successful(Left(msg.activationId))
+
+ // Install a timeout handler for the catastrophic case where an active ack
is not received at all
+ // (because say an invoker is down completely, or the connection to the
message bus is disrupted) or when
+ // the active ack is significantly delayed (possibly dues to long queues
but the subject should not be penalized);
+ // in this case, if the activation handler is still registered, remove it
and update the books.
+ activationSlots.getOrElseUpdate(
+ msg.activationId, {
+ val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) {
+ processCompletion(msg.activationId, msg.transid, forced = true,
isSystemError = false)
+ }
+
+ // please note: timeoutHandler.cancel must be called on all
non-timeout paths, e.g. Success
+ DistributedActivationEntry(
+ msg.activationId,
+ msg.user.namespace.uuid,
+ msg.user.namespace.name.asString,
+ msg.revision,
+ msg.transid,
+ action.limits.memory.megabytes.MB,
+ action.limits.concurrency.maxConcurrent,
+ action.fullyQualifiedName(true),
+ timeoutHandler,
+ isBlackboxInvocation,
+ msg.blocking,
+ controllerInstance)
+ })
+
+ resultPromise
+ }
+
+ /** 3. Send the activation to the kafka */
+ private def sendActivationToKafka(producer: MessageProducer,
+ msg: ActivationMessage,
+ topic: String): Future[RecordMetadata] = {
+ implicit val transid: TransactionId = msg.transid
+
+
MetricEmitter.emitCounterMetric(LoggingMarkers.LOADBALANCER_ACTIVATION_START)
+ val start = transid.started(this, LoggingMarkers.CONTROLLER_KAFKA)
+
+ producer.send(topic, msg).andThen {
+ case Success(status) =>
+ transid.finished(
+ this,
+ start,
+ s"posted to
${status.topic()}[${status.partition()}][${status.offset()}]",
+ logLevel = InfoLevel)
+ case Failure(_) => transid.failed(this, start, s"error on posting to
topic $topic")
+ }
+ }
+
+ /**
+ * Subscribes to active acks (completion messages from the invokers), and
+ * registers a handler for received active acks from invokers.
+ */
+ private val activationFeed: ActorRef =
+ feedFactory.createFeed(actorSystem, messagingProvider,
processAcknowledgement)
+
+ /** 4. Get the active-ack message and parse it */
+ protected[loadBalancer] def processAcknowledgement(bytes: Array[Byte]):
Future[Unit] = Future {
+ val raw = new String(bytes, StandardCharsets.UTF_8)
+ AcknowledegmentMessage.parse(raw) match {
+ case Success(acknowledegment) =>
+ acknowledegment.isSlotFree.foreach { invoker =>
+ processCompletion(
+ acknowledegment.activationId,
+ acknowledegment.transid,
+ forced = false,
+ isSystemError = acknowledegment.isSystemError.getOrElse(false))
+ }
+
+ acknowledegment.result.foreach { response =>
+ processResult(acknowledegment.activationId, acknowledegment.transid,
response)
+ }
+
+ activationFeed ! MessageFeed.Processed
+ case Failure(t) =>
+ activationFeed ! MessageFeed.Processed
+ logging.error(this, s"failed processing message: $raw")
+
+ case _ =>
+ activationFeed ! MessageFeed.Processed
+ logging.error(this, s"Unexpected Acknowledgment message received by
loadbalancer: $raw")
+ }
+ }
+
+ private def renewTimeoutHandler(entry: DistributedActivationEntry,
+ msg: ActivationMessage,
+ isSystemError: Boolean): Unit = {
+ entry.timeoutHandler.cancel()
+
+ val timeout = (TimeLimit.MAX_DURATION * lbConfig.timeoutFactor) + 1.minute
+ val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) {
+ processCompletion(msg.activationId, msg.transid, forced = true,
isSystemError)
+ }
+ activationSlots.update(msg.activationId, entry.copy(timeoutHandler =
timeoutHandler))
+ }
+
+ /** 5. Process the result ack and return it to the user */
+ private def processResult(aid: ActivationId,
+ tid: TransactionId,
+ response: Either[ActivationId, WhiskActivation]):
Unit = {
+ // Resolve the promise to send the result back to the user.
+ // The activation will be removed from the activation slots later, when
the completion message
+ // is received (because the slot in the invoker is not yet free for new
activations).
+ activationPromises.remove(aid).foreach(_.trySuccess(response))
+ logging.info(this, s"received result ack for '$aid'")(tid)
+ }
+
+ private def deleteActivationSlot(aid: ActivationId, tid: TransactionId):
Option[DistributedActivationEntry] = {
+ activationSlots.remove(aid) match {
+ case option =>
+ if (activationSlots.contains(aid)) {
+ logging.warn(this, s"Failed to delete $aid from activation
slots")(tid)
+ throw new Exception(s"Failed to delete $aid from activation slots")
+ }
+ option
+ }
+ }
+
+ /** Process the completion ack and update the state */
+ protected[loadBalancer] def processCompletion(aid: ActivationId,
+ tid: TransactionId,
+ forced: Boolean,
+ isSystemError: Boolean): Unit
= {
+ implicit val transid = tid
+ activationSlots.remove(aid) match {
+ case Some(entry) =>
+ if (activationSlots.contains(aid))
+ Try { retry(deleteActivationSlot(aid, tid)) } recover {
+ case _ =>
+ logging.error(this, s"Failed to delete $aid from activation
slots")
+ }
+ totalActivations.decrement()
+ totalActivationMemory.add(entry.memory.toMB * (-1))
+ activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement())
+
activationsPerController.get(entry.controllerName).foreach(_.decrement())
+
+ if (!forced) {
+ entry.timeoutHandler.cancel()
+ // notice here that the activationPromises is not touched, because
the expectation is that
+ // the active ack is received as expected, and processing that
message removed the promise
+ // from the corresponding map
+ logging.info(this, s"received completion ack for '$aid', system
error=$isSystemError")(tid)
+ } else {
+ logging.error(this, s"Failed to invoke action ${aid.toString},
error: timeout waiting for the active ack")
+
+ // the entry has timed out; if the active ack is still around,
remove its entry also
+ // and complete the promise with a failure if necessary
+ activationPromises
+ .remove(aid)
+ .foreach(_.tryFailure(new Throwable("no completion or active ack
received yet")))
+ }
+
+ // Active acks that are received here are strictly from user actions -
health actions are not part of
+ // the load balancer's activation map. Inform the invoker pool
supervisor of the user action completion.
+ case None if !forced =>
+ // Received a completion ack that has already been taken out of the
state because of a timeout (forced ack).
+ // The result is ignored because a timeout has already been reported
to the invokerPool per the force.
+ // Logging this condition as a warning because the invoker processed
the activation and sent a completion
+ // message - but not in time.
+ logging.warn(this, s"received completion ack for '$aid' which has no
entry, system error=$isSystemError")(tid)
+ case None =>
+ // The entry has already been removed by a completion ack. This part
of the code is reached by the timeout and can
+ // happen if completion ack and timeout happen roughly at the same
time (the timeout was triggered before the completion
+ // ack canceled the timer). As the completion ack is already processed
we don't have to do anything here.
+ logging.debug(this, s"forced completion ack for '$aid' which has no
entry")(tid)
+ }
+ }
+
+ private val queueKey = QueueKeys.queuePrefix
+ private val schedulerKey = SchedulerKeys.prefix
+ private val throttlingKey = ThrottlingKeys.prefix
+ private val watchedKeys = Set(queueKey, schedulerKey, throttlingKey)
+
+ private val watcher = actorSystem.actorOf(Props(new Actor {
+ watchedKeys.foreach { key =>
+ watcherService ! WatchEndpoint(key, "", true, watcherName, Set(PutEvent,
DeleteEvent))
+ }
+
+ override def receive: Receive = {
+ case WatchEndpointRemoved(watchKey, key, value, true) =>
+ watchKey match {
+ case `queueKey` =>
+ if (key.contains("leader")) {
+ queueEndpoints.remove(key)
+ activationSlots.values
+ .find(entry =>
+ //the leader key's value is
queue/invocationNamespace/ns/pkg/act/leader
+ QueueKeys
+ .queue(entry.invocationNamespace,
entry.fullyQualifiedEntityName.copy(version = None), true) == key)
+ .foreach {
+ entry =>
+ implicit val transid = entry.transactionId
+ logging.warn(
+ this,
+ s"The $key is deleted from ETCD, but there are still
unhandled activations for this action, try to create a new queue")
+ WhiskActionMetaData
+ .get(
+ entityStore,
+ entry.fullyQualifiedEntityName.toDocId,
+ DocRevision.empty,
+ entry.revision != DocRevision.empty)
+ .map { actionMetaData =>
+ FPCPoolBalancer
+ .schedule(schedulerEndpoints.values.toIndexedSeq)
+ .map { scheduler =>
+ createQueue(
+ entry.invocationNamespace,
+ actionMetaData,
+ entry.fullyQualifiedEntityName,
+ entry.revision,
+ scheduler)
+ }
+ .getOrElse {
+ logging.error(
+ this,
+ s"Failed to recreate queue for
${entry.fullyQualifiedEntityName}, no scheduler endpoint available")
+ }
+ }
+ }
+ }
+ case `schedulerKey` =>
+ SchedulerStates
+ .parse(value)
+ .map { state =>
+ logging.info(this, s"remove scheduler endpoint $state")
+ schedulerEndpoints.remove(state.endpoints)
+ }
+ .recover {
+ case t =>
+ logging.error(this, s"Unexpected error$t")
+ }
+
+ case `throttlingKey` =>
+ throttlers.remove(key)
+ case _ =>
+ }
+
+ case WatchEndpointInserted(watchKey, key, value, true) =>
+ watchKey match {
+ case `queueKey` =>
+ //ignore parse follower value, just parse leader value's normal
value, e.g. on special case, leader key's value may be Removing
+ if (key.contains("leader") && value.contains("host")) {
+ SchedulerEndpoints
+ .parse(value)
+ .map { endpoints =>
+ queueEndpoints.update(key, endpoints)
+ }
+ .recover {
+ case t =>
+ logging.error(this, s"Unexpected error$t")
+ }
+ }
+ case `schedulerKey` =>
+ SchedulerStates
+ .parse(value)
+ .map { state =>
+ // if this is a new scheduler, warm up it
+ if (!schedulerEndpoints.contains(state.endpoints))
+ warmUpScheduler(key, state.endpoints)
+ schedulerEndpoints.update(state.endpoints, state)
+ }
+ .recover {
+ case t =>
+ logging.error(this, s"Unexpected error$t")
+ }
+ case `throttlingKey` =>
+ val throttled = Try {
+ value.toBoolean
+ }.getOrElse(false)
+ throttlers.update(key, throttled)
+ case _ =>
+ }
+ }
+ }))
+
+ private[loadBalancer] def getSchedulerEndpoint() = {
+ schedulerEndpoints
+ }
+
+ private val warmUpQueueCreationRequest =
+ ExecManifest.runtimesManifest
+ .resolveDefaultRuntime("nodejs:default")
+ .map { manifest =>
+ val metadata = ExecutableWhiskActionMetaData(
+ WarmUp.warmUpAction.path,
+ WarmUp.warmUpAction.name,
+ CodeExecMetaDataAsString(manifest, false, entryPoint = None))
+ CreateQueue(
+ WarmUp.warmUpActionIdentity.namespace.name.asString,
+ WarmUp.warmUpAction,
+ DocRevision.empty,
+ metadata.toWhiskAction)
+ }
+
+ private def warmUpScheduler(schedulerName: String, scheduler:
SchedulerEndpoints): Unit = {
+ implicit val transId = TransactionId.warmUp
+ logging.info(this, s"Warm up scheduler $scheduler")
+ sendActivationToKafka(
+ messageProducer,
+ WarmUp.warmUpActivation(controllerInstance),
+ Controller.topicPrefix + schedulerName.replace(s"$clusterName/",
"").replace("/", "")) // warm up kafka
+
+ warmUpQueueCreationRequest.foreach { request =>
+
scheduler.getRemoteRef(QueueManager.actorName).ask(request).mapTo[CreateQueueResponse].onComplete
{
+ case _ => logging.info(this, s"Warmed up scheduler $scheduler")
+ }
+ }
+ }
+
+ protected def loadSchedulerEndpoint(): Unit = {
+ etcdClient
+ .getPrefix(SchedulerKeys.prefix)
+ .map { res =>
+ res.getKvsList.asScala.map { kv =>
+ val schedulerStates: String = kv.getValue
+ SchedulerStates
+ .parse(schedulerStates)
+ .map { state =>
+ // if this is a new scheduler, warm up it
+ if (!schedulerEndpoints.contains(state.endpoints))
+ warmUpScheduler(kv.getKey, state.endpoints)
+ schedulerEndpoints.update(state.endpoints, state)
+ }
+ .recover {
+ case t =>
+ logging.error(this, s"Unexpected error$t")
+ }
+ }
+ }
+ }
+
+ loadSchedulerEndpoint()
+
+ override def close(): Unit = {
+ watchedKeys.foreach { key =>
+ watcherService ! UnwatchEndpoint(key, true, watcherName)
+ }
+ activationFeed ! GracefulShutdown
+ }
+
+ /**
+ * Returns a message indicating the health of the containers and/or
container pool in general.
+ *
+ * @return a Future[IndexedSeq[InvokerHealth]] representing the health of
the pools managed by the loadbalancer.
+ **/
+ override def invokerHealth(): Future[IndexedSeq[InvokerHealth]] = {
+ etcdClient.getPrefix(s"${InvokerKeys.prefix}/").map { res =>
+ val healthsFromEtcd = res.getKvsList.asScala.map { kv =>
+ val (memory, busyMemory, status, tags, dedicatedNamespaces) =
InvokerResourceMessage
+ .parse(kv.getValue.toString(StandardCharsets.UTF_8))
+ .map { resourceMessage =>
+ val status = resourceMessage.status match {
+ case Healthy.asString => Healthy
+ case Unhealthy.asString => Unhealthy
+ case Offline.asString => Offline
+ }
+ (
+ resourceMessage.freeMemory.MB,
+ resourceMessage.busyMemory.MB,
+ status,
+ resourceMessage.tags,
+ resourceMessage.dedicatedNamespaces)
+ }
+ .get
+ val temporalId =
InvokerKeys.getInstanceId(kv.getKey.toString(StandardCharsets.UTF_8))
+ val invoker = temporalId.copy(
+ userMemory = memory,
+ busyMemory = Some(busyMemory),
+ tags = tags,
+ dedicatedNamespaces = dedicatedNamespaces)
+
+ new InvokerHealth(invoker, status)
+ }.toIndexedSeq
+ val missingHealths =
+ if (healthsFromEtcd.isEmpty) Set.empty[InvokerHealth]
+ else
+ ((0 to healthsFromEtcd.maxBy(_.id.toInt).id.toInt).toSet --
healthsFromEtcd.map(_.id.toInt))
+ .map(id => new InvokerHealth(InvokerInstanceId(id,
Some(id.toString), userMemory = 0 MB), Offline))
+ (healthsFromEtcd ++ missingHealths) sortBy (_.id.toInt)
+ }
+ }
+
+ /** Gets the number of in-flight activations for a specific user. */
+ override def activeActivationsFor(namespace: UUID): Future[Int] =
+
Future.successful(activationsPerNamespace.get(namespace).map(_.intValue()).getOrElse(0))
+
+ /** Gets the number of in-flight activations for a specific controller. */
+ override def activeActivationsByController(controller: String): Future[Int] =
+
Future.successful(activationsPerController.get(ControllerInstanceId(controller)).map(_.intValue()).getOrElse(0))
+
+ /** Gets the in-flight activations */
+ override def activeActivationsByController: Future[List[ActivationId]] =
+ Future.successful(activationSlots.keySet.toList)
+
+ /** Gets the number of in-flight activations for a specific invoker. */
+ override def activeActivationsByInvoker(invoker: String): Future[Int] =
Future.successful(0)
+
+ /** Gets the number of in-flight activations in the system. */
+ override def totalActiveActivations: Future[Int] =
Future.successful(totalActivations.intValue())
+
+ /** Gets the throttling for given action. */
+ override def checkThrottle(namespace: EntityPath, fqn: String): Boolean = {
+
+ /**
+ * Note! The throttle key is assumed to exist unconditionally and is not
limited to throttle if not present.
Review comment:
Yes. there is no such thing.
If that is necessary, we can add a new feature to the scheduler part. I
believe we can control the rate activations are pulled out of a queue.
And if it is critical and urgent, we can indirectly limit invocations with
the number of containers with the assumption that one action's execution time
does not drastically vary.
##########
File path:
core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
##########
@@ -0,0 +1,727 @@
+package org.apache.openwhisk.core.loadBalancer
+
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.ThreadLocalRandom
+import java.util.concurrent.atomic.LongAdder
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Cancellable,
Props}
+import akka.event.Logging.InfoLevel
+import akka.pattern.ask
+import akka.util.Timeout
+import org.apache.kafka.clients.producer.RecordMetadata
+import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy}
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.controller.Controller
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.etcd.EtcdKV.{InvokerKeys, QueueKeys,
SchedulerKeys, ThrottlingKeys}
+import org.apache.openwhisk.core.etcd.EtcdType._
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig}
+import org.apache.openwhisk.core.scheduler.queue.{CreateQueue,
CreateQueueResponse, QueueManager}
+import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints,
SchedulerStates}
+import org.apache.openwhisk.core.service._
+import org.apache.openwhisk.core.{ConfigKeys, WarmUp, WhiskConfig}
+import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.utils.retry
+import pureconfig._
+import pureconfig.generic.auto._
+
+import scala.collection.JavaConverters._
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future,
Promise}
+import scala.language.postfixOps
+import scala.util.{Failure, Random, Success, Try}
+
+class FPCPoolBalancer(config: WhiskConfig,
+ controllerInstance: ControllerInstanceId,
+ etcdClient: EtcdClient,
+ private val feedFactory: FeedFactory,
+ lbConfig: ShardingContainerPoolBalancerConfig =
+
loadConfigOrThrow[ShardingContainerPoolBalancerConfig](ConfigKeys.loadbalancer),
+ private val messagingProvider: MessagingProvider =
SpiLoader.get[MessagingProvider])(
+ implicit val actorSystem: ActorSystem,
+ logging: Logging)
+ extends LoadBalancer {
+
+ private implicit val executionContext: ExecutionContext =
actorSystem.dispatcher
+ private implicit val requestTimeout: Timeout = Timeout(5.seconds)
+
+ private val entityStore = WhiskEntityStore.datastore()
+
+ private val clusterName =
loadConfigOrThrow[String](ConfigKeys.whiskClusterName)
+
+ /** key: SchedulerEndpoints, value: SchedulerStates */
+ private val schedulerEndpoints = TrieMap[SchedulerEndpoints,
SchedulerStates]()
+ private val messageProducer = messagingProvider.getProducer(config,
Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
+
+ private val watcherName = "distributed-pool-balancer"
+ val watcherService: ActorRef =
actorSystem.actorOf(WatcherService.props(etcdClient))
+
+ /** State related to invocations and throttling */
+ protected[loadBalancer] val activationSlots = TrieMap[ActivationId,
DistributedActivationEntry]()
+ protected[loadBalancer] val activationPromises =
+ TrieMap[ActivationId, Promise[Either[ActivationId, WhiskActivation]]]()
+
+ /** key: queue/${invocationNs}/ns/action/leader, value: SchedulerEndpoints */
+ private val queueEndpoints = TrieMap[String, SchedulerEndpoints]()
+
+ private val activationsPerNamespace = TrieMap[UUID, LongAdder]()
+ private val activationsPerController = TrieMap[ControllerInstanceId,
LongAdder]()
+ private val totalActivations = new LongAdder()
+ private val totalActivationMemory = new LongAdder()
+ private val throttlers = TrieMap[String, Boolean]()
+
+ /**
+ * Publishes activation message on internal bus for an invoker to pick up.
+ *
+ * @param action the action to invoke
+ * @param msg the activation message to publish on an invoker topic
+ * @param transid the transaction id for the request
+ * @return result a nested Future the outer indicating completion of
publishing and
+ * the inner the completion of the action (i.e., the result)
+ * if it is ready before timeout (Right) otherwise the activation id
(Left).
+ * The future is guaranteed to complete within the declared action
time limit
+ * plus a grace period (see activeAckTimeoutGrace).
+ */
+ override def publish(action: ExecutableWhiskActionMetaData, msg:
ActivationMessage)(
+ implicit transid: TransactionId): Future[Future[Either[ActivationId,
WhiskActivation]]] = {
+
+ val topicBaseName = if (schedulerEndpoints.isEmpty) {
+ logging.error(
+ this,
+ s"Failed to invoke action ${action.fullyQualifiedName(false)}, error:
no scheduler endpoint available")
+ Future.failed(LoadBalancerException("No scheduler endpoint available"))
+ } else {
+ val invocationNamespace = msg.user.namespace.name.asString
+ val key = QueueKeys.queue(invocationNamespace,
action.fullyQualifiedName(false), true)
+
+ queueEndpoints.get(key) match {
+ case Some(endPoint) =>
+ Future.successful(
+ schedulerEndpoints.getOrElse(endPoint,
Random.shuffle(schedulerEndpoints.toList).head._2).sid.toString)
+ case None =>
+ etcdClient
+ .get(key)
+ .map { res =>
+ res.getKvsList.asScala.headOption map { kv =>
+ val endPoint: String = kv.getValue
+ SchedulerEndpoints
+ .parse(endPoint)
+ .map { endPoint =>
+ queueEndpoints.update(kv.getKey, endPoint)
+ Some(
+ schedulerEndpoints
+ .getOrElse(endPoint,
Random.shuffle(schedulerEndpoints.toList).head._2)
+ .sid
+ .toString)
+ }
+ .getOrElse {
+
FPCPoolBalancer.schedule(schedulerEndpoints.values.toIndexedSeq).map {
scheduler =>
+ createQueue(invocationNamespace, action.toWhiskAction,
msg.action, msg.revision, scheduler)
+ scheduler.sid.toString
+ }
+ }
+ } getOrElse {
+
FPCPoolBalancer.schedule(schedulerEndpoints.values.toIndexedSeq).map {
scheduler =>
+ createQueue(invocationNamespace, action.toWhiskAction,
msg.action, msg.revision, scheduler)
+ scheduler.sid.toString
+ }
+ }
+ }
+ .map { _.get }
+ .recoverWith {
+ case _ =>
+ Future.failed(LoadBalancerException("No scheduler endpoint
available"))
+ }
+ }
+ }
+ topicBaseName.flatMap { baseName =>
+ val topicName = Controller.topicPrefix + baseName
+ val activationResult = setupActivation(msg, action)
+ sendActivationToKafka(messageProducer, msg, topicName).map(_ =>
activationResult)
+ }
+ }
+
+ private def createQueue(
+ invocationNamespace: String,
+ actionMetaData: WhiskActionMetaData,
+ fullyQualifiedEntityName: FullyQualifiedEntityName,
+ revision: DocRevision,
+ scheduler: SchedulerStates,
+ retryCount: Int = 5,
+ excludeSchedulers: Set[SchedulerInstanceId] = Set.empty)(implicit transid:
TransactionId): Unit = {
+ if (retryCount >= 0)
+ scheduler
+ .getRemoteRef(QueueManager.actorName)
+ .ask(CreateQueue(invocationNamespace,
fullyQualifiedEntityName.copy(binding = None), revision, actionMetaData))
+ .mapTo[CreateQueueResponse]
+ .onComplete {
+ case Success(_) =>
+ logging.info(
+ this,
+ s"Create queue successfully for
$invocationNamespace/$fullyQualifiedEntityName on ${scheduler.sid}")
+ case Failure(t) =>
+ logging.error(
+ this,
+ s"failed to get response from ${scheduler}, error is $t, will
retry for ${retryCount} times")
+ // try another scheduler
+ FPCPoolBalancer
+ .schedule(schedulerEndpoints.values.toIndexedSeq,
excludeSchedulers + scheduler.sid)
+ .map { newScheduler =>
+ createQueue(
+ invocationNamespace,
+ actionMetaData,
+ fullyQualifiedEntityName,
+ revision,
+ newScheduler,
+ retryCount - 1,
+ excludeSchedulers + scheduler.sid)
+ }
+ .getOrElse {
+ logging.error(
+ this,
+ s"failed to create queue for
$invocationNamespace/$fullyQualifiedEntityName, no scheduler endpoint available
related activations may failed")
+ }
+ } else
+ logging.error(
+ this,
+ s"failed to create queue for
$invocationNamespace/$fullyQualifiedEntityName, related activations may failed")
+ }
+
+ /**
+ * 2. Update local state with the to be executed activation.
+ *
+ * All activations are tracked in the activationSlots map. Additionally,
blocking invokes
+ * are tracked in the activation results map. When a result is received via
activeack, it
+ * will cause the result to be forwarded to the caller waiting on the
result, and cancel
+ * the DB poll which is also trying to do the same.
+ */
+ private def setupActivation(msg: ActivationMessage,
+ action: ExecutableWhiskActionMetaData):
Future[Either[ActivationId, WhiskActivation]] = {
+ val isBlackboxInvocation = action.exec.pull
+
+ totalActivations.increment()
+ totalActivationMemory.add(action.limits.memory.megabytes)
+ activationsPerNamespace.getOrElseUpdate(msg.user.namespace.uuid, new
LongAdder()).increment()
+ activationsPerController.getOrElseUpdate(controllerInstance, new
LongAdder()).increment()
+
+ // Timeout is a multiple of the configured maximum action duration. The
minimum timeout is the configured standard
+ // value for action durations to avoid too tight timeouts.
+ // Timeouts in general are diluted by a configurable factor. In essence
this factor controls how much slack you want
+ // to allow in your topics before you start reporting failed activations.
+ val timeout = (action.limits.timeout.duration.max(TimeLimit.STD_DURATION)
* lbConfig.timeoutFactor) + 1.minute
+
+ val resultPromise = if (msg.blocking) {
+ activationPromises.getOrElseUpdate(msg.activationId,
Promise[Either[ActivationId, WhiskActivation]]()).future
+ } else Future.successful(Left(msg.activationId))
+
+ // Install a timeout handler for the catastrophic case where an active ack
is not received at all
+ // (because say an invoker is down completely, or the connection to the
message bus is disrupted) or when
+ // the active ack is significantly delayed (possibly dues to long queues
but the subject should not be penalized);
+ // in this case, if the activation handler is still registered, remove it
and update the books.
+ activationSlots.getOrElseUpdate(
+ msg.activationId, {
+ val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) {
+ processCompletion(msg.activationId, msg.transid, forced = true,
isSystemError = false)
+ }
+
+ // please note: timeoutHandler.cancel must be called on all
non-timeout paths, e.g. Success
+ DistributedActivationEntry(
+ msg.activationId,
+ msg.user.namespace.uuid,
+ msg.user.namespace.name.asString,
+ msg.revision,
+ msg.transid,
+ action.limits.memory.megabytes.MB,
+ action.limits.concurrency.maxConcurrent,
+ action.fullyQualifiedName(true),
+ timeoutHandler,
+ isBlackboxInvocation,
+ msg.blocking,
+ controllerInstance)
+ })
+
+ resultPromise
+ }
+
+ /** 3. Send the activation to the kafka */
+ private def sendActivationToKafka(producer: MessageProducer,
+ msg: ActivationMessage,
+ topic: String): Future[RecordMetadata] = {
+ implicit val transid: TransactionId = msg.transid
+
+
MetricEmitter.emitCounterMetric(LoggingMarkers.LOADBALANCER_ACTIVATION_START)
+ val start = transid.started(this, LoggingMarkers.CONTROLLER_KAFKA)
+
+ producer.send(topic, msg).andThen {
+ case Success(status) =>
+ transid.finished(
+ this,
+ start,
+ s"posted to
${status.topic()}[${status.partition()}][${status.offset()}]",
+ logLevel = InfoLevel)
+ case Failure(_) => transid.failed(this, start, s"error on posting to
topic $topic")
+ }
+ }
+
+ /**
+ * Subscribes to active acks (completion messages from the invokers), and
+ * registers a handler for received active acks from invokers.
+ */
+ private val activationFeed: ActorRef =
+ feedFactory.createFeed(actorSystem, messagingProvider,
processAcknowledgement)
+
+ /** 4. Get the active-ack message and parse it */
+ protected[loadBalancer] def processAcknowledgement(bytes: Array[Byte]):
Future[Unit] = Future {
+ val raw = new String(bytes, StandardCharsets.UTF_8)
+ AcknowledegmentMessage.parse(raw) match {
+ case Success(acknowledegment) =>
+ acknowledegment.isSlotFree.foreach { invoker =>
+ processCompletion(
+ acknowledegment.activationId,
+ acknowledegment.transid,
+ forced = false,
+ isSystemError = acknowledegment.isSystemError.getOrElse(false))
+ }
+
+ acknowledegment.result.foreach { response =>
+ processResult(acknowledegment.activationId, acknowledegment.transid,
response)
+ }
+
+ activationFeed ! MessageFeed.Processed
+ case Failure(t) =>
+ activationFeed ! MessageFeed.Processed
+ logging.error(this, s"failed processing message: $raw")
+
+ case _ =>
+ activationFeed ! MessageFeed.Processed
+ logging.error(this, s"Unexpected Acknowledgment message received by
loadbalancer: $raw")
+ }
+ }
+
+ private def renewTimeoutHandler(entry: DistributedActivationEntry,
+ msg: ActivationMessage,
+ isSystemError: Boolean): Unit = {
+ entry.timeoutHandler.cancel()
+
+ val timeout = (TimeLimit.MAX_DURATION * lbConfig.timeoutFactor) + 1.minute
+ val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) {
+ processCompletion(msg.activationId, msg.transid, forced = true,
isSystemError)
+ }
+ activationSlots.update(msg.activationId, entry.copy(timeoutHandler =
timeoutHandler))
+ }
+
+ /** 5. Process the result ack and return it to the user */
+ private def processResult(aid: ActivationId,
+ tid: TransactionId,
+ response: Either[ActivationId, WhiskActivation]):
Unit = {
+ // Resolve the promise to send the result back to the user.
+ // The activation will be removed from the activation slots later, when
the completion message
+ // is received (because the slot in the invoker is not yet free for new
activations).
+ activationPromises.remove(aid).foreach(_.trySuccess(response))
+ logging.info(this, s"received result ack for '$aid'")(tid)
+ }
+
+ private def deleteActivationSlot(aid: ActivationId, tid: TransactionId):
Option[DistributedActivationEntry] = {
+ activationSlots.remove(aid) match {
+ case option =>
+ if (activationSlots.contains(aid)) {
+ logging.warn(this, s"Failed to delete $aid from activation
slots")(tid)
+ throw new Exception(s"Failed to delete $aid from activation slots")
+ }
+ option
+ }
+ }
+
+ /** Process the completion ack and update the state */
+ protected[loadBalancer] def processCompletion(aid: ActivationId,
+ tid: TransactionId,
+ forced: Boolean,
+ isSystemError: Boolean): Unit
= {
+ implicit val transid = tid
+ activationSlots.remove(aid) match {
+ case Some(entry) =>
+ if (activationSlots.contains(aid))
+ Try { retry(deleteActivationSlot(aid, tid)) } recover {
+ case _ =>
+ logging.error(this, s"Failed to delete $aid from activation
slots")
+ }
+ totalActivations.decrement()
+ totalActivationMemory.add(entry.memory.toMB * (-1))
+ activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement())
+
activationsPerController.get(entry.controllerName).foreach(_.decrement())
+
+ if (!forced) {
+ entry.timeoutHandler.cancel()
+ // notice here that the activationPromises is not touched, because
the expectation is that
+ // the active ack is received as expected, and processing that
message removed the promise
+ // from the corresponding map
+ logging.info(this, s"received completion ack for '$aid', system
error=$isSystemError")(tid)
+ } else {
+ logging.error(this, s"Failed to invoke action ${aid.toString},
error: timeout waiting for the active ack")
+
+ // the entry has timed out; if the active ack is still around,
remove its entry also
+ // and complete the promise with a failure if necessary
+ activationPromises
+ .remove(aid)
+ .foreach(_.tryFailure(new Throwable("no completion or active ack
received yet")))
+ }
+
+ // Active acks that are received here are strictly from user actions -
health actions are not part of
+ // the load balancer's activation map. Inform the invoker pool
supervisor of the user action completion.
+ case None if !forced =>
+ // Received a completion ack that has already been taken out of the
state because of a timeout (forced ack).
+ // The result is ignored because a timeout has already been reported
to the invokerPool per the force.
+ // Logging this condition as a warning because the invoker processed
the activation and sent a completion
+ // message - but not in time.
+ logging.warn(this, s"received completion ack for '$aid' which has no
entry, system error=$isSystemError")(tid)
+ case None =>
+ // The entry has already been removed by a completion ack. This part
of the code is reached by the timeout and can
+ // happen if completion ack and timeout happen roughly at the same
time (the timeout was triggered before the completion
+ // ack canceled the timer). As the completion ack is already processed
we don't have to do anything here.
+ logging.debug(this, s"forced completion ack for '$aid' which has no
entry")(tid)
+ }
+ }
+
+ private val queueKey = QueueKeys.queuePrefix
+ private val schedulerKey = SchedulerKeys.prefix
+ private val throttlingKey = ThrottlingKeys.prefix
+ private val watchedKeys = Set(queueKey, schedulerKey, throttlingKey)
+
+ private val watcher = actorSystem.actorOf(Props(new Actor {
+ watchedKeys.foreach { key =>
+ watcherService ! WatchEndpoint(key, "", true, watcherName, Set(PutEvent,
DeleteEvent))
+ }
+
+ override def receive: Receive = {
+ case WatchEndpointRemoved(watchKey, key, value, true) =>
+ watchKey match {
+ case `queueKey` =>
+ if (key.contains("leader")) {
+ queueEndpoints.remove(key)
+ activationSlots.values
+ .find(entry =>
+ //the leader key's value is
queue/invocationNamespace/ns/pkg/act/leader
+ QueueKeys
+ .queue(entry.invocationNamespace,
entry.fullyQualifiedEntityName.copy(version = None), true) == key)
+ .foreach {
+ entry =>
+ implicit val transid = entry.transactionId
+ logging.warn(
+ this,
+ s"The $key is deleted from ETCD, but there are still
unhandled activations for this action, try to create a new queue")
+ WhiskActionMetaData
+ .get(
+ entityStore,
+ entry.fullyQualifiedEntityName.toDocId,
+ DocRevision.empty,
+ entry.revision != DocRevision.empty)
+ .map { actionMetaData =>
+ FPCPoolBalancer
+ .schedule(schedulerEndpoints.values.toIndexedSeq)
+ .map { scheduler =>
+ createQueue(
+ entry.invocationNamespace,
+ actionMetaData,
+ entry.fullyQualifiedEntityName,
+ entry.revision,
+ scheduler)
+ }
+ .getOrElse {
+ logging.error(
+ this,
+ s"Failed to recreate queue for
${entry.fullyQualifiedEntityName}, no scheduler endpoint available")
+ }
+ }
+ }
+ }
+ case `schedulerKey` =>
+ SchedulerStates
+ .parse(value)
+ .map { state =>
+ logging.info(this, s"remove scheduler endpoint $state")
+ schedulerEndpoints.remove(state.endpoints)
+ }
+ .recover {
+ case t =>
+ logging.error(this, s"Unexpected error$t")
+ }
+
+ case `throttlingKey` =>
+ throttlers.remove(key)
+ case _ =>
+ }
+
+ case WatchEndpointInserted(watchKey, key, value, true) =>
+ watchKey match {
+ case `queueKey` =>
+ //ignore parse follower value, just parse leader value's normal
value, e.g. on special case, leader key's value may be Removing
+ if (key.contains("leader") && value.contains("host")) {
+ SchedulerEndpoints
+ .parse(value)
+ .map { endpoints =>
+ queueEndpoints.update(key, endpoints)
+ }
+ .recover {
+ case t =>
+ logging.error(this, s"Unexpected error$t")
+ }
+ }
+ case `schedulerKey` =>
+ SchedulerStates
+ .parse(value)
+ .map { state =>
+ // if this is a new scheduler, warm up it
+ if (!schedulerEndpoints.contains(state.endpoints))
+ warmUpScheduler(key, state.endpoints)
+ schedulerEndpoints.update(state.endpoints, state)
+ }
+ .recover {
+ case t =>
+ logging.error(this, s"Unexpected error$t")
+ }
+ case `throttlingKey` =>
+ val throttled = Try {
+ value.toBoolean
+ }.getOrElse(false)
+ throttlers.update(key, throttled)
+ case _ =>
+ }
+ }
+ }))
+
+ private[loadBalancer] def getSchedulerEndpoint() = {
+ schedulerEndpoints
+ }
+
+ private val warmUpQueueCreationRequest =
+ ExecManifest.runtimesManifest
+ .resolveDefaultRuntime("nodejs:default")
+ .map { manifest =>
+ val metadata = ExecutableWhiskActionMetaData(
+ WarmUp.warmUpAction.path,
+ WarmUp.warmUpAction.name,
+ CodeExecMetaDataAsString(manifest, false, entryPoint = None))
+ CreateQueue(
+ WarmUp.warmUpActionIdentity.namespace.name.asString,
+ WarmUp.warmUpAction,
+ DocRevision.empty,
+ metadata.toWhiskAction)
+ }
+
+ private def warmUpScheduler(schedulerName: String, scheduler:
SchedulerEndpoints): Unit = {
+ implicit val transId = TransactionId.warmUp
+ logging.info(this, s"Warm up scheduler $scheduler")
+ sendActivationToKafka(
+ messageProducer,
+ WarmUp.warmUpActivation(controllerInstance),
+ Controller.topicPrefix + schedulerName.replace(s"$clusterName/",
"").replace("/", "")) // warm up kafka
+
+ warmUpQueueCreationRequest.foreach { request =>
+
scheduler.getRemoteRef(QueueManager.actorName).ask(request).mapTo[CreateQueueResponse].onComplete
{
+ case _ => logging.info(this, s"Warmed up scheduler $scheduler")
+ }
+ }
+ }
+
+ protected def loadSchedulerEndpoint(): Unit = {
+ etcdClient
+ .getPrefix(SchedulerKeys.prefix)
+ .map { res =>
+ res.getKvsList.asScala.map { kv =>
+ val schedulerStates: String = kv.getValue
+ SchedulerStates
+ .parse(schedulerStates)
+ .map { state =>
+ // if this is a new scheduler, warm up it
+ if (!schedulerEndpoints.contains(state.endpoints))
+ warmUpScheduler(kv.getKey, state.endpoints)
+ schedulerEndpoints.update(state.endpoints, state)
+ }
+ .recover {
+ case t =>
+ logging.error(this, s"Unexpected error$t")
+ }
+ }
+ }
+ }
+
+ loadSchedulerEndpoint()
+
+ override def close(): Unit = {
+ watchedKeys.foreach { key =>
+ watcherService ! UnwatchEndpoint(key, true, watcherName)
+ }
+ activationFeed ! GracefulShutdown
+ }
+
+ /**
+ * Returns a message indicating the health of the containers and/or
container pool in general.
+ *
+ * @return a Future[IndexedSeq[InvokerHealth]] representing the health of
the pools managed by the loadbalancer.
+ **/
+ override def invokerHealth(): Future[IndexedSeq[InvokerHealth]] = {
+ etcdClient.getPrefix(s"${InvokerKeys.prefix}/").map { res =>
+ val healthsFromEtcd = res.getKvsList.asScala.map { kv =>
+ val (memory, busyMemory, status, tags, dedicatedNamespaces) =
InvokerResourceMessage
+ .parse(kv.getValue.toString(StandardCharsets.UTF_8))
+ .map { resourceMessage =>
+ val status = resourceMessage.status match {
+ case Healthy.asString => Healthy
+ case Unhealthy.asString => Unhealthy
+ case Offline.asString => Offline
+ }
+ (
+ resourceMessage.freeMemory.MB,
+ resourceMessage.busyMemory.MB,
+ status,
+ resourceMessage.tags,
+ resourceMessage.dedicatedNamespaces)
+ }
+ .get
+ val temporalId =
InvokerKeys.getInstanceId(kv.getKey.toString(StandardCharsets.UTF_8))
+ val invoker = temporalId.copy(
+ userMemory = memory,
+ busyMemory = Some(busyMemory),
+ tags = tags,
+ dedicatedNamespaces = dedicatedNamespaces)
+
+ new InvokerHealth(invoker, status)
+ }.toIndexedSeq
+ val missingHealths =
+ if (healthsFromEtcd.isEmpty) Set.empty[InvokerHealth]
+ else
+ ((0 to healthsFromEtcd.maxBy(_.id.toInt).id.toInt).toSet --
healthsFromEtcd.map(_.id.toInt))
+ .map(id => new InvokerHealth(InvokerInstanceId(id,
Some(id.toString), userMemory = 0 MB), Offline))
+ (healthsFromEtcd ++ missingHealths) sortBy (_.id.toInt)
+ }
+ }
+
+ /** Gets the number of in-flight activations for a specific user. */
+ override def activeActivationsFor(namespace: UUID): Future[Int] =
+
Future.successful(activationsPerNamespace.get(namespace).map(_.intValue()).getOrElse(0))
+
+ /** Gets the number of in-flight activations for a specific controller. */
+ override def activeActivationsByController(controller: String): Future[Int] =
+
Future.successful(activationsPerController.get(ControllerInstanceId(controller)).map(_.intValue()).getOrElse(0))
+
+ /** Gets the in-flight activations */
+ override def activeActivationsByController: Future[List[ActivationId]] =
+ Future.successful(activationSlots.keySet.toList)
+
+ /** Gets the number of in-flight activations for a specific invoker. */
+ override def activeActivationsByInvoker(invoker: String): Future[Int] =
Future.successful(0)
+
+ /** Gets the number of in-flight activations in the system. */
+ override def totalActiveActivations: Future[Int] =
Future.successful(totalActivations.intValue())
+
+ /** Gets the throttling for given action. */
+ override def checkThrottle(namespace: EntityPath, fqn: String): Boolean = {
+
+ /**
+ * Note! The throttle key is assumed to exist unconditionally and is not
limited to throttle if not present.
+ *
+ * Action Throttle true -> 429
+ * false -> Pass (wait 429, Container already exist)
+ * not exist -> Namespace Throttled true -> 429
(Cannot more create container)
+ * false -> Pass
+ * not exist -> Pass
+ */
+ throttlers.getOrElse(
+ ThrottlingKeys.action(namespace.namespace, fqn),
+ throttlers.getOrElse(ThrottlingKeys.namespace(namespace.root), false))
+ }
+}
+
+object FPCPoolBalancer extends LoadBalancerProvider {
+
+ override def instance(whiskConfig: WhiskConfig, instance:
ControllerInstanceId)(implicit actorSystem: ActorSystem,
+
logging: Logging): LoadBalancer = {
+
+ implicit val exe: ExecutionContextExecutor = actorSystem.dispatcher
+ val activeAckTopic =
s"${Controller.topicPrefix}completed${instance.asString}"
+ val maxActiveAcksPerPoll = 128
+ val activeAckPollDuration = 1.second
+
+ val feedFactory = new FeedFactory {
+ def createFeed(f: ActorRefFactory, provider: MessagingProvider, acker:
Array[Byte] => Future[Unit]): ActorRef = {
+ f.actorOf(Props {
+ new MessageFeed(
+ "activeack",
+ logging,
+ provider.getConsumer(whiskConfig, activeAckTopic, activeAckTopic,
maxPeek = maxActiveAcksPerPoll),
+ maxActiveAcksPerPoll,
+ activeAckPollDuration,
+ acker)
+ })
+ }
+ }
+
+ val etcd = EtcdClient(loadConfigOrThrow[EtcdConfig](ConfigKeys.etcd).hosts)
+
+ new FPCPoolBalancer(whiskConfig, instance, etcd, feedFactory)
+ }
+
+ def requiredProperties: Map[String, String] = WhiskConfig.kafkaHosts
+
+ // TODO modularize rng algorithm
+ /**
+ * The rng algorithm is responsible for the invoker distribution, and the
better the distribution, the smaller the number of rescheduling.
+ *
+ */
+ def rng(mod: Int): Int = ThreadLocalRandom.current().nextInt(mod)
+
+ /**
+ * Assign an invoker to a message
+ *
+ * @param invokers Invoker pool
+ * @param minMemory Minimum memory for all invokers
+ * @return Assigned an invoker
+ */
+ // TODO add input/output example
+ def schedule(invokers: IndexedSeq[InvokerHealth], minMemory: ByteSize):
Option[InvokerInstanceId] = {
Review comment:
Oh.. this should be deleted.
This is only required for our downstream to send a message to debugging
invokers.
##########
File path:
core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
##########
@@ -0,0 +1,727 @@
+package org.apache.openwhisk.core.loadBalancer
+
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.ThreadLocalRandom
+import java.util.concurrent.atomic.LongAdder
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Cancellable,
Props}
+import akka.event.Logging.InfoLevel
+import akka.pattern.ask
+import akka.util.Timeout
+import org.apache.kafka.clients.producer.RecordMetadata
+import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy}
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.controller.Controller
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.etcd.EtcdKV.{InvokerKeys, QueueKeys,
SchedulerKeys, ThrottlingKeys}
+import org.apache.openwhisk.core.etcd.EtcdType._
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig}
+import org.apache.openwhisk.core.scheduler.queue.{CreateQueue,
CreateQueueResponse, QueueManager}
+import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints,
SchedulerStates}
+import org.apache.openwhisk.core.service._
+import org.apache.openwhisk.core.{ConfigKeys, WarmUp, WhiskConfig}
+import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.utils.retry
+import pureconfig._
+import pureconfig.generic.auto._
+
+import scala.collection.JavaConverters._
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future,
Promise}
+import scala.language.postfixOps
+import scala.util.{Failure, Random, Success, Try}
+
+class FPCPoolBalancer(config: WhiskConfig,
+ controllerInstance: ControllerInstanceId,
+ etcdClient: EtcdClient,
+ private val feedFactory: FeedFactory,
+ lbConfig: ShardingContainerPoolBalancerConfig =
+
loadConfigOrThrow[ShardingContainerPoolBalancerConfig](ConfigKeys.loadbalancer),
+ private val messagingProvider: MessagingProvider =
SpiLoader.get[MessagingProvider])(
+ implicit val actorSystem: ActorSystem,
+ logging: Logging)
+ extends LoadBalancer {
+
+ private implicit val executionContext: ExecutionContext =
actorSystem.dispatcher
+ private implicit val requestTimeout: Timeout = Timeout(5.seconds)
+
+ private val entityStore = WhiskEntityStore.datastore()
+
+ private val clusterName =
loadConfigOrThrow[String](ConfigKeys.whiskClusterName)
+
+ /** key: SchedulerEndpoints, value: SchedulerStates */
+ private val schedulerEndpoints = TrieMap[SchedulerEndpoints,
SchedulerStates]()
+ private val messageProducer = messagingProvider.getProducer(config,
Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
+
+ private val watcherName = "distributed-pool-balancer"
+ val watcherService: ActorRef =
actorSystem.actorOf(WatcherService.props(etcdClient))
+
+ /** State related to invocations and throttling */
+ protected[loadBalancer] val activationSlots = TrieMap[ActivationId,
DistributedActivationEntry]()
+ protected[loadBalancer] val activationPromises =
+ TrieMap[ActivationId, Promise[Either[ActivationId, WhiskActivation]]]()
+
+ /** key: queue/${invocationNs}/ns/action/leader, value: SchedulerEndpoints */
+ private val queueEndpoints = TrieMap[String, SchedulerEndpoints]()
+
+ private val activationsPerNamespace = TrieMap[UUID, LongAdder]()
+ private val activationsPerController = TrieMap[ControllerInstanceId,
LongAdder]()
+ private val totalActivations = new LongAdder()
+ private val totalActivationMemory = new LongAdder()
+ private val throttlers = TrieMap[String, Boolean]()
+
+ /**
+ * Publishes activation message on internal bus for an invoker to pick up.
+ *
+ * @param action the action to invoke
+ * @param msg the activation message to publish on an invoker topic
+ * @param transid the transaction id for the request
+ * @return result a nested Future the outer indicating completion of
publishing and
+ * the inner the completion of the action (i.e., the result)
+ * if it is ready before timeout (Right) otherwise the activation id
(Left).
+ * The future is guaranteed to complete within the declared action
time limit
+ * plus a grace period (see activeAckTimeoutGrace).
+ */
+ override def publish(action: ExecutableWhiskActionMetaData, msg:
ActivationMessage)(
+ implicit transid: TransactionId): Future[Future[Either[ActivationId,
WhiskActivation]]] = {
+
+ val topicBaseName = if (schedulerEndpoints.isEmpty) {
+ logging.error(
+ this,
+ s"Failed to invoke action ${action.fullyQualifiedName(false)}, error:
no scheduler endpoint available")
+ Future.failed(LoadBalancerException("No scheduler endpoint available"))
+ } else {
+ val invocationNamespace = msg.user.namespace.name.asString
+ val key = QueueKeys.queue(invocationNamespace,
action.fullyQualifiedName(false), true)
+
+ queueEndpoints.get(key) match {
+ case Some(endPoint) =>
+ Future.successful(
+ schedulerEndpoints.getOrElse(endPoint,
Random.shuffle(schedulerEndpoints.toList).head._2).sid.toString)
+ case None =>
+ etcdClient
+ .get(key)
+ .map { res =>
+ res.getKvsList.asScala.headOption map { kv =>
+ val endPoint: String = kv.getValue
+ SchedulerEndpoints
+ .parse(endPoint)
+ .map { endPoint =>
+ queueEndpoints.update(kv.getKey, endPoint)
+ Some(
+ schedulerEndpoints
+ .getOrElse(endPoint,
Random.shuffle(schedulerEndpoints.toList).head._2)
+ .sid
+ .toString)
+ }
+ .getOrElse {
+
FPCPoolBalancer.schedule(schedulerEndpoints.values.toIndexedSeq).map {
scheduler =>
Review comment:
I looked into the code again.
Actually, there are two different logic, one is to select the right
scheduler, and the other is to create a queue to the given scheduler. Since the
`createQueue` method is using recursion, it would be better to separate it as a
standalone method.
So if we create a new method for this kind of duplication, that would be
just a wrapper method with this code as-is. The wrapper method would have just
two method calls, `schedule` and `createQueue` without any other logic and its
parameters would be just the sum of the two underlying methods' parameters.
I am inclined to keep the original code, HDYT?
##########
File path:
core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
##########
@@ -0,0 +1,727 @@
+package org.apache.openwhisk.core.loadBalancer
+
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.ThreadLocalRandom
+import java.util.concurrent.atomic.LongAdder
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Cancellable,
Props}
+import akka.event.Logging.InfoLevel
+import akka.pattern.ask
+import akka.util.Timeout
+import org.apache.kafka.clients.producer.RecordMetadata
+import org.apache.openwhisk.common.InvokerState.{Healthy, Offline, Unhealthy}
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.controller.Controller
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.etcd.EtcdKV.{InvokerKeys, QueueKeys,
SchedulerKeys, ThrottlingKeys}
+import org.apache.openwhisk.core.etcd.EtcdType._
+import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig}
+import org.apache.openwhisk.core.scheduler.queue.{CreateQueue,
CreateQueueResponse, QueueManager}
+import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints,
SchedulerStates}
+import org.apache.openwhisk.core.service._
+import org.apache.openwhisk.core.{ConfigKeys, WarmUp, WhiskConfig}
+import org.apache.openwhisk.spi.SpiLoader
+import org.apache.openwhisk.utils.retry
+import pureconfig._
+import pureconfig.generic.auto._
+
+import scala.collection.JavaConverters._
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future,
Promise}
+import scala.language.postfixOps
+import scala.util.{Failure, Random, Success, Try}
+
+class FPCPoolBalancer(config: WhiskConfig,
+ controllerInstance: ControllerInstanceId,
+ etcdClient: EtcdClient,
+ private val feedFactory: FeedFactory,
+ lbConfig: ShardingContainerPoolBalancerConfig =
+
loadConfigOrThrow[ShardingContainerPoolBalancerConfig](ConfigKeys.loadbalancer),
+ private val messagingProvider: MessagingProvider =
SpiLoader.get[MessagingProvider])(
+ implicit val actorSystem: ActorSystem,
+ logging: Logging)
+ extends LoadBalancer {
+
+ private implicit val executionContext: ExecutionContext =
actorSystem.dispatcher
+ private implicit val requestTimeout: Timeout = Timeout(5.seconds)
+
+ private val entityStore = WhiskEntityStore.datastore()
+
+ private val clusterName =
loadConfigOrThrow[String](ConfigKeys.whiskClusterName)
+
+ /** key: SchedulerEndpoints, value: SchedulerStates */
+ private val schedulerEndpoints = TrieMap[SchedulerEndpoints,
SchedulerStates]()
+ private val messageProducer = messagingProvider.getProducer(config,
Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
+
+ private val watcherName = "distributed-pool-balancer"
+ val watcherService: ActorRef =
actorSystem.actorOf(WatcherService.props(etcdClient))
+
+ /** State related to invocations and throttling */
+ protected[loadBalancer] val activationSlots = TrieMap[ActivationId,
DistributedActivationEntry]()
+ protected[loadBalancer] val activationPromises =
+ TrieMap[ActivationId, Promise[Either[ActivationId, WhiskActivation]]]()
+
+ /** key: queue/${invocationNs}/ns/action/leader, value: SchedulerEndpoints */
+ private val queueEndpoints = TrieMap[String, SchedulerEndpoints]()
+
+ private val activationsPerNamespace = TrieMap[UUID, LongAdder]()
+ private val activationsPerController = TrieMap[ControllerInstanceId,
LongAdder]()
+ private val totalActivations = new LongAdder()
+ private val totalActivationMemory = new LongAdder()
+ private val throttlers = TrieMap[String, Boolean]()
+
+ /**
+ * Publishes activation message on internal bus for an invoker to pick up.
+ *
+ * @param action the action to invoke
+ * @param msg the activation message to publish on an invoker topic
+ * @param transid the transaction id for the request
+ * @return result a nested Future the outer indicating completion of
publishing and
+ * the inner the completion of the action (i.e., the result)
+ * if it is ready before timeout (Right) otherwise the activation id
(Left).
+ * The future is guaranteed to complete within the declared action
time limit
+ * plus a grace period (see activeAckTimeoutGrace).
+ */
+ override def publish(action: ExecutableWhiskActionMetaData, msg:
ActivationMessage)(
+ implicit transid: TransactionId): Future[Future[Either[ActivationId,
WhiskActivation]]] = {
+
+ val topicBaseName = if (schedulerEndpoints.isEmpty) {
+ logging.error(
+ this,
+ s"Failed to invoke action ${action.fullyQualifiedName(false)}, error:
no scheduler endpoint available")
+ Future.failed(LoadBalancerException("No scheduler endpoint available"))
+ } else {
+ val invocationNamespace = msg.user.namespace.name.asString
+ val key = QueueKeys.queue(invocationNamespace,
action.fullyQualifiedName(false), true)
+
+ queueEndpoints.get(key) match {
+ case Some(endPoint) =>
+ Future.successful(
+ schedulerEndpoints.getOrElse(endPoint,
Random.shuffle(schedulerEndpoints.toList).head._2).sid.toString)
+ case None =>
+ etcdClient
+ .get(key)
+ .map { res =>
+ res.getKvsList.asScala.headOption map { kv =>
+ val endPoint: String = kv.getValue
+ SchedulerEndpoints
+ .parse(endPoint)
+ .map { endPoint =>
+ queueEndpoints.update(kv.getKey, endPoint)
+ Some(
+ schedulerEndpoints
+ .getOrElse(endPoint,
Random.shuffle(schedulerEndpoints.toList).head._2)
+ .sid
+ .toString)
+ }
+ .getOrElse {
+
FPCPoolBalancer.schedule(schedulerEndpoints.values.toIndexedSeq).map {
scheduler =>
+ createQueue(invocationNamespace, action.toWhiskAction,
msg.action, msg.revision, scheduler)
+ scheduler.sid.toString
+ }
+ }
+ } getOrElse {
+
FPCPoolBalancer.schedule(schedulerEndpoints.values.toIndexedSeq).map {
scheduler =>
+ createQueue(invocationNamespace, action.toWhiskAction,
msg.action, msg.revision, scheduler)
+ scheduler.sid.toString
+ }
+ }
+ }
+ .map { _.get }
+ .recoverWith {
+ case _ =>
+ Future.failed(LoadBalancerException("No scheduler endpoint
available"))
+ }
+ }
+ }
+ topicBaseName.flatMap { baseName =>
+ val topicName = Controller.topicPrefix + baseName
+ val activationResult = setupActivation(msg, action)
+ sendActivationToKafka(messageProducer, msg, topicName).map(_ =>
activationResult)
+ }
+ }
+
+ private def createQueue(
+ invocationNamespace: String,
+ actionMetaData: WhiskActionMetaData,
+ fullyQualifiedEntityName: FullyQualifiedEntityName,
+ revision: DocRevision,
+ scheduler: SchedulerStates,
+ retryCount: Int = 5,
+ excludeSchedulers: Set[SchedulerInstanceId] = Set.empty)(implicit transid:
TransactionId): Unit = {
+ if (retryCount >= 0)
+ scheduler
+ .getRemoteRef(QueueManager.actorName)
+ .ask(CreateQueue(invocationNamespace,
fullyQualifiedEntityName.copy(binding = None), revision, actionMetaData))
+ .mapTo[CreateQueueResponse]
+ .onComplete {
+ case Success(_) =>
+ logging.info(
+ this,
+ s"Create queue successfully for
$invocationNamespace/$fullyQualifiedEntityName on ${scheduler.sid}")
+ case Failure(t) =>
+ logging.error(
+ this,
+ s"failed to get response from ${scheduler}, error is $t, will
retry for ${retryCount} times")
+ // try another scheduler
+ FPCPoolBalancer
+ .schedule(schedulerEndpoints.values.toIndexedSeq,
excludeSchedulers + scheduler.sid)
+ .map { newScheduler =>
+ createQueue(
+ invocationNamespace,
+ actionMetaData,
+ fullyQualifiedEntityName,
+ revision,
+ newScheduler,
+ retryCount - 1,
+ excludeSchedulers + scheduler.sid)
+ }
+ .getOrElse {
+ logging.error(
+ this,
+ s"failed to create queue for
$invocationNamespace/$fullyQualifiedEntityName, no scheduler endpoint available
related activations may failed")
+ }
+ } else
+ logging.error(
+ this,
+ s"failed to create queue for
$invocationNamespace/$fullyQualifiedEntityName, related activations may failed")
+ }
+
+ /**
+ * 2. Update local state with the to be executed activation.
+ *
+ * All activations are tracked in the activationSlots map. Additionally,
blocking invokes
+ * are tracked in the activation results map. When a result is received via
activeack, it
+ * will cause the result to be forwarded to the caller waiting on the
result, and cancel
+ * the DB poll which is also trying to do the same.
+ */
+ private def setupActivation(msg: ActivationMessage,
+ action: ExecutableWhiskActionMetaData):
Future[Either[ActivationId, WhiskActivation]] = {
+ val isBlackboxInvocation = action.exec.pull
+
+ totalActivations.increment()
+ totalActivationMemory.add(action.limits.memory.megabytes)
+ activationsPerNamespace.getOrElseUpdate(msg.user.namespace.uuid, new
LongAdder()).increment()
+ activationsPerController.getOrElseUpdate(controllerInstance, new
LongAdder()).increment()
+
+ // Timeout is a multiple of the configured maximum action duration. The
minimum timeout is the configured standard
+ // value for action durations to avoid too tight timeouts.
+ // Timeouts in general are diluted by a configurable factor. In essence
this factor controls how much slack you want
+ // to allow in your topics before you start reporting failed activations.
+ val timeout = (action.limits.timeout.duration.max(TimeLimit.STD_DURATION)
* lbConfig.timeoutFactor) + 1.minute
+
+ val resultPromise = if (msg.blocking) {
+ activationPromises.getOrElseUpdate(msg.activationId,
Promise[Either[ActivationId, WhiskActivation]]()).future
+ } else Future.successful(Left(msg.activationId))
+
+ // Install a timeout handler for the catastrophic case where an active ack
is not received at all
+ // (because say an invoker is down completely, or the connection to the
message bus is disrupted) or when
+ // the active ack is significantly delayed (possibly dues to long queues
but the subject should not be penalized);
+ // in this case, if the activation handler is still registered, remove it
and update the books.
+ activationSlots.getOrElseUpdate(
+ msg.activationId, {
+ val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) {
+ processCompletion(msg.activationId, msg.transid, forced = true,
isSystemError = false)
+ }
+
+ // please note: timeoutHandler.cancel must be called on all
non-timeout paths, e.g. Success
+ DistributedActivationEntry(
+ msg.activationId,
+ msg.user.namespace.uuid,
+ msg.user.namespace.name.asString,
+ msg.revision,
+ msg.transid,
+ action.limits.memory.megabytes.MB,
+ action.limits.concurrency.maxConcurrent,
+ action.fullyQualifiedName(true),
+ timeoutHandler,
+ isBlackboxInvocation,
+ msg.blocking,
+ controllerInstance)
+ })
+
+ resultPromise
+ }
+
+ /** 3. Send the activation to the kafka */
+ private def sendActivationToKafka(producer: MessageProducer,
+ msg: ActivationMessage,
+ topic: String): Future[RecordMetadata] = {
+ implicit val transid: TransactionId = msg.transid
+
+
MetricEmitter.emitCounterMetric(LoggingMarkers.LOADBALANCER_ACTIVATION_START)
+ val start = transid.started(this, LoggingMarkers.CONTROLLER_KAFKA)
+
+ producer.send(topic, msg).andThen {
+ case Success(status) =>
+ transid.finished(
+ this,
+ start,
+ s"posted to
${status.topic()}[${status.partition()}][${status.offset()}]",
+ logLevel = InfoLevel)
+ case Failure(_) => transid.failed(this, start, s"error on posting to
topic $topic")
+ }
+ }
+
+ /**
+ * Subscribes to active acks (completion messages from the invokers), and
+ * registers a handler for received active acks from invokers.
+ */
+ private val activationFeed: ActorRef =
+ feedFactory.createFeed(actorSystem, messagingProvider,
processAcknowledgement)
+
+ /** 4. Get the active-ack message and parse it */
+ protected[loadBalancer] def processAcknowledgement(bytes: Array[Byte]):
Future[Unit] = Future {
+ val raw = new String(bytes, StandardCharsets.UTF_8)
+ AcknowledegmentMessage.parse(raw) match {
+ case Success(acknowledegment) =>
+ acknowledegment.isSlotFree.foreach { invoker =>
+ processCompletion(
+ acknowledegment.activationId,
+ acknowledegment.transid,
+ forced = false,
+ isSystemError = acknowledegment.isSystemError.getOrElse(false))
+ }
+
+ acknowledegment.result.foreach { response =>
+ processResult(acknowledegment.activationId, acknowledegment.transid,
response)
+ }
+
+ activationFeed ! MessageFeed.Processed
+ case Failure(t) =>
+ activationFeed ! MessageFeed.Processed
+ logging.error(this, s"failed processing message: $raw")
+
+ case _ =>
+ activationFeed ! MessageFeed.Processed
+ logging.error(this, s"Unexpected Acknowledgment message received by
loadbalancer: $raw")
+ }
+ }
+
+ private def renewTimeoutHandler(entry: DistributedActivationEntry,
+ msg: ActivationMessage,
+ isSystemError: Boolean): Unit = {
+ entry.timeoutHandler.cancel()
+
+ val timeout = (TimeLimit.MAX_DURATION * lbConfig.timeoutFactor) + 1.minute
+ val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) {
+ processCompletion(msg.activationId, msg.transid, forced = true,
isSystemError)
+ }
+ activationSlots.update(msg.activationId, entry.copy(timeoutHandler =
timeoutHandler))
+ }
+
+ /** 5. Process the result ack and return it to the user */
+ private def processResult(aid: ActivationId,
+ tid: TransactionId,
+ response: Either[ActivationId, WhiskActivation]):
Unit = {
+ // Resolve the promise to send the result back to the user.
+ // The activation will be removed from the activation slots later, when
the completion message
+ // is received (because the slot in the invoker is not yet free for new
activations).
+ activationPromises.remove(aid).foreach(_.trySuccess(response))
+ logging.info(this, s"received result ack for '$aid'")(tid)
+ }
+
+ private def deleteActivationSlot(aid: ActivationId, tid: TransactionId):
Option[DistributedActivationEntry] = {
+ activationSlots.remove(aid) match {
+ case option =>
+ if (activationSlots.contains(aid)) {
+ logging.warn(this, s"Failed to delete $aid from activation
slots")(tid)
+ throw new Exception(s"Failed to delete $aid from activation slots")
+ }
+ option
+ }
+ }
+
+ /** Process the completion ack and update the state */
+ protected[loadBalancer] def processCompletion(aid: ActivationId,
+ tid: TransactionId,
+ forced: Boolean,
+ isSystemError: Boolean): Unit
= {
+ implicit val transid = tid
+ activationSlots.remove(aid) match {
+ case Some(entry) =>
+ if (activationSlots.contains(aid))
+ Try { retry(deleteActivationSlot(aid, tid)) } recover {
+ case _ =>
+ logging.error(this, s"Failed to delete $aid from activation
slots")
+ }
+ totalActivations.decrement()
+ totalActivationMemory.add(entry.memory.toMB * (-1))
+ activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement())
+
activationsPerController.get(entry.controllerName).foreach(_.decrement())
+
+ if (!forced) {
+ entry.timeoutHandler.cancel()
+ // notice here that the activationPromises is not touched, because
the expectation is that
+ // the active ack is received as expected, and processing that
message removed the promise
+ // from the corresponding map
+ logging.info(this, s"received completion ack for '$aid', system
error=$isSystemError")(tid)
+ } else {
+ logging.error(this, s"Failed to invoke action ${aid.toString},
error: timeout waiting for the active ack")
+
+ // the entry has timed out; if the active ack is still around,
remove its entry also
+ // and complete the promise with a failure if necessary
+ activationPromises
+ .remove(aid)
+ .foreach(_.tryFailure(new Throwable("no completion or active ack
received yet")))
+ }
+
+ // Active acks that are received here are strictly from user actions -
health actions are not part of
+ // the load balancer's activation map. Inform the invoker pool
supervisor of the user action completion.
+ case None if !forced =>
+ // Received a completion ack that has already been taken out of the
state because of a timeout (forced ack).
+ // The result is ignored because a timeout has already been reported
to the invokerPool per the force.
+ // Logging this condition as a warning because the invoker processed
the activation and sent a completion
+ // message - but not in time.
+ logging.warn(this, s"received completion ack for '$aid' which has no
entry, system error=$isSystemError")(tid)
+ case None =>
+ // The entry has already been removed by a completion ack. This part
of the code is reached by the timeout and can
+ // happen if completion ack and timeout happen roughly at the same
time (the timeout was triggered before the completion
+ // ack canceled the timer). As the completion ack is already processed
we don't have to do anything here.
+ logging.debug(this, s"forced completion ack for '$aid' which has no
entry")(tid)
+ }
+ }
+
+ private val queueKey = QueueKeys.queuePrefix
+ private val schedulerKey = SchedulerKeys.prefix
+ private val throttlingKey = ThrottlingKeys.prefix
+ private val watchedKeys = Set(queueKey, schedulerKey, throttlingKey)
+
+ private val watcher = actorSystem.actorOf(Props(new Actor {
+ watchedKeys.foreach { key =>
+ watcherService ! WatchEndpoint(key, "", true, watcherName, Set(PutEvent,
DeleteEvent))
+ }
+
+ override def receive: Receive = {
+ case WatchEndpointRemoved(watchKey, key, value, true) =>
+ watchKey match {
+ case `queueKey` =>
+ if (key.contains("leader")) {
+ queueEndpoints.remove(key)
+ activationSlots.values
+ .find(entry =>
+ //the leader key's value is
queue/invocationNamespace/ns/pkg/act/leader
+ QueueKeys
+ .queue(entry.invocationNamespace,
entry.fullyQualifiedEntityName.copy(version = None), true) == key)
+ .foreach {
+ entry =>
+ implicit val transid = entry.transactionId
+ logging.warn(
+ this,
+ s"The $key is deleted from ETCD, but there are still
unhandled activations for this action, try to create a new queue")
Review comment:
When there is an activation in a queue, the queue is not deleted.
Unhandled activations here come from the possibility that while the queue is
deleted because of timeout(no more activation), still there can be new
activations incoming on the controller side.
Regarding the role of a controller, yes, now it is rather passive.
It just selects the right scheduler and send a queue creation request and
put activations to one of the scheduler's Kafka topic.
Still, activations can be delivered to a scheduler without a queue for the
given activations, then the scheduler looks for(with retries) the target
scheduler that has a queue and forwards activations to the scheduler.
(I hope we can remove Kafka from the critical path entirely in the future.
Then we are probably able to consolidate these two calls into one.)
So basically the situation here is supposed to rarely happen.
Once a queue is created, the activation will be forwarded to the proper
scheduler.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]