This is an automated email from the ASF dual-hosted git repository.
bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 20a7b1c6a add config to fail async scheduler throttles as whisk errors
(#5305)
20a7b1c6a is described below
commit 20a7b1c6ad3c57b583484c1fc12a069da42ad847
Author: Brendan Doyle <[email protected]>
AuthorDate: Thu Sep 1 14:08:16 2022 -0700
add config to fail async scheduler throttles as whisk errors (#5305)
* add config to fail async scheduler throttles as whisk errors
* fix tests
Co-authored-by: Brendan Doyle <[email protected]>
---
core/scheduler/src/main/resources/application.conf | 1 +
.../apache/openwhisk/core/scheduler/queue/MemoryQueue.scala | 11 ++++++-----
.../core/scheduler/queue/test/MemoryQueueFlowTests.scala | 2 +-
.../core/scheduler/queue/test/MemoryQueueTests.scala | 8 ++++----
.../core/scheduler/queue/test/MemoryQueueTestsFixture.scala | 2 +-
5 files changed, 13 insertions(+), 11 deletions(-)
diff --git a/core/scheduler/src/main/resources/application.conf
b/core/scheduler/src/main/resources/application.conf
index e73f764c1..23ca734bb 100644
--- a/core/scheduler/src/main/resources/application.conf
+++ b/core/scheduler/src/main/resources/application.conf
@@ -79,6 +79,7 @@ whisk {
max-blackbox-retention-ms = "300000"
throttling-fraction = "0.9"
duration-buffer-size = "10"
+ fail-throttle-as-whisk-error = "false"
}
queue-manager {
max-scheduling-time = "20 seconds"
diff --git
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
index 436b53d05..45fa2d3a2 100644
---
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
+++
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
@@ -227,7 +227,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
enableNamespaceThrottling()
if (dropMsg)
- completeAllActivations(tooManyConcurrentRequests, isWhiskError = false)
+ completeAllActivations(tooManyConcurrentRequests, isWhiskError =
queueConfig.failThrottleAsWhiskError)
goto(NamespaceThrottled) using ThrottledData(data.schedulerActor,
data.droppingActor)
case Event(StateTimeout, data: RunningData) =>
@@ -269,7 +269,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
when(NamespaceThrottled) {
case Event(msg: ActivationMessage, _: ThrottledData) =>
if (containers.size + creationIds.size == 0) {
- completeErrorActivation(msg, tooManyConcurrentRequests, isWhiskError =
false)
+ completeErrorActivation(msg, tooManyConcurrentRequests, isWhiskError =
queueConfig.failThrottleAsWhiskError)
} else {
handleActivationMessage(msg)
}
@@ -285,7 +285,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
when(ActionThrottled) {
// since there are already too many activation messages, it drops the new
messages
case Event(msg: ActivationMessage, ThrottledData(_, _)) =>
- completeErrorActivation(msg, tooManyConcurrentRequests, isWhiskError =
false)
+ completeErrorActivation(msg, tooManyConcurrentRequests, isWhiskError =
queueConfig.failThrottleAsWhiskError)
stay
}
@@ -823,7 +823,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
CompletionMessage(activation.transid, activationResponse, instance)
}
- if (!isWhiskError && message == tooManyConcurrentRequests) {
+ if (message == tooManyConcurrentRequests) {
val metric = Metric("ConcurrentRateLimit", 1)
UserEvents.send(
messagingProducer,
@@ -1208,7 +1208,8 @@ case class QueueConfig(idleGrace: FiniteDuration,
maxRetentionMs: Long,
maxBlackboxRetentionMs: Long,
throttlingFraction: Double,
- durationBufferSize: Int)
+ durationBufferSize: Int,
+ failThrottleAsWhiskError: Boolean)
case class BufferedRequest(containerId: String, promise:
Promise[Either[MemoryQueueError, ActivationMessage]])
case object DropOld
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
index e68de5b58..1cb2a5cda 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
@@ -517,7 +517,7 @@ class MemoryQueueFlowTests
// max retention size is 10 and throttling fraction is 0.8
// queue will be action throttled at 10 messages and disabled action
throttling at 8 messages
- val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5
seconds, 10, 5000, 10000, 0.8, 10)
+ val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5
seconds, 10, 5000, 10000, 0.8, 10, false)
// limit is 1
val getUserLimit = (_: String) => Future.successful(1)
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
index e64262827..711847269 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
@@ -1116,7 +1116,7 @@ class MemoryQueueTests
expectDurationChecking(mockEsClient, testInvocationNamespace)
- val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5
seconds, 10, 10000, 20000, 0.9, 10)
+ val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5
seconds, 10, 10000, 20000, 0.9, 10, false)
val fsm =
TestFSMRef(
@@ -1342,7 +1342,7 @@ class MemoryQueueTests
expectDurationChecking(mockEsClient, testInvocationNamespace)
- val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5
seconds, 10, 10000, 20000, 0.9, 10)
+ val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5
seconds, 10, 10000, 20000, 0.9, 10, false)
val fsm =
TestFSMRef(
@@ -1585,7 +1585,7 @@ class MemoryQueueTests
// it always induces the throttling
val getZeroLimit = (_: String) => { Future.successful(2) }
- val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5
seconds, 1, 5000, 10000, 0.9, 10)
+ val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5
seconds, 1, 5000, 10000, 0.9, 10, false)
expectDurationChecking(mockEsClient, testInvocationNamespace)
@@ -1632,7 +1632,7 @@ class MemoryQueueTests
val probe = TestProbe()
val parent = TestProbe()
- val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5
seconds, 10, 5000, 10000, 0.9, 10)
+ val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5
seconds, 10, 5000, 10000, 0.9, 10, false)
val msgRetentionSize = queueConfig.maxRetentionSize
val tid = TransactionId(TransactionId.generateTid())
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
index c7a3ff095..090ce3d6e 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
@@ -154,7 +154,7 @@ class MemoryQueueTestsFixture
val actionThrottlingKey = ThrottlingKeys.action(testInvocationNamespace,
fqn.copy(version = None))
// queue variables
- val queueConfig = QueueConfig(5 seconds, 10 seconds, 5 seconds, 5 seconds,
10, 10000, 20000, 0.9, 10)
+ val queueConfig = QueueConfig(5 seconds, 10 seconds, 5 seconds, 5 seconds,
10, 10000, 20000, 0.9, 10, false)
val idleGrace = queueConfig.idleGrace
val stopGrace = queueConfig.stopGrace
val flushGrace = queueConfig.flushGrace