This is an automated email from the ASF dual-hosted git repository.
bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 07c920249 optimize scheduling decision when there are stale
activations (#5344)
07c920249 is described below
commit 07c920249d0a0db5fe3bc603add73e410f40dddd
Author: Brendan Doyle <[email protected]>
AuthorDate: Mon Oct 31 22:21:27 2022 -0700
optimize scheduling decision when there are stale activations (#5344)
* optimize scheduling decision when there are stale activations
* further optimization
* scalafmt
* add new test cases
Co-authored-by: Brendan Doyle <[email protected]>
---
.../scheduler/queue/SchedulingDecisionMaker.scala | 51 ++++++++++++----------
.../queue/test/SchedulingDecisionMakerTests.scala | 48 ++++++++++++++++++++
2 files changed, 75 insertions(+), 24 deletions(-)
diff --git
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
index 0e1f4ffa8..d6ae8f63e 100644
---
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
+++
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
@@ -135,41 +135,44 @@ class SchedulingDecisionMaker(
staleActivationNum,
0.0,
Running)
-
- case (Running, Some(duration)) if staleActivationNum > 0 =>
- // we can safely get the value as we already checked the existence
- val containerThroughput = staleThreshold / duration
- val num = ceiling(availableMsg.toDouble / containerThroughput)
- // if it tries to create more containers than existing messages,
we just create shortage
- val actualNum = (if (num > availableMsg) availableMsg else num) -
inProgress
- addServersIfPossible(
- existing,
- inProgress,
- containerThroughput,
- availableMsg,
- capacity,
- actualNum,
- staleActivationNum,
- duration,
- Running)
-
// need more containers and a message is already processed
case (Running, Some(duration)) =>
// we can safely get the value as we already checked the existence
val containerThroughput = staleThreshold / duration
val expectedTps = containerThroughput * (existing + inProgress)
+ val availableNonStaleActivations = availableMsg -
staleActivationNum
+
+ var staleContainerProvision = 0
+ if (staleActivationNum > 0) {
+ val num = ceiling(staleActivationNum.toDouble /
containerThroughput)
+ // if it tries to create more containers than existing messages,
we just create shortage
+ staleContainerProvision = (if (num > staleActivationNum)
staleActivationNum else num) - inProgress
+ }
- if (availableMsg >= expectedTps && existing + inProgress <
availableMsg) {
- val num = ceiling((availableMsg / containerThroughput) -
existing - inProgress)
+ if (availableNonStaleActivations >= expectedTps && existing +
inProgress < availableNonStaleActivations) {
+ val num = ceiling((availableNonStaleActivations /
containerThroughput) - existing - inProgress)
// if it tries to create more containers than existing messages,
we just create shortage
- val actualNum = if (num + totalContainers > availableMsg)
availableMsg - totalContainers else num
+ val actualNum =
+ if (num + totalContainers > availableNonStaleActivations)
availableNonStaleActivations - totalContainers
+ else num
+ addServersIfPossible(
+ existing,
+ inProgress,
+ containerThroughput,
+ availableMsg,
+ capacity,
+ actualNum + staleContainerProvision,
+ staleActivationNum,
+ duration,
+ Running)
+ } else if (staleContainerProvision > 0) {
addServersIfPossible(
existing,
inProgress,
containerThroughput,
availableMsg,
capacity,
- actualNum,
+ staleContainerProvision,
staleActivationNum,
duration,
Running)
@@ -184,9 +187,9 @@ class SchedulingDecisionMaker(
case (Removing, Some(duration)) if staleActivationNum > 0 =>
// we can safely get the value as we already checked the existence
val containerThroughput = staleThreshold / duration
- val num = ceiling(availableMsg.toDouble / containerThroughput)
+ val num = ceiling(staleActivationNum.toDouble /
containerThroughput)
// if it tries to create more containers than existing messages,
we just create shortage
- val actualNum = (if (num > availableMsg) availableMsg else num) -
inProgress
+ val actualNum = (if (num > staleActivationNum) staleActivationNum
else num) - inProgress
addServersIfPossible(
existing,
inProgress,
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
index 72adc8be2..ce1c86682 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
@@ -683,6 +683,54 @@ class SchedulingDecisionMakerTests
testProbe.expectMsg(DecisionResults(AddContainer, 2))
}
+ it should "add more containers when there are stale messages and non-stale
messages and both message classes need more containers" in {
+ val decisionMaker =
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action,
schedulingConfig))
+ val testProbe = TestProbe()
+
+ val msg = QueueSnapshot(
+ initialized = true,
+ incomingMsgCount = new AtomicInteger(0),
+ currentMsgCount = 5,
+ existingContainerCount = 2,
+ inProgressContainerCount = 0,
+ staleActivationNum = 2,
+ existingContainerCountInNamespace = 2,
+ inProgressContainerCountInNamespace = 0,
+ averageDuration = Some(1000), // the average duration exists
+ limit = 10,
+ stateName = Running,
+ recipient = testProbe.ref)
+
+ decisionMaker ! msg
+
+ //should add two for the stale messages and one to increase tps of
non-stale available messages
+ testProbe.expectMsg(DecisionResults(AddContainer, 3))
+ }
+
+ it should "add more containers when there are stale messages and non-stale
messages have needed tps" in {
+ val decisionMaker =
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action,
schedulingConfig))
+ val testProbe = TestProbe()
+
+ val msg = QueueSnapshot(
+ initialized = true,
+ incomingMsgCount = new AtomicInteger(0),
+ currentMsgCount = 5,
+ existingContainerCount = 2,
+ inProgressContainerCount = 0,
+ staleActivationNum = 2,
+ existingContainerCountInNamespace = 2,
+ inProgressContainerCountInNamespace = 0,
+ averageDuration = Some(50), // the average duration gives container
throughput of 2
+ limit = 10,
+ stateName = Running,
+ recipient = testProbe.ref)
+
+ decisionMaker ! msg
+
+ //should add one additional container for stale messages and non-stale
messages still meet tps
+ testProbe.expectMsg(DecisionResults(AddContainer, 1))
+ }
+
it should "enable namespace throttling while adding more container when
there are stale messages even in the GracefulShuttingDown" in {
val decisionMaker =
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action,
schedulingConfig))
val testProbe = TestProbe()