This is an automated email from the ASF dual-hosted git repository.
style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 8d31e96e1 [Scheduler Enhancement] Increase the retention timeout for
the blackbox action. (#5266)
8d31e96e1 is described below
commit 8d31e96e1a321987f9f5c3b7289ba83bf4eebff6
Author: Dominic Kim <[email protected]>
AuthorDate: Tue Jul 12 19:56:13 2022 +0900
[Scheduler Enhancement] Increase the retention timeout for the blackbox
action. (#5266)
* Increase the retention timeout for the blackbox action.
* Fix test cases.
* Apply scalaFmt.
* Add GracefulShutdown case back.
* Increase the blackbox timeout for test cases.
* Access the private method directly.
* Replace Thread.sleep with awaitAssert
* Add the missing configuration.
* Enhance the test code.
* Remove thread.sleep.
* Fix test cases.
* Fix test cases.
---
ansible/group_vars/all | 1 +
ansible/roles/schedulers/tasks/deploy.yml | 2 +
.../org/apache/openwhisk/common/Logging.scala | 3 +-
.../org/apache/openwhisk/core/WhiskConfig.scala | 1 +
.../v2/FunctionPullingContainerPool.scala | 12 +-
core/scheduler/src/main/resources/application.conf | 2 +
.../scheduler/container/CreationJobManager.scala | 22 ++-
.../core/scheduler/queue/MemoryQueue.scala | 85 ++++++++---
tests/src/test/resources/application.conf.j2 | 1 +
.../test/FunctionPullingContainerPoolTests.scala | 143 ++++++++----------
.../container/test/CreationJobManagerTests.scala | 60 ++++----
.../queue/test/MemoryQueueFlowTests.scala | 118 +++++++--------
.../scheduler/queue/test/MemoryQueueTests.scala | 168 ++++++++++++++++++---
.../queue/test/MemoryQueueTestsFixture.scala | 4 +-
14 files changed, 395 insertions(+), 227 deletions(-)
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 2d8365ea1..97a3c0cbb 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -518,6 +518,7 @@ scheduler:
dataManagementService:
retryInterval: "{{ scheduler_dataManagementService_retryInterval |
default('1 second') }}"
inProgressJobRetention: "{{ scheduler_inProgressJobRetention | default('20
seconds') }}"
+ blackboxMultiple: "{{ scheduler_blackboxMultiple | default(15) }}"
managedFraction: "{{ scheduler_managed_fraction | default(1.0 -
(scheduler_blackbox_fraction | default(__scheduler_blackbox_fraction))) }}"
blackboxFraction: "{{ scheduler_blackbox_fraction |
default(__scheduler_blackbox_fraction) }}"
scheduling:
diff --git a/ansible/roles/schedulers/tasks/deploy.yml
b/ansible/roles/schedulers/tasks/deploy.yml
index 507f4813f..82f82d4c1 100644
--- a/ansible/roles/schedulers/tasks/deploy.yml
+++ b/ansible/roles/schedulers/tasks/deploy.yml
@@ -113,6 +113,7 @@
"CONFIG_whisk_scheduler_maxPeek": "{{ scheduler.maxPeek }}"
"CONFIG_whisk_scheduler_dataManagementService_retryInterval": "{{
scheduler.dataManagementService.retryInterval }}"
"CONFIG_whisk_scheduler_inProgressJobRetention": "{{
scheduler.inProgressJobRetention }}"
+ "CONFIG_whisk_scheduler_blackboxMultiple": "{{
scheduler.blackboxMultiple }}"
"CONFIG_whisk_scheduler_scheduling_staleThreshold": "{{
scheduler.scheduling.staleThreshold }}"
"CONFIG_whisk_scheduler_scheduling_checkInterval": "{{
scheduler.scheduling.checkInterval }}"
"CONFIG_whisk_scheduler_scheduling_dropInterval": "{{
scheduler.scheduling.dropInterval }}"
@@ -124,6 +125,7 @@
"CONFIG_whisk_scheduler_queue_gracefulShutdownTimeout": "{{
scheduler.queue.gracefulShutdownTimeout }}"
"CONFIG_whisk_scheduler_queue_maxRetentionSize": "{{
scheduler.queue.maxRetentionSize }}"
"CONFIG_whisk_scheduler_queue_maxRetentionMs": "{{
scheduler.queue.maxRetentionMs }}"
+ "CONFIG_whisk_scheduler_queue_maxBlackboxRetentionMs": "{{
scheduler.queue.maxBlackboxRetentionMs }}"
"CONFIG_whisk_scheduler_queue_throttlingFraction": "{{
scheduler.queue.throttlingFraction }}"
"CONFIG_whisk_scheduler_queue_durationBufferSize": "{{
scheduler.queue.durationBufferSize }}"
"CONFIG_whisk_durationChecker_timeWindow": "{{
durationChecker.timeWindow }}"
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index a9da001b0..ff82ef5fb 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -594,7 +594,8 @@ object LoggingMarkers {
val SCHEDULER_KAFKA_WAIT_TIME =
LogMarkerToken(scheduler, "kafkaWaitTime",
counter)(MeasurementUnit.time.milliseconds)
def SCHEDULER_WAIT_TIME(action: String) =
- LogMarkerToken(scheduler, "waitTime", counter, Some(action), Map("action"
-> action))(MeasurementUnit.time.milliseconds)
+ LogMarkerToken(scheduler, "waitTime", counter, Some(action), Map("action"
-> action))(
+ MeasurementUnit.time.milliseconds)
def SCHEDULER_KEEP_ALIVE(leaseId: Long) =
LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" ->
leaseId.toString))(MeasurementUnit.none)
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index 95520f242..57d4a8b03 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -305,6 +305,7 @@ object ConfigKeys {
val schedulerQueue = "whisk.scheduler.queue"
val schedulerQueueManager = "whisk.scheduler.queue-manager"
val schedulerInProgressJobRetention =
"whisk.scheduler.in-progress-job-retention"
+ val schedulerBlackboxMultiple = "whisk.scheduler.blackbox-multiple"
val schedulerStaleThreshold = "whisk.scheduler.stale-threshold"
val whiskClusterName = "whisk.cluster.name"
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala
index ac0d2e1ea..5b0c283c3 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala
@@ -88,14 +88,14 @@ class FunctionPullingContainerPool(
implicit val ec = context.system.dispatcher
- private var busyPool = immutable.Map.empty[ActorRef, Data]
- private var inProgressPool = immutable.Map.empty[ActorRef, Data]
- private var warmedPool = immutable.Map.empty[ActorRef, WarmData]
- private var prewarmedPool = immutable.Map.empty[ActorRef, PreWarmData]
- private var prewarmStartingPool = immutable.Map.empty[ActorRef, (String,
ByteSize)]
+ protected[containerpool] var busyPool = immutable.Map.empty[ActorRef, Data]
+ protected[containerpool] var inProgressPool = immutable.Map.empty[ActorRef,
Data]
+ protected[containerpool] var warmedPool = immutable.Map.empty[ActorRef,
WarmData]
+ protected[containerpool] var prewarmedPool = immutable.Map.empty[ActorRef,
PreWarmData]
+ protected[containerpool] var prewarmStartingPool =
immutable.Map.empty[ActorRef, (String, ByteSize)]
// for shutting down
- private var disablingPool = immutable.Set.empty[ActorRef]
+ protected[containerpool] var disablingPool = immutable.Set.empty[ActorRef]
private var shuttingDown = false
diff --git a/core/scheduler/src/main/resources/application.conf
b/core/scheduler/src/main/resources/application.conf
index 211ae5f0d..e73f764c1 100644
--- a/core/scheduler/src/main/resources/application.conf
+++ b/core/scheduler/src/main/resources/application.conf
@@ -76,6 +76,7 @@ whisk {
graceful-shutdown-timeout = "5 seconds"
max-retention-size = "10000"
max-retention-ms = "60000"
+ max-blackbox-retention-ms = "300000"
throttling-fraction = "0.9"
duration-buffer-size = "10"
}
@@ -85,6 +86,7 @@ whisk {
}
max-peek = "128"
in-progress-job-retention = "20 seconds"
+ blackbox-multiple = "15"
data-management-service {
retry-interval = "1 second"
}
diff --git
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/CreationJobManager.scala
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/CreationJobManager.scala
index 1ac75e8ff..ecbc528a4 100644
---
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/CreationJobManager.scala
+++
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/CreationJobManager.scala
@@ -47,10 +47,11 @@ case class JobEntry(action: FullyQualifiedEntityName,
timer: Cancellable)
class CreationJobManager(feedFactory: (ActorRefFactory, String, String, Int,
Array[Byte] => Future[Unit]) => ActorRef,
schedulerInstanceId: SchedulerInstanceId,
- dataManagementService: ActorRef)(implicit
actorSystem: ActorSystem, logging: Logging)
+ dataManagementService: ActorRef,
+ baseTimeout: FiniteDuration,
+ blackboxMultiple: Int)(implicit actorSystem:
ActorSystem, logging: Logging)
extends Actor {
private implicit val ec: ExecutionContext = actorSystem.dispatcher
- private val baseTimeout =
loadConfigOrThrow[FiniteDuration](ConfigKeys.schedulerInProgressJobRetention)
private val retryLimit = 5
/**
@@ -152,10 +153,10 @@ class CreationJobManager(feedFactory: (ActorRefFactory,
String, String, Int, Arr
// If there is a JobEntry, delete it.
creationJobPool
.remove(creationId)
- .foreach(entry => {
- sendState(state)
- entry.timer.cancel()
- })
+ .map(entry => entry.timer.cancel())
+
+ // even if there is no entry because of timeout, we still need to send the
state to the queue if the queue exists
+ sendState(state)
dataManagementService ! UnregisterData(key)
Future.successful({})
@@ -176,7 +177,8 @@ class CreationJobManager(feedFactory: (ActorRefFactory,
String, String, Int, Arr
revision: DocRevision,
creationId: CreationId,
isBlackbox: Boolean): Cancellable = {
- val timeout = if (isBlackbox) FiniteDuration(baseTimeout.toSeconds * 3,
TimeUnit.SECONDS) else baseTimeout
+ val timeout =
+ if (isBlackbox) FiniteDuration(baseTimeout.toSeconds * blackboxMultiple,
TimeUnit.SECONDS) else baseTimeout
actorSystem.scheduler.scheduleOnce(timeout) {
logging.warn(
this,
@@ -222,8 +224,12 @@ class CreationJobManager(feedFactory: (ActorRefFactory,
String, String, Int, Arr
}
object CreationJobManager {
+ private val baseTimeout =
loadConfigOrThrow[Int](ConfigKeys.schedulerInProgressJobRetention).seconds
+ private val blackboxMultiple =
loadConfigOrThrow[Int](ConfigKeys.schedulerBlackboxMultiple)
+
def props(feedFactory: (ActorRefFactory, String, String, Int, Array[Byte] =>
Future[Unit]) => ActorRef,
schedulerInstanceId: SchedulerInstanceId,
dataManagementService: ActorRef)(implicit actorSystem:
ActorSystem, logging: Logging) =
- Props(new CreationJobManager(feedFactory, schedulerInstanceId,
dataManagementService))
+ Props(
+ new CreationJobManager(feedFactory, schedulerInstanceId,
dataManagementService, baseTimeout, blackboxMultiple))
}
diff --git
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
index e4ba0022b..fabc785a4 100644
---
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
+++
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
@@ -85,7 +85,6 @@ case class QueueRemoved(invocationNamespace: String, action:
DocInfo, leaderKey:
case class QueueReactivated(invocationNamespace: String, action:
FullyQualifiedEntityName, docInfo: DocInfo)
case class CancelPoll(promise: Promise[Either[MemoryQueueError,
ActivationMessage]])
case object QueueRemovedCompleted
-case object FlushPulse
// Events received by the actor
case object Start
@@ -125,7 +124,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
checkToDropStaleActivation:
(Queue[TimeSeriesActivationEntry],
Long,
String,
- FullyQualifiedEntityName,
+ WhiskActionMetaData,
MemoryQueueState,
ActorRef) => Unit,
queueConfig: QueueConfig)(implicit logging: Logging)
@@ -151,6 +150,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
private val memory = actionMetaData.limits.memory.megabytes.MB
private val queueRemovedMsg = QueueRemoved(invocationNamespace,
action.toDocId.asDocInfo(revision), Some(leaderKey))
private val staleQueueRemovedMsg = QueueRemoved(invocationNamespace,
action.toDocId.asDocInfo(revision), None)
+ private val actionRetentionTimeout =
MemoryQueue.getRetentionTimeout(actionMetaData, queueConfig)
private[queue] var containers = Set.empty[String]
private[queue] var creationIds = Set.empty[String]
@@ -197,7 +197,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,
when(Uninitialized) {
case Event(Start, _) =>
- logging.info(this, s"[$invocationNamespace:$action:$stateName] a new
queue is created.")
+ logging.info(
+ this,
+ s"[$invocationNamespace:$action:$stateName] a new queue is created,
retentionTimeout: $actionRetentionTimeout, kind: ${actionMetaData.exec.kind}.")
val (schedulerActor, droppingActor) = startMonitoring()
initializeThrottling()
@@ -256,7 +258,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,
// when there is no container, it moves to the Flushing state as no
activations can be invoked
if (containers.size <= 0) {
val isWhiskError = ContainerCreationError.whiskErrors.contains(error)
- completeAllActivations(message, isWhiskError)
+ if (!isWhiskError) {
+ completeAllActivations(message, isWhiskError)
+ }
logging.error(
this,
s"[$invocationNamespace:$action:$stateName] Failed to create an
initial container due to ${if (isWhiskError) "whiskError"
@@ -271,7 +275,11 @@ class MemoryQueue(private val etcdClient: EtcdClient,
// there is no timeout for this state as when there is no further message,
it would move to the Running state again.
when(NamespaceThrottled) {
case Event(msg: ActivationMessage, _: ThrottledData) =>
- handleActivationMessage(msg)
+ if (containers.size + creationIds.size == 0) {
+ completeErrorActivation(msg, tooManyConcurrentRequests, isWhiskError =
false)
+ } else {
+ handleActivationMessage(msg)
+ }
stay
case Event(DisableNamespaceThrottling, data: ThrottledData) =>
@@ -328,33 +336,51 @@ class MemoryQueue(private val etcdClient: EtcdClient,
goto(Running) using RunningData(schedulerActor, droppingActor)
// log the failed information
- case Event(FailedCreationJob(creationId, _, _, _, _, message), data:
FlushingData) =>
+ case Event(FailedCreationJob(creationId, _, _, _, error, message), data:
FlushingData) =>
creationIds -= creationId.asString
logging.info(
this,
s"[$invocationNamespace:$action:$stateName][$creationId] Failed to
create a container due to $message")
// keep updating the reason
- stay using data.copy(reason = message)
+ stay using data.copy(error = error, reason = message)
// since there is no container, activations cannot be handled.
case Event(msg: ActivationMessage, data: FlushingData) =>
- completeErrorActivation(msg, data.reason,
ContainerCreationError.whiskErrors.contains(data.error))
+ logging.info(this, s"[$invocationNamespace:$action:$stateName] got a new
activation message ${msg.activationId}")(
+ msg.transid)
+ val whiskError = isWhiskError(data.error)
+ if (whiskError)
+ queue = queue.enqueue(TimeSeriesActivationEntry(Instant.now, msg))
+ else
+ completeErrorActivation(msg, data.reason, whiskError)
stay() using data.copy(activeDuringFlush = true)
// Since SchedulingDecisionMaker keep sending a message to create a
container, this state is not automatically timed out.
// Instead, StateTimeout message will be sent by a timer.
- case Event(StateTimeout, data: FlushingData) =>
- completeAllActivations(data.reason,
ContainerCreationError.whiskErrors.contains(data.error))
- if (data.activeDuringFlush)
+ case Event(StateTimeout | DropOld, data: FlushingData) =>
+ logging.info(this, s"[$invocationNamespace:$action:$stateName] Received
StateTimeout, drop stale messages.")
+ queue =
+ MemoryQueue.dropOld(queue, Duration.ofMillis(actionRetentionTimeout),
data.reason, completeErrorActivation)
+ if (data.activeDuringFlush || queue.nonEmpty)
stay using data.copy(activeDuringFlush = false)
else
cleanUpActorsAndGotoRemoved(data)
case Event(GracefulShutdown, data: FlushingData) =>
- completeAllActivations(data.reason,
ContainerCreationError.whiskErrors.contains(data.error))
+ completeAllActivations(data.reason, isWhiskError(data.error))
logging.info(this, s"[$invocationNamespace:$action:$stateName] Received
GracefulShutdown, stop the queue.")
cleanUpActorsAndGotoRemoved(data)
+
+ case Event(StopSchedulingAsOutdated, data: FlushingData) =>
+ logging.info(this, s"[$invocationNamespace:$action:$stateName] stop
further scheduling.")
+ completeAllActivations(data.reason, isWhiskError(data.error))
+ // let QueueManager know this queue is no longer in charge.
+ context.parent ! staleQueueRemovedMsg
+ cleanUpActors(data)
+ cleanUpData()
+
+ goto(Removed) using NoData()
}
// in case there is any activation in the queue, it waits until all of them
are handled.
@@ -399,6 +425,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
// actors and data are already wiped
case Event(QueueRemovedCompleted, _: NoData) =>
+ logging.info(this, "stop fsm")
stop()
// This is not supposed to happen. This will ensure the queue does not run
forever.
@@ -523,7 +550,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
case Event(DropOld, _) =>
if (queue.nonEmpty && Duration
.between(queue.head.timestamp, Instant.now)
- .compareTo(Duration.ofMillis(queueConfig.maxRetentionMs)) < 0) {
+ .compareTo(Duration.ofMillis(actionRetentionTimeout)) < 0) {
logging.error(
this,
s"[$invocationNamespace:$action:$stateName] Drop some stale
activations for $revision, existing container is ${containers.size}, inProgress
container is ${creationIds.size}, state data: $stateData, in is $in, current:
${queue.size}.")
@@ -531,7 +558,11 @@ class MemoryQueue(private val etcdClient: EtcdClient,
this,
s"[$invocationNamespace:$action:$stateName] the head stale message:
${queue.head.msg.activationId}")
}
- queue = MemoryQueue.dropOld(queue,
Duration.ofMillis(queueConfig.maxRetentionMs), completeErrorActivation)
+ queue = MemoryQueue.dropOld(
+ queue,
+ Duration.ofMillis(actionRetentionTimeout),
+ s"Activation processing is not initiated for $actionRetentionTimeout
ms",
+ completeErrorActivation)
stay
@@ -861,7 +892,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
// these schedulers will run forever and stop when the memory queue stops
private def startMonitoring(): (ActorRef, ActorRef) = {
val droppingScheduler =
Scheduler.scheduleWaitAtLeast(schedulingConfig.dropInterval) { () =>
- checkToDropStaleActivation(queue, queueConfig.maxRetentionMs,
invocationNamespace, action, stateName, self)
+ checkToDropStaleActivation(queue, actionRetentionTimeout,
invocationNamespace, actionMetaData, stateName, self)
Future.successful(())
}
@@ -1055,11 +1086,12 @@ class MemoryQueue(private val etcdClient: EtcdClient,
causedBy ++ limits ++ binding
})
}
+
+ private def isWhiskError(error: ContainerCreationError): Boolean =
ContainerCreationError.whiskErrors.contains(error)
}
object MemoryQueue {
private[queue] val queueConfig =
loadConfigOrThrow[QueueConfig](ConfigKeys.schedulerQueue)
- private[queue] val MaxRetentionTime = queueConfig.maxRetentionMs
def props(etcdClient: EtcdClient,
durationChecker: DurationChecker,
@@ -1105,21 +1137,27 @@ object MemoryQueue {
def dropOld(
queue: Queue[TimeSeriesActivationEntry],
retention: Duration,
+ reason: String,
completeErrorActivation: (ActivationMessage, String, Boolean) =>
Future[Any]): Queue[TimeSeriesActivationEntry] = {
if (queue.isEmpty || Duration.between(queue.head.timestamp,
Instant.now).compareTo(retention) < 0)
queue
else {
- completeErrorActivation(queue.head.msg, s"activation processing is not
initiated for $MaxRetentionTime ms", true)
- dropOld(queue.tail, retention, completeErrorActivation)
+ completeErrorActivation(queue.head.msg, reason, true)
+ dropOld(queue.tail, retention, reason, completeErrorActivation)
}
}
def checkToDropStaleActivation(queue: Queue[TimeSeriesActivationEntry],
maxRetentionMs: Long,
invocationNamespace: String,
- action: FullyQualifiedEntityName,
+ actionMetaData: WhiskActionMetaData,
stateName: MemoryQueueState,
queueRef: ActorRef)(implicit logging:
Logging) = {
+ val action = actionMetaData.fullyQualifiedName(true)
+ logging.debug(
+ this,
+ s"[$invocationNamespace:$action:$stateName] use the given retention
timeout: $maxRetentionMs for this action kind: ${actionMetaData.exec.kind}.")
+
if (queue.nonEmpty && Duration
.between(queue.head.timestamp, Instant.now)
.compareTo(Duration.ofMillis(maxRetentionMs)) >= 0) {
@@ -1130,6 +1168,14 @@ object MemoryQueue {
queueRef ! DropOld
}
}
+
+ private def getRetentionTimeout(actionMetaData: WhiskActionMetaData,
queueConfig: QueueConfig): Long = {
+ if (actionMetaData.exec.kind == ExecMetaDataBase.BLACKBOX) {
+ queueConfig.maxBlackboxRetentionMs
+ } else {
+ queueConfig.maxRetentionMs
+ }
+ }
}
case class QueueSnapshot(initialized: Boolean,
@@ -1151,6 +1197,7 @@ case class QueueConfig(idleGrace: FiniteDuration,
gracefulShutdownTimeout: FiniteDuration,
maxRetentionSize: Int,
maxRetentionMs: Long,
+ maxBlackboxRetentionMs: Long,
throttlingFraction: Double,
durationBufferSize: Int)
diff --git a/tests/src/test/resources/application.conf.j2
b/tests/src/test/resources/application.conf.j2
index 0ec5f839f..b0f393238 100644
--- a/tests/src/test/resources/application.conf.j2
+++ b/tests/src/test/resources/application.conf.j2
@@ -152,6 +152,7 @@ whisk {
graceful-shutdown-timeout = "{{
scheduler.queue.gracefulShutdownTimeout | default('5 seconds') }}"
max-retention-size = "{{ scheduler.queue.maxRetentionSize |
default(10000) }}"
max-retention-ms = "{{ scheduler.queue.maxRetentionMs |
default(60000) }}"
+ max-blackbox-retention-ms = "{{
scheduler.queue.maxBlackboxRetentionMs}}"
throttling-fraction = "{{ scheduler.queue.throttlingFraction |
default(0.9) }}"
duration-buffer-size = "{{ scheduler.queue.durationBufferSize |
default(10) }}"
}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
index 6f7787675..14cf432b5 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
@@ -20,7 +20,7 @@ package org.apache.openwhisk.core.containerpool.v2.test
import java.time.Instant
import java.util.concurrent.TimeUnit
import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, Props}
-import akka.testkit.{ImplicitSender, TestActor, TestKit, TestProbe}
+import akka.testkit.{ImplicitSender, TestActor, TestActorRef, TestKit,
TestProbe}
import common.StreamLogging
import org.apache.openwhisk.common.{Enable, GracefulShutdown, TransactionId}
import org.apache.openwhisk.core.WhiskConfig
@@ -55,6 +55,7 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach,
FlatSpecLike, Match
import org.scalatest.concurrent.Eventually
import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.language.postfixOps
@@ -311,21 +312,38 @@ class FunctionPullingContainerPoolTests
}
}
+ private def retry[T](fn: => T) = org.apache.openwhisk.utils.retry(fn, 10,
Some(1.second))
+
it should "stop containers gradually when shut down" in within(timeout * 20)
{
val (containers, factory) = testContainers(10)
+ val disablingContainers = ListBuffer[ActorRef]()
+
+ for (container <- containers) {
+ container.setAutoPilot((_: ActorRef, msg: Any) =>
+ msg match {
+ case GracefulShutdown =>
+ disablingContainers += container.ref
+ TestActor.KeepRunning
+
+ case _ =>
+ TestActor.KeepRunning
+ })
+ }
+
val doc = put(entityStore, bigWhiskAction)
val topic = s"creationAck${schedulerInstanceId.asString}"
val consumer = new TestConnector(topic, 4, true)
- val pool = system.actorOf(
- Props(new FunctionPullingContainerPool(
+ val pool = TestActorRef(
+ new FunctionPullingContainerPool(
factory,
invokerHealthService.ref,
poolConfig(MemoryLimit.STD_MEMORY * 20, batchDeletionSize = 3),
invokerInstance,
List.empty,
- sendAckToScheduler(consumer.getProducer()))))
+ sendAckToScheduler(consumer.getProducer())))
(0 to 10).foreach(_ => pool !
CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)) // 11
* stdMemory taken)
+
(0 to 10).foreach(i => {
containers(i).expectMsgPF() {
case Initialize(invocationNamespace, fqn, executeAction,
schedulerHost, rpcPort, _) => true
@@ -346,81 +364,54 @@ class FunctionPullingContainerPoolTests
TestProbe().ref)))
})
+ retry {
+ pool.underlyingActor.warmedPool.size shouldBe 6
+ pool.underlyingActor.busyPool.size shouldBe 5
+ }
+
// disable
pool ! GracefulShutdown
+
// at first, 3 containers will be removed from busy pool, and left
containers will not
- var disablingContainers = Set.empty[Int]
- (0 to 10).foreach(i => {
- try {
- containers(i).expectMsg(1.second, GracefulShutdown)
- disablingContainers += i
- } catch {
- case _: Throwable =>
- }
- })
- assert(disablingContainers.size == 3, "more than 3 containers is shutting
down")
- disablingContainers.foreach(i => containers(i).send(pool,
ContainerRemoved(false)))
-
- Thread.sleep(3000)
- var completedContainer = -1
- (0 to 10)
- .filter(!disablingContainers.contains(_))
- .foreach(i => {
- try {
- containers(i).expectMsg(1.second, GracefulShutdown)
- disablingContainers += i
- // only make one container complete shutting down
- if (completedContainer == -1)
- completedContainer = i
- } catch {
- case _: Throwable =>
- }
- })
- assert(disablingContainers.size == 6, "more than 3 containers is shutting
down")
- containers(completedContainer).send(pool, ContainerRemoved(false))
-
- Thread.sleep(3000)
- (0 to 10)
- .filter(!disablingContainers.contains(_))
- .foreach(i => {
- try {
- containers(i).expectMsg(1.second, GracefulShutdown)
- disablingContainers += i
- } catch {
- case _: Throwable =>
- }
- })
- // there should be only one more container going to shut down
- assert(disablingContainers.size == 7, "more than 3 containers is shutting
down")
- disablingContainers.foreach(i => containers(i).send(pool,
ContainerRemoved(false)))
-
- Thread.sleep(3000)
- (0 to 10)
- .filter(!disablingContainers.contains(_))
- .foreach(i => {
- try {
- containers(i).expectMsg(1.second, GracefulShutdown)
- disablingContainers += i
- } catch {
- case _: Throwable =>
- }
- })
- assert(disablingContainers.size == 10, "more than 3 containers is shutting
down")
- disablingContainers.foreach(i => containers(i).send(pool,
ContainerRemoved(false)))
-
- Thread.sleep(3000)
- (0 to 10)
- .filter(!disablingContainers.contains(_))
- .foreach(i => {
- try {
- containers(i).expectMsg(1.second, GracefulShutdown)
- disablingContainers += i
- } catch {
- case _: Throwable =>
- }
- })
- assert(disablingContainers.size == 11, "unexpected containers is shutting
down")
- disablingContainers.foreach(i => containers(i).send(pool,
ContainerRemoved(false)))
+ retry {
+ disablingContainers.size shouldBe 3
+ }
+
+ // all 3 containers finish termination
+ disablingContainers.foreach(pool.tell(ContainerRemoved(false), _))
+
+ retry {
+ pool.underlyingActor.warmedPool.size +
pool.underlyingActor.busyPool.size shouldBe 8
+ }
+
+ // it will disable 3 more containers.
+ retry {
+ disablingContainers.size shouldBe 6
+ }
+
+ // only one container of them finishes termination
+ pool.tell(ContainerRemoved(false), disablingContainers.last)
+
+ // there should be only one more container going to shut down as more than
3 containers are shutting down.
+ retry {
+ disablingContainers.size shouldBe 7
+ }
+
+ // all 3 containers finish termination
+ disablingContainers.foreach(pool.tell(ContainerRemoved(false), _))
+
+ retry {
+ disablingContainers.size shouldBe 10
+ }
+
+ // all disabling containers finish termination
+ disablingContainers.foreach(pool.tell(ContainerRemoved(false), _))
+
+ // the last container is shutting down.
+ retry {
+ disablingContainers.size shouldBe 11
+ }
+ disablingContainers.foreach(pool.tell(ContainerRemoved(false), _))
}
it should "create prewarmed containers on startup" in within(timeout) {
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/CreationJobManagerTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/CreationJobManagerTests.scala
index 75db03a8d..61e8199b7 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/CreationJobManagerTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/CreationJobManagerTests.scala
@@ -17,12 +17,12 @@
package org.apache.openwhisk.core.scheduler.container.test
-import java.util.concurrent.TimeUnit
-import akka.actor.{ActorRef, ActorRefFactory, ActorSystem}
-import akka.testkit.{ImplicitSender, TestKit, TestProbe}
+import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.testkit.{ImplicitSender, TestActorRef, TestKit, TestProbe}
import com.ibm.etcd.client.{EtcdClient => Client}
import common.StreamLogging
import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.entity.ExecManifest.{ImageName,
RuntimeManifest}
import org.apache.openwhisk.core.entity._
@@ -32,15 +32,14 @@ import org.apache.openwhisk.core.scheduler.container._
import org.apache.openwhisk.core.scheduler.message._
import org.apache.openwhisk.core.scheduler.queue.{MemoryQueueKey,
MemoryQueueValue, QueuePool}
import org.apache.openwhisk.core.service.{RegisterData, UnregisterData}
-import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.junit.runner.RunWith
import org.scalamock.scalatest.MockFactory
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpecLike,
Matchers}
-import pureconfig.loadConfigOrThrow
-import scala.concurrent.duration.FiniteDuration
+import java.util.concurrent.TimeUnit
+import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{ExecutionContextExecutor, Future}
@RunWith(classOf[JUnitRunner])
@@ -55,8 +54,9 @@ class CreationJobManagerTests
with BeforeAndAfterEach
with StreamLogging {
- private val timeout =
loadConfigOrThrow[FiniteDuration](ConfigKeys.schedulerInProgressJobRetention)
- val blackboxTimeout = FiniteDuration(timeout.toSeconds * 3, TimeUnit.SECONDS)
+ val timeout = 20.seconds
+ val blackboxMultiple = 2
+ val blackboxTimeout = FiniteDuration(timeout.toSeconds * blackboxMultiple,
TimeUnit.SECONDS)
implicit val ece: ExecutionContextExecutor = system.dispatcher
val config = new WhiskConfig(ExecManifest.requiredProperties)
val creationIdTest = CreationId.generate()
@@ -139,8 +139,7 @@ class CreationJobManagerTests
it should "register creation job" in {
val probe = TestProbe()
- val manager =
- system.actorOf(CreationJobManager.props(feedFactory, sid, probe.ref))
+ val manager = TestActorRef(new CreationJobManager(feedFactory, sid,
probe.ref, timeout, blackboxMultiple))
manager ! registerMessage
@@ -150,8 +149,7 @@ class CreationJobManagerTests
it should "skip duplicated creation job" in {
val probe = TestProbe()
- val manager =
- system.actorOf(CreationJobManager.props(feedFactory, sid, probe.ref))
+ val manager = TestActorRef(new CreationJobManager(feedFactory, sid,
probe.ref, timeout, blackboxMultiple))
manager ! registerMessage
manager ! registerMessage
@@ -206,8 +204,9 @@ class CreationJobManagerTests
val containerManager = TestProbe()
val dataManagementService = TestProbe()
val probe = TestProbe()
- val jobManager =
- containerManager.childActorOf(CreationJobManager.props(feedFactory, sid,
dataManagementService.ref))
+ val jobManager = TestActorRef(
+ Props(new CreationJobManager(feedFactory, sid,
dataManagementService.ref, timeout, blackboxMultiple)),
+ containerManager.ref)
QueuePool.put(
MemoryQueueKey(testInvocationNamespace,
action.toDocId.asDocInfo(revision)),
@@ -238,8 +237,9 @@ class CreationJobManagerTests
val containerManager = TestProbe()
val dataManagementService = TestProbe()
- val jobManager =
- containerManager.childActorOf(CreationJobManager.props(feedFactory, sid,
dataManagementService.ref))
+ val jobManager = TestActorRef(
+ Props(new CreationJobManager(feedFactory, sid,
dataManagementService.ref, timeout, blackboxMultiple)),
+ containerManager.ref)
jobManager ! registerMessage
@@ -257,8 +257,9 @@ class CreationJobManagerTests
val containerManager = TestProbe()
val dataManagementService = TestProbe()
val probe = TestProbe()
- val jobManager =
- containerManager.childActorOf(CreationJobManager.props(feedFactory, sid,
dataManagementService.ref))
+ val jobManager = TestActorRef(
+ Props(new CreationJobManager(feedFactory, sid,
dataManagementService.ref, timeout, blackboxMultiple)),
+ containerManager.ref)
QueuePool.put(
MemoryQueueKey(testInvocationNamespace,
action.toDocId.asDocInfo(revision)),
@@ -294,8 +295,9 @@ class CreationJobManagerTests
val containerManager = TestProbe()
val dataManagementService = TestProbe()
- val jobManager =
- containerManager.childActorOf(CreationJobManager.props(feedFactory, sid,
dataManagementService.ref))
+ val jobManager = TestActorRef(
+ Props(new CreationJobManager(feedFactory, sid,
dataManagementService.ref, timeout, blackboxMultiple)),
+ containerManager.ref)
jobManager ! failedFinish.copy(ack = failedFinish.ack.copy(retryCount = 5))
@@ -306,8 +308,9 @@ class CreationJobManagerTests
val containerManager = TestProbe()
val dataManagementService = TestProbe()
- val jobManager =
- containerManager.childActorOf(CreationJobManager.props(feedFactory, sid,
dataManagementService.ref))
+ val jobManager = TestActorRef(
+ Props(new CreationJobManager(feedFactory, sid,
dataManagementService.ref, timeout, blackboxMultiple)),
+ containerManager.ref)
jobManager ! registerMessage
@@ -329,8 +332,9 @@ class CreationJobManagerTests
val containerManager = TestProbe()
val dataManagementService = TestProbe()
- val jobManager =
- containerManager.childActorOf(CreationJobManager.props(feedFactory, sid,
dataManagementService.ref))
+ val jobManager = TestActorRef(
+ Props(new CreationJobManager(feedFactory, sid,
dataManagementService.ref, timeout, blackboxMultiple)),
+ containerManager.ref)
val execMetadata =
BlackBoxExecMetaData(ImageName("test image"), Some("main"), native =
false);
@@ -364,7 +368,7 @@ class CreationJobManagerTests
// no message for timeout
dataManagementService.expectNoMessage(timeout)
- Thread.sleep(timeout.toMillis * 2) // timeout is doubled for blackbox
actions
+ Thread.sleep(timeout.toMillis * blackboxMultiple) // timeout is doubled
for blackbox actions
dataManagementService.expectMsg(UnregisterData(testKey))
containerManager.expectMsg(
FailedCreationJob(
@@ -380,8 +384,10 @@ class CreationJobManagerTests
val containerManager = TestProbe()
val dataManagementService = TestProbe()
val probe = TestProbe()
- val jobManager =
- containerManager.childActorOf(CreationJobManager.props(feedFactory, sid,
dataManagementService.ref))
+ val jobManager = TestActorRef(
+ Props(new CreationJobManager(feedFactory, sid,
dataManagementService.ref, timeout, blackboxMultiple)),
+ containerManager.ref)
+
QueuePool.put(
MemoryQueueKey(testInvocationNamespace,
action.toDocId.asDocInfo(revision)),
MemoryQueueValue(probe.ref, true))
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
index edacd26f3..267d3a81d 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
@@ -47,7 +47,7 @@ import spray.json.{JsObject, JsString}
import java.time.Instant
import scala.collection.immutable.Queue
import scala.concurrent.Future
-import scala.concurrent.duration.DurationInt
+import scala.concurrent.duration.{DurationInt, FiniteDuration, MILLISECONDS}
import scala.language.postfixOps
@RunWith(classOf[JUnitRunner])
@@ -74,7 +74,6 @@ class MemoryQueueFlowTests
behavior of "MemoryQueueFlow"
- // this is 1. normal case in
https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
it should "normally be created and handle an activation and became idle an
finally removed" in {
val mockEtcdClient = mock[EtcdClient]
val parent = TestProbe()
@@ -168,7 +167,6 @@ class MemoryQueueFlowTests
probe.expectTerminated(fsm, 10.seconds)
}
- // this is 1-2. normal case in
https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
it should "became Idle and Running again if a message arrives" in {
val mockEtcdClient = mock[EtcdClient]
val parent = TestProbe()
@@ -282,7 +280,6 @@ class MemoryQueueFlowTests
probe.expectTerminated(fsm, 10.seconds)
}
- // this is 2. NamespaceThrottled case in
https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
it should "go to the Flushing state dropping messages when it can't create
an initial container" in {
val mockEtcdClient = mock[EtcdClient]
val parent = TestProbe()
@@ -365,7 +362,6 @@ class MemoryQueueFlowTests
probe.expectTerminated(fsm, 10.seconds)
}
- // this is 3. NamespaceThrottled case in
https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
it should "go to the NamespaceThrottled state without dropping messages and
get back to the Running container" in {
val mockEtcdClient = mock[EtcdClient]
val parent = TestProbe()
@@ -488,7 +484,6 @@ class MemoryQueueFlowTests
probe.expectTerminated(fsm, 10.seconds)
}
- // this is 4. ActionThrottled case in
https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
it should "go to the ActionThrottled state when there are too many stale
activations including transition to NamespaceThrottling" in {
val mockEtcdClient = mock[EtcdClient]
val parent = TestProbe()
@@ -501,7 +496,7 @@ class MemoryQueueFlowTests
// max retention size is 10 and throttling fraction is 0.8
// queue will be action throttled at 10 messages and disabled action
throttling at 8 messages
- val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5
seconds, 10, 5000, 0.8, 10)
+ val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5
seconds, 10, 5000, 10000, 0.8, 10)
// limit is 1
val getUserLimit = (_: String) => Future.successful(1)
@@ -637,7 +632,6 @@ class MemoryQueueFlowTests
probe.expectTerminated(fsm, 10.seconds)
}
- // this is 5. Paused case in
https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
it should "be Flushing when the limit is 0 and restarted back to Running
state when the limit is increased" in {
val mockEtcdClient = mock[EtcdClient]
val parent = TestProbe()
@@ -691,26 +685,18 @@ class MemoryQueueFlowTests
expectInitialData(watcher, dataMgmtService)
probe.expectMsg(Transition(fsm, Uninitialized, Running))
- awaitAssert({
- ackedMessageCount shouldBe 1
- lastAckedActivationResult.response.result shouldBe Some(JsObject("error"
-> JsString(namespaceLimitUnderZero)))
- storedMessageCount shouldBe 1
- lastAckedActivationResult.response.result shouldBe Some(JsObject("error"
-> JsString(namespaceLimitUnderZero)))
- fsm.underlyingActor.queue.length shouldBe 0
- }, 5.seconds)
-
probe.expectMsg(Transition(fsm, Running, Flushing))
+ // activation received in Flushing state won't be flushed immediately if
Flushing state is caused by a whisk error
+ Thread.sleep(flushGrace.toMillis)
+ fsm ! messages(1)
awaitAssert({
- // in the paused state, all incoming messages should be dropped
immediately
- fsm ! messages(1)
- ackedMessageCount shouldBe 2
+ ackedMessageCount shouldBe 1
lastAckedActivationResult.response.result shouldBe Some(JsObject("error"
-> JsString(namespaceLimitUnderZero)))
- storedMessageCount shouldBe 2
+ storedMessageCount shouldBe 1
lastAckedActivationResult.response.result shouldBe Some(JsObject("error"
-> JsString(namespaceLimitUnderZero)))
- fsm.underlyingActor.queue.length shouldBe 0
- }, 5.seconds)
-
+ fsm.underlyingActor.queue.length shouldBe 1
+ }, FiniteDuration(retentionTimeout, MILLISECONDS))
// limit is increased by an operator
limit = 10
@@ -728,14 +714,12 @@ class MemoryQueueFlowTests
// Queue is now working
probe.expectMsg(Transition(fsm, Flushing, Running))
- fsm ! messages(2)
-
// one container is created
fsm.underlyingActor.namespaceContainerCount.existingContainerNumByNamespace += 1
// only one message is handled
container.send(fsm, getActivation(true, "testContainerId1"))
- container.expectMsg(ActivationResponse(Right(messages(2))))
+ container.expectMsg(ActivationResponse(Right(messages(1))))
// deleting the container from containers set
container.send(fsm, getActivation(false, "testContainerId1"))
@@ -758,7 +742,6 @@ class MemoryQueueFlowTests
probe.expectTerminated(fsm, 10.seconds)
}
- // this is 5-2. Paused case in
https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
it should "be Flushing when the limit is 0 and be terminated without
recovering" in {
val mockEtcdClient = mock[EtcdClient]
val parent = TestProbe()
@@ -770,7 +753,7 @@ class MemoryQueueFlowTests
system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace,
fqn, schedulingConfig))
// generate 2 activations
- val messages = getActivationMessages(3)
+ val messages = getActivationMessages(2)
val getUserLimit = (_: String) => Future.successful(0)
@@ -804,35 +787,25 @@ class MemoryQueueFlowTests
registerCallback(probe, fsm)
fsm ! Start
+ fsm ! messages(0)
expectInitialData(watcher, dataMgmtService)
fsm ! testInitialDataStorageResult
probe.expectMsg(Transition(fsm, Uninitialized, Running))
- fsm ! messages(0)
- awaitAssert({
- ackedMessageCount shouldBe 1
- lastAckedActivationResult.response.result shouldBe Some(JsObject("error"
-> JsString(namespaceLimitUnderZero)))
- storedMessageCount shouldBe 1
- lastAckedActivationResult.response.result shouldBe Some(JsObject("error"
-> JsString(namespaceLimitUnderZero)))
- fsm.underlyingActor.queue.length shouldBe 0
- }, 5.seconds)
-
probe.expectMsg(Transition(fsm, Running, Flushing))
-
- // in the paused state, all incoming messages should be dropped immediately
fsm ! messages(1)
+ // activation received in Flushing state won't be flushed immediately if
Flushing state is caused by a whisk error
+ Thread.sleep(flushGrace.toMillis)
+
awaitAssert({
ackedMessageCount shouldBe 2
lastAckedActivationResult.response.result shouldBe Some(JsObject("error"
-> JsString(namespaceLimitUnderZero)))
storedMessageCount shouldBe 2
lastAckedActivationResult.response.result shouldBe Some(JsObject("error"
-> JsString(namespaceLimitUnderZero)))
fsm.underlyingActor.queue.length shouldBe 0
- }, 5.seconds)
-
- // normal termination process
- Thread.sleep(flushGrace.toMillis * 2)
+ }, FiniteDuration(retentionTimeout, MILLISECONDS))
// In this case data clean up happens first.
expectDataCleanUp(watcher, dataMgmtService)
@@ -844,7 +817,6 @@ class MemoryQueueFlowTests
probe.expectTerminated(fsm, 10.seconds)
}
- // this is 6. Waiting case in
https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
it should "be the Flushing state when a whisk error happens" in {
val mockEtcdClient = mock[EtcdClient]
val parent = TestProbe()
@@ -921,13 +893,15 @@ class MemoryQueueFlowTests
fsm ! messages(1)
+ Thread.sleep(flushGrace.toMillis)
+
awaitAssert({
ackedMessageCount shouldBe 2
lastAckedActivationResult.response.result shouldBe Some(JsObject("error"
-> JsString("whisk error")))
storedMessageCount shouldBe 2
lastAckedActivationResult.response.result shouldBe Some(JsObject("error"
-> JsString("whisk error")))
fsm.underlyingActor.queue.length shouldBe 0
- }, 5.seconds)
+ }, FiniteDuration(retentionTimeout, MILLISECONDS))
Thread.sleep(flushGrace.toMillis * 2)
@@ -941,7 +915,6 @@ class MemoryQueueFlowTests
probe.expectTerminated(fsm, 10.seconds)
}
- // this is 6-2. Waiting case in
https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
it should "be the Flushing state when a whisk error happens and be recovered
when a container is created" in {
val mockEtcdClient = mock[EtcdClient]
val parent = TestProbe()
@@ -952,6 +925,8 @@ class MemoryQueueFlowTests
system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace,
fqn, schedulingConfig))
val probe = TestProbe()
val container = TestProbe()
+ // generate 2 activations
+ val messages = getActivationMessages(2)
expectDurationChecking(mockEsClient, testInvocationNamespace)
@@ -988,36 +963,39 @@ class MemoryQueueFlowTests
probe.expectMsg(Transition(fsm, Uninitialized, Running))
- fsm ! message
- // any id is fine because it would be overridden
- var creationId = CreationId.generate()
+ fsm ! messages(0)
+ // Failed to create a container
containerManager.expectMsgPF() {
case ContainerCreation(List(ContainerCreationMessage(_, _, _, _, _, _,
_, _, _, id)), _, _) =>
- creationId = id
+ fsm ! FailedCreationJob(id, testInvocationNamespace, fqn, revision,
WhiskError, "whisk error")
}
- // Failed to create a container
- fsm ! FailedCreationJob(creationId, testInvocationNamespace, fqn,
revision, WhiskError, "whisk error")
+
+ probe.expectMsg(Transition(fsm, Running, Flushing))
+ Thread.sleep(1000)
+ fsm ! messages(1)
+
+ // activation received in Flushing state won't be flushed immediately if
Flushing state is caused by a whisk error
+ Thread.sleep(flushGrace.toMillis)
awaitAssert({
ackedMessageCount shouldBe 1
lastAckedActivationResult.response.result shouldBe Some(JsObject("error"
-> JsString("whisk error")))
storedMessageCount shouldBe 1
lastAckedActivationResult.response.result shouldBe Some(JsObject("error"
-> JsString("whisk error")))
- fsm.underlyingActor.queue.length shouldBe 0
- }, 5.seconds)
+ fsm.underlyingActor.queue.length shouldBe 1
+ }, FiniteDuration(retentionTimeout, MILLISECONDS))
- probe.expectMsg(Transition(fsm, Running, Flushing))
-
- // Failed to create a container
- fsm ! SuccessfulCreationJob(creationId, testInvocationNamespace, fqn,
revision)
+ // Succeed to create a container
+ containerManager.expectMsgPF() {
+ case ContainerCreation(List(ContainerCreationMessage(_, _, _, _, _, _,
_, _, _, id)), _, _) =>
+ fsm ! SuccessfulCreationJob(id, testInvocationNamespace, fqn, revision)
+ }
probe.expectMsg(Transition(fsm, Flushing, Running))
- fsm ! message
-
container.send(fsm, getActivation())
- container.expectMsg(ActivationResponse(Right(message)))
+ container.expectMsg(ActivationResponse(Right(messages(1))))
// deleting the container from containers set
container.send(fsm, getActivation(false))
@@ -1039,7 +1017,6 @@ class MemoryQueueFlowTests
probe.expectTerminated(fsm, 10.seconds)
}
- // this is 7. Flushing case in
https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
it should "be the Flushing state when a developer error happens" in {
val mockEtcdClient = mock[EtcdClient]
val parent = TestProbe()
@@ -1143,7 +1120,6 @@ class MemoryQueueFlowTests
probe.expectTerminated(fsm, 10.seconds)
}
- // this is 8. GracefulShuttingDown case in
https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
it should "be gracefully terminated when it receives a GracefulShutDown
message" in {
val mockEtcdClient = mock[EtcdClient]
val parent = TestProbe()
@@ -1250,7 +1226,6 @@ class MemoryQueueFlowTests
probe.expectTerminated(fsm, 10.seconds)
}
- // this is 10. deprecated case in
https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
it should "be deprecated when a new queue supersedes it." in {
// GracefulShuttingDown is not applicable
val allStates = List(Running, Idle, Flushing, ActionThrottled,
NamespaceThrottled, Removing, Removed)
@@ -1335,7 +1310,6 @@ class MemoryQueueFlowTests
}
}
- // this is 10-2. deprecated case in
https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
it should "be deprecated and stops even if the queue manager could not
respond." in {
// GracefulShuttingDown is not applicable
val allStates = List(Running, Idle, Flushing, ActionThrottled,
NamespaceThrottled, Removing, Removed)
@@ -1564,6 +1538,20 @@ class MemoryQueueFlowTests
fsm ! QueueRemovedCompleted
probe.expectTerminated(fsm, 10.seconds)
+ case Flushing =>
+ // queue is stale and will be removed
+ parent.expectMsg(staleQueueRemovedMsg)
+ probe.expectMsg(Transition(fsm, state, Removed))
+
+ fsm ! QueueRemovedCompleted
+
+ Thread.sleep(gracefulShutdownTimeout.toMillis)
+
+ watcher.expectMsgAllOf(
+ UnwatchEndpoint(inProgressContainerKey, isPrefix = true,
watcherName),
+ UnwatchEndpoint(existingContainerKey, isPrefix = true,
watcherName),
+ UnwatchEndpoint(leaderKey, isPrefix = false, watcherName))
+
case _ =>
parent.expectMsg(staleQueueRemovedMsg)
parent.expectMsg(message)
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
index 15ac5f4c3..950af0292 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
@@ -20,7 +20,6 @@ package org.apache.openwhisk.core.scheduler.queue.test
import java.time.Instant
import java.util.concurrent.Executor
import java.{lang, util}
-
import akka.actor.ActorRef
import akka.actor.FSM.{CurrentState, StateTimeout,
SubscribeTransitionCallBack, Transition}
import akka.pattern.ask
@@ -39,6 +38,7 @@ import org.apache.openwhisk.core.ack.ActiveAck
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.containerpool.ContainerId
import org.apache.openwhisk.core.database.NoDocumentException
+import org.apache.openwhisk.core.entity.ExecManifest.ImageName
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
import
org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.{existingContainers,
inProgressContainer}
@@ -953,8 +953,12 @@ class MemoryQueueTests
parent.expectMsg(Transition(fsm, Running, Flushing))
(1 to expectedCount).foreach(_ =>
probe.expectMsg(ActivationResponse.developerError("nonExecutbleAction error")))
+ // flush msg immediately
+ fsm ! message
+ probe.expectMsg(ActivationResponse.developerError("nonExecutbleAction
error"))
+
parent.expectMsg(
- queueConfig.stopGrace + 5.seconds,
+ 2 * queueConfig.flushGrace + 5.seconds,
QueueRemoved(testInvocationNamespace, fqn.toDocId.asDocInfo(action.rev),
Some(leaderKey)))
parent.expectMsg(Transition(fsm, Flushing, Removed))
fsm ! QueueRemovedCompleted
@@ -971,11 +975,9 @@ class MemoryQueueTests
val parent = TestProbe()
val expectedCount = 3
- val probe = TestProbe()
-
expectDurationChecking(mockEsClient, testInvocationNamespace)
- val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5
seconds, 10, 180000, 0.9, 10)
+ val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5
seconds, 10, 10000, 20000, 0.9, 10)
val fsm =
TestFSMRef(
@@ -1018,6 +1020,15 @@ class MemoryQueueTests
ContainerCreationError.NoAvailableInvokersError,
"no available invokers")
+ parent.expectMsg(Transition(fsm, Running, Flushing))
+ parent.expectNoMessage(5.seconds)
+
+ // Add 3 more messages.
+ (1 to expectedCount).foreach(_ => fsm ! message)
+ parent.expectNoMessage(5.seconds)
+
+ // After 10 seconds(action retention timeout), the first 3 messages are
timed out.
+ // It does not get removed as there are still 3 messages in the queue.
awaitAssert({
ackedMessageCount shouldBe 3
lastAckedActivationResult.response.result shouldBe Some(JsObject("error"
-> JsString("no available invokers")))
@@ -1025,15 +1036,12 @@ class MemoryQueueTests
lastAckedActivationResult.response.result shouldBe Some(JsObject("error"
-> JsString("no available invokers")))
}, 5.seconds)
- parent.expectMsg(Transition(fsm, Running, Flushing))
-
// should goto Running
fsm ! SuccessfulCreationJob(testCreationId,
message.user.namespace.name.asString, message.action, message.revision)
- (1 to expectedCount).foreach(_ => fsm ! message)
+
parent.expectMsg(Transition(fsm, Flushing, Running))
- probe.expectNoMessage(2.seconds)
- // should goto WaitForFlush again as existing is always 0
+ // should goto Flushing again as there is no container running.
fsm ! FailedCreationJob(
testCreationId,
message.user.namespace.name.asString,
@@ -1042,23 +1050,24 @@ class MemoryQueueTests
ContainerCreationError.ResourceNotEnoughError,
"resource not enough")
parent.expectMsg(Transition(fsm, Running, Flushing))
- (1 to expectedCount).foreach(_ => fsm ! message)
- // wait for event `FlushPulse`, and then all existing activations will be
flushed
- Thread.sleep(flushGrace.toMillis + 3.seconds.toMillis)
+ // wait for the flush grace, and then all existing activations will be
flushed
+ Thread.sleep(queueConfig.maxBlackboxRetentionMs +
queueConfig.flushGrace.toMillis)
+ // The error message is updated from the recent error message of the
FailedCreationJob.
awaitAssert({
- ackedMessageCount shouldBe 9
+ ackedMessageCount shouldBe 6
lastAckedActivationResult.response.result shouldBe Some(JsObject("error"
-> JsString("resource not enough")))
- storedMessageCount shouldBe 9
+ storedMessageCount shouldBe 6
lastAckedActivationResult.response.result shouldBe Some(JsObject("error"
-> JsString("resource not enough")))
}, 5.seconds)
- // should goto Running
- fsm ! SuccessfulCreationJob(testCreationId,
message.user.namespace.name.asString, message.action, message.revision)
- (1 to expectedCount).foreach(_ => fsm ! message)
- parent.expectMsg(Transition(fsm, Flushing, Running))
- probe.expectNoMessage(2.seconds)
+ parent.expectMsg(queueRemovedMsg)
+
+ // should goto Removed
+ parent.expectMsg(Transition(fsm, Flushing, Removed))
+ fsm ! QueueRemovedCompleted
+
fsm.stop()
}
@@ -1149,6 +1158,109 @@ class MemoryQueueTests
fsm.stop()
}
+ it should "complete error activation after blackbox timeout when the action
is a blackbox action and received FailedCreationJob with a whisk
error(recoverable)" in {
+ val mockEtcdClient = mock[EtcdClient]
+ val testProbe = TestProbe()
+ val decisionMaker = TestProbe()
+ decisionMaker.ignoreMsg { case _: QueueSnapshot => true }
+ val parent = TestProbe()
+ val expectedCount = 3
+
+ val probe = TestProbe()
+ val newAck = new ActiveAck {
+ override def apply(tid: TransactionId,
+ activationResult: WhiskActivation,
+ blockingInvoke: Boolean,
+ controllerInstance: ControllerInstanceId,
+ userId: UUID,
+ acknowledegment: AcknowledegmentMessage): Future[Any]
= {
+ probe.ref ! activationResult.response
+ Future.successful({})
+ }
+ }
+
+ val execMetadata = BlackBoxExecMetaData(ImageName("test"), None, native =
false)
+
+ val blackboxActionMetadata =
+ WhiskActionMetaData(
+ action.namespace,
+ action.name,
+ execMetadata,
+ action.parameters,
+ action.limits,
+ action.version,
+ action.publish,
+ action.annotations)
+ .revision[WhiskActionMetaData](action.rev)
+
+ expectDurationChecking(mockEsClient, testInvocationNamespace)
+
+ val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5
seconds, 10, 10000, 20000, 0.9, 10)
+
+ val fsm =
+ TestFSMRef(
+ new MemoryQueue(
+ mockEtcdClient,
+ durationChecker,
+ fqn,
+ mockMessaging(),
+ schedulingConfig,
+ testInvocationNamespace,
+ revision,
+ endpoints,
+ blackboxActionMetadata,
+ testProbe.ref,
+ testProbe.ref,
+ testProbe.ref,
+ decisionMaker.ref,
+ schedulerId,
+ newAck,
+ store,
+ (s: String) => { Future.successful(10000) }, // avoid exceed user
limit
+ checkToDropStaleActivation,
+ queueConfig),
+ parent.ref,
+ "MemoryQueue")
+
+ fsm ! SubscribeTransitionCallBack(parent.ref)
+ parent.expectMsg(CurrentState(fsm, Uninitialized))
+ parent watch fsm
+
+ fsm ! Start
+
+ parent.expectMsg(Transition(fsm, Uninitialized, Running))
+
+ (1 to expectedCount).foreach(_ => fsm ! message)
+ fsm ! FailedCreationJob(
+ testCreationId,
+ message.user.namespace.name.asString,
+ message.action,
+ message.revision,
+ ContainerCreationError.NoAvailableInvokersError,
+ "no available invokers")
+
+ parent.expectMsg(Transition(fsm, Running, Flushing))
+ probe.expectNoMessage()
+
+ // should wait for sometime before flush message
+ fsm ! message
+
+ // wait for the flush grace, and then some existing activations will be
flushed
+ Thread.sleep(queueConfig.maxBlackboxRetentionMs +
queueConfig.flushGrace.toMillis)
+ (1 to expectedCount).foreach(_ =>
probe.expectMsg(ActivationResponse.whiskError("no available invokers")))
+
+ val duration = FiniteDuration(queueConfig.maxBlackboxRetentionMs,
MILLISECONDS) + queueConfig.flushGrace
+ probe.expectMsg(duration, ActivationResponse.whiskError("no available
invokers"))
+ parent.expectMsg(
+ duration,
+ QueueRemoved(testInvocationNamespace, fqn.toDocId.asDocInfo(action.rev),
Some(leaderKey)))
+ parent.expectMsg(Transition(fsm, Flushing, Removed))
+ fsm ! QueueRemovedCompleted
+ parent.expectTerminated(fsm)
+
+ fsm.stop()
+ }
+
it should "stop scheduling if the namespace does not exist" in {
val mockEtcdClient = mock[EtcdClient]
val getZeroLimit = (_: String) => {
Future.failed(NoDocumentException("namespace does not exist")) }
@@ -1320,7 +1432,7 @@ class MemoryQueueTests
// it always induces the throttling
val getZeroLimit = (_: String) => { Future.successful(2) }
- val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5
seconds, 1, 5000, 0.9, 10)
+ val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5
seconds, 1, 5000, 10000, 0.9, 10)
expectDurationChecking(mockEsClient, testInvocationNamespace)
@@ -1366,7 +1478,7 @@ class MemoryQueueTests
val probe = TestProbe()
val parent = TestProbe()
- val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5
seconds, 10, 5000, 0.9, 10)
+ val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5
seconds, 10, 5000, 10000, 0.9, 10)
val msgRetentionSize = queueConfig.maxRetentionSize
val tid = TransactionId(TransactionId.generateTid())
@@ -1717,7 +1829,11 @@ class MemoryQueueTests
Thread.sleep(5000)
- queue = MemoryQueue.dropOld(queue, java.time.Duration.ofMillis(1000),
completeErrorActivation)
+ queue = MemoryQueue.dropOld(
+ queue,
+ java.time.Duration.ofMillis(1000),
+ "activation processing is not initiated for 1000 ms",
+ completeErrorActivation)
queue.size shouldBe 3
}
@@ -1726,7 +1842,11 @@ class MemoryQueueTests
var queue = Queue.empty[TimeSeriesActivationEntry]
noException should be thrownBy {
- queue = MemoryQueue.dropOld(queue, java.time.Duration.ofMillis(1000),
completeErrorActivation)
+ queue = MemoryQueue.dropOld(
+ queue,
+ java.time.Duration.ofMillis(1000),
+ "activation processing is not initiated for 1000 ms",
+ completeErrorActivation)
}
}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
index 38f13ceee..c7a3ff095 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
@@ -154,10 +154,12 @@ class MemoryQueueTestsFixture
val actionThrottlingKey = ThrottlingKeys.action(testInvocationNamespace,
fqn.copy(version = None))
// queue variables
- val queueConfig = QueueConfig(5 seconds, 10 seconds, 5 seconds, 5 seconds,
10, 10000, 0.9, 10)
+ val queueConfig = QueueConfig(5 seconds, 10 seconds, 5 seconds, 5 seconds,
10, 10000, 20000, 0.9, 10)
val idleGrace = queueConfig.idleGrace
val stopGrace = queueConfig.stopGrace
val flushGrace = queueConfig.flushGrace
+ val retentionTimeout = queueConfig.maxRetentionMs
+ val blackboxTimeout = queueConfig.maxBlackboxRetentionMs
val gracefulShutdownTimeout = queueConfig.gracefulShutdownTimeout
val testRetentionSize = queueConfig.maxRetentionSize
val testThrottlingFraction = queueConfig.throttlingFraction