This is an automated email from the ASF dual-hosted git repository.
cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 8cd10bb Send capacity, system overload metrics for managed and
blackbox invokers separately. (#4219)
8cd10bb is described below
commit 8cd10bb09d64c1bc2638f05d1fb8dadb24b7f36e
Author: Su <[email protected]>
AuthorDate: Tue Jan 29 13:14:43 2019 +0100
Send capacity, system overload metrics for managed and blackbox invokers
separately. (#4219)
To send metrics on system overload condition, capacity in flight v/s total
and count of healthy, unhealthy, unresponsive and down invokers (managed and
blackbox separately) in order to visualise it as graph on Metrics dashboard.
---
.../org/apache/openwhisk/common/Logging.scala | 22 ++++-
.../ShardingContainerPoolBalancer.scala | 103 +++++++++++++++++----
.../test/ShardingContainerPoolBalancerTests.scala | 52 ++++++++---
3 files changed, 143 insertions(+), 34 deletions(-)
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index c45cb3f..1eb0083 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -335,14 +335,15 @@ object LoggingMarkers {
val CONTROLLER_ACTIVATION_BLOCKING_DATABASE_RETRIEVAL =
LogMarkerToken(controller, "blockingActivationDatabaseRetrieval", count)
- // Time that is needed load balance the activation
+ // Time that is needed to load balance the activation
val CONTROLLER_LOADBALANCER = LogMarkerToken(controller, loadbalancer, start)
// Time that is needed to produce message in kafka
val CONTROLLER_KAFKA = LogMarkerToken(controller, kafka, start)
// System overload and random invoker assignment
- val SYSTEM_OVERLOAD = LogMarkerToken(controller, "systemOverload", count)
+ val MANAGED_SYSTEM_OVERLOAD = LogMarkerToken(controller,
"managedInvokerSystemOverload", count)
+ val BLACKBOX_SYSTEM_OVERLOAD = LogMarkerToken(controller,
"blackBoxInvokerSystemOverload", count)
/*
* Invoker related markers
*/
@@ -355,8 +356,8 @@ object LoggingMarkers {
def LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance:
ControllerInstanceId) =
LogMarkerToken(loadbalancer + controllerInstance.asString,
"activationsInflight", count)
- def LOADBALANCER_MEMORY_INFLIGHT(controllerInstance: ControllerInstanceId) =
- LogMarkerToken(loadbalancer + controllerInstance.asString,
"memoryInflight", count)
+ def LOADBALANCER_MEMORY_INFLIGHT(controllerInstance: ControllerInstanceId,
actionType: String) =
+ LogMarkerToken(loadbalancer + controllerInstance.asString,
s"memory${actionType}Inflight", count)
// Time that is needed to execute the action
val INVOKER_ACTIVATION_RUN = LogMarkerToken(invoker, "activationRun", start)
@@ -383,6 +384,19 @@ object LoggingMarkers {
val CONTAINER_CLIENT_RETRIES =
LogMarkerToken(containerClient, "retries", count)
+ val INVOKER_TOTALMEM_BLACKBOX = LogMarkerToken(loadbalancer,
"totalCapacityBlackBox", count)
+ val INVOKER_TOTALMEM_MANAGED = LogMarkerToken(loadbalancer,
"totalCapacityManaged", count)
+
+ val HEALTHY_INVOKER_MANAGED = LogMarkerToken(loadbalancer,
"totalHealthyInvokerManaged", count)
+ val UNHEALTHY_INVOKER_MANAGED = LogMarkerToken(loadbalancer,
"totalUnhealthyInvokerManaged", count)
+ val UNRESPONSIVE_INVOKER_MANAGED = LogMarkerToken(loadbalancer,
"totalUnresponsiveInvokerManaged", count)
+ val OFFLINE_INVOKER_MANAGED = LogMarkerToken(loadbalancer,
"totalOfflineInvokerManaged", count)
+
+ val HEALTHY_INVOKER_BLACKBOX = LogMarkerToken(loadbalancer,
"totalHealthyInvokerBlackBox", count)
+ val UNHEALTHY_INVOKER_BLACKBOX = LogMarkerToken(loadbalancer,
"totalUnhealthyInvokerBlackBox", count)
+ val UNRESPONSIVE_INVOKER_BLACKBOX = LogMarkerToken(loadbalancer,
"totalUnresponsiveInvokerBlackBox", count)
+ val OFFLINE_INVOKER_BLACKBOX = LogMarkerToken(loadbalancer,
"totalOfflineInvokerBlackBox", count)
+
// Kafka related markers
def KAFKA_QUEUE(topic: String) = LogMarkerToken(kafka, topic, count)
def KAFKA_MESSAGE_DELAY(topic: String) = LogMarkerToken(kafka, topic, start,
Some("delay"))
diff --git
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 4d24725..914b3ac 100644
---
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -38,6 +38,7 @@ import org.apache.openwhisk.core.WhiskConfig._
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.loadBalancer.InvokerState.{Healthy, Offline,
Unhealthy, Unresponsive}
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.spi.SpiLoader
@@ -179,14 +180,63 @@ class ShardingContainerPoolBalancer(
TrieMap[ActivationId, Promise[Either[ActivationId, WhiskActivation]]]()
private val activationsPerNamespace = TrieMap[UUID, LongAdder]()
private val totalActivations = new LongAdder()
- private val totalActivationMemory = new LongAdder()
+ private val totalBlackBoxActivationMemory = new LongAdder()
+ private val totalManagedActivationMemory = new LongAdder()
/** State needed for scheduling. */
protected[loadBalancer] val schedulingState =
ShardingContainerPoolBalancerState()(lbConfig)
actorSystem.scheduler.schedule(0.seconds, 10.seconds) {
MetricEmitter.emitHistogramMetric(LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance),
totalActivations.longValue)
-
MetricEmitter.emitHistogramMetric(LOADBALANCER_MEMORY_INFLIGHT(controllerInstance),
totalActivationMemory.longValue)
+ MetricEmitter.emitHistogramMetric(
+ LOADBALANCER_MEMORY_INFLIGHT(controllerInstance, ""),
+ totalBlackBoxActivationMemory.longValue +
totalManagedActivationMemory.longValue)
+ MetricEmitter.emitHistogramMetric(
+ LOADBALANCER_MEMORY_INFLIGHT(controllerInstance, "Blackbox"),
+ totalBlackBoxActivationMemory.longValue)
+ MetricEmitter.emitHistogramMetric(
+ LOADBALANCER_MEMORY_INFLIGHT(controllerInstance, "Managed"),
+ totalManagedActivationMemory.longValue)
+ MetricEmitter.emitHistogramMetric(INVOKER_TOTALMEM_BLACKBOX,
schedulingState.blackboxInvokers.foldLeft(0L) {
+ (total, curr) =>
+ if (curr.status.isUsable) {
+ curr.id.userMemory.toMB + total
+ } else {
+ total
+ }
+ })
+ MetricEmitter.emitHistogramMetric(INVOKER_TOTALMEM_MANAGED,
schedulingState.managedInvokers.foldLeft(0L) {
+ (total, curr) =>
+ if (curr.status.isUsable) {
+ curr.id.userMemory.toMB + total
+ } else {
+ total
+ }
+ })
+ MetricEmitter.emitHistogramMetric(
+ HEALTHY_INVOKER_MANAGED,
+ schedulingState.managedInvokers.count(_.status == Healthy))
+ MetricEmitter.emitHistogramMetric(
+ UNHEALTHY_INVOKER_MANAGED,
+ schedulingState.managedInvokers.count(_.status == Unhealthy))
+ MetricEmitter.emitHistogramMetric(
+ UNRESPONSIVE_INVOKER_MANAGED,
+ schedulingState.managedInvokers.count(_.status == Unresponsive))
+ MetricEmitter.emitHistogramMetric(
+ OFFLINE_INVOKER_MANAGED,
+ schedulingState.managedInvokers.count(_.status == Offline))
+ MetricEmitter.emitHistogramMetric(
+ HEALTHY_INVOKER_BLACKBOX,
+ schedulingState.blackboxInvokers.count(_.status == Healthy))
+ MetricEmitter.emitHistogramMetric(
+ UNHEALTHY_INVOKER_BLACKBOX,
+ schedulingState.blackboxInvokers.count(_.status == Unhealthy))
+ MetricEmitter.emitHistogramMetric(
+ UNRESPONSIVE_INVOKER_BLACKBOX,
+ schedulingState.blackboxInvokers.count(_.status == Unresponsive))
+ MetricEmitter.emitHistogramMetric(
+ OFFLINE_INVOKER_BLACKBOX,
+ schedulingState.blackboxInvokers.count(_.status == Offline))
}
/**
@@ -244,7 +294,6 @@ class ShardingContainerPoolBalancer(
val isBlackboxInvocation = action.exec.pull
val actionType = if (!isBlackboxInvocation) "managed" else "blackbox"
-
val (invokersToUse, stepSizes) =
if (!isBlackboxInvocation) (schedulingState.managedInvokers,
schedulingState.managedStepSizes)
else (schedulingState.blackboxInvokers,
schedulingState.blackboxStepSizes)
@@ -252,7 +301,7 @@ class ShardingContainerPoolBalancer(
val hash =
ShardingContainerPoolBalancer.generateHash(msg.user.namespace.name,
action.fullyQualifiedName(false))
val homeInvoker = hash % invokersToUse.size
val stepSize = stepSizes(hash % stepSizes.size)
- val invoker = ShardingContainerPoolBalancer.schedule(
+ val invoker: Option[(InvokerInstanceId, Boolean)] =
ShardingContainerPoolBalancer.schedule(
action.limits.concurrency.maxConcurrent,
action.fullyQualifiedName(true),
invokersToUse,
@@ -260,7 +309,17 @@ class ShardingContainerPoolBalancer(
action.limits.memory.megabytes,
homeInvoker,
stepSize)
- invoker
+ invoker.foreach {
+ case (_, true) =>
+ val metric =
+ if (isBlackboxInvocation)
+ LoggingMarkers.BLACKBOX_SYSTEM_OVERLOAD
+ else
+ LoggingMarkers.MANAGED_SYSTEM_OVERLOAD
+ MetricEmitter.emitCounterMetric(metric)
+ case _ =>
+ }
+ invoker.map(_._1)
} else {
None
}
@@ -298,7 +357,11 @@ class ShardingContainerPoolBalancer(
instance: InvokerInstanceId):
Future[Either[ActivationId, WhiskActivation]] = {
totalActivations.increment()
+ val isBlackboxInvocation = action.exec.pull
+ val totalActivationMemory =
+ if (isBlackboxInvocation) totalBlackBoxActivationMemory else
totalManagedActivationMemory
totalActivationMemory.add(action.limits.memory.megabytes)
+
activationsPerNamespace.getOrElseUpdate(msg.user.namespace.uuid, new
LongAdder()).increment()
// Timeout is a multiple of the configured maximum action duration. The
minimum timeout is the configured standard
@@ -329,7 +392,8 @@ class ShardingContainerPoolBalancer(
action.limits.memory.megabytes.MB,
action.limits.concurrency.maxConcurrent,
action.fullyQualifiedName(true),
- timeoutHandler)
+ timeoutHandler,
+ isBlackboxInvocation)
})
resultPromise
@@ -426,6 +490,8 @@ class ShardingContainerPoolBalancer(
activationSlots.remove(aid) match {
case Some(entry) =>
totalActivations.decrement()
+ val totalActivationMemory =
+ if (entry.isBlackbox) totalBlackBoxActivationMemory else
totalManagedActivationMemory
totalActivationMemory.add(entry.memory.toMB * (-1))
activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement())
schedulingState.invokerSlots
@@ -555,21 +621,22 @@ object ShardingContainerPoolBalancer extends
LoadBalancerProvider {
* @return an invoker to schedule to or None of no invoker is available
*/
@tailrec
- def schedule(maxConcurrent: Int,
- fqn: FullyQualifiedEntityName,
- invokers: IndexedSeq[InvokerHealth],
- dispatched:
IndexedSeq[NestedSemaphore[FullyQualifiedEntityName]],
- slots: Int,
- index: Int,
- step: Int,
- stepsDone: Int = 0)(implicit logging: Logging, transId:
TransactionId): Option[InvokerInstanceId] = {
+ def schedule(
+ maxConcurrent: Int,
+ fqn: FullyQualifiedEntityName,
+ invokers: IndexedSeq[InvokerHealth],
+ dispatched: IndexedSeq[NestedSemaphore[FullyQualifiedEntityName]],
+ slots: Int,
+ index: Int,
+ step: Int,
+ stepsDone: Int = 0)(implicit logging: Logging, transId: TransactionId):
Option[(InvokerInstanceId, Boolean)] = {
val numInvokers = invokers.size
if (numInvokers > 0) {
val invoker = invokers(index)
//test this invoker - if this action supports concurrency, use the
scheduleConcurrent function
if (invoker.status.isUsable &&
dispatched(invoker.id.toInt).tryAcquireConcurrent(fqn, maxConcurrent, slots)) {
- Some(invoker.id)
+ Some(invoker.id, false)
} else {
// If we've gone through all invokers
if (stepsDone == numInvokers + 1) {
@@ -579,8 +646,7 @@ object ShardingContainerPoolBalancer extends
LoadBalancerProvider {
val random =
healthyInvokers(ThreadLocalRandom.current().nextInt(healthyInvokers.size)).id
dispatched(random.toInt).forceAcquireConcurrent(fqn,
maxConcurrent, slots)
logging.warn(this, s"system is overloaded. Chose
invoker${random.toInt} by random assignment.")
- MetricEmitter.emitCounterMetric(LoggingMarkers.SYSTEM_OVERLOAD)
- Some(random)
+ Some(random, true)
} else {
None
}
@@ -745,4 +811,5 @@ case class ActivationEntry(id: ActivationId,
memory: ByteSize,
maxConcurrent: Int,
fullyQualifiedEntityName: FullyQualifiedEntityName,
- timeoutHandler: Cancellable)
+ timeoutHandler: Cancellable,
+ isBlackbox: Boolean)
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
index da166e7..8feb9a6 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
@@ -269,7 +269,8 @@ class ShardingContainerPoolBalancerTests
it should "choose the first available invoker, jumping in stepSize steps,
falling back to randomized scheduling once all invokers are full" in {
val invokerCount = 3
- val invokerSlots = semaphores(invokerCount + 3, 3) // needs to be offset
by 3 as well
+ val slotPerInvoker = 3
+ val invokerSlots = semaphores(invokerCount + 3, slotPerInvoker) // needs
to be offset by 3 as well
val invokers = (0 until invokerCount).map(i => healthy(i + 3)) // offset
by 3 to asset InstanceId is returned
val expectedResult = Seq(3, 3, 3, 5, 5, 5, 4, 4, 4)
@@ -277,6 +278,7 @@ class ShardingContainerPoolBalancerTests
ShardingContainerPoolBalancer
.schedule(1, fqn, invokers, invokerSlots, 1, index = 0, step = 2)
.get
+ ._1
.toInt
}
@@ -286,21 +288,23 @@ class ShardingContainerPoolBalancerTests
ShardingContainerPoolBalancer
.schedule(1, fqn, invokers, invokerSlots, 1, index = 0, step = 2)
.get
- .toInt
}
- bruteResult should contain allOf (3, 4, 5)
+ bruteResult.map(_._1.toInt) should contain allOf (3, 4, 5)
+ bruteResult.map(_._2) should contain only true
}
it should "ignore unhealthy or offline invokers" in {
val invokers = IndexedSeq(healthy(0), unhealthy(1), offline(2), healthy(3))
- val invokerSlots = semaphores(invokers.size, 3)
+ val slotPerInvoker = 3
+ val invokerSlots = semaphores(invokers.size, slotPerInvoker)
val expectedResult = Seq(0, 0, 0, 3, 3, 3)
val result = expectedResult.map { _ =>
ShardingContainerPoolBalancer
.schedule(1, fqn, invokers, invokerSlots, 1, index = 0, step = 1)
.get
+ ._1
.toInt
}
@@ -308,11 +312,14 @@ class ShardingContainerPoolBalancerTests
// more schedules will result in randomized invokers, but the unhealthy
and offline invokers should not be part
val bruteResult = (0 to 100).map { _ =>
- ShardingContainerPoolBalancer.schedule(1, fqn, invokers, invokerSlots,
1, index = 0, step = 1).get.toInt
+ ShardingContainerPoolBalancer
+ .schedule(1, fqn, invokers, invokerSlots, 1, index = 0, step = 1)
+ .get
}
- bruteResult should contain allOf (0, 3)
- bruteResult should contain noneOf (1, 2)
+ bruteResult.map(_._1.toInt) should contain allOf (0, 3)
+ bruteResult.map(_._1.toInt) should contain noneOf (1, 2)
+ bruteResult.map(_._2) should contain only true
}
it should "only take invokers that have enough free slots" in {
@@ -322,15 +329,35 @@ class ShardingContainerPoolBalancerTests
val invokers = (0 until invokerCount).map(i => healthy(i))
// Ask for three slots -> First invoker should be used
- ShardingContainerPoolBalancer.schedule(1, fqn, invokers, invokerSlots, 3,
index = 0, step = 1).get.toInt shouldBe 0
+ ShardingContainerPoolBalancer
+ .schedule(1, fqn, invokers, invokerSlots, 3, index = 0, step = 1)
+ .get
+ ._1
+ .toInt shouldBe 0
// Ask for two slots -> Second invoker should be used
- ShardingContainerPoolBalancer.schedule(1, fqn, invokers, invokerSlots, 2,
index = 0, step = 1).get.toInt shouldBe 1
+ ShardingContainerPoolBalancer
+ .schedule(1, fqn, invokers, invokerSlots, 2, index = 0, step = 1)
+ .get
+ ._1
+ .toInt shouldBe 1
// Ask for 1 slot -> First invoker should be used
- ShardingContainerPoolBalancer.schedule(1, fqn, invokers, invokerSlots, 1,
index = 0, step = 1).get.toInt shouldBe 0
+ ShardingContainerPoolBalancer
+ .schedule(1, fqn, invokers, invokerSlots, 1, index = 0, step = 1)
+ .get
+ ._1
+ .toInt shouldBe 0
// Ask for 4 slots -> Third invoker should be used
- ShardingContainerPoolBalancer.schedule(1, fqn, invokers, invokerSlots, 4,
index = 0, step = 1).get.toInt shouldBe 2
+ ShardingContainerPoolBalancer
+ .schedule(1, fqn, invokers, invokerSlots, 4, index = 0, step = 1)
+ .get
+ ._1
+ .toInt shouldBe 2
// Ask for 2 slots -> Second invoker should be used
- ShardingContainerPoolBalancer.schedule(1, fqn, invokers, invokerSlots, 2,
index = 0, step = 1).get.toInt shouldBe 1
+ ShardingContainerPoolBalancer
+ .schedule(1, fqn, invokers, invokerSlots, 2, index = 0, step = 1)
+ .get
+ ._1
+ .toInt shouldBe 1
invokerSlots.foreach(_.availablePermits shouldBe 0)
}
@@ -367,6 +394,7 @@ class ShardingContainerPoolBalancerTests
ShardingContainerPoolBalancer
.schedule(concurrency, fqn, invokers, invokerSlots, 1, 0, 1)
.get
+ ._1
.toInt shouldBe i
invokerSlots
.lift(i)