This is an automated email from the ASF dual-hosted git repository.

cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 8cd10bb  Send capacity, system overload metrics for managed and 
blackbox invokers separately. (#4219)
8cd10bb is described below

commit 8cd10bb09d64c1bc2638f05d1fb8dadb24b7f36e
Author: Su <[email protected]>
AuthorDate: Tue Jan 29 13:14:43 2019 +0100

    Send capacity, system overload metrics for managed and blackbox invokers 
separately. (#4219)
    
    To send metrics on system overload condition, capacity in flight v/s total 
and count of healthy, unhealthy, unresponsive and down invokers (managed and 
blackbox separately) in order to visualise it as graph on Metrics dashboard.
---
 .../org/apache/openwhisk/common/Logging.scala      |  22 ++++-
 .../ShardingContainerPoolBalancer.scala            | 103 +++++++++++++++++----
 .../test/ShardingContainerPoolBalancerTests.scala  |  52 ++++++++---
 3 files changed, 143 insertions(+), 34 deletions(-)

diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index c45cb3f..1eb0083 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -335,14 +335,15 @@ object LoggingMarkers {
   val CONTROLLER_ACTIVATION_BLOCKING_DATABASE_RETRIEVAL =
     LogMarkerToken(controller, "blockingActivationDatabaseRetrieval", count)
 
-  // Time that is needed load balance the activation
+  // Time that is needed to load balance the activation
   val CONTROLLER_LOADBALANCER = LogMarkerToken(controller, loadbalancer, start)
 
   // Time that is needed to produce message in kafka
   val CONTROLLER_KAFKA = LogMarkerToken(controller, kafka, start)
 
   // System overload and random invoker assignment
-  val SYSTEM_OVERLOAD = LogMarkerToken(controller, "systemOverload", count)
+  val MANAGED_SYSTEM_OVERLOAD = LogMarkerToken(controller, 
"managedInvokerSystemOverload", count)
+  val BLACKBOX_SYSTEM_OVERLOAD = LogMarkerToken(controller, 
"blackBoxInvokerSystemOverload", count)
   /*
    * Invoker related markers
    */
@@ -355,8 +356,8 @@ object LoggingMarkers {
 
   def LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance: 
ControllerInstanceId) =
     LogMarkerToken(loadbalancer + controllerInstance.asString, 
"activationsInflight", count)
-  def LOADBALANCER_MEMORY_INFLIGHT(controllerInstance: ControllerInstanceId) =
-    LogMarkerToken(loadbalancer + controllerInstance.asString, 
"memoryInflight", count)
+  def LOADBALANCER_MEMORY_INFLIGHT(controllerInstance: ControllerInstanceId, 
actionType: String) =
+    LogMarkerToken(loadbalancer + controllerInstance.asString, 
s"memory${actionType}Inflight", count)
 
   // Time that is needed to execute the action
   val INVOKER_ACTIVATION_RUN = LogMarkerToken(invoker, "activationRun", start)
@@ -383,6 +384,19 @@ object LoggingMarkers {
   val CONTAINER_CLIENT_RETRIES =
     LogMarkerToken(containerClient, "retries", count)
 
+  val INVOKER_TOTALMEM_BLACKBOX = LogMarkerToken(loadbalancer, 
"totalCapacityBlackBox", count)
+  val INVOKER_TOTALMEM_MANAGED = LogMarkerToken(loadbalancer, 
"totalCapacityManaged", count)
+
+  val HEALTHY_INVOKER_MANAGED = LogMarkerToken(loadbalancer, 
"totalHealthyInvokerManaged", count)
+  val UNHEALTHY_INVOKER_MANAGED = LogMarkerToken(loadbalancer, 
"totalUnhealthyInvokerManaged", count)
+  val UNRESPONSIVE_INVOKER_MANAGED = LogMarkerToken(loadbalancer, 
"totalUnresponsiveInvokerManaged", count)
+  val OFFLINE_INVOKER_MANAGED = LogMarkerToken(loadbalancer, 
"totalOfflineInvokerManaged", count)
+
+  val HEALTHY_INVOKER_BLACKBOX = LogMarkerToken(loadbalancer, 
"totalHealthyInvokerBlackBox", count)
+  val UNHEALTHY_INVOKER_BLACKBOX = LogMarkerToken(loadbalancer, 
"totalUnhealthyInvokerBlackBox", count)
+  val UNRESPONSIVE_INVOKER_BLACKBOX = LogMarkerToken(loadbalancer, 
"totalUnresponsiveInvokerBlackBox", count)
+  val OFFLINE_INVOKER_BLACKBOX = LogMarkerToken(loadbalancer, 
"totalOfflineInvokerBlackBox", count)
+
   // Kafka related markers
   def KAFKA_QUEUE(topic: String) = LogMarkerToken(kafka, topic, count)
   def KAFKA_MESSAGE_DELAY(topic: String) = LogMarkerToken(kafka, topic, start, 
Some("delay"))
diff --git 
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
 
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 4d24725..914b3ac 100644
--- 
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ 
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -38,6 +38,7 @@ import org.apache.openwhisk.core.WhiskConfig._
 import org.apache.openwhisk.core.connector._
 import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.loadBalancer.InvokerState.{Healthy, Offline, 
Unhealthy, Unresponsive}
 import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
 import org.apache.openwhisk.spi.SpiLoader
 
@@ -179,14 +180,63 @@ class ShardingContainerPoolBalancer(
     TrieMap[ActivationId, Promise[Either[ActivationId, WhiskActivation]]]()
   private val activationsPerNamespace = TrieMap[UUID, LongAdder]()
   private val totalActivations = new LongAdder()
-  private val totalActivationMemory = new LongAdder()
+  private val totalBlackBoxActivationMemory = new LongAdder()
+  private val totalManagedActivationMemory = new LongAdder()
 
   /** State needed for scheduling. */
   protected[loadBalancer] val schedulingState = 
ShardingContainerPoolBalancerState()(lbConfig)
 
   actorSystem.scheduler.schedule(0.seconds, 10.seconds) {
     
MetricEmitter.emitHistogramMetric(LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance),
 totalActivations.longValue)
-    
MetricEmitter.emitHistogramMetric(LOADBALANCER_MEMORY_INFLIGHT(controllerInstance),
 totalActivationMemory.longValue)
+    MetricEmitter.emitHistogramMetric(
+      LOADBALANCER_MEMORY_INFLIGHT(controllerInstance, ""),
+      totalBlackBoxActivationMemory.longValue + 
totalManagedActivationMemory.longValue)
+    MetricEmitter.emitHistogramMetric(
+      LOADBALANCER_MEMORY_INFLIGHT(controllerInstance, "Blackbox"),
+      totalBlackBoxActivationMemory.longValue)
+    MetricEmitter.emitHistogramMetric(
+      LOADBALANCER_MEMORY_INFLIGHT(controllerInstance, "Managed"),
+      totalManagedActivationMemory.longValue)
+    MetricEmitter.emitHistogramMetric(INVOKER_TOTALMEM_BLACKBOX, 
schedulingState.blackboxInvokers.foldLeft(0L) {
+      (total, curr) =>
+        if (curr.status.isUsable) {
+          curr.id.userMemory.toMB + total
+        } else {
+          total
+        }
+    })
+    MetricEmitter.emitHistogramMetric(INVOKER_TOTALMEM_MANAGED, 
schedulingState.managedInvokers.foldLeft(0L) {
+      (total, curr) =>
+        if (curr.status.isUsable) {
+          curr.id.userMemory.toMB + total
+        } else {
+          total
+        }
+    })
+    MetricEmitter.emitHistogramMetric(
+      HEALTHY_INVOKER_MANAGED,
+      schedulingState.managedInvokers.count(_.status == Healthy))
+    MetricEmitter.emitHistogramMetric(
+      UNHEALTHY_INVOKER_MANAGED,
+      schedulingState.managedInvokers.count(_.status == Unhealthy))
+    MetricEmitter.emitHistogramMetric(
+      UNRESPONSIVE_INVOKER_MANAGED,
+      schedulingState.managedInvokers.count(_.status == Unresponsive))
+    MetricEmitter.emitHistogramMetric(
+      OFFLINE_INVOKER_MANAGED,
+      schedulingState.managedInvokers.count(_.status == Offline))
+    MetricEmitter.emitHistogramMetric(
+      HEALTHY_INVOKER_BLACKBOX,
+      schedulingState.blackboxInvokers.count(_.status == Healthy))
+    MetricEmitter.emitHistogramMetric(
+      UNHEALTHY_INVOKER_BLACKBOX,
+      schedulingState.blackboxInvokers.count(_.status == Unhealthy))
+    MetricEmitter.emitHistogramMetric(
+      UNRESPONSIVE_INVOKER_BLACKBOX,
+      schedulingState.blackboxInvokers.count(_.status == Unresponsive))
+    MetricEmitter.emitHistogramMetric(
+      OFFLINE_INVOKER_BLACKBOX,
+      schedulingState.blackboxInvokers.count(_.status == Offline))
   }
 
   /**
@@ -244,7 +294,6 @@ class ShardingContainerPoolBalancer(
 
     val isBlackboxInvocation = action.exec.pull
     val actionType = if (!isBlackboxInvocation) "managed" else "blackbox"
-
     val (invokersToUse, stepSizes) =
       if (!isBlackboxInvocation) (schedulingState.managedInvokers, 
schedulingState.managedStepSizes)
       else (schedulingState.blackboxInvokers, 
schedulingState.blackboxStepSizes)
@@ -252,7 +301,7 @@ class ShardingContainerPoolBalancer(
       val hash = 
ShardingContainerPoolBalancer.generateHash(msg.user.namespace.name, 
action.fullyQualifiedName(false))
       val homeInvoker = hash % invokersToUse.size
       val stepSize = stepSizes(hash % stepSizes.size)
-      val invoker = ShardingContainerPoolBalancer.schedule(
+      val invoker: Option[(InvokerInstanceId, Boolean)] = 
ShardingContainerPoolBalancer.schedule(
         action.limits.concurrency.maxConcurrent,
         action.fullyQualifiedName(true),
         invokersToUse,
@@ -260,7 +309,17 @@ class ShardingContainerPoolBalancer(
         action.limits.memory.megabytes,
         homeInvoker,
         stepSize)
-      invoker
+      invoker.foreach {
+        case (_, true) =>
+          val metric =
+            if (isBlackboxInvocation)
+              LoggingMarkers.BLACKBOX_SYSTEM_OVERLOAD
+            else
+              LoggingMarkers.MANAGED_SYSTEM_OVERLOAD
+          MetricEmitter.emitCounterMetric(metric)
+        case _ =>
+      }
+      invoker.map(_._1)
     } else {
       None
     }
@@ -298,7 +357,11 @@ class ShardingContainerPoolBalancer(
                               instance: InvokerInstanceId): 
Future[Either[ActivationId, WhiskActivation]] = {
 
     totalActivations.increment()
+    val isBlackboxInvocation = action.exec.pull
+    val totalActivationMemory =
+      if (isBlackboxInvocation) totalBlackBoxActivationMemory else 
totalManagedActivationMemory
     totalActivationMemory.add(action.limits.memory.megabytes)
+
     activationsPerNamespace.getOrElseUpdate(msg.user.namespace.uuid, new 
LongAdder()).increment()
 
     // Timeout is a multiple of the configured maximum action duration. The 
minimum timeout is the configured standard
@@ -329,7 +392,8 @@ class ShardingContainerPoolBalancer(
           action.limits.memory.megabytes.MB,
           action.limits.concurrency.maxConcurrent,
           action.fullyQualifiedName(true),
-          timeoutHandler)
+          timeoutHandler,
+          isBlackboxInvocation)
       })
 
     resultPromise
@@ -426,6 +490,8 @@ class ShardingContainerPoolBalancer(
     activationSlots.remove(aid) match {
       case Some(entry) =>
         totalActivations.decrement()
+        val totalActivationMemory =
+          if (entry.isBlackbox) totalBlackBoxActivationMemory else 
totalManagedActivationMemory
         totalActivationMemory.add(entry.memory.toMB * (-1))
         activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement())
         schedulingState.invokerSlots
@@ -555,21 +621,22 @@ object ShardingContainerPoolBalancer extends 
LoadBalancerProvider {
    * @return an invoker to schedule to or None of no invoker is available
    */
   @tailrec
-  def schedule(maxConcurrent: Int,
-               fqn: FullyQualifiedEntityName,
-               invokers: IndexedSeq[InvokerHealth],
-               dispatched: 
IndexedSeq[NestedSemaphore[FullyQualifiedEntityName]],
-               slots: Int,
-               index: Int,
-               step: Int,
-               stepsDone: Int = 0)(implicit logging: Logging, transId: 
TransactionId): Option[InvokerInstanceId] = {
+  def schedule(
+    maxConcurrent: Int,
+    fqn: FullyQualifiedEntityName,
+    invokers: IndexedSeq[InvokerHealth],
+    dispatched: IndexedSeq[NestedSemaphore[FullyQualifiedEntityName]],
+    slots: Int,
+    index: Int,
+    step: Int,
+    stepsDone: Int = 0)(implicit logging: Logging, transId: TransactionId): 
Option[(InvokerInstanceId, Boolean)] = {
     val numInvokers = invokers.size
 
     if (numInvokers > 0) {
       val invoker = invokers(index)
       //test this invoker - if this action supports concurrency, use the 
scheduleConcurrent function
       if (invoker.status.isUsable && 
dispatched(invoker.id.toInt).tryAcquireConcurrent(fqn, maxConcurrent, slots)) {
-        Some(invoker.id)
+        Some(invoker.id, false)
       } else {
         // If we've gone through all invokers
         if (stepsDone == numInvokers + 1) {
@@ -579,8 +646,7 @@ object ShardingContainerPoolBalancer extends 
LoadBalancerProvider {
             val random = 
healthyInvokers(ThreadLocalRandom.current().nextInt(healthyInvokers.size)).id
             dispatched(random.toInt).forceAcquireConcurrent(fqn, 
maxConcurrent, slots)
             logging.warn(this, s"system is overloaded. Chose 
invoker${random.toInt} by random assignment.")
-            MetricEmitter.emitCounterMetric(LoggingMarkers.SYSTEM_OVERLOAD)
-            Some(random)
+            Some(random, true)
           } else {
             None
           }
@@ -745,4 +811,5 @@ case class ActivationEntry(id: ActivationId,
                            memory: ByteSize,
                            maxConcurrent: Int,
                            fullyQualifiedEntityName: FullyQualifiedEntityName,
-                           timeoutHandler: Cancellable)
+                           timeoutHandler: Cancellable,
+                           isBlackbox: Boolean)
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
index da166e7..8feb9a6 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
@@ -269,7 +269,8 @@ class ShardingContainerPoolBalancerTests
 
   it should "choose the first available invoker, jumping in stepSize steps, 
falling back to randomized scheduling once all invokers are full" in {
     val invokerCount = 3
-    val invokerSlots = semaphores(invokerCount + 3, 3) // needs to be offset 
by 3 as well
+    val slotPerInvoker = 3
+    val invokerSlots = semaphores(invokerCount + 3, slotPerInvoker) // needs 
to be offset by 3 as well
     val invokers = (0 until invokerCount).map(i => healthy(i + 3)) // offset 
by 3 to asset InstanceId is returned
 
     val expectedResult = Seq(3, 3, 3, 5, 5, 5, 4, 4, 4)
@@ -277,6 +278,7 @@ class ShardingContainerPoolBalancerTests
       ShardingContainerPoolBalancer
         .schedule(1, fqn, invokers, invokerSlots, 1, index = 0, step = 2)
         .get
+        ._1
         .toInt
     }
 
@@ -286,21 +288,23 @@ class ShardingContainerPoolBalancerTests
       ShardingContainerPoolBalancer
         .schedule(1, fqn, invokers, invokerSlots, 1, index = 0, step = 2)
         .get
-        .toInt
     }
 
-    bruteResult should contain allOf (3, 4, 5)
+    bruteResult.map(_._1.toInt) should contain allOf (3, 4, 5)
+    bruteResult.map(_._2) should contain only true
   }
 
   it should "ignore unhealthy or offline invokers" in {
     val invokers = IndexedSeq(healthy(0), unhealthy(1), offline(2), healthy(3))
-    val invokerSlots = semaphores(invokers.size, 3)
+    val slotPerInvoker = 3
+    val invokerSlots = semaphores(invokers.size, slotPerInvoker)
 
     val expectedResult = Seq(0, 0, 0, 3, 3, 3)
     val result = expectedResult.map { _ =>
       ShardingContainerPoolBalancer
         .schedule(1, fqn, invokers, invokerSlots, 1, index = 0, step = 1)
         .get
+        ._1
         .toInt
     }
 
@@ -308,11 +312,14 @@ class ShardingContainerPoolBalancerTests
 
     // more schedules will result in randomized invokers, but the unhealthy 
and offline invokers should not be part
     val bruteResult = (0 to 100).map { _ =>
-      ShardingContainerPoolBalancer.schedule(1, fqn, invokers, invokerSlots, 
1, index = 0, step = 1).get.toInt
+      ShardingContainerPoolBalancer
+        .schedule(1, fqn, invokers, invokerSlots, 1, index = 0, step = 1)
+        .get
     }
 
-    bruteResult should contain allOf (0, 3)
-    bruteResult should contain noneOf (1, 2)
+    bruteResult.map(_._1.toInt) should contain allOf (0, 3)
+    bruteResult.map(_._1.toInt) should contain noneOf (1, 2)
+    bruteResult.map(_._2) should contain only true
   }
 
   it should "only take invokers that have enough free slots" in {
@@ -322,15 +329,35 @@ class ShardingContainerPoolBalancerTests
     val invokers = (0 until invokerCount).map(i => healthy(i))
 
     // Ask for three slots -> First invoker should be used
-    ShardingContainerPoolBalancer.schedule(1, fqn, invokers, invokerSlots, 3, 
index = 0, step = 1).get.toInt shouldBe 0
+    ShardingContainerPoolBalancer
+      .schedule(1, fqn, invokers, invokerSlots, 3, index = 0, step = 1)
+      .get
+      ._1
+      .toInt shouldBe 0
     // Ask for two slots -> Second invoker should be used
-    ShardingContainerPoolBalancer.schedule(1, fqn, invokers, invokerSlots, 2, 
index = 0, step = 1).get.toInt shouldBe 1
+    ShardingContainerPoolBalancer
+      .schedule(1, fqn, invokers, invokerSlots, 2, index = 0, step = 1)
+      .get
+      ._1
+      .toInt shouldBe 1
     // Ask for 1 slot -> First invoker should be used
-    ShardingContainerPoolBalancer.schedule(1, fqn, invokers, invokerSlots, 1, 
index = 0, step = 1).get.toInt shouldBe 0
+    ShardingContainerPoolBalancer
+      .schedule(1, fqn, invokers, invokerSlots, 1, index = 0, step = 1)
+      .get
+      ._1
+      .toInt shouldBe 0
     // Ask for 4 slots -> Third invoker should be used
-    ShardingContainerPoolBalancer.schedule(1, fqn, invokers, invokerSlots, 4, 
index = 0, step = 1).get.toInt shouldBe 2
+    ShardingContainerPoolBalancer
+      .schedule(1, fqn, invokers, invokerSlots, 4, index = 0, step = 1)
+      .get
+      ._1
+      .toInt shouldBe 2
     // Ask for 2 slots -> Second invoker should be used
-    ShardingContainerPoolBalancer.schedule(1, fqn, invokers, invokerSlots, 2, 
index = 0, step = 1).get.toInt shouldBe 1
+    ShardingContainerPoolBalancer
+      .schedule(1, fqn, invokers, invokerSlots, 2, index = 0, step = 1)
+      .get
+      ._1
+      .toInt shouldBe 1
 
     invokerSlots.foreach(_.availablePermits shouldBe 0)
   }
@@ -367,6 +394,7 @@ class ShardingContainerPoolBalancerTests
           ShardingContainerPoolBalancer
             .schedule(concurrency, fqn, invokers, invokerSlots, 1, 0, 1)
             .get
+            ._1
             .toInt shouldBe i
           invokerSlots
             .lift(i)

Reply via email to