tysonnorris closed pull request #2984: Dynamic LoadBalancer load using SpiLoader
URL: https://github.com/apache/incubator-openwhisk/pull/2984
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/common/scala/src/main/resources/reference.conf
b/common/scala/src/main/resources/reference.conf
index 45543e5865..4530aef344 100644
--- a/common/scala/src/main/resources/reference.conf
+++ b/common/scala/src/main/resources/reference.conf
@@ -3,4 +3,5 @@ whisk.spi{
MessagingProvider = whisk.connector.kafka.KafkaMessagingProvider
ContainerFactoryProvider =
whisk.core.containerpool.docker.DockerContainerFactoryProvider
LogStoreProvider =
whisk.core.containerpool.logging.DockerToActivationLogStoreProvider
+ LoadBalancerProvider = whisk.core.loadBalancer.ContainerPoolBalancer
}
diff --git
a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index ba5845276c..53408c3438 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -46,7 +46,7 @@ import whisk.core.entitlement._
import whisk.core.entity._
import whisk.core.entity.ActivationId.ActivationIdGenerator
import whisk.core.entity.ExecManifest.Runtimes
-import whisk.core.loadBalancer.{LoadBalancerService}
+import whisk.core.loadBalancer.LoadBalancerProvider
import whisk.http.BasicHttpService
import whisk.http.BasicRasService
import whisk.spi.SpiLoader
@@ -117,7 +117,8 @@ class Controller(val instance: InstanceId,
})
// initialize backend services
- private implicit val loadBalancer = new LoadBalancerService(whiskConfig,
instance, entityStore)
+ private implicit val loadBalancer =
+ SpiLoader.get[LoadBalancerProvider].loadBalancer(whiskConfig, instance)
private implicit val entitlementProvider = new
LocalEntitlementProvider(whiskConfig, loadBalancer)
private implicit val activationIdFactory = new ActivationIdGenerator {}
private implicit val logStore =
SpiLoader.get[LogStoreProvider].logStore(actorSystem)
@@ -137,12 +138,13 @@ class Controller(val instance: InstanceId,
*/
private val internalInvokerHealth = {
implicit val executionContext = actorSystem.dispatcher
-
(path("invokers") & get) {
complete {
- loadBalancer.allInvokers.map(_.map {
- case (instance, state) => s"invoker${instance.toInt}" ->
state.asString
- }.toMap.toJson.asJsObject)
+ loadBalancer
+ .invokerHealth()
+ .map(_.map {
+ case i => s"invoker${i.id.toInt}" -> i.status.asString
+ }.toMap.toJson.asJsObject)
}
}
}
@@ -163,7 +165,7 @@ object Controller {
Map(WhiskConfig.controllerInstances -> null) ++
ExecManifest.requiredProperties ++
RestApiCommons.requiredProperties ++
- LoadBalancerService.requiredProperties ++
+ SpiLoader.get[LoadBalancerProvider].requiredProperties ++
EntitlementProvider.requiredProperties
private def info(config: WhiskConfig, runtimes: Runtimes, apis:
List[String]) =
diff --git
a/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
b/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
index ffcf01bd8a..161ef4bc38 100644
--- a/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala
@@ -46,7 +46,7 @@ import whisk.core.entity._
import whisk.core.entity.ActivationId.ActivationIdGenerator
import whisk.core.entity.WhiskAuthStore
import whisk.core.entity.types._
-import whisk.core.loadBalancer.LoadBalancerService
+import whisk.core.loadBalancer.LoadBalancer
import whisk.http.Messages
/**
@@ -161,7 +161,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String,
apiVersion: String)(
implicit val entityStore: EntityStore,
implicit val entitlementProvider: EntitlementProvider,
implicit val activationIdFactory: ActivationIdGenerator,
- implicit val loadBalancer: LoadBalancerService,
+ implicit val loadBalancer: LoadBalancer,
implicit val cacheChangeNotification: Some[CacheChangeNotification],
implicit val activationStore: ActivationStore,
implicit val logStore: LogStore,
@@ -243,7 +243,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String,
apiVersion: String)(
override val activationStore: ActivationStore,
override val entitlementProvider: EntitlementProvider,
override val activationIdFactory: ActivationIdGenerator,
- override val loadBalancer: LoadBalancerService,
+ override val loadBalancer: LoadBalancer,
override val cacheChangeNotification: Some[CacheChangeNotification],
override val executionContext: ExecutionContext,
override val logging: Logging,
@@ -266,7 +266,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String,
apiVersion: String)(
implicit override val entityStore: EntityStore,
override val entitlementProvider: EntitlementProvider,
override val activationIdFactory: ActivationIdGenerator,
- override val loadBalancer: LoadBalancerService,
+ override val loadBalancer: LoadBalancer,
override val cacheChangeNotification: Some[CacheChangeNotification],
override val executionContext: ExecutionContext,
override val logging: Logging,
@@ -279,7 +279,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String,
apiVersion: String)(
override val entityStore: EntityStore,
override val entitlementProvider: EntitlementProvider,
override val activationIdFactory: ActivationIdGenerator,
- override val loadBalancer: LoadBalancerService,
+ override val loadBalancer: LoadBalancer,
override val cacheChangeNotification: Some[CacheChangeNotification],
override val executionContext: ExecutionContext,
override val logging: Logging,
@@ -293,7 +293,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String,
apiVersion: String)(
override val entitlementProvider: EntitlementProvider,
override val activationStore: ActivationStore,
override val activationIdFactory: ActivationIdGenerator,
- override val loadBalancer: LoadBalancerService,
+ override val loadBalancer: LoadBalancer,
override val cacheChangeNotification: Some[CacheChangeNotification],
override val executionContext: ExecutionContext,
override val logging: Logging,
@@ -310,7 +310,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String,
apiVersion: String)(
override val activationStore: ActivationStore,
override val entitlementProvider: EntitlementProvider,
override val activationIdFactory: ActivationIdGenerator,
- override val loadBalancer: LoadBalancerService,
+ override val loadBalancer: LoadBalancer,
override val actorSystem: ActorSystem,
override val executionContext: ExecutionContext,
override val logging: Logging,
diff --git
a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
similarity index 82%
rename from
core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
rename to
core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
index 89ba90045f..ffd831d079 100644
---
a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala
+++
b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
@@ -19,82 +19,43 @@ package whisk.core.loadBalancer
import java.nio.charset.StandardCharsets
-import scala.annotation.tailrec
-import scala.concurrent.Await
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
-import scala.concurrent.Promise
-import scala.concurrent.duration._
-import scala.util.Failure
-import scala.util.Success
-import org.apache.kafka.clients.producer.RecordMetadata
-import akka.actor.ActorRefFactory
-import akka.actor.ActorSystem
-import akka.actor.Props
+import akka.actor.{ActorRefFactory, ActorSystem, Props}
import akka.cluster.Cluster
-import akka.util.Timeout
import akka.pattern.ask
-import whisk.common.Logging
-import whisk.common.LoggingMarkers
-import whisk.common.TransactionId
-import whisk.core.ConfigKeys
-import whisk.core.WhiskConfig
+import akka.stream.ActorMaterializer
+import akka.util.Timeout
+import org.apache.kafka.clients.producer.RecordMetadata
+import pureconfig._
+import whisk.common.{Logging, LoggingMarkers, TransactionId}
import whisk.core.WhiskConfig._
-import whisk.core.connector.{ActivationMessage, CompletionMessage}
-import whisk.core.connector.MessageFeed
-import whisk.core.connector.MessageProducer
-import whisk.core.connector.MessagingProvider
+import whisk.core.connector._
import whisk.core.database.NoDocumentException
import whisk.core.entity._
-import whisk.core.entity.{ActivationId, WhiskActivation}
-import whisk.core.entity.EntityName
-import whisk.core.entity.ExecutableWhiskActionMetaData
-import whisk.core.entity.Identity
-import whisk.core.entity.InstanceId
-import whisk.core.entity.UUID
-import whisk.core.entity.WhiskAction
import whisk.core.entity.types.EntityStore
+import whisk.core.{ConfigKeys, WhiskConfig}
import whisk.spi.SpiLoader
-import pureconfig._
-case class LoadbalancerConfig(blackboxFraction: Double, invokerBusyThreshold:
Int)
-
-trait LoadBalancer {
-
- val activeAckTimeoutGrace = 1.minute
-
- /** Gets the number of in-flight activations for a specific user. */
- def activeActivationsFor(namespace: UUID): Future[Int]
-
- /** Gets the number of in-flight activations in the system. */
- def totalActiveActivations: Future[Int]
-
- /**
- * Publishes activation message on internal bus for an invoker to pick up.
- *
- * @param action the action to invoke
- * @param msg the activation message to publish on an invoker topic
- * @param transid the transaction id for the request
- * @return result a nested Future the outer indicating completion of
publishing and
- * the inner the completion of the action (i.e., the result)
- * if it is ready before timeout (Right) otherwise the activation id
(Left).
- * The future is guaranteed to complete within the declared action
time limit
- * plus a grace period (see activeAckTimeoutGrace).
- */
- def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
- implicit transid: TransactionId): Future[Future[Either[ActivationId,
WhiskActivation]]]
+import scala.annotation.tailrec
+import scala.concurrent.duration._
+import scala.concurrent.{Await, ExecutionContext, Future, Promise}
+import scala.util.{Failure, Success}
-}
+case class LoadbalancerConfig(blackboxFraction: Double, invokerBusyThreshold:
Int)
-class LoadBalancerService(config: WhiskConfig, instance: InstanceId,
entityStore: EntityStore)(
- implicit val actorSystem: ActorSystem,
- logging: Logging)
+class ContainerPoolBalancer(config: WhiskConfig, instance:
InstanceId)(implicit val actorSystem: ActorSystem,
+
logging: Logging,
+
materializer: ActorMaterializer)
extends LoadBalancer {
private val lbConfig =
loadConfigOrThrow[LoadbalancerConfig](ConfigKeys.loadbalancer)
+ /** Used to manage an action for testing invoker health */ /** Used to
manage an action for testing invoker health */
+ private val entityStore = WhiskEntityStore.datastore(config)
+
/** The execution context for futures */
- implicit val executionContext: ExecutionContext = actorSystem.dispatcher
+ private implicit val executionContext: ExecutionContext =
actorSystem.dispatcher
+
+ private val activeAckTimeoutGrace = 1.minute
/** How many invokers are dedicated to blackbox images. We range bound to
something sensical regardless of configuration. */
private val blackboxFraction: Double = Math.max(0.0, Math.min(1.0,
lbConfig.blackboxFraction))
@@ -113,10 +74,6 @@ class LoadBalancerService(config: WhiskConfig, instance:
InstanceId, entityStore
}
}
- override def activeActivationsFor(namespace: UUID) =
loadBalancerData.activationCountOn(namespace)
-
- override def totalActiveActivations = loadBalancerData.totalActivationCount
-
override def publish(action: ExecutableWhiskActionMetaData, msg:
ActivationMessage)(
implicit transid: TransactionId): Future[Future[Either[ActivationId,
WhiskActivation]]] = {
chooseInvoker(msg.user, action).flatMap { invokerName =>
@@ -127,11 +84,16 @@ class LoadBalancerService(config: WhiskConfig, instance:
InstanceId, entityStore
}
}
- /** An indexed sequence of all invokers in the current system */
- def allInvokers: Future[IndexedSeq[(InstanceId, InvokerState)]] =
+ /** An indexed sequence of all invokers in the current system. */
+ override def invokerHealth(): Future[IndexedSeq[InvokerHealth]] = {
invokerPool
.ask(GetStatus)(Timeout(5.seconds))
- .mapTo[IndexedSeq[(InstanceId, InvokerState)]]
+ .mapTo[IndexedSeq[InvokerHealth]]
+ }
+
+ override def activeActivationsFor(namespace: UUID) =
loadBalancerData.activationCountOn(namespace)
+
+ override def totalActiveActivations = loadBalancerData.totalActivationCount
/**
* Tries to fill in the result slot (i.e., complete the promise) when a
completion message arrives.
@@ -307,9 +269,8 @@ class LoadBalancerService(config: WhiskConfig, instance:
InstanceId, entityStore
/** Compute the number of blackbox-dedicated invokers by applying a rounded
down fraction of all invokers (but at least 1). */
private def numBlackbox(totalInvokers: Int) = Math.max(1,
(totalInvokers.toDouble * blackboxFraction).toInt)
- /** Return invokers (almost) dedicated to running blackbox actions. */
- private def blackboxInvokers(
- invokers: IndexedSeq[(InstanceId, InvokerState)]): IndexedSeq[(InstanceId,
InvokerState)] = {
+ /** Return invokers dedicated to running blackbox actions. */
+ private def blackboxInvokers(invokers: IndexedSeq[InvokerHealth]):
IndexedSeq[InvokerHealth] = {
val blackboxes = numBlackbox(invokers.size)
invokers.takeRight(blackboxes)
}
@@ -318,8 +279,7 @@ class LoadBalancerService(config: WhiskConfig, instance:
InstanceId, entityStore
* Return (at least one) invokers for running non black-box actions.
* This set can overlap with the blackbox set if there is only one invoker.
*/
- private def managedInvokers(
- invokers: IndexedSeq[(InstanceId, InvokerState)]): IndexedSeq[(InstanceId,
InvokerState)] = {
+ private def managedInvokers(invokers: IndexedSeq[InvokerHealth]):
IndexedSeq[InvokerHealth] = {
val managed = Math.max(1, invokers.length - numBlackbox(invokers.length))
invokers.take(managed)
}
@@ -329,14 +289,14 @@ class LoadBalancerService(config: WhiskConfig, instance:
InstanceId, entityStore
val hash = generateHash(user.namespace, action)
loadBalancerData.activationCountPerInvoker.flatMap { currentActivations =>
- allInvokers.flatMap { invokers =>
+ invokerHealth().flatMap { invokers =>
val invokersToUse = if (action.exec.pull) blackboxInvokers(invokers)
else managedInvokers(invokers)
val invokersWithUsage = invokersToUse.view.map {
// Using a view defers the comparably expensive lookup to actual
access of the element
- case (instance, state) => (instance, state,
currentActivations.getOrElse(instance.toString, 0))
+ case invoker => (invoker.id, invoker.status,
currentActivations.getOrElse(instance.toString, 0))
}
- LoadBalancerService.schedule(invokersWithUsage,
lbConfig.invokerBusyThreshold, hash) match {
+ ContainerPoolBalancer.schedule(invokersWithUsage,
lbConfig.invokerBusyThreshold, hash) match {
case Some(invoker) => Future.successful(invoker)
case None =>
logging.error(this, s"all invokers
down")(TransactionId.invokerHealth)
@@ -352,7 +312,13 @@ class LoadBalancerService(config: WhiskConfig, instance:
InstanceId, entityStore
}
}
-object LoadBalancerService {
+object ContainerPoolBalancer extends LoadBalancerProvider {
+
+ override def loadBalancer(whiskConfig: WhiskConfig, instance: InstanceId)(
+ implicit actorSystem: ActorSystem,
+ logging: Logging,
+ materializer: ActorMaterializer): LoadBalancer = new
ContainerPoolBalancer(whiskConfig, instance)
+
def requiredProperties =
kafkaHosts ++
Map(controllerLocalBookkeeping -> null, controllerSeedNodes -> null)
@@ -367,7 +333,7 @@ object LoadBalancerService {
def gcd(a: Int, b: Int): Int = if (b == 0) a else gcd(b, a % b)
/** Returns pairwise coprime numbers until x. Result is memoized. */
- val pairwiseCoprimeNumbersUntil: Int => IndexedSeq[Int] =
LoadBalancerService.memoize {
+ val pairwiseCoprimeNumbersUntil: Int => IndexedSeq[Int] =
ContainerPoolBalancer.memoize {
case x =>
(1 to x).foldLeft(IndexedSeq.empty[Int])((primes, cur) => {
if (gcd(cur, x) == 1 && primes.forall(i => gcd(i, cur) == 1)) {
@@ -394,7 +360,7 @@ object LoadBalancerService {
val numInvokers = invokers.size
if (numInvokers > 0) {
val homeInvoker = hash % numInvokers
- val stepSizes =
LoadBalancerService.pairwiseCoprimeNumbersUntil(numInvokers)
+ val stepSizes =
ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(numInvokers)
val step = stepSizes(hash % stepSizes.size)
val invokerProgression = Stream
diff --git
a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
index cd80a8ee12..13c3a70b95 100644
---
a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
+++
b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
@@ -86,7 +86,7 @@ class InvokerPool(childFactory: (ActorRefFactory, InstanceId)
=> ActorRef,
// from leaking the state for external mutation
var instanceToRef = immutable.Map.empty[InstanceId, ActorRef]
var refToInstance = immutable.Map.empty[ActorRef, InstanceId]
- var status = IndexedSeq[(InstanceId, InvokerState)]()
+ var status = IndexedSeq[InvokerHealth]()
def receive = {
case p: PingMessage =>
@@ -103,13 +103,13 @@ class InvokerPool(childFactory: (ActorRefFactory,
InstanceId) => ActorRef,
case CurrentState(invoker, currentState: InvokerState) =>
refToInstance.get(invoker).foreach { instance =>
- status = status.updated(instance.toInt, (instance, currentState))
+ status = status.updated(instance.toInt, new InvokerHealth(instance,
currentState))
}
logStatus()
case Transition(invoker, oldState: InvokerState, newState: InvokerState) =>
refToInstance.get(invoker).foreach { instance =>
- status = status.updated(instance.toInt, (instance, newState))
+ status = status.updated(instance.toInt, new InvokerHealth(instance,
newState))
}
logStatus()
@@ -118,7 +118,7 @@ class InvokerPool(childFactory: (ActorRefFactory,
InstanceId) => ActorRef,
}
def logStatus() = {
- val pretty = status.map { case (instance, state) => s"${instance.toInt} ->
$state" }
+ val pretty = status.map(i => s"${i.id.toInt} -> ${i.status}")
logging.info(this, s"invoker status changed to ${pretty.mkString(", ")}")
}
@@ -155,7 +155,7 @@ class InvokerPool(childFactory: (ActorRefFactory,
InstanceId) => ActorRef,
def registerInvoker(instanceId: InstanceId): ActorRef = {
logging.info(this, s"registered a new invoker:
invoker${instanceId.toInt}")(TransactionId.invokerHealth)
- status = padToIndexed(status, instanceId.toInt + 1, i => (InstanceId(i),
Offline))
+ status = padToIndexed(status, instanceId.toInt + 1, i => new
InvokerHealth(InstanceId(i), Offline))
val ref = childFactory(context, instanceId)
diff --git
a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala
b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala
new file mode 100644
index 0000000000..8f2227feba
--- /dev/null
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.loadBalancer
+
+import scala.concurrent.Future
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
+import whisk.common.{Logging, TransactionId}
+import whisk.core.WhiskConfig
+import whisk.core.connector._
+import whisk.core.entity._
+import whisk.spi.Spi
+
+/**
+ * Describes an abstract invoker. An invoker is a local container pool manager
that
+ * is in charge of the container life cycle management.
+ *
+ * @param id a unique instance identifier for the invoker
+ * @param status it status (healthy, unhealthy, offline)
+ */
+class InvokerHealth(val id: InstanceId, val status: InvokerState) {
+ override def equals(obj: scala.Any): Boolean = obj match {
+ case that: InvokerHealth => that.id == this.id && that.status ==
this.status
+ case _ => false
+ }
+}
+
+trait LoadBalancer {
+
+ /**
+ * Publishes activation message on internal bus for an invoker to pick up.
+ *
+ * @param action the action to invoke
+ * @param msg the activation message to publish on an invoker topic
+ * @param transid the transaction id for the request
+ * @return result a nested Future the outer indicating completion of
publishing and
+ * the inner the completion of the action (i.e., the result)
+ * if it is ready before timeout (Right) otherwise the activation id
(Left).
+ * The future is guaranteed to complete within the declared action
time limit
+ * plus a grace period (see activeAckTimeoutGrace).
+ */
+ def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
+ implicit transid: TransactionId): Future[Future[Either[ActivationId,
WhiskActivation]]]
+
+ /**
+ * Returns a message indicating the health of the containers and/or
container pool in general.
+ *
+ * @return a Future[IndexedSeq[InvokerHealth]] representing the health of
the pools managed by the loadbalancer.
+ */
+ def invokerHealth(): Future[IndexedSeq[InvokerHealth]]
+
+ /** Gets the number of in-flight activations for a specific user. */
+ def activeActivationsFor(namespace: UUID): Future[Int]
+
+ /** Gets the number of in-flight activations in the system. */
+ def totalActiveActivations: Future[Int]
+}
+
+/**
+ * An Spi for providing load balancer implementations.
+ */
+trait LoadBalancerProvider extends Spi {
+ def requiredProperties: Map[String, String]
+
+ def loadBalancer(whiskConfig: WhiskConfig, instance: InstanceId)(implicit
actorSystem: ActorSystem,
+ logging:
Logging,
+
materializer: ActorMaterializer): LoadBalancer
+}
diff --git
a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
index f0c9fe5c78..7344ba7849 100644
--- a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala
@@ -200,4 +200,5 @@ class DegenerateLoadBalancerService(config:
WhiskConfig)(implicit ec: ExecutionC
} getOrElse Future.failed(new IllegalArgumentException("Unit test does
not need fast path"))
}
+ override def invokerHealth() = Future.successful(IndexedSeq.empty)
}
diff --git
a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerServiceObjectTests.scala
b/tests/src/test/scala/whisk/core/loadBalancer/test/ContainerPoolBalancerObjectTests.scala
similarity index 68%
rename from
tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerServiceObjectTests.scala
rename to
tests/src/test/scala/whisk/core/loadBalancer/test/ContainerPoolBalancerObjectTests.scala
index 3f3dca4794..60eda84a79 100644
---
a/tests/src/test/scala/whisk/core/loadBalancer/test/LoadBalancerServiceObjectTests.scala
+++
b/tests/src/test/scala/whisk/core/loadBalancer/test/ContainerPoolBalancerObjectTests.scala
@@ -21,7 +21,7 @@ import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.FlatSpec
import org.scalatest.Matchers
-import whisk.core.loadBalancer.LoadBalancerService
+import whisk.core.loadBalancer.ContainerPoolBalancer
import whisk.core.loadBalancer.Healthy
import whisk.core.loadBalancer.Offline
import whisk.core.loadBalancer.UnHealthy
@@ -34,12 +34,12 @@ import whisk.core.entity.InstanceId
* of the ContainerPool object.
*/
@RunWith(classOf[JUnitRunner])
-class LoadBalancerServiceObjectTests extends FlatSpec with Matchers {
+class ContainerPoolBalancerObjectTests extends FlatSpec with Matchers {
behavior of "memoize"
it should "not recompute a value which was already given" in {
var calls = 0
- val add1: Int => Int = LoadBalancerService.memoize {
+ val add1: Int => Int = ContainerPoolBalancer.memoize {
case second =>
calls += 1
1 + second
@@ -58,18 +58,18 @@ class LoadBalancerServiceObjectTests extends FlatSpec with
Matchers {
behavior of "pairwiseCoprimeNumbersUntil"
it should "return an empty set for malformed inputs" in {
- LoadBalancerService.pairwiseCoprimeNumbersUntil(0) shouldBe Seq()
- LoadBalancerService.pairwiseCoprimeNumbersUntil(-1) shouldBe Seq()
+ ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0) shouldBe Seq()
+ ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(-1) shouldBe Seq()
}
it should "return all coprime numbers until the number given" in {
- LoadBalancerService.pairwiseCoprimeNumbersUntil(1) shouldBe Seq(1)
- LoadBalancerService.pairwiseCoprimeNumbersUntil(2) shouldBe Seq(1)
- LoadBalancerService.pairwiseCoprimeNumbersUntil(3) shouldBe Seq(1, 2)
- LoadBalancerService.pairwiseCoprimeNumbersUntil(4) shouldBe Seq(1, 3)
- LoadBalancerService.pairwiseCoprimeNumbersUntil(5) shouldBe Seq(1, 2, 3)
- LoadBalancerService.pairwiseCoprimeNumbersUntil(9) shouldBe Seq(1, 2, 5, 7)
- LoadBalancerService.pairwiseCoprimeNumbersUntil(10) shouldBe Seq(1, 3, 7)
+ ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(1) shouldBe Seq(1)
+ ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(2) shouldBe Seq(1)
+ ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(3) shouldBe Seq(1, 2)
+ ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(4) shouldBe Seq(1, 3)
+ ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(5) shouldBe Seq(1, 2, 3)
+ ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(9) shouldBe Seq(1, 2, 5,
7)
+ ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(10) shouldBe Seq(1, 3, 7)
}
behavior of "chooseInvoker"
@@ -78,24 +78,24 @@ class LoadBalancerServiceObjectTests extends FlatSpec with
Matchers {
def hashInto[A](list: Seq[A], hash: Int) = list(hash % list.size)
it should "return None on an empty invokers list" in {
- LoadBalancerService.schedule(IndexedSeq(), 0, 1) shouldBe None
+ ContainerPoolBalancer.schedule(IndexedSeq(), 0, 1) shouldBe None
}
it should "return None on a list of offline/unhealthy invokers" in {
val invs = IndexedSeq((InstanceId(0), Offline, 0), (InstanceId(1),
UnHealthy, 0))
- LoadBalancerService.schedule(invs, 0, 1) shouldBe None
+ ContainerPoolBalancer.schedule(invs, 0, 1) shouldBe None
}
it should "schedule to the home invoker" in {
val invs = invokers(10)
val hash = 2
- LoadBalancerService.schedule(invs, 1, hash) shouldBe Some(InstanceId(hash
% invs.size))
+ ContainerPoolBalancer.schedule(invs, 1, hash) shouldBe
Some(InstanceId(hash % invs.size))
}
it should "take the only online invoker" in {
- LoadBalancerService.schedule(
+ ContainerPoolBalancer.schedule(
IndexedSeq((InstanceId(0), Offline, 0), (InstanceId(1), UnHealthy, 0),
(InstanceId(2), Healthy, 0)),
0,
1) shouldBe Some(InstanceId(2))
@@ -105,7 +105,7 @@ class LoadBalancerServiceObjectTests extends FlatSpec with
Matchers {
val hash = 0
val invs = IndexedSeq((InstanceId(0), Healthy, 10), (InstanceId(1),
UnHealthy, 0), (InstanceId(2), Healthy, 0))
- LoadBalancerService.schedule(invs, 10, hash) shouldBe Some(InstanceId(2))
+ ContainerPoolBalancer.schedule(invs, 10, hash) shouldBe Some(InstanceId(2))
}
it should "jump to the next invoker determined by a hashed stepsize if the
home invoker is overloaded" in {
@@ -114,9 +114,9 @@ class LoadBalancerServiceObjectTests extends FlatSpec with
Matchers {
val targetInvoker = hash % invokerCount
val invs = invokers(invokerCount).updated(targetInvoker,
(InstanceId(targetInvoker), Healthy, 1))
- val step =
hashInto(LoadBalancerService.pairwiseCoprimeNumbersUntil(invokerCount), hash)
+ val step =
hashInto(ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(invokerCount), hash)
- LoadBalancerService.schedule(invs, 1, hash) shouldBe Some(InstanceId((hash
+ step) % invs.size))
+ ContainerPoolBalancer.schedule(invs, 1, hash) shouldBe
Some(InstanceId((hash + step) % invs.size))
}
it should "wrap the search at the end of the invoker list" in {
@@ -125,12 +125,12 @@ class LoadBalancerServiceObjectTests extends FlatSpec
with Matchers {
val hash = 1
val targetInvoker = hashInto(invs, hash) // will be invoker1
- val step =
hashInto(LoadBalancerService.pairwiseCoprimeNumbersUntil(invokerCount), hash)
// will be 2
+ val step =
hashInto(ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(invokerCount), hash)
// will be 2
step shouldBe 2
// invoker1 is overloaded so it will step (2 steps) to the next one --> 1
2 0 --> invoker0 is next target
// invoker0 is overloaded so it will step to the next one --> 0 1 2 -->
invoker2 is next target and underloaded
- LoadBalancerService.schedule(invs, 1, hash) shouldBe Some(InstanceId((hash
+ step + step) % invs.size))
+ ContainerPoolBalancer.schedule(invs, 1, hash) shouldBe
Some(InstanceId((hash + step + step) % invs.size))
}
it should "multiply its threshold in 3 iterations to find an invoker with a
good warm-chance" in {
@@ -140,22 +140,22 @@ class LoadBalancerServiceObjectTests extends FlatSpec
with Matchers {
// even though invoker1 is not the home invoker in this case, it gets
chosen over
// the others because it's the first one encountered by the iteration
mechanism to be below
// the threshold of 3 * 16 invocations
- LoadBalancerService.schedule(invs, 16, hash) shouldBe Some(InstanceId(0))
+ ContainerPoolBalancer.schedule(invs, 16, hash) shouldBe Some(InstanceId(0))
}
it should "choose the home invoker if all invokers are overloaded even above
the muliplied threshold" in {
val invs = IndexedSeq((InstanceId(0), Healthy, 51), (InstanceId(1),
Healthy, 50), (InstanceId(2), Healthy, 49))
val hash = 0 // home is 0, stepsize is 1
- LoadBalancerService.schedule(invs, 16, hash) shouldBe Some(InstanceId(0))
+ ContainerPoolBalancer.schedule(invs, 16, hash) shouldBe Some(InstanceId(0))
}
it should "transparently work with partitioned sets of invokers" in {
val invs = IndexedSeq((InstanceId(3), Healthy, 0), (InstanceId(4),
Healthy, 0), (InstanceId(5), Healthy, 0))
- LoadBalancerService.schedule(invs, 1, 0) shouldBe Some(InstanceId(3))
- LoadBalancerService.schedule(invs, 1, 1) shouldBe Some(InstanceId(4))
- LoadBalancerService.schedule(invs, 1, 2) shouldBe Some(InstanceId(5))
- LoadBalancerService.schedule(invs, 1, 3) shouldBe Some(InstanceId(3))
+ ContainerPoolBalancer.schedule(invs, 1, 0) shouldBe Some(InstanceId(3))
+ ContainerPoolBalancer.schedule(invs, 1, 1) shouldBe Some(InstanceId(4))
+ ContainerPoolBalancer.schedule(invs, 1, 2) shouldBe Some(InstanceId(5))
+ ContainerPoolBalancer.schedule(invs, 1, 3) shouldBe Some(InstanceId(3))
}
}
diff --git
a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
index 8e7d2911cd..6f0d05ce42 100644
---
a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
+++
b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
@@ -68,6 +68,7 @@ import whisk.core.loadBalancer.InvokerPool
import whisk.core.loadBalancer.InvokerState
import whisk.core.loadBalancer.Offline
import whisk.core.loadBalancer.UnHealthy
+import whisk.core.loadBalancer.InvokerHealth
import whisk.utils.retry
import whisk.core.connector.test.TestConnector
import whisk.core.entitlement.Privilege
@@ -101,7 +102,7 @@ class InvokerSupervisionTests
/** Helper to generate a list of (InstanceId, InvokerState) */
def zipWithInstance(list: IndexedSeq[InvokerState]) = list.zipWithIndex.map {
- case (state, index) => (InstanceId(index), state)
+ case (state, index) => new InvokerHealth(InstanceId(index), state)
}
val pC = new TestConnector("pingFeedTtest", 4, false) {}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services