This is an automated email from the ASF dual-hosted git repository.

bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new de3e0a8fd fix bug in average ring buffer and add negative protection 
to scheduling decision maker (#5396)
de3e0a8fd is described below

commit de3e0a8fdf4bf6341ccc423727628d554e53edee
Author: Brendan Doyle <[email protected]>
AuthorDate: Mon Apr 17 19:28:04 2023 -0700

    fix bug in average ring buffer and add negative protection to scheduling 
decision maker (#5396)
    
    Co-authored-by: Brendan Doyle <[email protected]>
---
 .../org/apache/openwhisk/common/AverageRingBuffer.scala | 14 +-------------
 .../core/scheduler/queue/SchedulingDecisionMaker.scala  | 17 +++++++++++++----
 2 files changed, 14 insertions(+), 17 deletions(-)

diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/common/AverageRingBuffer.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/common/AverageRingBuffer.scala
index 0903daaad..57140d440 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/common/AverageRingBuffer.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/common/AverageRingBuffer.scala
@@ -32,18 +32,12 @@ object AverageRingBuffer {
 class AverageRingBuffer(private val maxSize: Int) {
   private var elements = Vector.empty[Double]
   private var sum = 0.0
-  private var max = 0.0
-  private var min = 0.0
 
   def nonEmpty: Boolean = elements.nonEmpty
 
   def average: Double = {
     val size = elements.size
-    if (size > 2) {
-      (sum - max - min) / (size - 2)
-    } else {
-      sum / size
-    }
+    sum / size
   }
 
   def add(el: Double): Unit = synchronized {
@@ -54,12 +48,6 @@ class AverageRingBuffer(private val maxSize: Int) {
       sum += el
       elements = elements :+ el
     }
-    if (el > max) {
-      max = el
-    }
-    if (el < min) {
-      min = el
-    }
   }
 
   def size(): Int = elements.size
diff --git 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
index ab0de2f6a..4d830762c 100644
--- 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
+++ 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
@@ -153,19 +153,24 @@ class SchedulingDecisionMaker(
               Running)
           // need more containers and a message is already processed
           case (Running, Some(duration)) =>
-            // we can safely get the value as we already checked the existence
-            val containerThroughput = (staleThreshold / duration) * 
maxActionConcurrency
+            // we can safely get the value as we already checked the 
existence, have extra protection in case duration is somehow negative
+            val containerThroughput = if (duration <= 0) {
+              maxActionConcurrency
+            } else {
+              (staleThreshold / duration) * maxActionConcurrency
+            }
             val expectedTps = containerThroughput * (existing + inProgress)
             val availableNonStaleActivations = availableMsg - 
staleActivationNum
 
             var staleContainerProvision = 0
             if (staleActivationNum > 0) {
+
               val num = ceiling(staleActivationNum.toDouble / 
containerThroughput)
               // if it tries to create more containers than existing messages, 
we just create shortage
               staleContainerProvision = (if (num > staleActivationNum) 
staleActivationNum else num) - inProgress
             }
 
-            if (availableNonStaleActivations >= expectedTps && existing + 
inProgress < availableNonStaleActivations) {
+            if (availableNonStaleActivations >= expectedTps && existing + 
inProgress < availableNonStaleActivations && duration > 0) {
               val num = ceiling((availableNonStaleActivations / 
containerThroughput) - existing - inProgress)
               // if it tries to create more containers than existing messages, 
we just create shortage
               val actualNum =
@@ -202,7 +207,11 @@ class SchedulingDecisionMaker(
           // this case is for that as a last resort.
           case (Removing, Some(duration)) if staleActivationNum > 0 =>
             // we can safely get the value as we already checked the existence
-            val containerThroughput = (staleThreshold / duration) * 
maxActionConcurrency
+            val containerThroughput = if (duration <= 0) {
+              maxActionConcurrency
+            } else {
+              (staleThreshold / duration) * maxActionConcurrency
+            }
             val num = ceiling(staleActivationNum.toDouble / 
containerThroughput)
             // if it tries to create more containers than existing messages, 
we just create shortage
             val actualNum = (if (num > staleActivationNum) staleActivationNum 
else num) - inProgress

Reply via email to