This is an automated email from the ASF dual-hosted git repository.

bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 077fb6d24 Add scheduler overprovision for new actions before namespace 
throttling (#5284)
077fb6d24 is described below

commit 077fb6d24f0132e7755ea47d7ee9b35f0966daf3
Author: Brendan Doyle <[email protected]>
AuthorDate: Tue Nov 1 11:37:30 2022 -0700

    Add scheduler overprovision for new actions before namespace throttling 
(#5284)
    
    * initial attempt
    
    * tests
    
    * fix tests
    
    * enable throttling when last capacity used in overprovisioning
    
    * add case to correctly disable namespace throttling when namespace 
overprovisioning has space
    
    * feedback
    
    Co-authored-by: Brendan Doyle <[email protected]>
---
 core/scheduler/src/main/resources/application.conf |   2 +
 .../openwhisk/core/scheduler/Scheduler.scala       |   6 +-
 .../scheduler/queue/SchedulingDecisionMaker.scala  |  19 ++-
 .../queue/test/MemoryQueueTestsFixture.scala       |   2 +-
 .../queue/test/SchedulingDecisionMakerTests.scala  | 144 ++++++++++++++++++++-
 5 files changed, 164 insertions(+), 9 deletions(-)

diff --git a/core/scheduler/src/main/resources/application.conf 
b/core/scheduler/src/main/resources/application.conf
index 1f696fc43..f04d2888f 100644
--- a/core/scheduler/src/main/resources/application.conf
+++ b/core/scheduler/src/main/resources/application.conf
@@ -79,6 +79,8 @@ whisk {
       stale-threshold = "100 milliseconds"
       check-interval = "100 milliseconds"
       drop-interval = "10 seconds"
+      allow-over-provision-before-throttle = false
+      namespace-over-provision-before-throttle-ratio = 1.5
     }
     queue {
       idle-grace = "20 seconds"
diff --git 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
index e3ed70d8a..f260feb4f 100644
--- 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
+++ 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
@@ -421,4 +421,8 @@ object SchedulerStates extends DefaultJsonProtocol {
   def parse(states: String) = Try(serdes.read(states.parseJson))
 }
 
-case class SchedulingConfig(staleThreshold: FiniteDuration, checkInterval: 
FiniteDuration, dropInterval: FiniteDuration)
+case class SchedulingConfig(staleThreshold: FiniteDuration,
+                            checkInterval: FiniteDuration,
+                            dropInterval: FiniteDuration,
+                            allowOverProvisionBeforeThrottle: Boolean,
+                            namespaceOverProvisionBeforeThrottleRatio: Double)
diff --git 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
index d6ae8f63e..19c458fc9 100644
--- 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
+++ 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
@@ -70,7 +70,17 @@ class SchedulingDecisionMaker(
         case _        => Future.successful(DecisionResults(Pausing, 0))
       }
     } else {
-      val capacity = limit - existingContainerCountInNs - 
inProgressContainerCountInNs
+      val capacity = if (schedulingConfig.allowOverProvisionBeforeThrottle && 
totalContainers == 0) {
+        // if space available within the over provision ratio amount above 
namespace limit, create one container for new
+        // action so namespace traffic can attempt to re-balance without 
blocking entire action
+        if ((ceiling(limit * 
schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) - 
existingContainerCountInNs - inProgressContainerCountInNs) > 0) {
+          1
+        } else {
+          0
+        }
+      } else {
+        limit - existingContainerCountInNs - inProgressContainerCountInNs
+      }
       if (capacity <= 0) {
         stateName match {
 
@@ -79,12 +89,15 @@ class SchedulingDecisionMaker(
            *
            * However, if the container exists(totalContainers != 0), the 
activation is not treated as a failure and the activation is delivered to the 
container.
            */
-          case Running =>
+          case Running
+              if !schedulingConfig.allowOverProvisionBeforeThrottle || 
(schedulingConfig.allowOverProvisionBeforeThrottle && ceiling(
+                limit * 
schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) - 
existingContainerCountInNs - inProgressContainerCountInNs <= 0) =>
             logging.info(
               this,
               s"there is no capacity activations will be dropped or throttled, 
(availableMsg: $availableMsg totalContainers: $totalContainers, limit: $limit, 
namespaceContainers: ${existingContainerCountInNs}, 
namespaceInProgressContainer: ${inProgressContainerCountInNs}) 
[$invocationNamespace:$action]")
             
Future.successful(DecisionResults(EnableNamespaceThrottling(dropMsg = 
totalContainers == 0), 0))
-
+          case NamespaceThrottled if 
schedulingConfig.allowOverProvisionBeforeThrottle && ceiling(limit * 
schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) - 
existingContainerCountInNs - inProgressContainerCountInNs > 0 =>
+            Future.successful(DecisionResults(DisableNamespaceThrottling, 0))
           // do nothing
           case _ =>
             // no need to print any messages if the state is already 
NamespaceThrottled
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
index 090ce3d6e..d793c5848 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
@@ -86,7 +86,7 @@ class MemoryQueueTestsFixture
   val testNamespace = "test-namespace"
   val testAction = "test-action"
 
-  val schedulingConfig = SchedulingConfig(100.milliseconds, 100.milliseconds, 
10.seconds)
+  val schedulingConfig = SchedulingConfig(100.milliseconds, 100.milliseconds, 
10.seconds, false, 1.5)
 
   val fqn = FullyQualifiedEntityName(EntityPath(testNamespace), 
EntityName(testAction), Some(SemVer(0, 0, 1)))
   val revision = DocRevision("1-testRev")
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
index ce1c86682..b70b71554 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
@@ -47,7 +47,7 @@ class SchedulingDecisionMakerTests
   val testAction = "test-action"
   val action = FullyQualifiedEntityName(EntityPath(testNamespace), 
EntityName(testAction), Some(SemVer(0, 0, 1)))
 
-  val schedulingConfig = SchedulingConfig(100.milliseconds, 100.milliseconds, 
10.seconds)
+  val schedulingConfig = SchedulingConfig(100.milliseconds, 100.milliseconds, 
10.seconds, false, 1.5)
 
   it should "decide pausing when the limit is less than equal to 0" in {
     val decisionMaker = 
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, 
schedulingConfig))
@@ -149,7 +149,7 @@ class SchedulingDecisionMakerTests
     }
   }
 
-  it should "enable namespace throttling with dropping msg when there is not 
enough capacity and no container" in {
+  it should "enable namespace throttling with dropping msg when there is not 
enough capacity, no container, and namespace over-provision disabled" in {
     val decisionMaker = 
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, 
schedulingConfig))
     val testProbe = TestProbe()
 
@@ -173,7 +173,7 @@ class SchedulingDecisionMakerTests
     testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg = 
true), 0))
   }
 
-  it should "enable namespace throttling without dropping msg when there is 
not enough capacity but are some containers" in {
+  it should "enable namespace throttling without dropping msg when there is 
not enough capacity but are some containers and namespace over-provision 
disabled" in {
     val decisionMaker = 
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, 
schedulingConfig))
     val testProbe = TestProbe()
 
@@ -197,7 +197,142 @@ class SchedulingDecisionMakerTests
     testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg = 
false), 0))
   }
 
-  it should "add an initial container if there is no any" in {
+  it should "add one container when there is no container, and namespace 
over-provision has capacity" in {
+    val schedulingConfigNamespaceOverProvisioning =
+      SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true, 
1.5)
+    val decisionMaker =
+      system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, 
schedulingConfigNamespaceOverProvisioning))
+    val testProbe = TestProbe()
+
+    val msg = QueueSnapshot(
+      initialized = false,
+      incomingMsgCount = new AtomicInteger(0),
+      currentMsgCount = 0,
+      existingContainerCount = 0, // there is no container for this action
+      inProgressContainerCount = 0,
+      staleActivationNum = 0,
+      existingContainerCountInNamespace = 1, // but there are already 2 
containers in this namespace
+      inProgressContainerCountInNamespace = 1,
+      averageDuration = None,
+      limit = 2,
+      stateName = Running,
+      recipient = testProbe.ref)
+
+    decisionMaker ! msg
+
+    // this queue cannot create an initial container so enable throttling and 
drop messages.
+    testProbe.expectMsg(DecisionResults(AddInitialContainer, 1))
+  }
+
+  it should "enable namespace throttling with dropping msg when there is no 
container, and namespace over-provision has no capacity" in {
+    val schedulingConfigNamespaceOverProvisioning =
+      SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true, 
1.0)
+    val decisionMaker =
+      system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, 
schedulingConfigNamespaceOverProvisioning))
+    val testProbe = TestProbe()
+
+    val msg = QueueSnapshot(
+      initialized = true,
+      incomingMsgCount = new AtomicInteger(0),
+      currentMsgCount = 0,
+      existingContainerCount = 0, // there is no container for this action
+      inProgressContainerCount = 0,
+      staleActivationNum = 0,
+      existingContainerCountInNamespace = 1, // but there are already 2 
containers in this namespace
+      inProgressContainerCountInNamespace = 1,
+      averageDuration = None,
+      limit = 2,
+      stateName = Running,
+      recipient = testProbe.ref)
+
+    decisionMaker ! msg
+
+    // this queue cannot create an initial container so enable throttling and 
drop messages.
+    testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg = 
true), 0))
+  }
+
+  it should "disable namespace throttling when namespace over-provision has 
capacity again" in {
+    val schedulingConfigNamespaceOverProvisioning =
+      SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true, 
1.1)
+    val decisionMaker =
+      system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, 
schedulingConfigNamespaceOverProvisioning))
+    val testProbe = TestProbe()
+
+    val msg = QueueSnapshot(
+      initialized = true,
+      incomingMsgCount = new AtomicInteger(0),
+      currentMsgCount = 0,
+      existingContainerCount = 1, // there is one container for this action
+      inProgressContainerCount = 0,
+      staleActivationNum = 0,
+      existingContainerCountInNamespace = 1, // but there are already 2 
containers in this namespace
+      inProgressContainerCountInNamespace = 1,
+      averageDuration = None,
+      limit = 2,
+      stateName = NamespaceThrottled,
+      recipient = testProbe.ref)
+
+    decisionMaker ! msg
+
+    // this queue cannot create an initial container so enable throttling and 
drop messages.
+    testProbe.expectMsg(DecisionResults(DisableNamespaceThrottling, 0))
+  }
+
+  it should "enable namespace throttling without dropping msg when there is a 
container, and namespace over-provision has no additional capacity" in {
+    val schedulingConfigNamespaceOverProvisioning =
+      SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true, 
1.0)
+    val decisionMaker =
+      system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, 
schedulingConfigNamespaceOverProvisioning))
+    val testProbe = TestProbe()
+
+    val msg = QueueSnapshot(
+      initialized = true,
+      incomingMsgCount = new AtomicInteger(0),
+      currentMsgCount = 0,
+      existingContainerCount = 1,
+      inProgressContainerCount = 0,
+      staleActivationNum = 0,
+      existingContainerCountInNamespace = 1, // but there are already 2 
containers in this namespace
+      inProgressContainerCountInNamespace = 1,
+      averageDuration = None,
+      limit = 2,
+      stateName = Running,
+      recipient = testProbe.ref)
+
+    decisionMaker ! msg
+
+    // this queue cannot create an additional container so enable throttling 
and drop messages.
+    testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg = 
false), 0))
+  }
+
+  it should "not enable namespace throttling when there is not enough capacity 
but are some containers and namespace over-provision is enabled with capacity" 
in {
+    val schedulingConfigNamespaceOverProvisioning =
+      SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true, 
1.5)
+    val decisionMaker =
+      system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, 
schedulingConfigNamespaceOverProvisioning))
+    val testProbe = TestProbe()
+
+    val msg = QueueSnapshot(
+      initialized = true,
+      incomingMsgCount = new AtomicInteger(0),
+      currentMsgCount = 0,
+      existingContainerCount = 1, // there are some containers for this action
+      inProgressContainerCount = 1,
+      staleActivationNum = 0,
+      existingContainerCountInNamespace = 2, // but there are already 2 
containers in this namespace
+      inProgressContainerCountInNamespace = 2, // this value includes the 
count of this action as well.
+      averageDuration = None,
+      limit = 4,
+      stateName = Running,
+      recipient = testProbe.ref)
+
+    decisionMaker ! msg
+
+    // this queue cannot create more containers
+    testProbe.expectNoMessage()
+  }
+
+  it should "add an initial container if there is not any" in {
     val decisionMaker = 
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, 
schedulingConfig))
     val testProbe = TestProbe()
 
@@ -219,6 +354,7 @@ class SchedulingDecisionMakerTests
 
     testProbe.expectMsg(DecisionResults(AddInitialContainer, 1))
   }
+
   it should "disable the namespace throttling with adding an initial container 
when there is no container" in {
     val decisionMaker = 
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, 
schedulingConfig))
     val testProbe = TestProbe()

Reply via email to