markusthoemmes closed pull request #2888: Adapt trigger throttle for multiple
controllers
URL: https://github.com/apache/incubator-openwhisk/pull/2888
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 79642fed01..c6ad149c4c 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -117,6 +117,8 @@ controller:
bindPort: 2551
# at this moment all controllers are seed nodes
seedNodes: "{{ groups['controllers'] | map('extract', hostvars,
'ansible_host') | list }}"
+ # We recommend to enable HA for the controllers only, if bookkeeping data
are shared too. (localBookkeeping: false)
+ ha: "{{ controller_enable_ha | default(false) }}"
registry:
confdir: "{{ config_root_dir }}/registry"
diff --git a/ansible/roles/controller/tasks/deploy.yml
b/ansible/roles/controller/tasks/deploy.yml
index c669395dcb..bba75d72f3 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -82,12 +82,11 @@
"AKKA_CLUSTER_SEED_NODES": "{{seed_nodes_list | join(' ') }}"
"AKKA_CLUSTER_BIND_PORT": "{{ controller.akka.cluster.bindPort }}"
"AKKA_ACTOR_PROVIDER": "{{ controller.akka.provider }}"
-
"METRICS_KAMON": "{{ metrics.kamon.enabled }}"
"METRICS_LOG": "{{ metrics.log.enabled }}"
"METRICS_KAMON_HOST": "{{ metrics.kamon.host }}"
"METRICS_KAMON_PORT": "{{ metrics.kamon.port }}"
-
+ "CONTROLLER_HA": "{{ controller.ha }}"
volumes:
- "{{ whisk_logs_dir }}/controller{{
groups['controllers'].index(inventory_hostname) }}:/logs"
ports:
diff --git a/ansible/roles/nginx/templates/nginx.conf.j2
b/ansible/roles/nginx/templates/nginx.conf.j2
index 6a0b4dc875..9efae0088b 100644
--- a/ansible/roles/nginx/templates/nginx.conf.j2
+++ b/ansible/roles/nginx/templates/nginx.conf.j2
@@ -30,7 +30,7 @@ http {
server {{ hostvars[groups['controllers'] | first].ansible_host }}:{{
controller.basePort }} fail_timeout=60s;
{% for ip in groups['controllers'] %}
{% if groups['controllers'].index(ip) > 0 %}
- server {{ hostvars[ip].ansible_host }}:{{ controller.basePort +
groups['controllers'].index(ip) }} backup;
+ server {{ hostvars[ip].ansible_host }}:{{ controller.basePort +
groups['controllers'].index(ip) }} {% if controller.ha %}fail_timeout=60s{%
else %}backup{% endif %};
{% endif %}
{% endfor %}
keepalive 512;
diff --git a/ansible/templates/whisk.properties.j2
b/ansible/templates/whisk.properties.j2
index 2c1d644cfb..ecef131f90 100644
--- a/ansible/templates/whisk.properties.j2
+++ b/ansible/templates/whisk.properties.j2
@@ -58,6 +58,7 @@ invoker.hosts.baseport={{ invoker.port }}
controller.hosts={{ groups["controllers"] | map('extract', hostvars,
'ansible_host') | list | join(",") }}
controller.host.basePort={{ controller.basePort }}
controller.instances={{ controller.instances }}
+controller.ha={{ controller.ha }}
invoker.container.network=bridge
invoker.container.policy={{ invoker_container_policy_name | default()}}
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 726d1bcabb..bc63dc2bea 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -105,6 +105,7 @@ class WhiskConfig(requiredProperties: Map[String, String],
val actionSequenceLimit = this(WhiskConfig.actionSequenceMaxLimit)
val controllerSeedNodes = this(WhiskConfig.controllerSeedNodes)
val controllerLocalBookkeeping =
getAsBoolean(WhiskConfig.controllerLocalBookkeeping, false)
+ val controllerHighAvailability =
getAsBoolean(WhiskConfig.controllerHighAvailability, false)
}
object WhiskConfig {
@@ -240,4 +241,5 @@ object WhiskConfig {
val triggerFirePerMinuteLimit = "limits.triggers.fires.perMinute"
val controllerSeedNodes = "akka.cluster.seed.nodes"
val controllerLocalBookkeeping = "controller.localBookkeeping"
+ val controllerHighAvailability = "controller.ha"
}
diff --git
a/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala
b/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala
index ef51097274..6fa76d22c5 100644
---
a/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala
+++
b/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala
@@ -161,8 +161,6 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
if (cacheEnabled) {
logger.info(this, s"invalidating $key on delete")
- notifier.foreach(_(key))
-
// try inserting our desired entry...
val desiredEntry = Entry(transid, InvalidateInProgress, None)
cache(key)(desiredEntry) flatMap { actualEntry =>
@@ -206,6 +204,8 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
// a pre-existing owner will take care of the invalidation
invalidator
}
+ } andThen {
+ case _ => notifier.foreach(_(key))
}
} else invalidator // not caching
}
@@ -265,8 +265,6 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
notifier: Option[CacheChangeNotification]): Future[Winfo] = {
if (cacheEnabled) {
- notifier.foreach(_(key))
-
// try inserting our desired entry...
val desiredEntry = Entry(transid, WriteInProgress,
Some(Future.successful(doc)))
cache(key)(desiredEntry) flatMap { actualEntry =>
@@ -292,6 +290,8 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
invalidateEntryAfter(generator, key, actualEntry)
}
}
+ } andThen {
+ case _ => notifier.foreach(_(key))
}
} else generator // not caching
}
diff --git
a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
index 9afa83aa78..c0682815e2 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
@@ -65,7 +65,9 @@ protected[core] object EntitlementProvider {
WhiskConfig.actionInvokePerMinuteLimit -> null,
WhiskConfig.actionInvokeConcurrentLimit -> null,
WhiskConfig.triggerFirePerMinuteLimit -> null,
- WhiskConfig.actionInvokeSystemOverloadLimit -> null)
+ WhiskConfig.actionInvokeSystemOverloadLimit -> null,
+ WhiskConfig.controllerInstances -> null,
+ WhiskConfig.controllerHighAvailability -> null)
}
/**
@@ -78,10 +80,36 @@ protected[core] abstract class EntitlementProvider(config:
WhiskConfig, loadBala
private implicit val executionContext = actorSystem.dispatcher
+ /**
+ * The number of controllers if HA is enabled, 1 otherwise
+ */
+ private val diviser = if (config.controllerHighAvailability)
config.controllerInstances.toInt else 1
+
+ /**
+ * Allows 20% of additional requests on top of the limit to mitigate
possible unfair round-robin loadbalancing between
+ * controllers
+ */
+ private val overcommit = if (config.controllerHighAvailability) 1.2 else 1
+
+ /**
+ * Adjust the throttles for a single controller with the diviser and the
overcommit.
+ *
+ * @param originalThrottle The throttle that needs to be adjusted for this
controller.
+ */
+ private def dilateThrottle(originalThrottle: Int): Int = {
+ Math.ceil((originalThrottle.toDouble / diviser.toDouble) *
overcommit).toInt
+ }
+
private val invokeRateThrottler =
- new RateThrottler("actions per minute",
config.actionInvokePerMinuteLimit.toInt, _.limits.invocationsPerMinute)
+ new RateThrottler(
+ "actions per minute",
+ dilateThrottle(config.actionInvokePerMinuteLimit.toInt),
+ _.limits.invocationsPerMinute.map(dilateThrottle))
private val triggerRateThrottler =
- new RateThrottler("triggers per minute",
config.triggerFirePerMinuteLimit.toInt, _.limits.firesPerMinute)
+ new RateThrottler(
+ "triggers per minute",
+ dilateThrottle(config.triggerFirePerMinuteLimit.toInt),
+ _.limits.firesPerMinute.map(dilateThrottle))
private val concurrentInvokeThrottler = new ActivationThrottler(
loadBalancer,
config.actionInvokeConcurrentLimit.toInt,
diff --git a/tests/src/test/scala/ha/ShootComponentsTests.scala
b/tests/src/test/scala/ha/ShootComponentsTests.scala
index fa1aba121e..5fd1525936 100644
--- a/tests/src/test/scala/ha/ShootComponentsTests.scala
+++ b/tests/src/test/scala/ha/ShootComponentsTests.scala
@@ -53,6 +53,13 @@ class ShootComponentsTests extends FlatSpec with Matchers
with WskTestHelpers wi
implicit val materializer = ActorMaterializer()
implicit val testConfig = PatienceConfig(1.minute)
+ // Throttle requests to the remaining controllers to avoid getting 429s. (60
req/min)
+ val amountOfControllers =
WhiskProperties.getProperty(WhiskConfig.controllerInstances).toInt
+ val limit =
WhiskProperties.getProperty(WhiskConfig.actionInvokeConcurrentLimit).toDouble
+ val limitPerController = limit / amountOfControllers
+ val allowedRequestsPerMinute = (amountOfControllers - 1.0) *
limitPerController
+ val timeBeweenRequests = 60.seconds / allowedRequestsPerMinute
+
val controller0DockerHost = WhiskProperties.getBaseControllerHost() + ":" +
WhiskProperties.getProperty(
WhiskConfig.dockerPort)
@@ -94,9 +101,8 @@ class ShootComponentsTests extends FlatSpec with Matchers
with WskTestHelpers wi
println(s"Done rerquests with responses: invoke:
${invokeExit.futureValue} and get: ${getExit.futureValue}")
- // Do at most one action invocation per second to avoid getting 429s.
(60 req/min - limit)
- val wait = 1000 - (Instant.now.toEpochMilli - start.toEpochMilli)
- Thread.sleep(if (wait < 0) 0L else if (wait > 1000) 1000L else wait)
+ val remainingWait = timeBeweenRequests.toMillis -
(Instant.now.toEpochMilli - start.toEpochMilli)
+ Thread.sleep(if (remainingWait < 0) 0L else remainingWait)
(invokeExit.futureValue, getExit.futureValue)
}
}
@@ -104,15 +110,15 @@ class ShootComponentsTests extends FlatSpec with Matchers
with WskTestHelpers wi
behavior of "Controllers hot standby"
it should "use controller1 if controller0 goes down" in
withAssetCleaner(wskprops) { (wp, assetHelper) =>
- if (WhiskProperties.getProperty(WhiskConfig.controllerInstances).toInt >=
2) {
+ if (amountOfControllers >= 2) {
val actionName = "shootcontroller"
assetHelper.withCleaner(wsk.action, actionName) { (action, _) =>
action.create(actionName, defaultAction)
}
- // Produce some load on the system for 100 seconds (each second one
request). Kill the controller after 4 requests
- val totalRequests = 100
+ // Produce some load on the system for 100 seconds. Kill the controller
after 4 requests
+ val totalRequests = (100.seconds / timeBeweenRequests).toInt
val requestsBeforeRestart = doRequests(4, actionName)
@@ -130,7 +136,7 @@ class ShootComponentsTests extends FlatSpec with Matchers
with WskTestHelpers wi
val requests = requestsBeforeRestart ++ requestsAfterRestart
val unsuccessfulInvokes = requests.map(_._1).count(_ !=
TestUtils.SUCCESS_EXIT)
- // Allow 3 failures for the 90 seconds
+ // Allow 3 failures for the 100 seconds
unsuccessfulInvokes should be <= 3
val unsuccessfulGets = requests.map(_._2).count(_ !=
TestUtils.SUCCESS_EXIT)
diff --git a/tests/src/test/scala/limits/ThrottleTests.scala
b/tests/src/test/scala/limits/ThrottleTests.scala
index 511a1b3412..7c59aec150 100644
--- a/tests/src/test/scala/limits/ThrottleTests.scala
+++ b/tests/src/test/scala/limits/ThrottleTests.scala
@@ -27,11 +27,13 @@ import scala.concurrent.Promise
import scala.concurrent.duration._
import org.junit.runner.RunWith
+import org.scalatest.BeforeAndAfterAll
import org.scalatest.FlatSpec
import org.scalatest.Matchers
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.junit.JUnitRunner
+import common.RunWskAdminCmd
import common.TestHelpers
import common.TestUtils
import common.TestUtils._
@@ -42,10 +44,9 @@ import common.WskProps
import common.WskTestHelpers
import spray.json._
import spray.json.DefaultJsonProtocol._
+import whisk.core.WhiskConfig
import whisk.http.Messages._
import whisk.utils.ExecutionContextFactory
-import org.scalatest.BeforeAndAfterAll
-import common.RunWskAdminCmd
import whisk.utils.retry
protected[limits] trait LocalHelper {
@@ -72,8 +73,10 @@ class ThrottleTests
val throttleWindow = 1.minute
- val maximumInvokesPerMinute = getLimit("limits.actions.invokes.perMinute")
- val maximumFiringsPerMinute = getLimit("limits.triggers.fires.perMinute")
+ // Due to the overhead of the per minute limit in the controller, we add
this overhead here as well.
+ val overhead = if
(WhiskProperties.getProperty(WhiskConfig.controllerHighAvailability).toBoolean)
1.2 else 1.0
+ val maximumInvokesPerMinute =
math.ceil(getLimit("limits.actions.invokes.perMinute") * overhead).toInt
+ val maximumFiringsPerMinute =
math.ceil(getLimit("limits.triggers.fires.perMinute") * overhead).toInt
val maximumConcurrentInvokes = getLimit("limits.actions.invokes.concurrent")
println(s"maximumInvokesPerMinute = $maximumInvokesPerMinute")
@@ -368,11 +371,13 @@ class NamespaceSpecificThrottleTests
trigger.create(triggerName)
}
+ val deployedControllers =
WhiskProperties.getControllerHosts.split(",").length
+
// One invoke should be allowed, the second one throttled.
// Due to the current implementation of the rate throttling,
// it is possible that the counter gets deleted, because the minute
switches.
retry({
- val results = (1 to 2).map { _ =>
+ val results = (1 to deployedControllers + 1).map { _ =>
wsk.action.invoke(actionName, expectedExitCode =
TestUtils.DONTCARE_EXIT)
}
results.map(_.exitCode) should contain(TestUtils.THROTTLED)
@@ -385,7 +390,7 @@ class NamespaceSpecificThrottleTests
// Due to the current implementation of the rate throttling,
// it is possible, that the counter gets deleted, because the minute
switches.
retry({
- val results = (1 to 2).map { _ =>
+ val results = (1 to deployedControllers + 1).map { _ =>
wsk.trigger.fire(triggerName, expectedExitCode =
TestUtils.DONTCARE_EXIT)
}
results.map(_.exitCode) should contain(TestUtils.THROTTLED)
diff --git
a/tests/src/test/scala/whisk/core/database/test/CacheConcurrencyTests.scala
b/tests/src/test/scala/whisk/core/database/test/CacheConcurrencyTests.scala
index 112152258d..5ae8167737 100644
--- a/tests/src/test/scala/whisk/core/database/test/CacheConcurrencyTests.scala
+++ b/tests/src/test/scala/whisk/core/database/test/CacheConcurrencyTests.scala
@@ -17,9 +17,8 @@
package whisk.core.database.test
-import akka.http.scaladsl.model.StatusCodes.NotFound
-
import scala.collection.parallel._
+import scala.concurrent.duration.DurationInt
import scala.concurrent.forkjoin.ForkJoinPool
import org.junit.runner.RunWith
@@ -27,13 +26,16 @@ import org.scalatest.BeforeAndAfter
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner
-import common.TestUtils
+import akka.http.scaladsl.model.StatusCodes.NotFound
import common.TestUtils._
-import common.rest.WskRest
+import common.TestUtils
+import common.WhiskProperties
import common.WskProps
import common.WskTestHelpers
+import common.rest.WskRest
import spray.json.JsString
import whisk.common.TransactionId
+import whisk.utils.retry
@RunWith(classOf[JUnitRunner])
class CacheConcurrencyTests extends FlatSpec with WskTestHelpers with
BeforeAndAfter {
@@ -98,9 +100,18 @@ class CacheConcurrencyTests extends FlatSpec with
WskTestHelpers with BeforeAndA
}
}
- run("get after delete") { name =>
- wsk.action.get(name, expectedExitCode = NotFound.intValue)
- }
+ // Give some time to replicate the state between the controllers
+ retry(
+ {
+ // Check that every controller has the correct state (used round
robin)
+ WhiskProperties.getControllerHosts.split(",").foreach { _ =>
+ run("get after delete") { name =>
+ wsk.action.get(name, expectedExitCode = NotFound.intValue)
+ }
+ }
+ },
+ 10,
+ Some(2.second))
run("recreate") { name =>
wsk.action.create(name, Some(actionFile))
@@ -123,14 +134,24 @@ class CacheConcurrencyTests extends FlatSpec with
WskTestHelpers with BeforeAndA
}
}
- run("get after update") { name =>
- wsk.action.get(name)
- } map {
- case (name, rr) =>
- withClue(s"get after update: failed check for $name") {
- rr.stdout should include("blue")
- rr.stdout should not include ("red")
+ // All controllers should have the correct action
+ // As they are used round robin, we ask every controller for the action.
+ // We add a retry to tollarate a short interval to bring the controllers
in sync.
+ retry(
+ {
+ WhiskProperties.getControllerHosts.split(",").foreach { _ =>
+ run("get after update") { name =>
+ wsk.action.get(name)
+ } map {
+ case (name, rr) =>
+ withClue(s"get after update: failed check for $name") {
+ rr.stdout should include("blue")
+ rr.stdout should not include ("red")
+ }
+ }
}
- }
+ },
+ 10,
+ Some(2.second))
}
}
diff --git
a/tests/src/test/scala/whisk/core/database/test/MultipleReadersSingleWriterCacheTests.scala
b/tests/src/test/scala/whisk/core/database/test/MultipleReadersSingleWriterCacheTests.scala
index 3b046a3a57..719672217d 100644
---
a/tests/src/test/scala/whisk/core/database/test/MultipleReadersSingleWriterCacheTests.scala
+++
b/tests/src/test/scala/whisk/core/database/test/MultipleReadersSingleWriterCacheTests.scala
@@ -19,8 +19,9 @@ package whisk.core.database.test
import java.util.concurrent.atomic.AtomicInteger
-import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Await
import scala.concurrent.Future
+import scala.concurrent.duration.DurationInt
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
@@ -59,17 +60,17 @@ class MultipleReadersSingleWriterCacheTests
}
// Create an cache entry
- cacheUpdate("doc", key, Future.successful("db save successful"))
+ Await.ready(cacheUpdate("doc", key, Future.successful("db save
successful")), 10.seconds)
ctr.get shouldBe 1
// Callback should be called if entry exists
- cacheInvalidate(key, Future.successful(()))
+ Await.ready(cacheInvalidate(key, Future.successful(())), 10.seconds)
ctr.get shouldBe 2
- cacheUpdate("docdoc", key, Future.successful("update in db successful"))
+ Await.ready(cacheUpdate("docdoc", key, Future.successful("update in db
successful")), 10.seconds)
ctr.get shouldBe 3
// Callback should be called if entry does not exist
- cacheInvalidate(CacheKey("abc"), Future.successful(()))
+ Await.ready(cacheInvalidate(CacheKey("abc"), Future.successful(())),
10.seconds)
ctr.get shouldBe 4
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services