cbickel closed pull request #2665: Add ability to use all controllers round 
robin
URL: https://github.com/apache/incubator-openwhisk/pull/2665
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index d7dfae9c83..d90271edc5 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -117,6 +117,8 @@ controller:
       bindPort: 2551
       # at this moment all controllers are seed nodes
       seedNodes: "{{ groups['controllers'] | map('extract', hostvars, 
'ansible_host') | list }}"
+  # We recommend to enable HA for the controllers only, if bookkeeping data 
are shared too. (localBookkeeping: false)
+  ha: "{{ controller_enable_ha | default(false) }}"
 
 registry:
   confdir: "{{ config_root_dir }}/registry"
diff --git a/ansible/roles/nginx/templates/nginx.conf.j2 
b/ansible/roles/nginx/templates/nginx.conf.j2
index 6a0b4dc875..9efae0088b 100644
--- a/ansible/roles/nginx/templates/nginx.conf.j2
+++ b/ansible/roles/nginx/templates/nginx.conf.j2
@@ -30,7 +30,7 @@ http {
         server {{ hostvars[groups['controllers'] | first].ansible_host }}:{{ 
controller.basePort }} fail_timeout=60s;
 {% for ip in groups['controllers'] %}
 {% if groups['controllers'].index(ip) > 0 %}
-        server {{ hostvars[ip].ansible_host }}:{{ controller.basePort + 
groups['controllers'].index(ip) }} backup;
+        server {{ hostvars[ip].ansible_host }}:{{ controller.basePort + 
groups['controllers'].index(ip) }} {% if controller.ha %}fail_timeout=60s{% 
else %}backup{% endif %};
 {% endif %}
 {% endfor %}
         keepalive 512;
diff --git 
a/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala
 
b/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala
index ef51097274..6fa76d22c5 100644
--- 
a/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala
+++ 
b/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala
@@ -161,8 +161,6 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
     if (cacheEnabled) {
       logger.info(this, s"invalidating $key on delete")
 
-      notifier.foreach(_(key))
-
       // try inserting our desired entry...
       val desiredEntry = Entry(transid, InvalidateInProgress, None)
       cache(key)(desiredEntry) flatMap { actualEntry =>
@@ -206,6 +204,8 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
             // a pre-existing owner will take care of the invalidation
             invalidator
         }
+      } andThen {
+        case _ => notifier.foreach(_(key))
       }
     } else invalidator // not caching
   }
@@ -265,8 +265,6 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
     notifier: Option[CacheChangeNotification]): Future[Winfo] = {
     if (cacheEnabled) {
 
-      notifier.foreach(_(key))
-
       // try inserting our desired entry...
       val desiredEntry = Entry(transid, WriteInProgress, 
Some(Future.successful(doc)))
       cache(key)(desiredEntry) flatMap { actualEntry =>
@@ -292,6 +290,8 @@ trait MultipleReadersSingleWriterCache[W, Winfo] {
             invalidateEntryAfter(generator, key, actualEntry)
           }
         }
+      } andThen {
+        case _ => notifier.foreach(_(key))
       }
     } else generator // not caching
   }
diff --git a/tests/src/test/scala/limits/ThrottleTests.scala 
b/tests/src/test/scala/limits/ThrottleTests.scala
index 8da66e811d..947c501f74 100644
--- a/tests/src/test/scala/limits/ThrottleTests.scala
+++ b/tests/src/test/scala/limits/ThrottleTests.scala
@@ -70,8 +70,9 @@ class ThrottleTests
 
   val throttleWindow = 1.minute
 
-  val maximumInvokesPerMinute = getLimit("limits.actions.invokes.perMinute")
-  val maximumFiringsPerMinute = getLimit("limits.triggers.fires.perMinute")
+  // Due to the overhead of the per minute limit in the controller, we add 
this overhead here as well.
+  val maximumInvokesPerMinute = 
math.ceil(getLimit("limits.actions.invokes.perMinute") * 1.2).toInt
+  val maximumFiringsPerMinute = 
math.ceil(getLimit("limits.triggers.fires.perMinute") * 1.2).toInt
   val maximumConcurrentInvokes = getLimit("limits.actions.invokes.concurrent")
 
   println(s"maximumInvokesPerMinute  = $maximumInvokesPerMinute")
@@ -366,11 +367,13 @@ class NamespaceSpecificThrottleTests
       trigger.create(triggerName)
     }
 
+    val deployedControllers = 
WhiskProperties.getControllerHosts.split(",").length
+
     // One invoke should be allowed, the second one throttled.
     // Due to the current implementation of the rate throttling,
     // it is possible that the counter gets deleted, because the minute 
switches.
     retry({
-      val results = (1 to 2).map { _ =>
+      val results = (1 to deployedControllers + 1).map { _ =>
         wsk.action.invoke(actionName, expectedExitCode = 
TestUtils.DONTCARE_EXIT)
       }
       results.map(_.exitCode) should contain(TestUtils.THROTTLED)
@@ -383,7 +386,7 @@ class NamespaceSpecificThrottleTests
     // Due to the current implementation of the rate throttling,
     // it is possible, that the counter gets deleted, because the minute 
switches.
     retry({
-      val results = (1 to 2).map { _ =>
+      val results = (1 to deployedControllers + 1).map { _ =>
         wsk.trigger.fire(triggerName, expectedExitCode = 
TestUtils.DONTCARE_EXIT)
       }
       results.map(_.exitCode) should contain(TestUtils.THROTTLED)
diff --git 
a/tests/src/test/scala/whisk/core/database/test/CacheConcurrencyTests.scala 
b/tests/src/test/scala/whisk/core/database/test/CacheConcurrencyTests.scala
index 7bc56ba7d6..d0568c1165 100644
--- a/tests/src/test/scala/whisk/core/database/test/CacheConcurrencyTests.scala
+++ b/tests/src/test/scala/whisk/core/database/test/CacheConcurrencyTests.scala
@@ -18,6 +18,7 @@
 package whisk.core.database.test
 
 import scala.collection.parallel._
+import scala.concurrent.duration.DurationInt
 import scala.concurrent.forkjoin.ForkJoinPool
 
 import org.junit.runner.RunWith
@@ -25,13 +26,15 @@ import org.scalatest.BeforeAndAfter
 import org.scalatest.FlatSpec
 import org.scalatest.junit.JUnitRunner
 
-import common.TestUtils
 import common.TestUtils._
+import common.TestUtils
+import common.WhiskProperties
 import common.Wsk
 import common.WskProps
 import common.WskTestHelpers
 import spray.json.JsString
 import whisk.common.TransactionId
+import whisk.utils.retry
 
 @RunWith(classOf[JUnitRunner])
 class CacheConcurrencyTests extends FlatSpec with WskTestHelpers with 
BeforeAndAfter {
@@ -96,9 +99,18 @@ class CacheConcurrencyTests extends FlatSpec with 
WskTestHelpers with BeforeAndA
         }
       }
 
-      run("get after delete") { name =>
-        wsk.action.get(name, expectedExitCode = NOT_FOUND)
-      }
+      // Give some time to replicate the state between the controllers
+      retry(
+        {
+          // Check that every controller has the correct state (used round 
robin)
+          WhiskProperties.getControllerHosts.split(",").foreach { _ =>
+            run("get after delete") { name =>
+              wsk.action.get(name, expectedExitCode = NOT_FOUND)
+            }
+          }
+        },
+        10,
+        Some(2.second))
 
       run("recreate") { name =>
         wsk.action.create(name, Some(actionFile))
@@ -121,14 +133,24 @@ class CacheConcurrencyTests extends FlatSpec with 
WskTestHelpers with BeforeAndA
         }
       }
 
-      run("get after update") { name =>
-        wsk.action.get(name)
-      } map {
-        case (name, rr) =>
-          withClue(s"get after update: failed check for $name") {
-            rr.stdout should include("blue")
-            rr.stdout should not include ("red")
+      // All controllers should have the correct action
+      // As they are used round robin, we ask every controller for the action.
+      // We add a retry to tollarate a short interval to bring the controllers 
in sync.
+      retry(
+        {
+          WhiskProperties.getControllerHosts.split(",").foreach { _ =>
+            run("get after update") { name =>
+              wsk.action.get(name)
+            } map {
+              case (name, rr) =>
+                withClue(s"get after update: failed check for $name") {
+                  rr.stdout should include("blue")
+                  rr.stdout should not include ("red")
+                }
+            }
           }
-      }
+        },
+        10,
+        Some(2.second))
     }
 }
diff --git 
a/tests/src/test/scala/whisk/core/database/test/MultipleReadersSingleWriterCacheTests.scala
 
b/tests/src/test/scala/whisk/core/database/test/MultipleReadersSingleWriterCacheTests.scala
index 3b046a3a57..719672217d 100644
--- 
a/tests/src/test/scala/whisk/core/database/test/MultipleReadersSingleWriterCacheTests.scala
+++ 
b/tests/src/test/scala/whisk/core/database/test/MultipleReadersSingleWriterCacheTests.scala
@@ -19,8 +19,9 @@ package whisk.core.database.test
 
 import java.util.concurrent.atomic.AtomicInteger
 
-import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Await
 import scala.concurrent.Future
+import scala.concurrent.duration.DurationInt
 
 import org.junit.runner.RunWith
 import org.scalatest.FlatSpec
@@ -59,17 +60,17 @@ class MultipleReadersSingleWriterCacheTests
     }
 
     // Create an cache entry
-    cacheUpdate("doc", key, Future.successful("db save successful"))
+    Await.ready(cacheUpdate("doc", key, Future.successful("db save 
successful")), 10.seconds)
     ctr.get shouldBe 1
 
     // Callback should be called if entry exists
-    cacheInvalidate(key, Future.successful(()))
+    Await.ready(cacheInvalidate(key, Future.successful(())), 10.seconds)
     ctr.get shouldBe 2
-    cacheUpdate("docdoc", key, Future.successful("update in db successful"))
+    Await.ready(cacheUpdate("docdoc", key, Future.successful("update in db 
successful")), 10.seconds)
     ctr.get shouldBe 3
 
     // Callback should be called if entry does not exist
-    cacheInvalidate(CacheKey("abc"), Future.successful(()))
+    Await.ready(cacheInvalidate(CacheKey("abc"), Future.successful(())), 
10.seconds)
     ctr.get shouldBe 4
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to