style95 commented on code in PR #5284:
URL: https://github.com/apache/openwhisk/pull/5284#discussion_r1009995134
##########
tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala:
##########
@@ -197,7 +197,142 @@ class SchedulingDecisionMakerTests
testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg =
false), 0))
}
- it should "add an initial container if there is no any" in {
+ it should "add one container when there is no container, and namespace
over-provision has capacity" in {
+ val schedulingConfigNamespaceOverProvisioning =
+ SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true,
1.5)
+ val decisionMaker =
+ system.actorOf(SchedulingDecisionMaker.props(testNamespace, action,
schedulingConfigNamespaceOverProvisioning))
+ val testProbe = TestProbe()
+
+ val msg = QueueSnapshot(
+ initialized = false,
+ incomingMsgCount = new AtomicInteger(0),
+ currentMsgCount = 0,
+ existingContainerCount = 0, // there is no container for this action
+ inProgressContainerCount = 0,
+ staleActivationNum = 0,
+ existingContainerCountInNamespace = 1, // but there are already 2
containers in this namespace
+ inProgressContainerCountInNamespace = 1,
+ averageDuration = None,
+ limit = 2,
+ stateName = Running,
+ recipient = testProbe.ref)
+
+ decisionMaker ! msg
+
+ // this queue cannot create an initial container so enable throttling and
drop messages.
+ testProbe.expectMsg(DecisionResults(AddInitialContainer, 1))
+ }
+
+ it should "enable namespace throttling with dropping msg when there is no
container, and namespace over-provision has no capacity" in {
+ val schedulingConfigNamespaceOverProvisioning =
+ SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true,
1.0)
+ val decisionMaker =
+ system.actorOf(SchedulingDecisionMaker.props(testNamespace, action,
schedulingConfigNamespaceOverProvisioning))
+ val testProbe = TestProbe()
+
+ val msg = QueueSnapshot(
+ initialized = true,
+ incomingMsgCount = new AtomicInteger(0),
+ currentMsgCount = 0,
+ existingContainerCount = 0, // there is no container for this action
+ inProgressContainerCount = 0,
+ staleActivationNum = 0,
+ existingContainerCountInNamespace = 1, // but there are already 2
containers in this namespace
+ inProgressContainerCountInNamespace = 1,
+ averageDuration = None,
+ limit = 2,
+ stateName = Running,
+ recipient = testProbe.ref)
+
+ decisionMaker ! msg
+
+ // this queue cannot create an initial container so enable throttling and
drop messages.
+ testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg =
true), 0))
+ }
+
+ it should "disable namespace throttling when namespace over-provision has
capacity again" in {
+ val schedulingConfigNamespaceOverProvisioning =
+ SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true,
1.1)
+ val decisionMaker =
+ system.actorOf(SchedulingDecisionMaker.props(testNamespace, action,
schedulingConfigNamespaceOverProvisioning))
+ val testProbe = TestProbe()
+
+ val msg = QueueSnapshot(
+ initialized = true,
+ incomingMsgCount = new AtomicInteger(0),
+ currentMsgCount = 0,
+ existingContainerCount = 1, // there is no container for this action
Review Comment:
```suggestion
existingContainerCount = 1, // there is one container for this action
```
##########
tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala:
##########
@@ -197,7 +197,142 @@ class SchedulingDecisionMakerTests
testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg =
false), 0))
}
- it should "add an initial container if there is no any" in {
+ it should "add one container when there is no container, and namespace
over-provision has capacity" in {
+ val schedulingConfigNamespaceOverProvisioning =
+ SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true,
1.5)
+ val decisionMaker =
+ system.actorOf(SchedulingDecisionMaker.props(testNamespace, action,
schedulingConfigNamespaceOverProvisioning))
+ val testProbe = TestProbe()
+
+ val msg = QueueSnapshot(
+ initialized = false,
+ incomingMsgCount = new AtomicInteger(0),
+ currentMsgCount = 0,
+ existingContainerCount = 0, // there is no container for this action
+ inProgressContainerCount = 0,
+ staleActivationNum = 0,
+ existingContainerCountInNamespace = 1, // but there are already 2
containers in this namespace
+ inProgressContainerCountInNamespace = 1,
+ averageDuration = None,
+ limit = 2,
+ stateName = Running,
+ recipient = testProbe.ref)
+
+ decisionMaker ! msg
+
+ // this queue cannot create an initial container so enable throttling and
drop messages.
+ testProbe.expectMsg(DecisionResults(AddInitialContainer, 1))
+ }
+
+ it should "enable namespace throttling with dropping msg when there is no
container, and namespace over-provision has no capacity" in {
+ val schedulingConfigNamespaceOverProvisioning =
+ SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true,
1.0)
+ val decisionMaker =
+ system.actorOf(SchedulingDecisionMaker.props(testNamespace, action,
schedulingConfigNamespaceOverProvisioning))
+ val testProbe = TestProbe()
+
+ val msg = QueueSnapshot(
+ initialized = true,
+ incomingMsgCount = new AtomicInteger(0),
+ currentMsgCount = 0,
+ existingContainerCount = 0, // there is no container for this action
+ inProgressContainerCount = 0,
+ staleActivationNum = 0,
+ existingContainerCountInNamespace = 1, // but there are already 2
containers in this namespace
+ inProgressContainerCountInNamespace = 1,
+ averageDuration = None,
+ limit = 2,
+ stateName = Running,
+ recipient = testProbe.ref)
+
+ decisionMaker ! msg
+
+ // this queue cannot create an initial container so enable throttling and
drop messages.
+ testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg =
true), 0))
+ }
+
+ it should "disable namespace throttling when namespace over-provision has
capacity again" in {
+ val schedulingConfigNamespaceOverProvisioning =
+ SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true,
1.1)
+ val decisionMaker =
+ system.actorOf(SchedulingDecisionMaker.props(testNamespace, action,
schedulingConfigNamespaceOverProvisioning))
+ val testProbe = TestProbe()
+
+ val msg = QueueSnapshot(
+ initialized = true,
+ incomingMsgCount = new AtomicInteger(0),
+ currentMsgCount = 0,
+ existingContainerCount = 1, // there is no container for this action
+ inProgressContainerCount = 0,
+ staleActivationNum = 0,
+ existingContainerCountInNamespace = 1, // but there are already 2
containers in this namespace
+ inProgressContainerCountInNamespace = 1,
+ averageDuration = None,
+ limit = 2,
+ stateName = NamespaceThrottled,
+ recipient = testProbe.ref)
+
+ decisionMaker ! msg
+
+ // this queue cannot create an initial container so enable throttling and
drop messages.
+ testProbe.expectMsg(DecisionResults(DisableNamespaceThrottling, 0))
+ }
+
+ it should "enable namespace throttling with dropping msg when there is a
container, and namespace over-provision has no additional capacity" in {
Review Comment:
```suggestion
it should "enable namespace throttling without dropping msg when there is
a container, and namespace over-provision has no additional capacity" in {
```
##########
core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala:
##########
@@ -79,12 +89,15 @@ class SchedulingDecisionMaker(
*
* However, if the container exists(totalContainers != 0), the
activation is not treated as a failure and the activation is delivered to the
container.
*/
- case Running =>
+ case Running
+ if !schedulingConfig.allowOverProvisionBeforeThrottle ||
(schedulingConfig.allowOverProvisionBeforeThrottle && ceiling(
+ limit *
schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) -
existingContainerCountInNs - inProgressContainerCountInNs <= 0) =>
logging.info(
this,
s"there is no capacity activations will be dropped or throttled,
(availableMsg: $availableMsg totalContainers: $totalContainers, limit: $limit,
namespaceContainers: ${existingContainerCountInNs},
namespaceInProgressContainer: ${inProgressContainerCountInNs})
[$invocationNamespace:$action]")
Future.successful(DecisionResults(EnableNamespaceThrottling(dropMsg =
totalContainers == 0), 0))
-
+ case NamespaceThrottled if
schedulingConfig.allowOverProvisionBeforeThrottle && ceiling(limit *
schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) -
existingContainerCountInNs - inProgressContainerCountInNs > 0 =>
Review Comment:
👍
##########
tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala:
##########
@@ -197,7 +197,142 @@ class SchedulingDecisionMakerTests
testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg =
false), 0))
}
- it should "add an initial container if there is no any" in {
+ it should "add one container when there is no container, and namespace
over-provision has capacity" in {
+ val schedulingConfigNamespaceOverProvisioning =
+ SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true,
1.5)
+ val decisionMaker =
+ system.actorOf(SchedulingDecisionMaker.props(testNamespace, action,
schedulingConfigNamespaceOverProvisioning))
+ val testProbe = TestProbe()
+
+ val msg = QueueSnapshot(
+ initialized = false,
+ incomingMsgCount = new AtomicInteger(0),
+ currentMsgCount = 0,
+ existingContainerCount = 0, // there is no container for this action
+ inProgressContainerCount = 0,
+ staleActivationNum = 0,
+ existingContainerCountInNamespace = 1, // but there are already 2
containers in this namespace
+ inProgressContainerCountInNamespace = 1,
+ averageDuration = None,
+ limit = 2,
+ stateName = Running,
+ recipient = testProbe.ref)
+
+ decisionMaker ! msg
+
+ // this queue cannot create an initial container so enable throttling and
drop messages.
+ testProbe.expectMsg(DecisionResults(AddInitialContainer, 1))
+ }
+
+ it should "enable namespace throttling with dropping msg when there is no
container, and namespace over-provision has no capacity" in {
+ val schedulingConfigNamespaceOverProvisioning =
+ SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true,
1.0)
+ val decisionMaker =
+ system.actorOf(SchedulingDecisionMaker.props(testNamespace, action,
schedulingConfigNamespaceOverProvisioning))
+ val testProbe = TestProbe()
+
+ val msg = QueueSnapshot(
+ initialized = true,
+ incomingMsgCount = new AtomicInteger(0),
+ currentMsgCount = 0,
+ existingContainerCount = 0, // there is no container for this action
+ inProgressContainerCount = 0,
+ staleActivationNum = 0,
+ existingContainerCountInNamespace = 1, // but there are already 2
containers in this namespace
+ inProgressContainerCountInNamespace = 1,
+ averageDuration = None,
+ limit = 2,
+ stateName = Running,
+ recipient = testProbe.ref)
+
+ decisionMaker ! msg
+
+ // this queue cannot create an initial container so enable throttling and
drop messages.
+ testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg =
true), 0))
+ }
+
+ it should "disable namespace throttling when namespace over-provision has
capacity again" in {
+ val schedulingConfigNamespaceOverProvisioning =
+ SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true,
1.1)
+ val decisionMaker =
+ system.actorOf(SchedulingDecisionMaker.props(testNamespace, action,
schedulingConfigNamespaceOverProvisioning))
+ val testProbe = TestProbe()
+
+ val msg = QueueSnapshot(
+ initialized = true,
+ incomingMsgCount = new AtomicInteger(0),
+ currentMsgCount = 0,
+ existingContainerCount = 1, // there is no container for this action
+ inProgressContainerCount = 0,
+ staleActivationNum = 0,
+ existingContainerCountInNamespace = 1, // but there are already 2
containers in this namespace
+ inProgressContainerCountInNamespace = 1,
+ averageDuration = None,
+ limit = 2,
+ stateName = NamespaceThrottled,
+ recipient = testProbe.ref)
+
+ decisionMaker ! msg
+
+ // this queue cannot create an initial container so enable throttling and
drop messages.
+ testProbe.expectMsg(DecisionResults(DisableNamespaceThrottling, 0))
+ }
+
+ it should "enable namespace throttling with dropping msg when there is a
container, and namespace over-provision has no additional capacity" in {
+ val schedulingConfigNamespaceOverProvisioning =
+ SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true,
1.0)
+ val decisionMaker =
+ system.actorOf(SchedulingDecisionMaker.props(testNamespace, action,
schedulingConfigNamespaceOverProvisioning))
+ val testProbe = TestProbe()
+
+ val msg = QueueSnapshot(
+ initialized = true,
+ incomingMsgCount = new AtomicInteger(0),
+ currentMsgCount = 0,
+ existingContainerCount = 1,
+ inProgressContainerCount = 0,
+ staleActivationNum = 0,
+ existingContainerCountInNamespace = 1, // but there are already 2
containers in this namespace
+ inProgressContainerCountInNamespace = 1,
+ averageDuration = None,
+ limit = 2,
+ stateName = Running,
+ recipient = testProbe.ref)
+
+ decisionMaker ! msg
+
+ // this queue cannot create an initial container so enable throttling and
drop messages.
Review Comment:
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]