This is an automated email from the ASF dual-hosted git repository.
rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new a72ee3e Fix several loadbalancer bugs. (#3451)
a72ee3e is described below
commit a72ee3e3b0f300dd71fc87fc4ad2be83b2acf8bd
Author: Christian Bickel <[email protected]>
AuthorDate: Wed Mar 21 13:32:10 2018 +0100
Fix several loadbalancer bugs. (#3451)
1. `padTo` (used when generating the list of semaphores for all invokers)
does not recompute the value each time it adds an element to the underlying
list. That resulted in **all** invokers-state being backed by the same
Semaphore and thus not doing anything properly really.
2. The stepsizes need to be calculated for the list of invokers they are
used to step to. Therefore we need a seperate list of step-sizes for the
managed and the blackbox invoker list.
3. The assumption should be, that we freed our resources (and thus updated
the state) **before** we return the request to the user. A minor change in code
order is warranted here.
4. In the overload case we need to take the index we want to schedule to
out of the `healthyInvokers` list, rather than using the `random` value
directly. The `healthyInvokers` list might be shorter/in a different order than
the list of all underlying invokers.
Furthermore, this adds a useful metric to determine (from the loadbalancers
point-of-view) how many activations are currently running in the system. That
can be used to determine overall system utilization.
Co-authored-by: Markus Thömmes <[email protected]>
---
.../src/main/scala/whisk/common/Logging.scala | 11 ++--
.../ShardingContainerPoolBalancer.scala | 59 ++++++++++++++--------
.../test/ShardingContainerPoolBalancerTests.scala | 11 ++--
3 files changed, 51 insertions(+), 30 deletions(-)
diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala
b/common/scala/src/main/scala/whisk/common/Logging.scala
index 1780211..0e21fe3 100644
--- a/common/scala/src/main/scala/whisk/common/Logging.scala
+++ b/common/scala/src/main/scala/whisk/common/Logging.scala
@@ -18,15 +18,13 @@
package whisk.common
import java.io.PrintStream
-import java.time.Clock
-import java.time.Instant
-import java.time.ZoneId
+import java.time.{Clock, Instant, ZoneId}
import java.time.format.DateTimeFormatter
-import akka.event.Logging.{DebugLevel, ErrorLevel, InfoLevel, WarningLevel}
-import akka.event.Logging.LogLevel
+import akka.event.Logging._
import akka.event.LoggingAdapter
import kamon.Kamon
+import whisk.core.entity.InstanceId
trait Logging {
@@ -279,6 +277,9 @@ object LoggingMarkers {
val LOADBALANCER_INVOKER_UNHEALTHY = LogMarkerToken(loadbalancer,
"invokerUnhealthy", count)
val LOADBALANCER_ACTIVATION_START = LogMarkerToken(loadbalancer,
"activations", count)
+ def LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance: InstanceId) =
+ LogMarkerToken(loadbalancer + controllerInstance.toInt,
"activationsInflight", count)
+
// Time that is needed to execute the action
val INVOKER_ACTIVATION_RUN = LogMarkerToken(invoker, "activationRun", start)
diff --git
a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 19b959a..1f5f398 100644
---
a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++
b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -18,8 +18,8 @@
package whisk.core.loadBalancer
import java.nio.charset.StandardCharsets
-import java.util.concurrent.atomic.LongAdder
import java.util.concurrent.ThreadLocalRandom
+import java.util.concurrent.atomic.LongAdder
import akka.actor.{Actor, ActorSystem, Props}
import akka.cluster.ClusterEvent._
@@ -28,6 +28,7 @@ import akka.event.Logging.InfoLevel
import akka.stream.ActorMaterializer
import org.apache.kafka.clients.producer.RecordMetadata
import pureconfig._
+import whisk.common.LoggingMarkers._
import whisk.common._
import whisk.core.WhiskConfig._
import whisk.core.connector._
@@ -71,6 +72,10 @@ class ShardingContainerPoolBalancer(config: WhiskConfig,
controllerInstance: Ins
/** State needed for scheduling. */
private val schedulingState = ShardingContainerPoolBalancerState()()
+ actorSystem.scheduler.schedule(0.seconds, 10.seconds) {
+
MetricEmitter.emitHistogramMetric(LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance),
totalActivations.longValue)
+ }
+
/**
* Monitors invoker supervision and the cluster to update the state
sequentially
*
@@ -123,11 +128,13 @@ class ShardingContainerPoolBalancer(config: WhiskConfig,
controllerInstance: Ins
override def publish(action: ExecutableWhiskActionMetaData, msg:
ActivationMessage)(
implicit transid: TransactionId): Future[Future[Either[ActivationId,
WhiskActivation]]] = {
- val invokersToUse = if (!action.exec.pull) schedulingState.managedInvokers
else schedulingState.blackboxInvokers
+ val (invokersToUse, stepSizes) =
+ if (!action.exec.pull) (schedulingState.managedInvokers,
schedulingState.managedStepSizes)
+ else (schedulingState.blackboxInvokers,
schedulingState.blackboxStepSizes)
val chosen = if (invokersToUse.nonEmpty) {
val hash =
ShardingContainerPoolBalancer.generateHash(msg.user.namespace,
action.fullyQualifiedName(false))
val homeInvoker = hash % invokersToUse.size
- val stepSize = schedulingState.stepSizes(hash %
schedulingState.stepSizes.size)
+ val stepSize = stepSizes(hash % stepSizes.size)
ShardingContainerPoolBalancer.schedule(invokersToUse,
schedulingState.invokerSlots, homeInvoker, stepSize)
} else {
None
@@ -247,6 +254,10 @@ class ShardingContainerPoolBalancer(config: WhiskConfig,
controllerInstance: Ins
activations.remove(aid) match {
case Some(entry) =>
+ totalActivations.decrement()
+ activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement())
+ schedulingState.invokerSlots.lift(invoker.toInt).foreach(_.release())
+
if (!forced) {
entry.timeoutHandler.cancel()
entry.promise.trySuccess(response)
@@ -254,10 +265,6 @@ class ShardingContainerPoolBalancer(config: WhiskConfig,
controllerInstance: Ins
entry.promise.tryFailure(new Throwable("no active ack received"))
}
- totalActivations.decrement()
- activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement())
- schedulingState.invokerSlots.lift(invoker.toInt).foreach(_.release())
-
logging.info(this, s"${if (!forced) "received" else "forced"} active
ack for '$aid'")(tid)
// Active acks that are received here are strictly from user actions -
health actions are not part of
// the load balancer's activation map. Inform the invoker pool
supervisor of the user action completion.
@@ -317,7 +324,7 @@ object ShardingContainerPoolBalancer extends
LoadBalancerProvider {
dispatched: IndexedSeq[ForcableSemaphore],
index: Int,
step: Int,
- stepsDone: Int = 0): Option[InstanceId] = {
+ stepsDone: Int = 0)(implicit logging: Logging):
Option[InstanceId] = {
val numInvokers = invokers.size
if (numInvokers > 0) {
@@ -331,9 +338,10 @@ object ShardingContainerPoolBalancer extends
LoadBalancerProvider {
val healthyInvokers = invokers.filter(_.status == Healthy)
if (healthyInvokers.nonEmpty) {
// Choose a healthy invoker randomly
- val random =
ThreadLocalRandom.current().nextInt(healthyInvokers.size)
- dispatched(random).forceAcquire()
- Some(healthyInvokers(random).id)
+ val random =
healthyInvokers(ThreadLocalRandom.current().nextInt(healthyInvokers.size)).id
+ dispatched(random.toInt).forceAcquire()
+ logging.warn(this, s"system is overloaded. Chose
invoker${random.toInt} by random assignment.")
+ Some(random)
} else {
None
}
@@ -354,14 +362,16 @@ object ShardingContainerPoolBalancer extends
LoadBalancerProvider {
* @param _invokers all of the known invokers in the system
* @param _managedInvokers all invokers for managed runtimes
* @param _blackboxInvokers all invokers for blackbox runtimes
- * @param _stepSizes the step-sizes possible for the current invoker count
+ * @param _managedStepSizes the step-sizes possible for the current managed
invoker count
+ * @param _blackboxStepSizes the step-sizes possible for the current blackbox
invoker count
* @param _invokerSlots state of accessible slots of each invoker
*/
case class ShardingContainerPoolBalancerState(
private var _invokers: IndexedSeq[InvokerHealth] =
IndexedSeq.empty[InvokerHealth],
private var _managedInvokers: IndexedSeq[InvokerHealth] =
IndexedSeq.empty[InvokerHealth],
private var _blackboxInvokers: IndexedSeq[InvokerHealth] =
IndexedSeq.empty[InvokerHealth],
- private var _stepSizes: Seq[Int] =
ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0),
+ private var _managedStepSizes: Seq[Int] =
ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0),
+ private var _blackboxStepSizes: Seq[Int] =
ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0),
private var _invokerSlots: IndexedSeq[ForcableSemaphore] =
IndexedSeq.empty[ForcableSemaphore],
private var _clusterSize: Int = 1)(
lbConfig: ShardingContainerPoolBalancerConfig =
@@ -377,7 +387,8 @@ case class ShardingContainerPoolBalancerState(
def invokers: IndexedSeq[InvokerHealth] = _invokers
def managedInvokers: IndexedSeq[InvokerHealth] = _managedInvokers
def blackboxInvokers: IndexedSeq[InvokerHealth] = _blackboxInvokers
- def stepSizes: Seq[Int] = _stepSizes
+ def managedStepSizes: Seq[Int] = _managedStepSizes
+ def blackboxStepSizes: Seq[Int] = _blackboxStepSizes
def invokerSlots: IndexedSeq[ForcableSemaphore] = _invokerSlots
def clusterSize: Int = _clusterSize
@@ -396,14 +407,6 @@ case class ShardingContainerPoolBalancerState(
val oldSize = _invokers.size
val newSize = newInvokers.size
- if (oldSize != newSize) {
- _stepSizes = ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(newSize)
- if (oldSize < newSize) {
- // Keeps the existing state..
- _invokerSlots = _invokerSlots.padTo(newSize, new
ForcableSemaphore(currentInvokerThreshold))
- }
- }
-
val blackboxes = Math.max(1, (newSize.toDouble * blackboxFraction).toInt)
val managed = Math.max(1, newSize - blackboxes)
@@ -411,6 +414,18 @@ case class ShardingContainerPoolBalancerState(
_blackboxInvokers = _invokers.takeRight(blackboxes)
_managedInvokers = _invokers.take(managed)
+ if (oldSize != newSize) {
+ _managedStepSizes =
ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(managed)
+ _blackboxStepSizes =
ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(blackboxes)
+
+ if (oldSize < newSize) {
+ // Keeps the existing state..
+ _invokerSlots = _invokerSlots ++ IndexedSeq.fill(newSize - oldSize) {
+ new ForcableSemaphore(currentInvokerThreshold)
+ }
+ }
+ }
+
logging.info(
this,
s"loadbalancer invoker status updated. managedInvokers = $managed
blackboxInvokers = $blackboxes")(
diff --git
a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
index ca8442f..50201a6 100644
---
a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
+++
b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
@@ -50,7 +50,8 @@ class ShardingContainerPoolBalancerTests extends FlatSpec
with Matchers with Str
state.blackboxInvokers shouldBe 'empty
state.managedInvokers shouldBe 'empty
state.invokerSlots shouldBe 'empty
- state.stepSizes shouldBe Seq()
+ state.managedStepSizes shouldBe Seq()
+ state.blackboxStepSizes shouldBe Seq()
// apply one update, verify everything is updated accordingly
val update1 = IndexedSeq(healthy(0))
@@ -59,8 +60,10 @@ class ShardingContainerPoolBalancerTests extends FlatSpec
with Matchers with Str
state.invokers shouldBe update1
state.blackboxInvokers shouldBe update1 // fallback to at least one
state.managedInvokers shouldBe update1 // fallback to at least one
+ state.invokerSlots should have size update1.size
state.invokerSlots.head.availablePermits shouldBe slots
- state.stepSizes shouldBe Seq(1)
+ state.managedStepSizes shouldBe Seq(1)
+ state.blackboxStepSizes shouldBe Seq(1)
// aquire a slot to alter invoker state
state.invokerSlots.head.tryAcquire()
@@ -73,9 +76,11 @@ class ShardingContainerPoolBalancerTests extends FlatSpec
with Matchers with Str
state.invokers shouldBe update2
state.managedInvokers shouldBe IndexedSeq(update2.head)
state.blackboxInvokers shouldBe IndexedSeq(update2.last)
+ state.invokerSlots should have size update2.size
state.invokerSlots.head.availablePermits shouldBe slots - 1
state.invokerSlots(1).availablePermits shouldBe slots
- state.stepSizes shouldBe Seq(1)
+ state.managedStepSizes shouldBe Seq(1)
+ state.blackboxStepSizes shouldBe Seq(1)
}
it should "update the cluster size, adjusting the invoker slots accordingly"
in {
--
To stop receiving notification emails like this one, please contact
[email protected].