markusthoemmes closed pull request #2888: Adapt trigger throttle for multiple 
controllers
URL: https://github.com/apache/incubator-openwhisk/pull/2888
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 79642fed01..c6ad149c4c 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -117,6 +117,8 @@ controller:
       bindPort: 2551
       # at this moment all controllers are seed nodes
       seedNodes: "{{ groups['controllers'] | map('extract', hostvars, 
'ansible_host') | list }}"
+  # We recommend to enable HA for the controllers only, if bookkeeping data 
are shared too. (localBookkeeping: false)
+  ha: "{{ controller_enable_ha | default(false) }}"
 
 registry:
   confdir: "{{ config_root_dir }}/registry"
diff --git a/ansible/roles/controller/tasks/deploy.yml 
b/ansible/roles/controller/tasks/deploy.yml
index c669395dcb..bba75d72f3 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -82,12 +82,11 @@
       "AKKA_CLUSTER_SEED_NODES": "{{seed_nodes_list | join(' ') }}"
       "AKKA_CLUSTER_BIND_PORT": "{{ controller.akka.cluster.bindPort }}"
       "AKKA_ACTOR_PROVIDER": "{{ controller.akka.provider }}"
-
       "METRICS_KAMON": "{{ metrics.kamon.enabled }}"
       "METRICS_LOG": "{{ metrics.log.enabled }}"
       "METRICS_KAMON_HOST": "{{ metrics.kamon.host }}"
       "METRICS_KAMON_PORT": "{{ metrics.kamon.port }}"
-
+      "CONTROLLER_HA": "{{ controller.ha }}"
     volumes:
       - "{{ whisk_logs_dir }}/controller{{ 
groups['controllers'].index(inventory_hostname) }}:/logs"
     ports:
diff --git a/ansible/roles/nginx/templates/nginx.conf.j2 
b/ansible/roles/nginx/templates/nginx.conf.j2
index 6a0b4dc875..9efae0088b 100644
--- a/ansible/roles/nginx/templates/nginx.conf.j2
+++ b/ansible/roles/nginx/templates/nginx.conf.j2
@@ -30,7 +30,7 @@ http {
         server {{ hostvars[groups['controllers'] | first].ansible_host }}:{{ 
controller.basePort }} fail_timeout=60s;
 {% for ip in groups['controllers'] %}
 {% if groups['controllers'].index(ip) > 0 %}
-        server {{ hostvars[ip].ansible_host }}:{{ controller.basePort + 
groups['controllers'].index(ip) }} backup;
+        server {{ hostvars[ip].ansible_host }}:{{ controller.basePort + 
groups['controllers'].index(ip) }} {% if controller.ha %}fail_timeout=60s{% 
else %}backup{% endif %};
 {% endif %}
 {% endfor %}
         keepalive 512;
diff --git a/ansible/templates/whisk.properties.j2 
b/ansible/templates/whisk.properties.j2
index 2c1d644cfb..ecef131f90 100644
--- a/ansible/templates/whisk.properties.j2
+++ b/ansible/templates/whisk.properties.j2
@@ -58,6 +58,7 @@ invoker.hosts.baseport={{ invoker.port }}
 controller.hosts={{ groups["controllers"] | map('extract', hostvars, 
'ansible_host') | list | join(",") }}
 controller.host.basePort={{ controller.basePort }}
 controller.instances={{ controller.instances }}
+controller.ha={{ controller.ha }}
 
 invoker.container.network=bridge
 invoker.container.policy={{ invoker_container_policy_name | default()}}
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala 
b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 726d1bcabb..bc63dc2bea 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -105,6 +105,7 @@ class WhiskConfig(requiredProperties: Map[String, String],
   val actionSequenceLimit = this(WhiskConfig.actionSequenceMaxLimit)
   val controllerSeedNodes = this(WhiskConfig.controllerSeedNodes)
   val controllerLocalBookkeeping = 
getAsBoolean(WhiskConfig.controllerLocalBookkeeping, false)
+  val controllerHighAvailability = 
getAsBoolean(WhiskConfig.controllerHighAvailability, false)
 }
 
 object WhiskConfig {
@@ -240,4 +241,5 @@ object WhiskConfig {
   val triggerFirePerMinuteLimit = "limits.triggers.fires.perMinute"
   val controllerSeedNodes = "akka.cluster.seed.nodes"
   val controllerLocalBookkeeping = "controller.localBookkeeping"
+  val controllerHighAvailability = "controller.ha"
 }
diff --git 
a/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala
 
b/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala
index ef51097274..6fa76d22c5 100644
--- 
a/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala
+++ 
b/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala
@@ -161,8 +161,6 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
     if (cacheEnabled) {
       logger.info(this, s"invalidating $key on delete")
 
-      notifier.foreach(_(key))
-
       // try inserting our desired entry...
       val desiredEntry = Entry(transid, InvalidateInProgress, None)
       cache(key)(desiredEntry) flatMap { actualEntry =>
@@ -206,6 +204,8 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
             // a pre-existing owner will take care of the invalidation
             invalidator
         }
+      } andThen {
+        case _ => notifier.foreach(_(key))
       }
     } else invalidator // not caching
   }
@@ -265,8 +265,6 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
     notifier: Option[CacheChangeNotification]): Future[Winfo] = {
     if (cacheEnabled) {
 
-      notifier.foreach(_(key))
-
       // try inserting our desired entry...
       val desiredEntry = Entry(transid, WriteInProgress, 
Some(Future.successful(doc)))
       cache(key)(desiredEntry) flatMap { actualEntry =>
@@ -292,6 +290,8 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
             invalidateEntryAfter(generator, key, actualEntry)
           }
         }
+      } andThen {
+        case _ => notifier.foreach(_(key))
       }
     } else generator // not caching
   }
diff --git 
a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala 
b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
index 9afa83aa78..c0682815e2 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
@@ -65,7 +65,9 @@ protected[core] object EntitlementProvider {
     WhiskConfig.actionInvokePerMinuteLimit -> null,
     WhiskConfig.actionInvokeConcurrentLimit -> null,
     WhiskConfig.triggerFirePerMinuteLimit -> null,
-    WhiskConfig.actionInvokeSystemOverloadLimit -> null)
+    WhiskConfig.actionInvokeSystemOverloadLimit -> null,
+    WhiskConfig.controllerInstances -> null,
+    WhiskConfig.controllerHighAvailability -> null)
 }
 
 /**
@@ -78,10 +80,36 @@ protected[core] abstract class EntitlementProvider(config: 
WhiskConfig, loadBala
 
   private implicit val executionContext = actorSystem.dispatcher
 
+  /**
+   * The number of controllers if HA is enabled, 1 otherwise
+   */
+  private val diviser = if (config.controllerHighAvailability) 
config.controllerInstances.toInt else 1
+
+  /**
+   * Allows 20% of additional requests on top of the limit to mitigate 
possible unfair round-robin loadbalancing between
+   * controllers
+   */
+  private val overcommit = if (config.controllerHighAvailability) 1.2 else 1
+
+  /**
+   * Adjust the throttles for a single controller with the diviser and the 
overcommit.
+   *
+   * @param originalThrottle The throttle that needs to be adjusted for this 
controller.
+   */
+  private def dilateThrottle(originalThrottle: Int): Int = {
+    Math.ceil((originalThrottle.toDouble / diviser.toDouble) * 
overcommit).toInt
+  }
+
   private val invokeRateThrottler =
-    new RateThrottler("actions per minute", 
config.actionInvokePerMinuteLimit.toInt, _.limits.invocationsPerMinute)
+    new RateThrottler(
+      "actions per minute",
+      dilateThrottle(config.actionInvokePerMinuteLimit.toInt),
+      _.limits.invocationsPerMinute.map(dilateThrottle))
   private val triggerRateThrottler =
-    new RateThrottler("triggers per minute", 
config.triggerFirePerMinuteLimit.toInt, _.limits.firesPerMinute)
+    new RateThrottler(
+      "triggers per minute",
+      dilateThrottle(config.triggerFirePerMinuteLimit.toInt),
+      _.limits.firesPerMinute.map(dilateThrottle))
   private val concurrentInvokeThrottler = new ActivationThrottler(
     loadBalancer,
     config.actionInvokeConcurrentLimit.toInt,
diff --git a/tests/src/test/scala/ha/ShootComponentsTests.scala 
b/tests/src/test/scala/ha/ShootComponentsTests.scala
index fa1aba121e..5fd1525936 100644
--- a/tests/src/test/scala/ha/ShootComponentsTests.scala
+++ b/tests/src/test/scala/ha/ShootComponentsTests.scala
@@ -53,6 +53,13 @@ class ShootComponentsTests extends FlatSpec with Matchers 
with WskTestHelpers wi
   implicit val materializer = ActorMaterializer()
   implicit val testConfig = PatienceConfig(1.minute)
 
+  // Throttle requests to the remaining controllers to avoid getting 429s. (60 
req/min)
+  val amountOfControllers = 
WhiskProperties.getProperty(WhiskConfig.controllerInstances).toInt
+  val limit = 
WhiskProperties.getProperty(WhiskConfig.actionInvokeConcurrentLimit).toDouble
+  val limitPerController = limit / amountOfControllers
+  val allowedRequestsPerMinute = (amountOfControllers - 1.0) * 
limitPerController
+  val timeBeweenRequests = 60.seconds / allowedRequestsPerMinute
+
   val controller0DockerHost = WhiskProperties.getBaseControllerHost() + ":" + 
WhiskProperties.getProperty(
     WhiskConfig.dockerPort)
 
@@ -94,9 +101,8 @@ class ShootComponentsTests extends FlatSpec with Matchers 
with WskTestHelpers wi
 
       println(s"Done rerquests with responses: invoke: 
${invokeExit.futureValue} and get: ${getExit.futureValue}")
 
-      // Do at most one action invocation per second to avoid getting 429s. 
(60 req/min - limit)
-      val wait = 1000 - (Instant.now.toEpochMilli - start.toEpochMilli)
-      Thread.sleep(if (wait < 0) 0L else if (wait > 1000) 1000L else wait)
+      val remainingWait = timeBeweenRequests.toMillis - 
(Instant.now.toEpochMilli - start.toEpochMilli)
+      Thread.sleep(if (remainingWait < 0) 0L else remainingWait)
       (invokeExit.futureValue, getExit.futureValue)
     }
   }
@@ -104,15 +110,15 @@ class ShootComponentsTests extends FlatSpec with Matchers 
with WskTestHelpers wi
   behavior of "Controllers hot standby"
 
   it should "use controller1 if controller0 goes down" in 
withAssetCleaner(wskprops) { (wp, assetHelper) =>
-    if (WhiskProperties.getProperty(WhiskConfig.controllerInstances).toInt >= 
2) {
+    if (amountOfControllers >= 2) {
       val actionName = "shootcontroller"
 
       assetHelper.withCleaner(wsk.action, actionName) { (action, _) =>
         action.create(actionName, defaultAction)
       }
 
-      // Produce some load on the system for 100 seconds (each second one 
request). Kill the controller after 4 requests
-      val totalRequests = 100
+      // Produce some load on the system for 100 seconds. Kill the controller 
after 4 requests
+      val totalRequests = (100.seconds / timeBeweenRequests).toInt
 
       val requestsBeforeRestart = doRequests(4, actionName)
 
@@ -130,7 +136,7 @@ class ShootComponentsTests extends FlatSpec with Matchers 
with WskTestHelpers wi
       val requests = requestsBeforeRestart ++ requestsAfterRestart
 
       val unsuccessfulInvokes = requests.map(_._1).count(_ != 
TestUtils.SUCCESS_EXIT)
-      // Allow 3 failures for the 90 seconds
+      // Allow 3 failures for the 100 seconds
       unsuccessfulInvokes should be <= 3
 
       val unsuccessfulGets = requests.map(_._2).count(_ != 
TestUtils.SUCCESS_EXIT)
diff --git a/tests/src/test/scala/limits/ThrottleTests.scala 
b/tests/src/test/scala/limits/ThrottleTests.scala
index 511a1b3412..7c59aec150 100644
--- a/tests/src/test/scala/limits/ThrottleTests.scala
+++ b/tests/src/test/scala/limits/ThrottleTests.scala
@@ -27,11 +27,13 @@ import scala.concurrent.Promise
 import scala.concurrent.duration._
 
 import org.junit.runner.RunWith
+import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FlatSpec
 import org.scalatest.Matchers
 import org.scalatest.concurrent.ScalaFutures
 import org.scalatest.junit.JUnitRunner
 
+import common.RunWskAdminCmd
 import common.TestHelpers
 import common.TestUtils
 import common.TestUtils._
@@ -42,10 +44,9 @@ import common.WskProps
 import common.WskTestHelpers
 import spray.json._
 import spray.json.DefaultJsonProtocol._
+import whisk.core.WhiskConfig
 import whisk.http.Messages._
 import whisk.utils.ExecutionContextFactory
-import org.scalatest.BeforeAndAfterAll
-import common.RunWskAdminCmd
 import whisk.utils.retry
 
 protected[limits] trait LocalHelper {
@@ -72,8 +73,10 @@ class ThrottleTests
 
   val throttleWindow = 1.minute
 
-  val maximumInvokesPerMinute = getLimit("limits.actions.invokes.perMinute")
-  val maximumFiringsPerMinute = getLimit("limits.triggers.fires.perMinute")
+  // Due to the overhead of the per minute limit in the controller, we add 
this overhead here as well.
+  val overhead = if 
(WhiskProperties.getProperty(WhiskConfig.controllerHighAvailability).toBoolean) 
1.2 else 1.0
+  val maximumInvokesPerMinute = 
math.ceil(getLimit("limits.actions.invokes.perMinute") * overhead).toInt
+  val maximumFiringsPerMinute = 
math.ceil(getLimit("limits.triggers.fires.perMinute") * overhead).toInt
   val maximumConcurrentInvokes = getLimit("limits.actions.invokes.concurrent")
 
   println(s"maximumInvokesPerMinute  = $maximumInvokesPerMinute")
@@ -368,11 +371,13 @@ class NamespaceSpecificThrottleTests
       trigger.create(triggerName)
     }
 
+    val deployedControllers = 
WhiskProperties.getControllerHosts.split(",").length
+
     // One invoke should be allowed, the second one throttled.
     // Due to the current implementation of the rate throttling,
     // it is possible that the counter gets deleted, because the minute 
switches.
     retry({
-      val results = (1 to 2).map { _ =>
+      val results = (1 to deployedControllers + 1).map { _ =>
         wsk.action.invoke(actionName, expectedExitCode = 
TestUtils.DONTCARE_EXIT)
       }
       results.map(_.exitCode) should contain(TestUtils.THROTTLED)
@@ -385,7 +390,7 @@ class NamespaceSpecificThrottleTests
     // Due to the current implementation of the rate throttling,
     // it is possible, that the counter gets deleted, because the minute 
switches.
     retry({
-      val results = (1 to 2).map { _ =>
+      val results = (1 to deployedControllers + 1).map { _ =>
         wsk.trigger.fire(triggerName, expectedExitCode = 
TestUtils.DONTCARE_EXIT)
       }
       results.map(_.exitCode) should contain(TestUtils.THROTTLED)
diff --git 
a/tests/src/test/scala/whisk/core/database/test/CacheConcurrencyTests.scala 
b/tests/src/test/scala/whisk/core/database/test/CacheConcurrencyTests.scala
index 112152258d..5ae8167737 100644
--- a/tests/src/test/scala/whisk/core/database/test/CacheConcurrencyTests.scala
+++ b/tests/src/test/scala/whisk/core/database/test/CacheConcurrencyTests.scala
@@ -17,9 +17,8 @@
 
 package whisk.core.database.test
 
-import akka.http.scaladsl.model.StatusCodes.NotFound
-
 import scala.collection.parallel._
+import scala.concurrent.duration.DurationInt
 import scala.concurrent.forkjoin.ForkJoinPool
 
 import org.junit.runner.RunWith
@@ -27,13 +26,16 @@ import org.scalatest.BeforeAndAfter
 import org.scalatest.FlatSpec
 import org.scalatest.junit.JUnitRunner
 
-import common.TestUtils
+import akka.http.scaladsl.model.StatusCodes.NotFound
 import common.TestUtils._
-import common.rest.WskRest
+import common.TestUtils
+import common.WhiskProperties
 import common.WskProps
 import common.WskTestHelpers
+import common.rest.WskRest
 import spray.json.JsString
 import whisk.common.TransactionId
+import whisk.utils.retry
 
 @RunWith(classOf[JUnitRunner])
 class CacheConcurrencyTests extends FlatSpec with WskTestHelpers with 
BeforeAndAfter {
@@ -98,9 +100,18 @@ class CacheConcurrencyTests extends FlatSpec with 
WskTestHelpers with BeforeAndA
         }
       }
 
-      run("get after delete") { name =>
-        wsk.action.get(name, expectedExitCode = NotFound.intValue)
-      }
+      // Give some time to replicate the state between the controllers
+      retry(
+        {
+          // Check that every controller has the correct state (used round 
robin)
+          WhiskProperties.getControllerHosts.split(",").foreach { _ =>
+            run("get after delete") { name =>
+              wsk.action.get(name, expectedExitCode = NotFound.intValue)
+            }
+          }
+        },
+        10,
+        Some(2.second))
 
       run("recreate") { name =>
         wsk.action.create(name, Some(actionFile))
@@ -123,14 +134,24 @@ class CacheConcurrencyTests extends FlatSpec with 
WskTestHelpers with BeforeAndA
         }
       }
 
-      run("get after update") { name =>
-        wsk.action.get(name)
-      } map {
-        case (name, rr) =>
-          withClue(s"get after update: failed check for $name") {
-            rr.stdout should include("blue")
-            rr.stdout should not include ("red")
+      // All controllers should have the correct action
+      // As they are used round robin, we ask every controller for the action.
+      // We add a retry to tollarate a short interval to bring the controllers 
in sync.
+      retry(
+        {
+          WhiskProperties.getControllerHosts.split(",").foreach { _ =>
+            run("get after update") { name =>
+              wsk.action.get(name)
+            } map {
+              case (name, rr) =>
+                withClue(s"get after update: failed check for $name") {
+                  rr.stdout should include("blue")
+                  rr.stdout should not include ("red")
+                }
+            }
           }
-      }
+        },
+        10,
+        Some(2.second))
     }
 }
diff --git 
a/tests/src/test/scala/whisk/core/database/test/MultipleReadersSingleWriterCacheTests.scala
 
b/tests/src/test/scala/whisk/core/database/test/MultipleReadersSingleWriterCacheTests.scala
index 3b046a3a57..719672217d 100644
--- 
a/tests/src/test/scala/whisk/core/database/test/MultipleReadersSingleWriterCacheTests.scala
+++ 
b/tests/src/test/scala/whisk/core/database/test/MultipleReadersSingleWriterCacheTests.scala
@@ -19,8 +19,9 @@ package whisk.core.database.test
 
 import java.util.concurrent.atomic.AtomicInteger
 
-import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Await
 import scala.concurrent.Future
+import scala.concurrent.duration.DurationInt
 
 import org.junit.runner.RunWith
 import org.scalatest.FlatSpec
@@ -59,17 +60,17 @@ class MultipleReadersSingleWriterCacheTests
     }
 
     // Create an cache entry
-    cacheUpdate("doc", key, Future.successful("db save successful"))
+    Await.ready(cacheUpdate("doc", key, Future.successful("db save 
successful")), 10.seconds)
     ctr.get shouldBe 1
 
     // Callback should be called if entry exists
-    cacheInvalidate(key, Future.successful(()))
+    Await.ready(cacheInvalidate(key, Future.successful(())), 10.seconds)
     ctr.get shouldBe 2
-    cacheUpdate("docdoc", key, Future.successful("update in db successful"))
+    Await.ready(cacheUpdate("docdoc", key, Future.successful("update in db 
successful")), 10.seconds)
     ctr.get shouldBe 3
 
     // Callback should be called if entry does not exist
-    cacheInvalidate(CacheKey("abc"), Future.successful(()))
+    Await.ready(cacheInvalidate(CacheKey("abc"), Future.successful(())), 
10.seconds)
     ctr.get shouldBe 4
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to