This is an automated email from the ASF dual-hosted git repository.
bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new de3e0a8fd fix bug in average ring buffer and add negative protection
to scheduling decision maker (#5396)
de3e0a8fd is described below
commit de3e0a8fdf4bf6341ccc423727628d554e53edee
Author: Brendan Doyle <[email protected]>
AuthorDate: Mon Apr 17 19:28:04 2023 -0700
fix bug in average ring buffer and add negative protection to scheduling
decision maker (#5396)
Co-authored-by: Brendan Doyle <[email protected]>
---
.../org/apache/openwhisk/common/AverageRingBuffer.scala | 14 +-------------
.../core/scheduler/queue/SchedulingDecisionMaker.scala | 17 +++++++++++++----
2 files changed, 14 insertions(+), 17 deletions(-)
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/common/AverageRingBuffer.scala
b/common/scala/src/main/scala/org/apache/openwhisk/common/AverageRingBuffer.scala
index 0903daaad..57140d440 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/common/AverageRingBuffer.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/common/AverageRingBuffer.scala
@@ -32,18 +32,12 @@ object AverageRingBuffer {
class AverageRingBuffer(private val maxSize: Int) {
private var elements = Vector.empty[Double]
private var sum = 0.0
- private var max = 0.0
- private var min = 0.0
def nonEmpty: Boolean = elements.nonEmpty
def average: Double = {
val size = elements.size
- if (size > 2) {
- (sum - max - min) / (size - 2)
- } else {
- sum / size
- }
+ sum / size
}
def add(el: Double): Unit = synchronized {
@@ -54,12 +48,6 @@ class AverageRingBuffer(private val maxSize: Int) {
sum += el
elements = elements :+ el
}
- if (el > max) {
- max = el
- }
- if (el < min) {
- min = el
- }
}
def size(): Int = elements.size
diff --git
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
index ab0de2f6a..4d830762c 100644
---
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
+++
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala
@@ -153,19 +153,24 @@ class SchedulingDecisionMaker(
Running)
// need more containers and a message is already processed
case (Running, Some(duration)) =>
- // we can safely get the value as we already checked the existence
- val containerThroughput = (staleThreshold / duration) *
maxActionConcurrency
+ // we can safely get the value as we already checked the
existence, have extra protection in case duration is somehow negative
+ val containerThroughput = if (duration <= 0) {
+ maxActionConcurrency
+ } else {
+ (staleThreshold / duration) * maxActionConcurrency
+ }
val expectedTps = containerThroughput * (existing + inProgress)
val availableNonStaleActivations = availableMsg -
staleActivationNum
var staleContainerProvision = 0
if (staleActivationNum > 0) {
+
val num = ceiling(staleActivationNum.toDouble /
containerThroughput)
// if it tries to create more containers than existing messages,
we just create shortage
staleContainerProvision = (if (num > staleActivationNum)
staleActivationNum else num) - inProgress
}
- if (availableNonStaleActivations >= expectedTps && existing +
inProgress < availableNonStaleActivations) {
+ if (availableNonStaleActivations >= expectedTps && existing +
inProgress < availableNonStaleActivations && duration > 0) {
val num = ceiling((availableNonStaleActivations /
containerThroughput) - existing - inProgress)
// if it tries to create more containers than existing messages,
we just create shortage
val actualNum =
@@ -202,7 +207,11 @@ class SchedulingDecisionMaker(
// this case is for that as a last resort.
case (Removing, Some(duration)) if staleActivationNum > 0 =>
// we can safely get the value as we already checked the existence
- val containerThroughput = (staleThreshold / duration) *
maxActionConcurrency
+ val containerThroughput = if (duration <= 0) {
+ maxActionConcurrency
+ } else {
+ (staleThreshold / duration) * maxActionConcurrency
+ }
val num = ceiling(staleActivationNum.toDouble /
containerThroughput)
// if it tries to create more containers than existing messages,
we just create shortage
val actualNum = (if (num > staleActivationNum) staleActivationNum
else num) - inProgress