style95 commented on code in PR #5284:
URL: https://github.com/apache/openwhisk/pull/5284#discussion_r1009995134


##########
tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala:
##########
@@ -197,7 +197,142 @@ class SchedulingDecisionMakerTests
     testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg = 
false), 0))
   }
 
-  it should "add an initial container if there is no any" in {
+  it should "add one container when there is no container, and namespace 
over-provision has capacity" in {
+    val schedulingConfigNamespaceOverProvisioning =
+      SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true, 
1.5)
+    val decisionMaker =
+      system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, 
schedulingConfigNamespaceOverProvisioning))
+    val testProbe = TestProbe()
+
+    val msg = QueueSnapshot(
+      initialized = false,
+      incomingMsgCount = new AtomicInteger(0),
+      currentMsgCount = 0,
+      existingContainerCount = 0, // there is no container for this action
+      inProgressContainerCount = 0,
+      staleActivationNum = 0,
+      existingContainerCountInNamespace = 1, // but there are already 2 
containers in this namespace
+      inProgressContainerCountInNamespace = 1,
+      averageDuration = None,
+      limit = 2,
+      stateName = Running,
+      recipient = testProbe.ref)
+
+    decisionMaker ! msg
+
+    // this queue cannot create an initial container so enable throttling and 
drop messages.
+    testProbe.expectMsg(DecisionResults(AddInitialContainer, 1))
+  }
+
+  it should "enable namespace throttling with dropping msg when there is no 
container, and namespace over-provision has no capacity" in {
+    val schedulingConfigNamespaceOverProvisioning =
+      SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true, 
1.0)
+    val decisionMaker =
+      system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, 
schedulingConfigNamespaceOverProvisioning))
+    val testProbe = TestProbe()
+
+    val msg = QueueSnapshot(
+      initialized = true,
+      incomingMsgCount = new AtomicInteger(0),
+      currentMsgCount = 0,
+      existingContainerCount = 0, // there is no container for this action
+      inProgressContainerCount = 0,
+      staleActivationNum = 0,
+      existingContainerCountInNamespace = 1, // but there are already 2 
containers in this namespace
+      inProgressContainerCountInNamespace = 1,
+      averageDuration = None,
+      limit = 2,
+      stateName = Running,
+      recipient = testProbe.ref)
+
+    decisionMaker ! msg
+
+    // this queue cannot create an initial container so enable throttling and 
drop messages.
+    testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg = 
true), 0))
+  }
+
+  it should "disable namespace throttling when namespace over-provision has 
capacity again" in {
+    val schedulingConfigNamespaceOverProvisioning =
+      SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true, 
1.1)
+    val decisionMaker =
+      system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, 
schedulingConfigNamespaceOverProvisioning))
+    val testProbe = TestProbe()
+
+    val msg = QueueSnapshot(
+      initialized = true,
+      incomingMsgCount = new AtomicInteger(0),
+      currentMsgCount = 0,
+      existingContainerCount = 1, // there is no container for this action

Review Comment:
   ```suggestion
         existingContainerCount = 1, // there is one container for this action
   ```



##########
tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala:
##########
@@ -197,7 +197,142 @@ class SchedulingDecisionMakerTests
     testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg = 
false), 0))
   }
 
-  it should "add an initial container if there is no any" in {
+  it should "add one container when there is no container, and namespace 
over-provision has capacity" in {
+    val schedulingConfigNamespaceOverProvisioning =
+      SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true, 
1.5)
+    val decisionMaker =
+      system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, 
schedulingConfigNamespaceOverProvisioning))
+    val testProbe = TestProbe()
+
+    val msg = QueueSnapshot(
+      initialized = false,
+      incomingMsgCount = new AtomicInteger(0),
+      currentMsgCount = 0,
+      existingContainerCount = 0, // there is no container for this action
+      inProgressContainerCount = 0,
+      staleActivationNum = 0,
+      existingContainerCountInNamespace = 1, // but there are already 2 
containers in this namespace
+      inProgressContainerCountInNamespace = 1,
+      averageDuration = None,
+      limit = 2,
+      stateName = Running,
+      recipient = testProbe.ref)
+
+    decisionMaker ! msg
+
+    // this queue cannot create an initial container so enable throttling and 
drop messages.
+    testProbe.expectMsg(DecisionResults(AddInitialContainer, 1))
+  }
+
+  it should "enable namespace throttling with dropping msg when there is no 
container, and namespace over-provision has no capacity" in {
+    val schedulingConfigNamespaceOverProvisioning =
+      SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true, 
1.0)
+    val decisionMaker =
+      system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, 
schedulingConfigNamespaceOverProvisioning))
+    val testProbe = TestProbe()
+
+    val msg = QueueSnapshot(
+      initialized = true,
+      incomingMsgCount = new AtomicInteger(0),
+      currentMsgCount = 0,
+      existingContainerCount = 0, // there is no container for this action
+      inProgressContainerCount = 0,
+      staleActivationNum = 0,
+      existingContainerCountInNamespace = 1, // but there are already 2 
containers in this namespace
+      inProgressContainerCountInNamespace = 1,
+      averageDuration = None,
+      limit = 2,
+      stateName = Running,
+      recipient = testProbe.ref)
+
+    decisionMaker ! msg
+
+    // this queue cannot create an initial container so enable throttling and 
drop messages.
+    testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg = 
true), 0))
+  }
+
+  it should "disable namespace throttling when namespace over-provision has 
capacity again" in {
+    val schedulingConfigNamespaceOverProvisioning =
+      SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true, 
1.1)
+    val decisionMaker =
+      system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, 
schedulingConfigNamespaceOverProvisioning))
+    val testProbe = TestProbe()
+
+    val msg = QueueSnapshot(
+      initialized = true,
+      incomingMsgCount = new AtomicInteger(0),
+      currentMsgCount = 0,
+      existingContainerCount = 1, // there is no container for this action
+      inProgressContainerCount = 0,
+      staleActivationNum = 0,
+      existingContainerCountInNamespace = 1, // but there are already 2 
containers in this namespace
+      inProgressContainerCountInNamespace = 1,
+      averageDuration = None,
+      limit = 2,
+      stateName = NamespaceThrottled,
+      recipient = testProbe.ref)
+
+    decisionMaker ! msg
+
+    // this queue cannot create an initial container so enable throttling and 
drop messages.
+    testProbe.expectMsg(DecisionResults(DisableNamespaceThrottling, 0))
+  }
+
+  it should "enable namespace throttling with dropping msg when there is a 
container, and namespace over-provision has no additional capacity" in {

Review Comment:
   ```suggestion
     it should "enable namespace throttling without dropping msg when there is 
a container, and namespace over-provision has no additional capacity" in {
   ```



##########
core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala:
##########
@@ -79,12 +89,15 @@ class SchedulingDecisionMaker(
            *
            * However, if the container exists(totalContainers != 0), the 
activation is not treated as a failure and the activation is delivered to the 
container.
            */
-          case Running =>
+          case Running
+              if !schedulingConfig.allowOverProvisionBeforeThrottle || 
(schedulingConfig.allowOverProvisionBeforeThrottle && ceiling(
+                limit * 
schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) - 
existingContainerCountInNs - inProgressContainerCountInNs <= 0) =>
             logging.info(
               this,
               s"there is no capacity activations will be dropped or throttled, 
(availableMsg: $availableMsg totalContainers: $totalContainers, limit: $limit, 
namespaceContainers: ${existingContainerCountInNs}, 
namespaceInProgressContainer: ${inProgressContainerCountInNs}) 
[$invocationNamespace:$action]")
             
Future.successful(DecisionResults(EnableNamespaceThrottling(dropMsg = 
totalContainers == 0), 0))
-
+          case NamespaceThrottled if 
schedulingConfig.allowOverProvisionBeforeThrottle && ceiling(limit * 
schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) - 
existingContainerCountInNs - inProgressContainerCountInNs > 0 =>

Review Comment:
   👍 



##########
tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala:
##########
@@ -197,7 +197,142 @@ class SchedulingDecisionMakerTests
     testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg = 
false), 0))
   }
 
-  it should "add an initial container if there is no any" in {
+  it should "add one container when there is no container, and namespace 
over-provision has capacity" in {
+    val schedulingConfigNamespaceOverProvisioning =
+      SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true, 
1.5)
+    val decisionMaker =
+      system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, 
schedulingConfigNamespaceOverProvisioning))
+    val testProbe = TestProbe()
+
+    val msg = QueueSnapshot(
+      initialized = false,
+      incomingMsgCount = new AtomicInteger(0),
+      currentMsgCount = 0,
+      existingContainerCount = 0, // there is no container for this action
+      inProgressContainerCount = 0,
+      staleActivationNum = 0,
+      existingContainerCountInNamespace = 1, // but there are already 2 
containers in this namespace
+      inProgressContainerCountInNamespace = 1,
+      averageDuration = None,
+      limit = 2,
+      stateName = Running,
+      recipient = testProbe.ref)
+
+    decisionMaker ! msg
+
+    // this queue cannot create an initial container so enable throttling and 
drop messages.
+    testProbe.expectMsg(DecisionResults(AddInitialContainer, 1))
+  }
+
+  it should "enable namespace throttling with dropping msg when there is no 
container, and namespace over-provision has no capacity" in {
+    val schedulingConfigNamespaceOverProvisioning =
+      SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true, 
1.0)
+    val decisionMaker =
+      system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, 
schedulingConfigNamespaceOverProvisioning))
+    val testProbe = TestProbe()
+
+    val msg = QueueSnapshot(
+      initialized = true,
+      incomingMsgCount = new AtomicInteger(0),
+      currentMsgCount = 0,
+      existingContainerCount = 0, // there is no container for this action
+      inProgressContainerCount = 0,
+      staleActivationNum = 0,
+      existingContainerCountInNamespace = 1, // but there are already 2 
containers in this namespace
+      inProgressContainerCountInNamespace = 1,
+      averageDuration = None,
+      limit = 2,
+      stateName = Running,
+      recipient = testProbe.ref)
+
+    decisionMaker ! msg
+
+    // this queue cannot create an initial container so enable throttling and 
drop messages.
+    testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg = 
true), 0))
+  }
+
+  it should "disable namespace throttling when namespace over-provision has 
capacity again" in {
+    val schedulingConfigNamespaceOverProvisioning =
+      SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true, 
1.1)
+    val decisionMaker =
+      system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, 
schedulingConfigNamespaceOverProvisioning))
+    val testProbe = TestProbe()
+
+    val msg = QueueSnapshot(
+      initialized = true,
+      incomingMsgCount = new AtomicInteger(0),
+      currentMsgCount = 0,
+      existingContainerCount = 1, // there is no container for this action
+      inProgressContainerCount = 0,
+      staleActivationNum = 0,
+      existingContainerCountInNamespace = 1, // but there are already 2 
containers in this namespace
+      inProgressContainerCountInNamespace = 1,
+      averageDuration = None,
+      limit = 2,
+      stateName = NamespaceThrottled,
+      recipient = testProbe.ref)
+
+    decisionMaker ! msg
+
+    // this queue cannot create an initial container so enable throttling and 
drop messages.
+    testProbe.expectMsg(DecisionResults(DisableNamespaceThrottling, 0))
+  }
+
+  it should "enable namespace throttling with dropping msg when there is a 
container, and namespace over-provision has no additional capacity" in {
+    val schedulingConfigNamespaceOverProvisioning =
+      SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true, 
1.0)
+    val decisionMaker =
+      system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, 
schedulingConfigNamespaceOverProvisioning))
+    val testProbe = TestProbe()
+
+    val msg = QueueSnapshot(
+      initialized = true,
+      incomingMsgCount = new AtomicInteger(0),
+      currentMsgCount = 0,
+      existingContainerCount = 1,
+      inProgressContainerCount = 0,
+      staleActivationNum = 0,
+      existingContainerCountInNamespace = 1, // but there are already 2 
containers in this namespace
+      inProgressContainerCountInNamespace = 1,
+      averageDuration = None,
+      limit = 2,
+      stateName = Running,
+      recipient = testProbe.ref)
+
+    decisionMaker ! msg
+
+    // this queue cannot create an initial container so enable throttling and 
drop messages.

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to