This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new a72ee3e  Fix several loadbalancer bugs. (#3451)
a72ee3e is described below

commit a72ee3e3b0f300dd71fc87fc4ad2be83b2acf8bd
Author: Christian Bickel <[email protected]>
AuthorDate: Wed Mar 21 13:32:10 2018 +0100

    Fix several loadbalancer bugs. (#3451)
    
    1. `padTo` (used when generating the list of semaphores for all invokers) 
does not recompute the value each time it adds an element to the underlying 
list. That resulted in **all** invokers-state being backed by the same 
Semaphore and thus not doing anything properly really.
    
    2. The stepsizes need to be calculated for the list of invokers they are 
used to step to. Therefore we need a seperate list of step-sizes for the 
managed and the blackbox invoker list.
    
    3. The assumption should be, that we freed our resources (and thus updated 
the state) **before** we return the request to the user. A minor change in code 
order is warranted here.
    
    4. In the overload case we need to take the index we want to schedule to 
out of the `healthyInvokers` list, rather than using the `random` value 
directly. The `healthyInvokers` list might be shorter/in a different order than 
the list of all underlying invokers.
    
    Furthermore, this adds a useful metric to determine (from the loadbalancers 
point-of-view) how many activations are currently running in the system. That 
can be used to determine overall system utilization.
    
    Co-authored-by: Markus Thömmes <[email protected]>
---
 .../src/main/scala/whisk/common/Logging.scala      | 11 ++--
 .../ShardingContainerPoolBalancer.scala            | 59 ++++++++++++++--------
 .../test/ShardingContainerPoolBalancerTests.scala  | 11 ++--
 3 files changed, 51 insertions(+), 30 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala 
b/common/scala/src/main/scala/whisk/common/Logging.scala
index 1780211..0e21fe3 100644
--- a/common/scala/src/main/scala/whisk/common/Logging.scala
+++ b/common/scala/src/main/scala/whisk/common/Logging.scala
@@ -18,15 +18,13 @@
 package whisk.common
 
 import java.io.PrintStream
-import java.time.Clock
-import java.time.Instant
-import java.time.ZoneId
+import java.time.{Clock, Instant, ZoneId}
 import java.time.format.DateTimeFormatter
 
-import akka.event.Logging.{DebugLevel, ErrorLevel, InfoLevel, WarningLevel}
-import akka.event.Logging.LogLevel
+import akka.event.Logging._
 import akka.event.LoggingAdapter
 import kamon.Kamon
+import whisk.core.entity.InstanceId
 
 trait Logging {
 
@@ -279,6 +277,9 @@ object LoggingMarkers {
   val LOADBALANCER_INVOKER_UNHEALTHY = LogMarkerToken(loadbalancer, 
"invokerUnhealthy", count)
   val LOADBALANCER_ACTIVATION_START = LogMarkerToken(loadbalancer, 
"activations", count)
 
+  def LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance: InstanceId) =
+    LogMarkerToken(loadbalancer + controllerInstance.toInt, 
"activationsInflight", count)
+
   // Time that is needed to execute the action
   val INVOKER_ACTIVATION_RUN = LogMarkerToken(invoker, "activationRun", start)
 
diff --git 
a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
 
b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 19b959a..1f5f398 100644
--- 
a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ 
b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -18,8 +18,8 @@
 package whisk.core.loadBalancer
 
 import java.nio.charset.StandardCharsets
-import java.util.concurrent.atomic.LongAdder
 import java.util.concurrent.ThreadLocalRandom
+import java.util.concurrent.atomic.LongAdder
 
 import akka.actor.{Actor, ActorSystem, Props}
 import akka.cluster.ClusterEvent._
@@ -28,6 +28,7 @@ import akka.event.Logging.InfoLevel
 import akka.stream.ActorMaterializer
 import org.apache.kafka.clients.producer.RecordMetadata
 import pureconfig._
+import whisk.common.LoggingMarkers._
 import whisk.common._
 import whisk.core.WhiskConfig._
 import whisk.core.connector._
@@ -71,6 +72,10 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, 
controllerInstance: Ins
   /** State needed for scheduling. */
   private val schedulingState = ShardingContainerPoolBalancerState()()
 
+  actorSystem.scheduler.schedule(0.seconds, 10.seconds) {
+    
MetricEmitter.emitHistogramMetric(LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance),
 totalActivations.longValue)
+  }
+
   /**
    * Monitors invoker supervision and the cluster to update the state 
sequentially
    *
@@ -123,11 +128,13 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, 
controllerInstance: Ins
   override def publish(action: ExecutableWhiskActionMetaData, msg: 
ActivationMessage)(
     implicit transid: TransactionId): Future[Future[Either[ActivationId, 
WhiskActivation]]] = {
 
-    val invokersToUse = if (!action.exec.pull) schedulingState.managedInvokers 
else schedulingState.blackboxInvokers
+    val (invokersToUse, stepSizes) =
+      if (!action.exec.pull) (schedulingState.managedInvokers, 
schedulingState.managedStepSizes)
+      else (schedulingState.blackboxInvokers, 
schedulingState.blackboxStepSizes)
     val chosen = if (invokersToUse.nonEmpty) {
       val hash = 
ShardingContainerPoolBalancer.generateHash(msg.user.namespace, 
action.fullyQualifiedName(false))
       val homeInvoker = hash % invokersToUse.size
-      val stepSize = schedulingState.stepSizes(hash % 
schedulingState.stepSizes.size)
+      val stepSize = stepSizes(hash % stepSizes.size)
       ShardingContainerPoolBalancer.schedule(invokersToUse, 
schedulingState.invokerSlots, homeInvoker, stepSize)
     } else {
       None
@@ -247,6 +254,10 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, 
controllerInstance: Ins
 
     activations.remove(aid) match {
       case Some(entry) =>
+        totalActivations.decrement()
+        activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement())
+        schedulingState.invokerSlots.lift(invoker.toInt).foreach(_.release())
+
         if (!forced) {
           entry.timeoutHandler.cancel()
           entry.promise.trySuccess(response)
@@ -254,10 +265,6 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, 
controllerInstance: Ins
           entry.promise.tryFailure(new Throwable("no active ack received"))
         }
 
-        totalActivations.decrement()
-        activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement())
-        schedulingState.invokerSlots.lift(invoker.toInt).foreach(_.release())
-
         logging.info(this, s"${if (!forced) "received" else "forced"} active 
ack for '$aid'")(tid)
         // Active acks that are received here are strictly from user actions - 
health actions are not part of
         // the load balancer's activation map. Inform the invoker pool 
supervisor of the user action completion.
@@ -317,7 +324,7 @@ object ShardingContainerPoolBalancer extends 
LoadBalancerProvider {
                dispatched: IndexedSeq[ForcableSemaphore],
                index: Int,
                step: Int,
-               stepsDone: Int = 0): Option[InstanceId] = {
+               stepsDone: Int = 0)(implicit logging: Logging): 
Option[InstanceId] = {
     val numInvokers = invokers.size
 
     if (numInvokers > 0) {
@@ -331,9 +338,10 @@ object ShardingContainerPoolBalancer extends 
LoadBalancerProvider {
           val healthyInvokers = invokers.filter(_.status == Healthy)
           if (healthyInvokers.nonEmpty) {
             // Choose a healthy invoker randomly
-            val random = 
ThreadLocalRandom.current().nextInt(healthyInvokers.size)
-            dispatched(random).forceAcquire()
-            Some(healthyInvokers(random).id)
+            val random = 
healthyInvokers(ThreadLocalRandom.current().nextInt(healthyInvokers.size)).id
+            dispatched(random.toInt).forceAcquire()
+            logging.warn(this, s"system is overloaded. Chose 
invoker${random.toInt} by random assignment.")
+            Some(random)
           } else {
             None
           }
@@ -354,14 +362,16 @@ object ShardingContainerPoolBalancer extends 
LoadBalancerProvider {
  * @param _invokers all of the known invokers in the system
  * @param _managedInvokers all invokers for managed runtimes
  * @param _blackboxInvokers all invokers for blackbox runtimes
- * @param _stepSizes the step-sizes possible for the current invoker count
+ * @param _managedStepSizes the step-sizes possible for the current managed 
invoker count
+ * @param _blackboxStepSizes the step-sizes possible for the current blackbox 
invoker count
  * @param _invokerSlots state of accessible slots of each invoker
  */
 case class ShardingContainerPoolBalancerState(
   private var _invokers: IndexedSeq[InvokerHealth] = 
IndexedSeq.empty[InvokerHealth],
   private var _managedInvokers: IndexedSeq[InvokerHealth] = 
IndexedSeq.empty[InvokerHealth],
   private var _blackboxInvokers: IndexedSeq[InvokerHealth] = 
IndexedSeq.empty[InvokerHealth],
-  private var _stepSizes: Seq[Int] = 
ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0),
+  private var _managedStepSizes: Seq[Int] = 
ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0),
+  private var _blackboxStepSizes: Seq[Int] = 
ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0),
   private var _invokerSlots: IndexedSeq[ForcableSemaphore] = 
IndexedSeq.empty[ForcableSemaphore],
   private var _clusterSize: Int = 1)(
   lbConfig: ShardingContainerPoolBalancerConfig =
@@ -377,7 +387,8 @@ case class ShardingContainerPoolBalancerState(
   def invokers: IndexedSeq[InvokerHealth] = _invokers
   def managedInvokers: IndexedSeq[InvokerHealth] = _managedInvokers
   def blackboxInvokers: IndexedSeq[InvokerHealth] = _blackboxInvokers
-  def stepSizes: Seq[Int] = _stepSizes
+  def managedStepSizes: Seq[Int] = _managedStepSizes
+  def blackboxStepSizes: Seq[Int] = _blackboxStepSizes
   def invokerSlots: IndexedSeq[ForcableSemaphore] = _invokerSlots
   def clusterSize: Int = _clusterSize
 
@@ -396,14 +407,6 @@ case class ShardingContainerPoolBalancerState(
     val oldSize = _invokers.size
     val newSize = newInvokers.size
 
-    if (oldSize != newSize) {
-      _stepSizes = ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(newSize)
-      if (oldSize < newSize) {
-        // Keeps the existing state..
-        _invokerSlots = _invokerSlots.padTo(newSize, new 
ForcableSemaphore(currentInvokerThreshold))
-      }
-    }
-
     val blackboxes = Math.max(1, (newSize.toDouble * blackboxFraction).toInt)
     val managed = Math.max(1, newSize - blackboxes)
 
@@ -411,6 +414,18 @@ case class ShardingContainerPoolBalancerState(
     _blackboxInvokers = _invokers.takeRight(blackboxes)
     _managedInvokers = _invokers.take(managed)
 
+    if (oldSize != newSize) {
+      _managedStepSizes = 
ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(managed)
+      _blackboxStepSizes = 
ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(blackboxes)
+
+      if (oldSize < newSize) {
+        // Keeps the existing state..
+        _invokerSlots = _invokerSlots ++ IndexedSeq.fill(newSize - oldSize) {
+          new ForcableSemaphore(currentInvokerThreshold)
+        }
+      }
+    }
+
     logging.info(
       this,
       s"loadbalancer invoker status updated. managedInvokers = $managed 
blackboxInvokers = $blackboxes")(
diff --git 
a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
 
b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
index ca8442f..50201a6 100644
--- 
a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
+++ 
b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
@@ -50,7 +50,8 @@ class ShardingContainerPoolBalancerTests extends FlatSpec 
with Matchers with Str
     state.blackboxInvokers shouldBe 'empty
     state.managedInvokers shouldBe 'empty
     state.invokerSlots shouldBe 'empty
-    state.stepSizes shouldBe Seq()
+    state.managedStepSizes shouldBe Seq()
+    state.blackboxStepSizes shouldBe Seq()
 
     // apply one update, verify everything is updated accordingly
     val update1 = IndexedSeq(healthy(0))
@@ -59,8 +60,10 @@ class ShardingContainerPoolBalancerTests extends FlatSpec 
with Matchers with Str
     state.invokers shouldBe update1
     state.blackboxInvokers shouldBe update1 // fallback to at least one
     state.managedInvokers shouldBe update1 // fallback to at least one
+    state.invokerSlots should have size update1.size
     state.invokerSlots.head.availablePermits shouldBe slots
-    state.stepSizes shouldBe Seq(1)
+    state.managedStepSizes shouldBe Seq(1)
+    state.blackboxStepSizes shouldBe Seq(1)
 
     // aquire a slot to alter invoker state
     state.invokerSlots.head.tryAcquire()
@@ -73,9 +76,11 @@ class ShardingContainerPoolBalancerTests extends FlatSpec 
with Matchers with Str
     state.invokers shouldBe update2
     state.managedInvokers shouldBe IndexedSeq(update2.head)
     state.blackboxInvokers shouldBe IndexedSeq(update2.last)
+    state.invokerSlots should have size update2.size
     state.invokerSlots.head.availablePermits shouldBe slots - 1
     state.invokerSlots(1).availablePermits shouldBe slots
-    state.stepSizes shouldBe Seq(1)
+    state.managedStepSizes shouldBe Seq(1)
+    state.blackboxStepSizes shouldBe Seq(1)
   }
 
   it should "update the cluster size, adjusting the invoker slots accordingly" 
in {

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to