This is an automated email from the ASF dual-hosted git repository.
bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 415ae98fd make scheduler consider action concurrency >1 (#5378)
415ae98fd is described below
commit 415ae98fd9f1fd44f5ab2dccb8b5cbe2d20932bb
Author: Brendan Doyle <[email protected]>
AuthorDate: Thu Feb 9 17:14:07 2023 -0800
make scheduler consider action concurrency >1 (#5378)
Co-authored-by: Brendan Doyle <[email protected]>
---
.../core/scheduler/queue/MemoryQueue.scala | 2 +
.../scheduler/queue/SchedulingDecisionMaker.scala | 9 +-
.../queue/test/MemoryQueueFlowTests.scala | 2 +-
.../scheduler/queue/test/MemoryQueueTests.scala | 2 +-
.../queue/test/SchedulingDecisionMakerTests.scala | 145 ++++++++++++++++++++-
5 files changed, 153 insertions(+), 7 deletions(-)
diff --git
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
index a5b80ffd1..051e28976 100644
---
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
+++
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
@@ -943,6 +943,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
namespaceContainerCount.inProgressContainerNumByNamespace,
averageDuration,
limit,
+ actionMetaData.limits.concurrency.maxConcurrent,
stateName,
self)
case Failure(_: NoDocumentException) =>
@@ -1222,6 +1223,7 @@ case class QueueSnapshot(initialized: Boolean,
inProgressContainerCountInNamespace: Int,
averageDuration: Option[Double],
limit: Int,
+ maxActionConcurrency: Int,
stateName: MemoryQueueState,
recipient: ActorRef)
diff --git
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
index d5dca8bb7..ab0de2f6a 100644
---
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
+++
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
@@ -58,6 +58,7 @@ class SchedulingDecisionMaker(
inProgressContainerCountInNs,
averageDuration,
limit,
+ maxActionConcurrency,
stateName,
_) = snapshot
val totalContainers = existing + inProgress
@@ -137,7 +138,7 @@ class SchedulingDecisionMaker(
// but it is a kind of trade-off and we place latency on top of
over-provisioning
case (Running, None) if staleActivationNum > 0 =>
// we can safely get the value as we already checked the existence
- val num = staleActivationNum - inProgress
+ val num = ceiling(staleActivationNum.toDouble /
maxActionConcurrency.toDouble) - inProgress
// if it tries to create more containers than existing messages,
we just create shortage
val actualNum = if (num > availableMsg) availableMsg else num
addServersIfPossible(
@@ -153,7 +154,7 @@ class SchedulingDecisionMaker(
// need more containers and a message is already processed
case (Running, Some(duration)) =>
// we can safely get the value as we already checked the existence
- val containerThroughput = staleThreshold / duration
+ val containerThroughput = (staleThreshold / duration) *
maxActionConcurrency
val expectedTps = containerThroughput * (existing + inProgress)
val availableNonStaleActivations = availableMsg -
staleActivationNum
@@ -201,7 +202,7 @@ class SchedulingDecisionMaker(
// this case is for that as a last resort.
case (Removing, Some(duration)) if staleActivationNum > 0 =>
// we can safely get the value as we already checked the existence
- val containerThroughput = staleThreshold / duration
+ val containerThroughput = (staleThreshold / duration) *
maxActionConcurrency
val num = ceiling(staleActivationNum.toDouble /
containerThroughput)
// if it tries to create more containers than existing messages,
we just create shortage
val actualNum = (if (num > staleActivationNum) staleActivationNum
else num) - inProgress
@@ -219,7 +220,7 @@ class SchedulingDecisionMaker(
// same with the above case but no duration exist.
case (Removing, None) if staleActivationNum > 0 =>
// we can safely get the value as we already checked the existence
- val num = staleActivationNum - inProgress
+ val num = ceiling(staleActivationNum.toDouble /
maxActionConcurrency.toDouble) - inProgress
// if it tries to create more containers than existing messages,
we just create shortage
val actualNum = if (num > availableMsg) availableMsg else num
addServersIfPossible(
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
index a3cd73e1a..9a235a990 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
@@ -313,7 +313,7 @@ class MemoryQueueFlowTests
// this is the case where there is no capacity in a namespace and no
container can be created.
decisionMaker.setAutoPilot((sender: ActorRef, msg) => {
msg match {
- case QueueSnapshot(_, _, _, _, _, _, _, _, _, _, Running, _) =>
+ case QueueSnapshot(_, _, _, _, _, _, _, _, _, _, _, Running, _) =>
sender ! DecisionResults(EnableNamespaceThrottling(true), 0)
TestActor.KeepRunning
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
index 37191378c..c2c5ae340 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
@@ -1534,7 +1534,7 @@ class MemoryQueueTests
// This test pilot mimic the decision maker who disable the namespace
throttling when there is enough capacity.
decisionMaker.setAutoPilot((sender: ActorRef, msg) => {
msg match {
- case QueueSnapshot(_, _, _, _, _, _, _, _, _, _, NamespaceThrottled,
_) =>
+ case QueueSnapshot(_, _, _, _, _, _, _, _, _, _, _,
NamespaceThrottled, _) =>
sender ! DisableNamespaceThrottling
case _ =>
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
index b70b71554..edd0783aa 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
@@ -63,7 +63,8 @@ class SchedulingDecisionMakerTests
existingContainerCountInNamespace = 0,
inProgressContainerCountInNamespace = 0,
averageDuration = None,
- limit = 0, // limit is 0
+ limit = 0, // limit is 0,
+ maxActionConcurrency = 1,
stateName = Running,
recipient = testProbe.ref)
@@ -87,6 +88,7 @@ class SchedulingDecisionMakerTests
inProgressContainerCountInNamespace = 0,
averageDuration = None,
limit = 0, // limit is 0
+ maxActionConcurrency = 1,
stateName = Flushing,
recipient = testProbe.ref)
@@ -114,6 +116,7 @@ class SchedulingDecisionMakerTests
inProgressContainerCountInNamespace = 2,
averageDuration = None, // No average duration available
limit = 10,
+ maxActionConcurrency = 1,
stateName = state,
recipient = testProbe.ref)
@@ -140,6 +143,7 @@ class SchedulingDecisionMakerTests
inProgressContainerCountInNamespace = 8,
averageDuration = Some(1.0), // Some average duration available
limit = 20,
+ maxActionConcurrency = 1,
stateName = state,
recipient = testProbe.ref)
@@ -164,6 +168,7 @@ class SchedulingDecisionMakerTests
inProgressContainerCountInNamespace = 1,
averageDuration = None,
limit = 2,
+ maxActionConcurrency = 1,
stateName = Running,
recipient = testProbe.ref)
@@ -188,6 +193,7 @@ class SchedulingDecisionMakerTests
inProgressContainerCountInNamespace = 2, // this value includes the
count of this action as well.
averageDuration = None,
limit = 4,
+ maxActionConcurrency = 1,
stateName = Running,
recipient = testProbe.ref)
@@ -215,6 +221,7 @@ class SchedulingDecisionMakerTests
inProgressContainerCountInNamespace = 1,
averageDuration = None,
limit = 2,
+ maxActionConcurrency = 1,
stateName = Running,
recipient = testProbe.ref)
@@ -242,6 +249,7 @@ class SchedulingDecisionMakerTests
inProgressContainerCountInNamespace = 1,
averageDuration = None,
limit = 2,
+ maxActionConcurrency = 1,
stateName = Running,
recipient = testProbe.ref)
@@ -269,6 +277,7 @@ class SchedulingDecisionMakerTests
inProgressContainerCountInNamespace = 1,
averageDuration = None,
limit = 2,
+ maxActionConcurrency = 1,
stateName = NamespaceThrottled,
recipient = testProbe.ref)
@@ -296,6 +305,7 @@ class SchedulingDecisionMakerTests
inProgressContainerCountInNamespace = 1,
averageDuration = None,
limit = 2,
+ maxActionConcurrency = 1,
stateName = Running,
recipient = testProbe.ref)
@@ -323,6 +333,7 @@ class SchedulingDecisionMakerTests
inProgressContainerCountInNamespace = 2, // this value includes the
count of this action as well.
averageDuration = None,
limit = 4,
+ maxActionConcurrency = 1,
stateName = Running,
recipient = testProbe.ref)
@@ -347,6 +358,7 @@ class SchedulingDecisionMakerTests
inProgressContainerCountInNamespace = 1,
averageDuration = None,
limit = 4,
+ maxActionConcurrency = 1,
stateName = Running,
recipient = testProbe.ref)
@@ -370,6 +382,7 @@ class SchedulingDecisionMakerTests
inProgressContainerCountInNamespace = 1,
averageDuration = None,
limit = 4,
+ maxActionConcurrency = 1,
stateName = NamespaceThrottled,
recipient = testProbe.ref)
@@ -393,6 +406,7 @@ class SchedulingDecisionMakerTests
inProgressContainerCountInNamespace = 1,
averageDuration = None,
limit = 4,
+ maxActionConcurrency = 1,
stateName = NamespaceThrottled,
recipient = testProbe.ref)
@@ -417,6 +431,7 @@ class SchedulingDecisionMakerTests
inProgressContainerCountInNamespace = 1,
averageDuration = None,
limit = 4,
+ maxActionConcurrency = 1,
stateName = Running,
recipient = testProbe.ref)
@@ -441,6 +456,7 @@ class SchedulingDecisionMakerTests
inProgressContainerCountInNamespace = 1,
averageDuration = None,
limit = 4,
+ maxActionConcurrency = 1,
stateName = Running,
recipient = testProbe.ref)
@@ -465,6 +481,7 @@ class SchedulingDecisionMakerTests
inProgressContainerCountInNamespace = 1,
averageDuration = None,
limit = 4,
+ maxActionConcurrency = 1,
stateName = Flushing,
recipient = testProbe.ref)
@@ -488,6 +505,7 @@ class SchedulingDecisionMakerTests
inProgressContainerCountInNamespace = 1,
averageDuration = None,
limit = 4,
+ maxActionConcurrency = 1,
stateName = Flushing,
recipient = testProbe.ref)
@@ -511,6 +529,7 @@ class SchedulingDecisionMakerTests
inProgressContainerCountInNamespace = 1,
averageDuration = None,
limit = 4,
+ maxActionConcurrency = 1,
stateName = Running,
recipient = testProbe.ref)
@@ -534,6 +553,7 @@ class SchedulingDecisionMakerTests
inProgressContainerCountInNamespace = 1,
averageDuration = None,
limit = 4,
+ maxActionConcurrency = 1,
stateName = Running,
recipient = testProbe.ref)
@@ -557,6 +577,7 @@ class SchedulingDecisionMakerTests
inProgressContainerCountInNamespace = 1,
averageDuration = None,
limit = 4,
+ maxActionConcurrency = 1,
stateName = Running,
recipient = testProbe.ref)
@@ -580,6 +601,7 @@ class SchedulingDecisionMakerTests
inProgressContainerCountInNamespace = 6,
averageDuration = None,
limit = 10,
+ maxActionConcurrency = 1,
stateName = Running,
recipient = testProbe.ref)
@@ -603,6 +625,7 @@ class SchedulingDecisionMakerTests
inProgressContainerCountInNamespace = 0,
averageDuration = Some(50), // the average duration exists
limit = 10,
+ maxActionConcurrency = 1,
stateName = Running,
recipient = testProbe.ref)
@@ -628,6 +651,7 @@ class SchedulingDecisionMakerTests
inProgressContainerCountInNamespace = 0,
averageDuration = Some(1000), // the average duration exists
limit = 10,
+ maxActionConcurrency = 1,
stateName = Running,
recipient = testProbe.ref)
@@ -653,6 +677,7 @@ class SchedulingDecisionMakerTests
inProgressContainerCountInNamespace = 0,
averageDuration = Some(1000), // the average duration exists
limit = 4,
+ maxActionConcurrency = 1,
stateName = Running,
recipient = testProbe.ref)
@@ -680,6 +705,7 @@ class SchedulingDecisionMakerTests
inProgressContainerCountInNamespace = 0,
averageDuration = Some(1000), // the average duration exists
limit = 10,
+ maxActionConcurrency = 1,
stateName = Running,
recipient = testProbe.ref)
@@ -706,6 +732,7 @@ class SchedulingDecisionMakerTests
inProgressContainerCountInNamespace = 0,
averageDuration = Some(1000), // the average duration exists
limit = 4,
+ maxActionConcurrency = 1,
stateName = Running,
recipient = testProbe.ref)
@@ -733,6 +760,7 @@ class SchedulingDecisionMakerTests
inProgressContainerCountInNamespace = 0,
averageDuration = Some(1000), // the average duration exists
limit = 10,
+ maxActionConcurrency = 1,
stateName = Running,
recipient = testProbe.ref)
@@ -759,6 +787,7 @@ class SchedulingDecisionMakerTests
inProgressContainerCountInNamespace = 0,
averageDuration = None, // the average duration exists
limit = 10,
+ maxActionConcurrency = 1,
stateName = Running,
recipient = testProbe.ref)
@@ -782,6 +811,7 @@ class SchedulingDecisionMakerTests
inProgressContainerCountInNamespace = 1,
averageDuration = Some(1000), // the average duration exists
limit = 10,
+ maxActionConcurrency = 1,
stateName = Removing,
recipient = testProbe.ref)
@@ -808,6 +838,7 @@ class SchedulingDecisionMakerTests
inProgressContainerCountInNamespace = 2,
averageDuration = None, // the average duration does not exist
limit = 10,
+ maxActionConcurrency = 1,
stateName = Removing,
recipient = testProbe.ref)
@@ -834,6 +865,7 @@ class SchedulingDecisionMakerTests
inProgressContainerCountInNamespace = 0,
averageDuration = Some(1000), // the average duration exists
limit = 10,
+ maxActionConcurrency = 1,
stateName = Running,
recipient = testProbe.ref)
@@ -858,6 +890,7 @@ class SchedulingDecisionMakerTests
inProgressContainerCountInNamespace = 0,
averageDuration = Some(50), // the average duration gives container
throughput of 2
limit = 10,
+ maxActionConcurrency = 1,
stateName = Running,
recipient = testProbe.ref)
@@ -882,6 +915,7 @@ class SchedulingDecisionMakerTests
inProgressContainerCountInNamespace = 2,
averageDuration = None, // the average duration does not exist
limit = 4,
+ maxActionConcurrency = 1,
stateName = Removing,
recipient = testProbe.ref)
@@ -893,4 +927,113 @@ class SchedulingDecisionMakerTests
// but there is not enough capacity, it becomes 1
testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(false), 1))
}
+
+ it should "correctly calculate demand is met when action concurrency >1 w/
average duration and no stale activations" in {
+ val decisionMaker =
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action,
schedulingConfig))
+ val testProbe = TestProbe()
+
+ // container
+ val msg = QueueSnapshot(
+ initialized = true,
+ incomingMsgCount = new AtomicInteger(0),
+ currentMsgCount = 4,
+ existingContainerCount = 2,
+ inProgressContainerCount = 0,
+ staleActivationNum = 0,
+ existingContainerCountInNamespace = 2,
+ inProgressContainerCountInNamespace = 0,
+ averageDuration = Some(100.0),
+ limit = 4,
+ maxActionConcurrency = 2,
+ stateName = Running,
+ recipient = testProbe.ref)
+
+ decisionMaker ! msg
+
+ // available messages is 4 with duration equaling the stale threshold and
action concurrency of 2 so needed containers
+ // should be exactly 2
+ testProbe.expectNoMessage()
+ }
+
+ it should "add containers when action concurrency >1 w/ average duration and
demand is not met" in {
+ val decisionMaker =
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action,
schedulingConfig))
+ val testProbe = TestProbe()
+
+ // container
+ val msg = QueueSnapshot(
+ initialized = true,
+ incomingMsgCount = new AtomicInteger(0),
+ currentMsgCount = 20,
+ existingContainerCount = 2,
+ inProgressContainerCount = 0,
+ staleActivationNum = 0,
+ existingContainerCountInNamespace = 2,
+ inProgressContainerCountInNamespace = 0,
+ averageDuration = Some(50.0),
+ limit = 10,
+ maxActionConcurrency = 3,
+ stateName = Running,
+ recipient = testProbe.ref)
+
+ decisionMaker ! msg
+
+ // available messages is 20 and throughput should be 100.0 / 50.0 * 3 = 6
+ // existing container is 2 so can handle 12 messages, therefore need 2
more containers
+ testProbe.expectMsg(DecisionResults(AddContainer, 2))
+ }
+
+ it should "add containers when action concurrency >1 w/ average duration and
demand is not met and has stale activations" in {
+ val decisionMaker =
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action,
schedulingConfig))
+ val testProbe = TestProbe()
+
+ // container
+ val msg = QueueSnapshot(
+ initialized = true,
+ incomingMsgCount = new AtomicInteger(0),
+ currentMsgCount = 30,
+ existingContainerCount = 2,
+ inProgressContainerCount = 0,
+ staleActivationNum = 10,
+ existingContainerCountInNamespace = 2,
+ inProgressContainerCountInNamespace = 0,
+ averageDuration = Some(50.0),
+ limit = 10,
+ maxActionConcurrency = 3,
+ stateName = Running,
+ recipient = testProbe.ref)
+
+ decisionMaker ! msg
+
+ // available messages is 30 and throughput should be 100.0 / 50.0 * 3 = 6
+ // existing container is 2 so can handle 12 messages, therefore need 2
more containers for non-stale
+ // stale has 10 activations so need another additional 2
+ testProbe.expectMsg(DecisionResults(AddContainer, 4))
+ }
+
+ it should "add containers when action concurrency >1 when no average
duration and there are stale activations" in {
+ val decisionMaker =
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action,
schedulingConfig))
+ val testProbe = TestProbe()
+
+ // container
+ val msg = QueueSnapshot(
+ initialized = true,
+ incomingMsgCount = new AtomicInteger(0),
+ currentMsgCount = 10,
+ existingContainerCount = 1,
+ inProgressContainerCount = 0,
+ staleActivationNum = 10,
+ existingContainerCountInNamespace = 2,
+ inProgressContainerCountInNamespace = 0,
+ averageDuration = None,
+ limit = 10,
+ maxActionConcurrency = 3,
+ stateName = Running,
+ recipient = testProbe.ref)
+
+ decisionMaker ! msg
+
+ // stale messages are 10. want stale to be handled by first pass of
requests from containers so
+ // 10 / 3 = 4.0
+ testProbe.expectMsg(DecisionResults(AddContainer, 4))
+ }
}