This is an automated email from the ASF dual-hosted git repository.
bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 077fb6d24 Add scheduler overprovision for new actions before namespace
throttling (#5284)
077fb6d24 is described below
commit 077fb6d24f0132e7755ea47d7ee9b35f0966daf3
Author: Brendan Doyle <[email protected]>
AuthorDate: Tue Nov 1 11:37:30 2022 -0700
Add scheduler overprovision for new actions before namespace throttling
(#5284)
* initial attempt
* tests
* fix tests
* enable throttling when last capacity used in overprovisioning
* add case to correctly disable namespace throttling when namespace
overprovisioning has space
* feedback
Co-authored-by: Brendan Doyle <[email protected]>
---
core/scheduler/src/main/resources/application.conf | 2 +
.../openwhisk/core/scheduler/Scheduler.scala | 6 +-
.../scheduler/queue/SchedulingDecisionMaker.scala | 19 ++-
.../queue/test/MemoryQueueTestsFixture.scala | 2 +-
.../queue/test/SchedulingDecisionMakerTests.scala | 144 ++++++++++++++++++++-
5 files changed, 164 insertions(+), 9 deletions(-)
diff --git a/core/scheduler/src/main/resources/application.conf
b/core/scheduler/src/main/resources/application.conf
index 1f696fc43..f04d2888f 100644
--- a/core/scheduler/src/main/resources/application.conf
+++ b/core/scheduler/src/main/resources/application.conf
@@ -79,6 +79,8 @@ whisk {
stale-threshold = "100 milliseconds"
check-interval = "100 milliseconds"
drop-interval = "10 seconds"
+ allow-over-provision-before-throttle = false
+ namespace-over-provision-before-throttle-ratio = 1.5
}
queue {
idle-grace = "20 seconds"
diff --git
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
index e3ed70d8a..f260feb4f 100644
---
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
+++
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
@@ -421,4 +421,8 @@ object SchedulerStates extends DefaultJsonProtocol {
def parse(states: String) = Try(serdes.read(states.parseJson))
}
-case class SchedulingConfig(staleThreshold: FiniteDuration, checkInterval:
FiniteDuration, dropInterval: FiniteDuration)
+case class SchedulingConfig(staleThreshold: FiniteDuration,
+ checkInterval: FiniteDuration,
+ dropInterval: FiniteDuration,
+ allowOverProvisionBeforeThrottle: Boolean,
+ namespaceOverProvisionBeforeThrottleRatio: Double)
diff --git
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
index d6ae8f63e..19c458fc9 100644
---
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
+++
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
@@ -70,7 +70,17 @@ class SchedulingDecisionMaker(
case _ => Future.successful(DecisionResults(Pausing, 0))
}
} else {
- val capacity = limit - existingContainerCountInNs -
inProgressContainerCountInNs
+ val capacity = if (schedulingConfig.allowOverProvisionBeforeThrottle &&
totalContainers == 0) {
+ // if space available within the over provision ratio amount above
namespace limit, create one container for new
+ // action so namespace traffic can attempt to re-balance without
blocking entire action
+ if ((ceiling(limit *
schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) -
existingContainerCountInNs - inProgressContainerCountInNs) > 0) {
+ 1
+ } else {
+ 0
+ }
+ } else {
+ limit - existingContainerCountInNs - inProgressContainerCountInNs
+ }
if (capacity <= 0) {
stateName match {
@@ -79,12 +89,15 @@ class SchedulingDecisionMaker(
*
* However, if the container exists(totalContainers != 0), the
activation is not treated as a failure and the activation is delivered to the
container.
*/
- case Running =>
+ case Running
+ if !schedulingConfig.allowOverProvisionBeforeThrottle ||
(schedulingConfig.allowOverProvisionBeforeThrottle && ceiling(
+ limit *
schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) -
existingContainerCountInNs - inProgressContainerCountInNs <= 0) =>
logging.info(
this,
s"there is no capacity activations will be dropped or throttled,
(availableMsg: $availableMsg totalContainers: $totalContainers, limit: $limit,
namespaceContainers: ${existingContainerCountInNs},
namespaceInProgressContainer: ${inProgressContainerCountInNs})
[$invocationNamespace:$action]")
Future.successful(DecisionResults(EnableNamespaceThrottling(dropMsg =
totalContainers == 0), 0))
-
+ case NamespaceThrottled if
schedulingConfig.allowOverProvisionBeforeThrottle && ceiling(limit *
schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) -
existingContainerCountInNs - inProgressContainerCountInNs > 0 =>
+ Future.successful(DecisionResults(DisableNamespaceThrottling, 0))
// do nothing
case _ =>
// no need to print any messages if the state is already
NamespaceThrottled
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
index 090ce3d6e..d793c5848 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
@@ -86,7 +86,7 @@ class MemoryQueueTestsFixture
val testNamespace = "test-namespace"
val testAction = "test-action"
- val schedulingConfig = SchedulingConfig(100.milliseconds, 100.milliseconds,
10.seconds)
+ val schedulingConfig = SchedulingConfig(100.milliseconds, 100.milliseconds,
10.seconds, false, 1.5)
val fqn = FullyQualifiedEntityName(EntityPath(testNamespace),
EntityName(testAction), Some(SemVer(0, 0, 1)))
val revision = DocRevision("1-testRev")
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
index ce1c86682..b70b71554 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala
@@ -47,7 +47,7 @@ class SchedulingDecisionMakerTests
val testAction = "test-action"
val action = FullyQualifiedEntityName(EntityPath(testNamespace),
EntityName(testAction), Some(SemVer(0, 0, 1)))
- val schedulingConfig = SchedulingConfig(100.milliseconds, 100.milliseconds,
10.seconds)
+ val schedulingConfig = SchedulingConfig(100.milliseconds, 100.milliseconds,
10.seconds, false, 1.5)
it should "decide pausing when the limit is less than equal to 0" in {
val decisionMaker =
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action,
schedulingConfig))
@@ -149,7 +149,7 @@ class SchedulingDecisionMakerTests
}
}
- it should "enable namespace throttling with dropping msg when there is not
enough capacity and no container" in {
+ it should "enable namespace throttling with dropping msg when there is not
enough capacity, no container, and namespace over-provision disabled" in {
val decisionMaker =
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action,
schedulingConfig))
val testProbe = TestProbe()
@@ -173,7 +173,7 @@ class SchedulingDecisionMakerTests
testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg =
true), 0))
}
- it should "enable namespace throttling without dropping msg when there is
not enough capacity but are some containers" in {
+ it should "enable namespace throttling without dropping msg when there is
not enough capacity but are some containers and namespace over-provision
disabled" in {
val decisionMaker =
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action,
schedulingConfig))
val testProbe = TestProbe()
@@ -197,7 +197,142 @@ class SchedulingDecisionMakerTests
testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg =
false), 0))
}
- it should "add an initial container if there is no any" in {
+ it should "add one container when there is no container, and namespace
over-provision has capacity" in {
+ val schedulingConfigNamespaceOverProvisioning =
+ SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true,
1.5)
+ val decisionMaker =
+ system.actorOf(SchedulingDecisionMaker.props(testNamespace, action,
schedulingConfigNamespaceOverProvisioning))
+ val testProbe = TestProbe()
+
+ val msg = QueueSnapshot(
+ initialized = false,
+ incomingMsgCount = new AtomicInteger(0),
+ currentMsgCount = 0,
+ existingContainerCount = 0, // there is no container for this action
+ inProgressContainerCount = 0,
+ staleActivationNum = 0,
+ existingContainerCountInNamespace = 1, // but there are already 2
containers in this namespace
+ inProgressContainerCountInNamespace = 1,
+ averageDuration = None,
+ limit = 2,
+ stateName = Running,
+ recipient = testProbe.ref)
+
+ decisionMaker ! msg
+
+ // this queue cannot create an initial container so enable throttling and
drop messages.
+ testProbe.expectMsg(DecisionResults(AddInitialContainer, 1))
+ }
+
+ it should "enable namespace throttling with dropping msg when there is no
container, and namespace over-provision has no capacity" in {
+ val schedulingConfigNamespaceOverProvisioning =
+ SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true,
1.0)
+ val decisionMaker =
+ system.actorOf(SchedulingDecisionMaker.props(testNamespace, action,
schedulingConfigNamespaceOverProvisioning))
+ val testProbe = TestProbe()
+
+ val msg = QueueSnapshot(
+ initialized = true,
+ incomingMsgCount = new AtomicInteger(0),
+ currentMsgCount = 0,
+ existingContainerCount = 0, // there is no container for this action
+ inProgressContainerCount = 0,
+ staleActivationNum = 0,
+ existingContainerCountInNamespace = 1, // but there are already 2
containers in this namespace
+ inProgressContainerCountInNamespace = 1,
+ averageDuration = None,
+ limit = 2,
+ stateName = Running,
+ recipient = testProbe.ref)
+
+ decisionMaker ! msg
+
+ // this queue cannot create an initial container so enable throttling and
drop messages.
+ testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg =
true), 0))
+ }
+
+ it should "disable namespace throttling when namespace over-provision has
capacity again" in {
+ val schedulingConfigNamespaceOverProvisioning =
+ SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true,
1.1)
+ val decisionMaker =
+ system.actorOf(SchedulingDecisionMaker.props(testNamespace, action,
schedulingConfigNamespaceOverProvisioning))
+ val testProbe = TestProbe()
+
+ val msg = QueueSnapshot(
+ initialized = true,
+ incomingMsgCount = new AtomicInteger(0),
+ currentMsgCount = 0,
+ existingContainerCount = 1, // there is one container for this action
+ inProgressContainerCount = 0,
+ staleActivationNum = 0,
+ existingContainerCountInNamespace = 1, // but there are already 2
containers in this namespace
+ inProgressContainerCountInNamespace = 1,
+ averageDuration = None,
+ limit = 2,
+ stateName = NamespaceThrottled,
+ recipient = testProbe.ref)
+
+ decisionMaker ! msg
+
+ // this queue cannot create an initial container so enable throttling and
drop messages.
+ testProbe.expectMsg(DecisionResults(DisableNamespaceThrottling, 0))
+ }
+
+ it should "enable namespace throttling without dropping msg when there is a
container, and namespace over-provision has no additional capacity" in {
+ val schedulingConfigNamespaceOverProvisioning =
+ SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true,
1.0)
+ val decisionMaker =
+ system.actorOf(SchedulingDecisionMaker.props(testNamespace, action,
schedulingConfigNamespaceOverProvisioning))
+ val testProbe = TestProbe()
+
+ val msg = QueueSnapshot(
+ initialized = true,
+ incomingMsgCount = new AtomicInteger(0),
+ currentMsgCount = 0,
+ existingContainerCount = 1,
+ inProgressContainerCount = 0,
+ staleActivationNum = 0,
+ existingContainerCountInNamespace = 1, // but there are already 2
containers in this namespace
+ inProgressContainerCountInNamespace = 1,
+ averageDuration = None,
+ limit = 2,
+ stateName = Running,
+ recipient = testProbe.ref)
+
+ decisionMaker ! msg
+
+ // this queue cannot create an additional container so enable throttling
and drop messages.
+ testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg =
false), 0))
+ }
+
+ it should "not enable namespace throttling when there is not enough capacity
but are some containers and namespace over-provision is enabled with capacity"
in {
+ val schedulingConfigNamespaceOverProvisioning =
+ SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true,
1.5)
+ val decisionMaker =
+ system.actorOf(SchedulingDecisionMaker.props(testNamespace, action,
schedulingConfigNamespaceOverProvisioning))
+ val testProbe = TestProbe()
+
+ val msg = QueueSnapshot(
+ initialized = true,
+ incomingMsgCount = new AtomicInteger(0),
+ currentMsgCount = 0,
+ existingContainerCount = 1, // there are some containers for this action
+ inProgressContainerCount = 1,
+ staleActivationNum = 0,
+ existingContainerCountInNamespace = 2, // but there are already 2
containers in this namespace
+ inProgressContainerCountInNamespace = 2, // this value includes the
count of this action as well.
+ averageDuration = None,
+ limit = 4,
+ stateName = Running,
+ recipient = testProbe.ref)
+
+ decisionMaker ! msg
+
+ // this queue cannot create more containers
+ testProbe.expectNoMessage()
+ }
+
+ it should "add an initial container if there is not any" in {
val decisionMaker =
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action,
schedulingConfig))
val testProbe = TestProbe()
@@ -219,6 +354,7 @@ class SchedulingDecisionMakerTests
testProbe.expectMsg(DecisionResults(AddInitialContainer, 1))
}
+
it should "disable the namespace throttling with adding an initial container
when there is no container" in {
val decisionMaker =
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action,
schedulingConfig))
val testProbe = TestProbe()