This is an automated email from the ASF dual-hosted git repository.

bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 415ae98fd make scheduler consider action concurrency >1 (#5378)
415ae98fd is described below

commit 415ae98fd9f1fd44f5ab2dccb8b5cbe2d20932bb
Author: Brendan Doyle <[email protected]>
AuthorDate: Thu Feb 9 17:14:07 2023 -0800

    make scheduler consider action concurrency >1 (#5378)
    
    Co-authored-by: Brendan Doyle <[email protected]>
---
 .../core/scheduler/queue/MemoryQueue.scala         |   2 +
 .../scheduler/queue/SchedulingDecisionMaker.scala  |   9 +-
 .../queue/test/MemoryQueueFlowTests.scala          |   2 +-
 .../scheduler/queue/test/MemoryQueueTests.scala    |   2 +-
 .../queue/test/SchedulingDecisionMakerTests.scala  | 145 ++++++++++++++++++++-
 5 files changed, 153 insertions(+), 7 deletions(-)

diff --git 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
index a5b80ffd1..051e28976 100644
--- 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
+++ 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
@@ -943,6 +943,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
             namespaceContainerCount.inProgressContainerNumByNamespace,
             averageDuration,
             limit,
+            actionMetaData.limits.concurrency.maxConcurrent,
             stateName,
             self)
         case Failure(_: NoDocumentException) =>
@@ -1222,6 +1223,7 @@ case class QueueSnapshot(initialized: Boolean,
                          inProgressContainerCountInNamespace: Int,
                          averageDuration: Option[Double],
                          limit: Int,
+                         maxActionConcurrency: Int,
                          stateName: MemoryQueueState,
                          recipient: ActorRef)
 
diff --git 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
index d5dca8bb7..ab0de2f6a 100644
--- 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
+++ 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
@@ -58,6 +58,7 @@ class SchedulingDecisionMaker(
       inProgressContainerCountInNs,
       averageDuration,
       limit,
+      maxActionConcurrency,
       stateName,
       _) = snapshot
     val totalContainers = existing + inProgress
@@ -137,7 +138,7 @@ class SchedulingDecisionMaker(
           // but it is a kind of trade-off and we place latency on top of 
over-provisioning
           case (Running, None) if staleActivationNum > 0 =>
             // we can safely get the value as we already checked the existence
-            val num = staleActivationNum - inProgress
+            val num = ceiling(staleActivationNum.toDouble / 
maxActionConcurrency.toDouble) - inProgress
             // if it tries to create more containers than existing messages, 
we just create shortage
             val actualNum = if (num > availableMsg) availableMsg else num
             addServersIfPossible(
@@ -153,7 +154,7 @@ class SchedulingDecisionMaker(
           // need more containers and a message is already processed
           case (Running, Some(duration)) =>
             // we can safely get the value as we already checked the existence
-            val containerThroughput = staleThreshold / duration
+            val containerThroughput = (staleThreshold / duration) * 
maxActionConcurrency
             val expectedTps = containerThroughput * (existing + inProgress)
             val availableNonStaleActivations = availableMsg - 
staleActivationNum
 
@@ -201,7 +202,7 @@ class SchedulingDecisionMaker(
           // this case is for that as a last resort.
           case (Removing, Some(duration)) if staleActivationNum > 0 =>
             // we can safely get the value as we already checked the existence
-            val containerThroughput = staleThreshold / duration
+            val containerThroughput = (staleThreshold / duration) * 
maxActionConcurrency
             val num = ceiling(staleActivationNum.toDouble / 
containerThroughput)
             // if it tries to create more containers than existing messages, 
we just create shortage
             val actualNum = (if (num > staleActivationNum) staleActivationNum 
else num) - inProgress
@@ -219,7 +220,7 @@ class SchedulingDecisionMaker(
           // same with the above case but no duration exist.
           case (Removing, None) if staleActivationNum > 0 =>
             // we can safely get the value as we already checked the existence
-            val num = staleActivationNum - inProgress
+            val num = ceiling(staleActivationNum.toDouble / 
maxActionConcurrency.toDouble) - inProgress
             // if it tries to create more containers than existing messages, 
we just create shortage
             val actualNum = if (num > availableMsg) availableMsg else num
             addServersIfPossible(
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
index a3cd73e1a..9a235a990 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
@@ -313,7 +313,7 @@ class MemoryQueueFlowTests
     // this is the case where there is no capacity in a namespace and no 
container can be created.
     decisionMaker.setAutoPilot((sender: ActorRef, msg) => {
       msg match {
-        case QueueSnapshot(_, _, _, _, _, _, _, _, _, _, Running, _) =>
+        case QueueSnapshot(_, _, _, _, _, _, _, _, _, _, _, Running, _) =>
           sender ! DecisionResults(EnableNamespaceThrottling(true), 0)
           TestActor.KeepRunning
 
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
index 37191378c..c2c5ae340 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
@@ -1534,7 +1534,7 @@ class MemoryQueueTests
     // This test pilot mimic the decision maker who disable the namespace 
throttling when there is enough capacity.
     decisionMaker.setAutoPilot((sender: ActorRef, msg) => {
       msg match {
-        case QueueSnapshot(_, _, _, _, _, _, _, _, _, _, NamespaceThrottled, 
_) =>
+        case QueueSnapshot(_, _, _, _, _, _, _, _, _, _, _, 
NamespaceThrottled, _) =>
           sender ! DisableNamespaceThrottling
 
         case _ =>
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
index b70b71554..edd0783aa 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
@@ -63,7 +63,8 @@ class SchedulingDecisionMakerTests
       existingContainerCountInNamespace = 0,
       inProgressContainerCountInNamespace = 0,
       averageDuration = None,
-      limit = 0, // limit is 0
+      limit = 0, // limit is 0,
+      maxActionConcurrency = 1,
       stateName = Running,
       recipient = testProbe.ref)
 
@@ -87,6 +88,7 @@ class SchedulingDecisionMakerTests
       inProgressContainerCountInNamespace = 0,
       averageDuration = None,
       limit = 0, // limit is 0
+      maxActionConcurrency = 1,
       stateName = Flushing,
       recipient = testProbe.ref)
 
@@ -114,6 +116,7 @@ class SchedulingDecisionMakerTests
         inProgressContainerCountInNamespace = 2,
         averageDuration = None, // No average duration available
         limit = 10,
+        maxActionConcurrency = 1,
         stateName = state,
         recipient = testProbe.ref)
 
@@ -140,6 +143,7 @@ class SchedulingDecisionMakerTests
         inProgressContainerCountInNamespace = 8,
         averageDuration = Some(1.0), // Some average duration available
         limit = 20,
+        maxActionConcurrency = 1,
         stateName = state,
         recipient = testProbe.ref)
 
@@ -164,6 +168,7 @@ class SchedulingDecisionMakerTests
       inProgressContainerCountInNamespace = 1,
       averageDuration = None,
       limit = 2,
+      maxActionConcurrency = 1,
       stateName = Running,
       recipient = testProbe.ref)
 
@@ -188,6 +193,7 @@ class SchedulingDecisionMakerTests
       inProgressContainerCountInNamespace = 2, // this value includes the 
count of this action as well.
       averageDuration = None,
       limit = 4,
+      maxActionConcurrency = 1,
       stateName = Running,
       recipient = testProbe.ref)
 
@@ -215,6 +221,7 @@ class SchedulingDecisionMakerTests
       inProgressContainerCountInNamespace = 1,
       averageDuration = None,
       limit = 2,
+      maxActionConcurrency = 1,
       stateName = Running,
       recipient = testProbe.ref)
 
@@ -242,6 +249,7 @@ class SchedulingDecisionMakerTests
       inProgressContainerCountInNamespace = 1,
       averageDuration = None,
       limit = 2,
+      maxActionConcurrency = 1,
       stateName = Running,
       recipient = testProbe.ref)
 
@@ -269,6 +277,7 @@ class SchedulingDecisionMakerTests
       inProgressContainerCountInNamespace = 1,
       averageDuration = None,
       limit = 2,
+      maxActionConcurrency = 1,
       stateName = NamespaceThrottled,
       recipient = testProbe.ref)
 
@@ -296,6 +305,7 @@ class SchedulingDecisionMakerTests
       inProgressContainerCountInNamespace = 1,
       averageDuration = None,
       limit = 2,
+      maxActionConcurrency = 1,
       stateName = Running,
       recipient = testProbe.ref)
 
@@ -323,6 +333,7 @@ class SchedulingDecisionMakerTests
       inProgressContainerCountInNamespace = 2, // this value includes the 
count of this action as well.
       averageDuration = None,
       limit = 4,
+      maxActionConcurrency = 1,
       stateName = Running,
       recipient = testProbe.ref)
 
@@ -347,6 +358,7 @@ class SchedulingDecisionMakerTests
       inProgressContainerCountInNamespace = 1,
       averageDuration = None,
       limit = 4,
+      maxActionConcurrency = 1,
       stateName = Running,
       recipient = testProbe.ref)
 
@@ -370,6 +382,7 @@ class SchedulingDecisionMakerTests
       inProgressContainerCountInNamespace = 1,
       averageDuration = None,
       limit = 4,
+      maxActionConcurrency = 1,
       stateName = NamespaceThrottled,
       recipient = testProbe.ref)
 
@@ -393,6 +406,7 @@ class SchedulingDecisionMakerTests
       inProgressContainerCountInNamespace = 1,
       averageDuration = None,
       limit = 4,
+      maxActionConcurrency = 1,
       stateName = NamespaceThrottled,
       recipient = testProbe.ref)
 
@@ -417,6 +431,7 @@ class SchedulingDecisionMakerTests
       inProgressContainerCountInNamespace = 1,
       averageDuration = None,
       limit = 4,
+      maxActionConcurrency = 1,
       stateName = Running,
       recipient = testProbe.ref)
 
@@ -441,6 +456,7 @@ class SchedulingDecisionMakerTests
       inProgressContainerCountInNamespace = 1,
       averageDuration = None,
       limit = 4,
+      maxActionConcurrency = 1,
       stateName = Running,
       recipient = testProbe.ref)
 
@@ -465,6 +481,7 @@ class SchedulingDecisionMakerTests
       inProgressContainerCountInNamespace = 1,
       averageDuration = None,
       limit = 4,
+      maxActionConcurrency = 1,
       stateName = Flushing,
       recipient = testProbe.ref)
 
@@ -488,6 +505,7 @@ class SchedulingDecisionMakerTests
       inProgressContainerCountInNamespace = 1,
       averageDuration = None,
       limit = 4,
+      maxActionConcurrency = 1,
       stateName = Flushing,
       recipient = testProbe.ref)
 
@@ -511,6 +529,7 @@ class SchedulingDecisionMakerTests
       inProgressContainerCountInNamespace = 1,
       averageDuration = None,
       limit = 4,
+      maxActionConcurrency = 1,
       stateName = Running,
       recipient = testProbe.ref)
 
@@ -534,6 +553,7 @@ class SchedulingDecisionMakerTests
       inProgressContainerCountInNamespace = 1,
       averageDuration = None,
       limit = 4,
+      maxActionConcurrency = 1,
       stateName = Running,
       recipient = testProbe.ref)
 
@@ -557,6 +577,7 @@ class SchedulingDecisionMakerTests
       inProgressContainerCountInNamespace = 1,
       averageDuration = None,
       limit = 4,
+      maxActionConcurrency = 1,
       stateName = Running,
       recipient = testProbe.ref)
 
@@ -580,6 +601,7 @@ class SchedulingDecisionMakerTests
       inProgressContainerCountInNamespace = 6,
       averageDuration = None,
       limit = 10,
+      maxActionConcurrency = 1,
       stateName = Running,
       recipient = testProbe.ref)
 
@@ -603,6 +625,7 @@ class SchedulingDecisionMakerTests
       inProgressContainerCountInNamespace = 0,
       averageDuration = Some(50), // the average duration exists
       limit = 10,
+      maxActionConcurrency = 1,
       stateName = Running,
       recipient = testProbe.ref)
 
@@ -628,6 +651,7 @@ class SchedulingDecisionMakerTests
       inProgressContainerCountInNamespace = 0,
       averageDuration = Some(1000), // the average duration exists
       limit = 10,
+      maxActionConcurrency = 1,
       stateName = Running,
       recipient = testProbe.ref)
 
@@ -653,6 +677,7 @@ class SchedulingDecisionMakerTests
       inProgressContainerCountInNamespace = 0,
       averageDuration = Some(1000), // the average duration exists
       limit = 4,
+      maxActionConcurrency = 1,
       stateName = Running,
       recipient = testProbe.ref)
 
@@ -680,6 +705,7 @@ class SchedulingDecisionMakerTests
       inProgressContainerCountInNamespace = 0,
       averageDuration = Some(1000), // the average duration exists
       limit = 10,
+      maxActionConcurrency = 1,
       stateName = Running,
       recipient = testProbe.ref)
 
@@ -706,6 +732,7 @@ class SchedulingDecisionMakerTests
       inProgressContainerCountInNamespace = 0,
       averageDuration = Some(1000), // the average duration exists
       limit = 4,
+      maxActionConcurrency = 1,
       stateName = Running,
       recipient = testProbe.ref)
 
@@ -733,6 +760,7 @@ class SchedulingDecisionMakerTests
       inProgressContainerCountInNamespace = 0,
       averageDuration = Some(1000), // the average duration exists
       limit = 10,
+      maxActionConcurrency = 1,
       stateName = Running,
       recipient = testProbe.ref)
 
@@ -759,6 +787,7 @@ class SchedulingDecisionMakerTests
       inProgressContainerCountInNamespace = 0,
       averageDuration = None, // the average duration exists
       limit = 10,
+      maxActionConcurrency = 1,
       stateName = Running,
       recipient = testProbe.ref)
 
@@ -782,6 +811,7 @@ class SchedulingDecisionMakerTests
       inProgressContainerCountInNamespace = 1,
       averageDuration = Some(1000), // the average duration exists
       limit = 10,
+      maxActionConcurrency = 1,
       stateName = Removing,
       recipient = testProbe.ref)
 
@@ -808,6 +838,7 @@ class SchedulingDecisionMakerTests
       inProgressContainerCountInNamespace = 2,
       averageDuration = None, // the average duration does not exist
       limit = 10,
+      maxActionConcurrency = 1,
       stateName = Removing,
       recipient = testProbe.ref)
 
@@ -834,6 +865,7 @@ class SchedulingDecisionMakerTests
       inProgressContainerCountInNamespace = 0,
       averageDuration = Some(1000), // the average duration exists
       limit = 10,
+      maxActionConcurrency = 1,
       stateName = Running,
       recipient = testProbe.ref)
 
@@ -858,6 +890,7 @@ class SchedulingDecisionMakerTests
       inProgressContainerCountInNamespace = 0,
       averageDuration = Some(50), // the average duration gives container 
throughput of 2
       limit = 10,
+      maxActionConcurrency = 1,
       stateName = Running,
       recipient = testProbe.ref)
 
@@ -882,6 +915,7 @@ class SchedulingDecisionMakerTests
       inProgressContainerCountInNamespace = 2,
       averageDuration = None, // the average duration does not exist
       limit = 4,
+      maxActionConcurrency = 1,
       stateName = Removing,
       recipient = testProbe.ref)
 
@@ -893,4 +927,113 @@ class SchedulingDecisionMakerTests
     // but there is not enough capacity, it becomes 1
     testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(false), 1))
   }
+
+  it should "correctly calculate demand is met when action concurrency >1 w/ 
average duration and no stale activations" in {
+    val decisionMaker = 
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, 
schedulingConfig))
+    val testProbe = TestProbe()
+
+    // container
+    val msg = QueueSnapshot(
+      initialized = true,
+      incomingMsgCount = new AtomicInteger(0),
+      currentMsgCount = 4,
+      existingContainerCount = 2,
+      inProgressContainerCount = 0,
+      staleActivationNum = 0,
+      existingContainerCountInNamespace = 2,
+      inProgressContainerCountInNamespace = 0,
+      averageDuration = Some(100.0),
+      limit = 4,
+      maxActionConcurrency = 2,
+      stateName = Running,
+      recipient = testProbe.ref)
+
+    decisionMaker ! msg
+
+    // available messages is 4 with duration equaling the stale threshold and 
action concurrency of 2 so needed containers
+    // should be exactly 2
+    testProbe.expectNoMessage()
+  }
+
+  it should "add containers when action concurrency >1 w/ average duration and 
demand is not met" in {
+    val decisionMaker = 
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, 
schedulingConfig))
+    val testProbe = TestProbe()
+
+    // container
+    val msg = QueueSnapshot(
+      initialized = true,
+      incomingMsgCount = new AtomicInteger(0),
+      currentMsgCount = 20,
+      existingContainerCount = 2,
+      inProgressContainerCount = 0,
+      staleActivationNum = 0,
+      existingContainerCountInNamespace = 2,
+      inProgressContainerCountInNamespace = 0,
+      averageDuration = Some(50.0),
+      limit = 10,
+      maxActionConcurrency = 3,
+      stateName = Running,
+      recipient = testProbe.ref)
+
+    decisionMaker ! msg
+
+    // available messages is 20 and throughput should be 100.0 / 50.0 * 3 = 6
+    // existing container is 2 so can handle 12 messages, therefore need 2 
more containers
+    testProbe.expectMsg(DecisionResults(AddContainer, 2))
+  }
+
+  it should "add containers when action concurrency >1 w/ average duration and 
demand is not met and has stale activations" in {
+    val decisionMaker = 
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, 
schedulingConfig))
+    val testProbe = TestProbe()
+
+    // container
+    val msg = QueueSnapshot(
+      initialized = true,
+      incomingMsgCount = new AtomicInteger(0),
+      currentMsgCount = 30,
+      existingContainerCount = 2,
+      inProgressContainerCount = 0,
+      staleActivationNum = 10,
+      existingContainerCountInNamespace = 2,
+      inProgressContainerCountInNamespace = 0,
+      averageDuration = Some(50.0),
+      limit = 10,
+      maxActionConcurrency = 3,
+      stateName = Running,
+      recipient = testProbe.ref)
+
+    decisionMaker ! msg
+
+    // available messages is 30 and throughput should be 100.0 / 50.0 * 3 = 6
+    // existing container is 2 so can handle 12 messages, therefore need 2 
more containers for non-stale
+    // stale has 10 activations so need another additional 2
+    testProbe.expectMsg(DecisionResults(AddContainer, 4))
+  }
+
+  it should "add containers when action concurrency >1 when no average 
duration and there are stale activations" in {
+    val decisionMaker = 
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, 
schedulingConfig))
+    val testProbe = TestProbe()
+
+    // container
+    val msg = QueueSnapshot(
+      initialized = true,
+      incomingMsgCount = new AtomicInteger(0),
+      currentMsgCount = 10,
+      existingContainerCount = 1,
+      inProgressContainerCount = 0,
+      staleActivationNum = 10,
+      existingContainerCountInNamespace = 2,
+      inProgressContainerCountInNamespace = 0,
+      averageDuration = None,
+      limit = 10,
+      maxActionConcurrency = 3,
+      stateName = Running,
+      recipient = testProbe.ref)
+
+    decisionMaker ! msg
+
+    // stale messages are 10. want stale to be handled by first pass of 
requests from containers so
+    // 10 / 3 = 4.0
+    testProbe.expectMsg(DecisionResults(AddContainer, 4))
+  }
 }

Reply via email to