This is an automated email from the ASF dual-hosted git repository.

style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d31e96e1 [Scheduler Enhancement] Increase the retention timeout for 
the blackbox action. (#5266)
8d31e96e1 is described below

commit 8d31e96e1a321987f9f5c3b7289ba83bf4eebff6
Author: Dominic Kim <[email protected]>
AuthorDate: Tue Jul 12 19:56:13 2022 +0900

    [Scheduler Enhancement] Increase the retention timeout for the blackbox 
action. (#5266)
    
    * Increase the retention timeout for the blackbox action.
    
    * Fix test cases.
    
    * Apply scalaFmt.
    
    * Add GracefulShutdown case back.
    
    * Increase the blackbox timeout for test cases.
    
    * Access the private method directly.
    
    * Replace Thread.sleep with awaitAssert
    
    * Add the missing configuration.
    
    * Enhance the test code.
    
    * Remove thread.sleep.
    
    * Fix test cases.
    
    * Fix test cases.
---
 ansible/group_vars/all                             |   1 +
 ansible/roles/schedulers/tasks/deploy.yml          |   2 +
 .../org/apache/openwhisk/common/Logging.scala      |   3 +-
 .../org/apache/openwhisk/core/WhiskConfig.scala    |   1 +
 .../v2/FunctionPullingContainerPool.scala          |  12 +-
 core/scheduler/src/main/resources/application.conf |   2 +
 .../scheduler/container/CreationJobManager.scala   |  22 ++-
 .../core/scheduler/queue/MemoryQueue.scala         |  85 ++++++++---
 tests/src/test/resources/application.conf.j2       |   1 +
 .../test/FunctionPullingContainerPoolTests.scala   | 143 ++++++++----------
 .../container/test/CreationJobManagerTests.scala   |  60 ++++----
 .../queue/test/MemoryQueueFlowTests.scala          | 118 +++++++--------
 .../scheduler/queue/test/MemoryQueueTests.scala    | 168 ++++++++++++++++++---
 .../queue/test/MemoryQueueTestsFixture.scala       |   4 +-
 14 files changed, 395 insertions(+), 227 deletions(-)

diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 2d8365ea1..97a3c0cbb 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -518,6 +518,7 @@ scheduler:
   dataManagementService:
     retryInterval: "{{ scheduler_dataManagementService_retryInterval | 
default('1 second') }}"
   inProgressJobRetention: "{{ scheduler_inProgressJobRetention | default('20 
seconds') }}"
+  blackboxMultiple: "{{ scheduler_blackboxMultiple | default(15) }}"
   managedFraction: "{{ scheduler_managed_fraction | default(1.0 - 
(scheduler_blackbox_fraction | default(__scheduler_blackbox_fraction))) }}"
   blackboxFraction: "{{ scheduler_blackbox_fraction | 
default(__scheduler_blackbox_fraction) }}"
   scheduling:
diff --git a/ansible/roles/schedulers/tasks/deploy.yml 
b/ansible/roles/schedulers/tasks/deploy.yml
index 507f4813f..82f82d4c1 100644
--- a/ansible/roles/schedulers/tasks/deploy.yml
+++ b/ansible/roles/schedulers/tasks/deploy.yml
@@ -113,6 +113,7 @@
       "CONFIG_whisk_scheduler_maxPeek": "{{ scheduler.maxPeek }}"
       "CONFIG_whisk_scheduler_dataManagementService_retryInterval": "{{ 
scheduler.dataManagementService.retryInterval }}"
       "CONFIG_whisk_scheduler_inProgressJobRetention": "{{ 
scheduler.inProgressJobRetention }}"
+      "CONFIG_whisk_scheduler_blackboxMultiple": "{{ 
scheduler.blackboxMultiple }}"
       "CONFIG_whisk_scheduler_scheduling_staleThreshold": "{{ 
scheduler.scheduling.staleThreshold }}"
       "CONFIG_whisk_scheduler_scheduling_checkInterval": "{{ 
scheduler.scheduling.checkInterval }}"
       "CONFIG_whisk_scheduler_scheduling_dropInterval": "{{ 
scheduler.scheduling.dropInterval }}"
@@ -124,6 +125,7 @@
       "CONFIG_whisk_scheduler_queue_gracefulShutdownTimeout": "{{ 
scheduler.queue.gracefulShutdownTimeout }}"
       "CONFIG_whisk_scheduler_queue_maxRetentionSize": "{{ 
scheduler.queue.maxRetentionSize }}"
       "CONFIG_whisk_scheduler_queue_maxRetentionMs": "{{ 
scheduler.queue.maxRetentionMs }}"
+      "CONFIG_whisk_scheduler_queue_maxBlackboxRetentionMs": "{{ 
scheduler.queue.maxBlackboxRetentionMs }}"
       "CONFIG_whisk_scheduler_queue_throttlingFraction": "{{ 
scheduler.queue.throttlingFraction }}"
       "CONFIG_whisk_scheduler_queue_durationBufferSize": "{{ 
scheduler.queue.durationBufferSize }}"
       "CONFIG_whisk_durationChecker_timeWindow": "{{ 
durationChecker.timeWindow }}"
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index a9da001b0..ff82ef5fb 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -594,7 +594,8 @@ object LoggingMarkers {
   val SCHEDULER_KAFKA_WAIT_TIME =
     LogMarkerToken(scheduler, "kafkaWaitTime", 
counter)(MeasurementUnit.time.milliseconds)
   def SCHEDULER_WAIT_TIME(action: String) =
-    LogMarkerToken(scheduler, "waitTime", counter, Some(action), Map("action" 
-> action))(MeasurementUnit.time.milliseconds)
+    LogMarkerToken(scheduler, "waitTime", counter, Some(action), Map("action" 
-> action))(
+      MeasurementUnit.time.milliseconds)
 
   def SCHEDULER_KEEP_ALIVE(leaseId: Long) =
     LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" -> 
leaseId.toString))(MeasurementUnit.none)
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index 95520f242..57d4a8b03 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -305,6 +305,7 @@ object ConfigKeys {
   val schedulerQueue = "whisk.scheduler.queue"
   val schedulerQueueManager = "whisk.scheduler.queue-manager"
   val schedulerInProgressJobRetention = 
"whisk.scheduler.in-progress-job-retention"
+  val schedulerBlackboxMultiple = "whisk.scheduler.blackbox-multiple"
   val schedulerStaleThreshold = "whisk.scheduler.stale-threshold"
 
   val whiskClusterName = "whisk.cluster.name"
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala
index ac0d2e1ea..5b0c283c3 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala
@@ -88,14 +88,14 @@ class FunctionPullingContainerPool(
 
   implicit val ec = context.system.dispatcher
 
-  private var busyPool = immutable.Map.empty[ActorRef, Data]
-  private var inProgressPool = immutable.Map.empty[ActorRef, Data]
-  private var warmedPool = immutable.Map.empty[ActorRef, WarmData]
-  private var prewarmedPool = immutable.Map.empty[ActorRef, PreWarmData]
-  private var prewarmStartingPool = immutable.Map.empty[ActorRef, (String, 
ByteSize)]
+  protected[containerpool] var busyPool = immutable.Map.empty[ActorRef, Data]
+  protected[containerpool] var inProgressPool = immutable.Map.empty[ActorRef, 
Data]
+  protected[containerpool] var warmedPool = immutable.Map.empty[ActorRef, 
WarmData]
+  protected[containerpool] var prewarmedPool = immutable.Map.empty[ActorRef, 
PreWarmData]
+  protected[containerpool] var prewarmStartingPool = 
immutable.Map.empty[ActorRef, (String, ByteSize)]
 
   // for shutting down
-  private var disablingPool = immutable.Set.empty[ActorRef]
+  protected[containerpool] var disablingPool = immutable.Set.empty[ActorRef]
 
   private var shuttingDown = false
 
diff --git a/core/scheduler/src/main/resources/application.conf 
b/core/scheduler/src/main/resources/application.conf
index 211ae5f0d..e73f764c1 100644
--- a/core/scheduler/src/main/resources/application.conf
+++ b/core/scheduler/src/main/resources/application.conf
@@ -76,6 +76,7 @@ whisk {
       graceful-shutdown-timeout = "5 seconds"
       max-retention-size = "10000"
       max-retention-ms = "60000"
+      max-blackbox-retention-ms = "300000"
       throttling-fraction = "0.9"
       duration-buffer-size = "10"
     }
@@ -85,6 +86,7 @@ whisk {
     }
     max-peek = "128"
     in-progress-job-retention = "20 seconds"
+    blackbox-multiple = "15"
     data-management-service {
         retry-interval = "1 second"
     }
diff --git 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/CreationJobManager.scala
 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/CreationJobManager.scala
index 1ac75e8ff..ecbc528a4 100644
--- 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/CreationJobManager.scala
+++ 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/CreationJobManager.scala
@@ -47,10 +47,11 @@ case class JobEntry(action: FullyQualifiedEntityName, 
timer: Cancellable)
 
 class CreationJobManager(feedFactory: (ActorRefFactory, String, String, Int, 
Array[Byte] => Future[Unit]) => ActorRef,
                          schedulerInstanceId: SchedulerInstanceId,
-                         dataManagementService: ActorRef)(implicit 
actorSystem: ActorSystem, logging: Logging)
+                         dataManagementService: ActorRef,
+                         baseTimeout: FiniteDuration,
+                         blackboxMultiple: Int)(implicit actorSystem: 
ActorSystem, logging: Logging)
     extends Actor {
   private implicit val ec: ExecutionContext = actorSystem.dispatcher
-  private val baseTimeout = 
loadConfigOrThrow[FiniteDuration](ConfigKeys.schedulerInProgressJobRetention)
   private val retryLimit = 5
 
   /**
@@ -152,10 +153,10 @@ class CreationJobManager(feedFactory: (ActorRefFactory, 
String, String, Int, Arr
     // If there is a JobEntry, delete it.
     creationJobPool
       .remove(creationId)
-      .foreach(entry => {
-        sendState(state)
-        entry.timer.cancel()
-      })
+      .map(entry => entry.timer.cancel())
+
+    // even if there is no entry because of timeout, we still need to send the 
state to the queue if the queue exists
+    sendState(state)
 
     dataManagementService ! UnregisterData(key)
     Future.successful({})
@@ -176,7 +177,8 @@ class CreationJobManager(feedFactory: (ActorRefFactory, 
String, String, Int, Arr
                             revision: DocRevision,
                             creationId: CreationId,
                             isBlackbox: Boolean): Cancellable = {
-    val timeout = if (isBlackbox) FiniteDuration(baseTimeout.toSeconds * 3, 
TimeUnit.SECONDS) else baseTimeout
+    val timeout =
+      if (isBlackbox) FiniteDuration(baseTimeout.toSeconds * blackboxMultiple, 
TimeUnit.SECONDS) else baseTimeout
     actorSystem.scheduler.scheduleOnce(timeout) {
       logging.warn(
         this,
@@ -222,8 +224,12 @@ class CreationJobManager(feedFactory: (ActorRefFactory, 
String, String, Int, Arr
 }
 
 object CreationJobManager {
+  private val baseTimeout = 
loadConfigOrThrow[Int](ConfigKeys.schedulerInProgressJobRetention).seconds
+  private val blackboxMultiple = 
loadConfigOrThrow[Int](ConfigKeys.schedulerBlackboxMultiple)
+
   def props(feedFactory: (ActorRefFactory, String, String, Int, Array[Byte] => 
Future[Unit]) => ActorRef,
             schedulerInstanceId: SchedulerInstanceId,
             dataManagementService: ActorRef)(implicit actorSystem: 
ActorSystem, logging: Logging) =
-    Props(new CreationJobManager(feedFactory, schedulerInstanceId, 
dataManagementService))
+    Props(
+      new CreationJobManager(feedFactory, schedulerInstanceId, 
dataManagementService, baseTimeout, blackboxMultiple))
 }
diff --git 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
index e4ba0022b..fabc785a4 100644
--- 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
+++ 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
@@ -85,7 +85,6 @@ case class QueueRemoved(invocationNamespace: String, action: 
DocInfo, leaderKey:
 case class QueueReactivated(invocationNamespace: String, action: 
FullyQualifiedEntityName, docInfo: DocInfo)
 case class CancelPoll(promise: Promise[Either[MemoryQueueError, 
ActivationMessage]])
 case object QueueRemovedCompleted
-case object FlushPulse
 
 // Events received by the actor
 case object Start
@@ -125,7 +124,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
                   checkToDropStaleActivation: 
(Queue[TimeSeriesActivationEntry],
                                                Long,
                                                String,
-                                               FullyQualifiedEntityName,
+                                               WhiskActionMetaData,
                                                MemoryQueueState,
                                                ActorRef) => Unit,
                   queueConfig: QueueConfig)(implicit logging: Logging)
@@ -151,6 +150,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
   private val memory = actionMetaData.limits.memory.megabytes.MB
   private val queueRemovedMsg = QueueRemoved(invocationNamespace, 
action.toDocId.asDocInfo(revision), Some(leaderKey))
   private val staleQueueRemovedMsg = QueueRemoved(invocationNamespace, 
action.toDocId.asDocInfo(revision), None)
+  private val actionRetentionTimeout = 
MemoryQueue.getRetentionTimeout(actionMetaData, queueConfig)
 
   private[queue] var containers = Set.empty[String]
   private[queue] var creationIds = Set.empty[String]
@@ -197,7 +197,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,
 
   when(Uninitialized) {
     case Event(Start, _) =>
-      logging.info(this, s"[$invocationNamespace:$action:$stateName] a new 
queue is created.")
+      logging.info(
+        this,
+        s"[$invocationNamespace:$action:$stateName] a new queue is created, 
retentionTimeout: $actionRetentionTimeout, kind: ${actionMetaData.exec.kind}.")
       val (schedulerActor, droppingActor) = startMonitoring()
       initializeThrottling()
 
@@ -256,7 +258,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,
       // when there is no container, it moves to the Flushing state as no 
activations can be invoked
       if (containers.size <= 0) {
         val isWhiskError = ContainerCreationError.whiskErrors.contains(error)
-        completeAllActivations(message, isWhiskError)
+        if (!isWhiskError) {
+          completeAllActivations(message, isWhiskError)
+        }
         logging.error(
           this,
           s"[$invocationNamespace:$action:$stateName] Failed to create an 
initial container due to ${if (isWhiskError) "whiskError"
@@ -271,7 +275,11 @@ class MemoryQueue(private val etcdClient: EtcdClient,
   // there is no timeout for this state as when there is no further message, 
it would move to the Running state again.
   when(NamespaceThrottled) {
     case Event(msg: ActivationMessage, _: ThrottledData) =>
-      handleActivationMessage(msg)
+      if (containers.size + creationIds.size == 0) {
+        completeErrorActivation(msg, tooManyConcurrentRequests, isWhiskError = 
false)
+      } else {
+        handleActivationMessage(msg)
+      }
       stay
 
     case Event(DisableNamespaceThrottling, data: ThrottledData) =>
@@ -328,33 +336,51 @@ class MemoryQueue(private val etcdClient: EtcdClient,
       goto(Running) using RunningData(schedulerActor, droppingActor)
 
     // log the failed information
-    case Event(FailedCreationJob(creationId, _, _, _, _, message), data: 
FlushingData) =>
+    case Event(FailedCreationJob(creationId, _, _, _, error, message), data: 
FlushingData) =>
       creationIds -= creationId.asString
       logging.info(
         this,
         s"[$invocationNamespace:$action:$stateName][$creationId] Failed to 
create a container due to $message")
 
       // keep updating the reason
-      stay using data.copy(reason = message)
+      stay using data.copy(error = error, reason = message)
 
     // since there is no container, activations cannot be handled.
     case Event(msg: ActivationMessage, data: FlushingData) =>
-      completeErrorActivation(msg, data.reason, 
ContainerCreationError.whiskErrors.contains(data.error))
+      logging.info(this, s"[$invocationNamespace:$action:$stateName] got a new 
activation message ${msg.activationId}")(
+        msg.transid)
+      val whiskError = isWhiskError(data.error)
+      if (whiskError)
+        queue = queue.enqueue(TimeSeriesActivationEntry(Instant.now, msg))
+      else
+        completeErrorActivation(msg, data.reason, whiskError)
       stay() using data.copy(activeDuringFlush = true)
 
     // Since SchedulingDecisionMaker keep sending a message to create a 
container, this state is not automatically timed out.
     // Instead, StateTimeout message will be sent by a timer.
-    case Event(StateTimeout, data: FlushingData) =>
-      completeAllActivations(data.reason, 
ContainerCreationError.whiskErrors.contains(data.error))
-      if (data.activeDuringFlush)
+    case Event(StateTimeout | DropOld, data: FlushingData) =>
+      logging.info(this, s"[$invocationNamespace:$action:$stateName] Received 
StateTimeout, drop stale messages.")
+      queue =
+        MemoryQueue.dropOld(queue, Duration.ofMillis(actionRetentionTimeout), 
data.reason, completeErrorActivation)
+      if (data.activeDuringFlush || queue.nonEmpty)
         stay using data.copy(activeDuringFlush = false)
       else
         cleanUpActorsAndGotoRemoved(data)
 
     case Event(GracefulShutdown, data: FlushingData) =>
-      completeAllActivations(data.reason, 
ContainerCreationError.whiskErrors.contains(data.error))
+      completeAllActivations(data.reason, isWhiskError(data.error))
       logging.info(this, s"[$invocationNamespace:$action:$stateName] Received 
GracefulShutdown, stop the queue.")
       cleanUpActorsAndGotoRemoved(data)
+
+    case Event(StopSchedulingAsOutdated, data: FlushingData) =>
+      logging.info(this, s"[$invocationNamespace:$action:$stateName] stop 
further scheduling.")
+      completeAllActivations(data.reason, isWhiskError(data.error))
+      // let QueueManager know this queue is no longer in charge.
+      context.parent ! staleQueueRemovedMsg
+      cleanUpActors(data)
+      cleanUpData()
+
+      goto(Removed) using NoData()
   }
 
   // in case there is any activation in the queue, it waits until all of them 
are handled.
@@ -399,6 +425,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
 
     // actors and data are already wiped
     case Event(QueueRemovedCompleted, _: NoData) =>
+      logging.info(this, "stop fsm")
       stop()
 
     // This is not supposed to happen. This will ensure the queue does not run 
forever.
@@ -523,7 +550,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
     case Event(DropOld, _) =>
       if (queue.nonEmpty && Duration
             .between(queue.head.timestamp, Instant.now)
-            .compareTo(Duration.ofMillis(queueConfig.maxRetentionMs)) < 0) {
+            .compareTo(Duration.ofMillis(actionRetentionTimeout)) < 0) {
         logging.error(
           this,
           s"[$invocationNamespace:$action:$stateName] Drop some stale 
activations for $revision, existing container is ${containers.size}, inProgress 
container is ${creationIds.size}, state data: $stateData, in is $in, current: 
${queue.size}.")
@@ -531,7 +558,11 @@ class MemoryQueue(private val etcdClient: EtcdClient,
           this,
           s"[$invocationNamespace:$action:$stateName] the head stale message: 
${queue.head.msg.activationId}")
       }
-      queue = MemoryQueue.dropOld(queue, 
Duration.ofMillis(queueConfig.maxRetentionMs), completeErrorActivation)
+      queue = MemoryQueue.dropOld(
+        queue,
+        Duration.ofMillis(actionRetentionTimeout),
+        s"Activation processing is not initiated for $actionRetentionTimeout 
ms",
+        completeErrorActivation)
 
       stay
 
@@ -861,7 +892,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
   // these schedulers will run forever and stop when the memory queue stops
   private def startMonitoring(): (ActorRef, ActorRef) = {
     val droppingScheduler = 
Scheduler.scheduleWaitAtLeast(schedulingConfig.dropInterval) { () =>
-      checkToDropStaleActivation(queue, queueConfig.maxRetentionMs, 
invocationNamespace, action, stateName, self)
+      checkToDropStaleActivation(queue, actionRetentionTimeout, 
invocationNamespace, actionMetaData, stateName, self)
       Future.successful(())
     }
 
@@ -1055,11 +1086,12 @@ class MemoryQueue(private val etcdClient: EtcdClient,
           causedBy ++ limits ++ binding
       })
   }
+
+  private def isWhiskError(error: ContainerCreationError): Boolean = 
ContainerCreationError.whiskErrors.contains(error)
 }
 
 object MemoryQueue {
   private[queue] val queueConfig = 
loadConfigOrThrow[QueueConfig](ConfigKeys.schedulerQueue)
-  private[queue] val MaxRetentionTime = queueConfig.maxRetentionMs
 
   def props(etcdClient: EtcdClient,
             durationChecker: DurationChecker,
@@ -1105,21 +1137,27 @@ object MemoryQueue {
   def dropOld(
     queue: Queue[TimeSeriesActivationEntry],
     retention: Duration,
+    reason: String,
     completeErrorActivation: (ActivationMessage, String, Boolean) => 
Future[Any]): Queue[TimeSeriesActivationEntry] = {
     if (queue.isEmpty || Duration.between(queue.head.timestamp, 
Instant.now).compareTo(retention) < 0)
       queue
     else {
-      completeErrorActivation(queue.head.msg, s"activation processing is not 
initiated for $MaxRetentionTime ms", true)
-      dropOld(queue.tail, retention, completeErrorActivation)
+      completeErrorActivation(queue.head.msg, reason, true)
+      dropOld(queue.tail, retention, reason, completeErrorActivation)
     }
   }
 
   def checkToDropStaleActivation(queue: Queue[TimeSeriesActivationEntry],
                                  maxRetentionMs: Long,
                                  invocationNamespace: String,
-                                 action: FullyQualifiedEntityName,
+                                 actionMetaData: WhiskActionMetaData,
                                  stateName: MemoryQueueState,
                                  queueRef: ActorRef)(implicit logging: 
Logging) = {
+    val action = actionMetaData.fullyQualifiedName(true)
+    logging.debug(
+      this,
+      s"[$invocationNamespace:$action:$stateName] use the given retention 
timeout: $maxRetentionMs for this action kind: ${actionMetaData.exec.kind}.")
+
     if (queue.nonEmpty && Duration
           .between(queue.head.timestamp, Instant.now)
           .compareTo(Duration.ofMillis(maxRetentionMs)) >= 0) {
@@ -1130,6 +1168,14 @@ object MemoryQueue {
       queueRef ! DropOld
     }
   }
+
+  private def getRetentionTimeout(actionMetaData: WhiskActionMetaData, 
queueConfig: QueueConfig): Long = {
+    if (actionMetaData.exec.kind == ExecMetaDataBase.BLACKBOX) {
+      queueConfig.maxBlackboxRetentionMs
+    } else {
+      queueConfig.maxRetentionMs
+    }
+  }
 }
 
 case class QueueSnapshot(initialized: Boolean,
@@ -1151,6 +1197,7 @@ case class QueueConfig(idleGrace: FiniteDuration,
                        gracefulShutdownTimeout: FiniteDuration,
                        maxRetentionSize: Int,
                        maxRetentionMs: Long,
+                       maxBlackboxRetentionMs: Long,
                        throttlingFraction: Double,
                        durationBufferSize: Int)
 
diff --git a/tests/src/test/resources/application.conf.j2 
b/tests/src/test/resources/application.conf.j2
index 0ec5f839f..b0f393238 100644
--- a/tests/src/test/resources/application.conf.j2
+++ b/tests/src/test/resources/application.conf.j2
@@ -152,6 +152,7 @@ whisk {
             graceful-shutdown-timeout = "{{ 
scheduler.queue.gracefulShutdownTimeout | default('5 seconds') }}"
             max-retention-size = "{{ scheduler.queue.maxRetentionSize | 
default(10000) }}"
             max-retention-ms = "{{ scheduler.queue.maxRetentionMs | 
default(60000) }}"
+            max-blackbox-retention-ms = "{{ 
scheduler.queue.maxBlackboxRetentionMs}}"
             throttling-fraction = "{{ scheduler.queue.throttlingFraction | 
default(0.9) }}"
             duration-buffer-size = "{{ scheduler.queue.durationBufferSize | 
default(10) }}"
         }
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
index 6f7787675..14cf432b5 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
@@ -20,7 +20,7 @@ package org.apache.openwhisk.core.containerpool.v2.test
 import java.time.Instant
 import java.util.concurrent.TimeUnit
 import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, Props}
-import akka.testkit.{ImplicitSender, TestActor, TestKit, TestProbe}
+import akka.testkit.{ImplicitSender, TestActor, TestActorRef, TestKit, 
TestProbe}
 import common.StreamLogging
 import org.apache.openwhisk.common.{Enable, GracefulShutdown, TransactionId}
 import org.apache.openwhisk.core.WhiskConfig
@@ -55,6 +55,7 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, 
FlatSpecLike, Match
 import org.scalatest.concurrent.Eventually
 
 import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
 import scala.concurrent.Future
 import scala.concurrent.duration._
 import scala.language.postfixOps
@@ -311,21 +312,38 @@ class FunctionPullingContainerPoolTests
     }
   }
 
+  private def retry[T](fn: => T) = org.apache.openwhisk.utils.retry(fn, 10, 
Some(1.second))
+
   it should "stop containers gradually when shut down" in within(timeout * 20) 
{
     val (containers, factory) = testContainers(10)
+    val disablingContainers = ListBuffer[ActorRef]()
+
+    for (container <- containers) {
+      container.setAutoPilot((_: ActorRef, msg: Any) =>
+        msg match {
+          case GracefulShutdown =>
+            disablingContainers += container.ref
+            TestActor.KeepRunning
+
+          case _ =>
+            TestActor.KeepRunning
+      })
+    }
+
     val doc = put(entityStore, bigWhiskAction)
     val topic = s"creationAck${schedulerInstanceId.asString}"
     val consumer = new TestConnector(topic, 4, true)
-    val pool = system.actorOf(
-      Props(new FunctionPullingContainerPool(
+    val pool = TestActorRef(
+      new FunctionPullingContainerPool(
         factory,
         invokerHealthService.ref,
         poolConfig(MemoryLimit.STD_MEMORY * 20, batchDeletionSize = 3),
         invokerInstance,
         List.empty,
-        sendAckToScheduler(consumer.getProducer()))))
+        sendAckToScheduler(consumer.getProducer())))
 
     (0 to 10).foreach(_ => pool ! 
CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)) // 11 
* stdMemory taken)
+
     (0 to 10).foreach(i => {
       containers(i).expectMsgPF() {
         case Initialize(invocationNamespace, fqn, executeAction, 
schedulerHost, rpcPort, _) => true
@@ -346,81 +364,54 @@ class FunctionPullingContainerPoolTests
               TestProbe().ref)))
     })
 
+    retry {
+      pool.underlyingActor.warmedPool.size shouldBe 6
+      pool.underlyingActor.busyPool.size shouldBe 5
+    }
+
     // disable
     pool ! GracefulShutdown
+
     // at first, 3 containers will be removed from busy pool, and left 
containers will not
-    var disablingContainers = Set.empty[Int]
-    (0 to 10).foreach(i => {
-      try {
-        containers(i).expectMsg(1.second, GracefulShutdown)
-        disablingContainers += i
-      } catch {
-        case _: Throwable =>
-      }
-    })
-    assert(disablingContainers.size == 3, "more than 3 containers is shutting 
down")
-    disablingContainers.foreach(i => containers(i).send(pool, 
ContainerRemoved(false)))
-
-    Thread.sleep(3000)
-    var completedContainer = -1
-    (0 to 10)
-      .filter(!disablingContainers.contains(_))
-      .foreach(i => {
-        try {
-          containers(i).expectMsg(1.second, GracefulShutdown)
-          disablingContainers += i
-          // only make one container complete shutting down
-          if (completedContainer == -1)
-            completedContainer = i
-        } catch {
-          case _: Throwable =>
-        }
-      })
-    assert(disablingContainers.size == 6, "more than 3 containers is shutting 
down")
-    containers(completedContainer).send(pool, ContainerRemoved(false))
-
-    Thread.sleep(3000)
-    (0 to 10)
-      .filter(!disablingContainers.contains(_))
-      .foreach(i => {
-        try {
-          containers(i).expectMsg(1.second, GracefulShutdown)
-          disablingContainers += i
-        } catch {
-          case _: Throwable =>
-        }
-      })
-    // there should be only one more container going to shut down
-    assert(disablingContainers.size == 7, "more than 3 containers is shutting 
down")
-    disablingContainers.foreach(i => containers(i).send(pool, 
ContainerRemoved(false)))
-
-    Thread.sleep(3000)
-    (0 to 10)
-      .filter(!disablingContainers.contains(_))
-      .foreach(i => {
-        try {
-          containers(i).expectMsg(1.second, GracefulShutdown)
-          disablingContainers += i
-        } catch {
-          case _: Throwable =>
-        }
-      })
-    assert(disablingContainers.size == 10, "more than 3 containers is shutting 
down")
-    disablingContainers.foreach(i => containers(i).send(pool, 
ContainerRemoved(false)))
-
-    Thread.sleep(3000)
-    (0 to 10)
-      .filter(!disablingContainers.contains(_))
-      .foreach(i => {
-        try {
-          containers(i).expectMsg(1.second, GracefulShutdown)
-          disablingContainers += i
-        } catch {
-          case _: Throwable =>
-        }
-      })
-    assert(disablingContainers.size == 11, "unexpected containers is shutting 
down")
-    disablingContainers.foreach(i => containers(i).send(pool, 
ContainerRemoved(false)))
+    retry {
+      disablingContainers.size shouldBe 3
+    }
+
+    // all 3 containers finish termination
+    disablingContainers.foreach(pool.tell(ContainerRemoved(false), _))
+
+    retry {
+      pool.underlyingActor.warmedPool.size + 
pool.underlyingActor.busyPool.size shouldBe 8
+    }
+
+    // it will disable 3 more containers.
+    retry {
+      disablingContainers.size shouldBe 6
+    }
+
+    // only one container of them finishes termination
+    pool.tell(ContainerRemoved(false), disablingContainers.last)
+
+    // there should be only one more container going to shut down as more than 
3 containers are shutting down.
+    retry {
+      disablingContainers.size shouldBe 7
+    }
+
+    // all 3 containers finish termination
+    disablingContainers.foreach(pool.tell(ContainerRemoved(false), _))
+
+    retry {
+      disablingContainers.size shouldBe 10
+    }
+
+    // all disabling containers finish termination
+    disablingContainers.foreach(pool.tell(ContainerRemoved(false), _))
+
+    // the last container is shutting down.
+    retry {
+      disablingContainers.size shouldBe 11
+    }
+    disablingContainers.foreach(pool.tell(ContainerRemoved(false), _))
   }
 
   it should "create prewarmed containers on startup" in within(timeout) {
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/CreationJobManagerTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/CreationJobManagerTests.scala
index 75db03a8d..61e8199b7 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/CreationJobManagerTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/CreationJobManagerTests.scala
@@ -17,12 +17,12 @@
 
 package org.apache.openwhisk.core.scheduler.container.test
 
-import java.util.concurrent.TimeUnit
-import akka.actor.{ActorRef, ActorRefFactory, ActorSystem}
-import akka.testkit.{ImplicitSender, TestKit, TestProbe}
+import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, Props}
+import akka.testkit.{ImplicitSender, TestActorRef, TestKit, TestProbe}
 import com.ibm.etcd.client.{EtcdClient => Client}
 import common.StreamLogging
 import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.WhiskConfig
 import org.apache.openwhisk.core.connector._
 import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, 
RuntimeManifest}
 import org.apache.openwhisk.core.entity._
@@ -32,15 +32,14 @@ import org.apache.openwhisk.core.scheduler.container._
 import org.apache.openwhisk.core.scheduler.message._
 import org.apache.openwhisk.core.scheduler.queue.{MemoryQueueKey, 
MemoryQueueValue, QueuePool}
 import org.apache.openwhisk.core.service.{RegisterData, UnregisterData}
-import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
 import org.junit.runner.RunWith
 import org.scalamock.scalatest.MockFactory
 import org.scalatest.concurrent.ScalaFutures
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpecLike, 
Matchers}
-import pureconfig.loadConfigOrThrow
 
-import scala.concurrent.duration.FiniteDuration
+import java.util.concurrent.TimeUnit
+import scala.concurrent.duration.{DurationInt, FiniteDuration}
 import scala.concurrent.{ExecutionContextExecutor, Future}
 
 @RunWith(classOf[JUnitRunner])
@@ -55,8 +54,9 @@ class CreationJobManagerTests
     with BeforeAndAfterEach
     with StreamLogging {
 
-  private val timeout = 
loadConfigOrThrow[FiniteDuration](ConfigKeys.schedulerInProgressJobRetention)
-  val blackboxTimeout = FiniteDuration(timeout.toSeconds * 3, TimeUnit.SECONDS)
+  val timeout = 20.seconds
+  val blackboxMultiple = 2
+  val blackboxTimeout = FiniteDuration(timeout.toSeconds * blackboxMultiple, 
TimeUnit.SECONDS)
   implicit val ece: ExecutionContextExecutor = system.dispatcher
   val config = new WhiskConfig(ExecManifest.requiredProperties)
   val creationIdTest = CreationId.generate()
@@ -139,8 +139,7 @@ class CreationJobManagerTests
   it should "register creation job" in {
     val probe = TestProbe()
 
-    val manager =
-      system.actorOf(CreationJobManager.props(feedFactory, sid, probe.ref))
+    val manager = TestActorRef(new CreationJobManager(feedFactory, sid, 
probe.ref, timeout, blackboxMultiple))
 
     manager ! registerMessage
 
@@ -150,8 +149,7 @@ class CreationJobManagerTests
   it should "skip duplicated creation job" in {
     val probe = TestProbe()
 
-    val manager =
-      system.actorOf(CreationJobManager.props(feedFactory, sid, probe.ref))
+    val manager = TestActorRef(new CreationJobManager(feedFactory, sid, 
probe.ref, timeout, blackboxMultiple))
 
     manager ! registerMessage
     manager ! registerMessage
@@ -206,8 +204,9 @@ class CreationJobManagerTests
     val containerManager = TestProbe()
     val dataManagementService = TestProbe()
     val probe = TestProbe()
-    val jobManager =
-      containerManager.childActorOf(CreationJobManager.props(feedFactory, sid, 
dataManagementService.ref))
+    val jobManager = TestActorRef(
+      Props(new CreationJobManager(feedFactory, sid, 
dataManagementService.ref, timeout, blackboxMultiple)),
+      containerManager.ref)
 
     QueuePool.put(
       MemoryQueueKey(testInvocationNamespace, 
action.toDocId.asDocInfo(revision)),
@@ -238,8 +237,9 @@ class CreationJobManagerTests
     val containerManager = TestProbe()
     val dataManagementService = TestProbe()
 
-    val jobManager =
-      containerManager.childActorOf(CreationJobManager.props(feedFactory, sid, 
dataManagementService.ref))
+    val jobManager = TestActorRef(
+      Props(new CreationJobManager(feedFactory, sid, 
dataManagementService.ref, timeout, blackboxMultiple)),
+      containerManager.ref)
 
     jobManager ! registerMessage
 
@@ -257,8 +257,9 @@ class CreationJobManagerTests
     val containerManager = TestProbe()
     val dataManagementService = TestProbe()
     val probe = TestProbe()
-    val jobManager =
-      containerManager.childActorOf(CreationJobManager.props(feedFactory, sid, 
dataManagementService.ref))
+    val jobManager = TestActorRef(
+      Props(new CreationJobManager(feedFactory, sid, 
dataManagementService.ref, timeout, blackboxMultiple)),
+      containerManager.ref)
 
     QueuePool.put(
       MemoryQueueKey(testInvocationNamespace, 
action.toDocId.asDocInfo(revision)),
@@ -294,8 +295,9 @@ class CreationJobManagerTests
     val containerManager = TestProbe()
     val dataManagementService = TestProbe()
 
-    val jobManager =
-      containerManager.childActorOf(CreationJobManager.props(feedFactory, sid, 
dataManagementService.ref))
+    val jobManager = TestActorRef(
+      Props(new CreationJobManager(feedFactory, sid, 
dataManagementService.ref, timeout, blackboxMultiple)),
+      containerManager.ref)
 
     jobManager ! failedFinish.copy(ack = failedFinish.ack.copy(retryCount = 5))
 
@@ -306,8 +308,9 @@ class CreationJobManagerTests
     val containerManager = TestProbe()
     val dataManagementService = TestProbe()
 
-    val jobManager =
-      containerManager.childActorOf(CreationJobManager.props(feedFactory, sid, 
dataManagementService.ref))
+    val jobManager = TestActorRef(
+      Props(new CreationJobManager(feedFactory, sid, 
dataManagementService.ref, timeout, blackboxMultiple)),
+      containerManager.ref)
 
     jobManager ! registerMessage
 
@@ -329,8 +332,9 @@ class CreationJobManagerTests
     val containerManager = TestProbe()
     val dataManagementService = TestProbe()
 
-    val jobManager =
-      containerManager.childActorOf(CreationJobManager.props(feedFactory, sid, 
dataManagementService.ref))
+    val jobManager = TestActorRef(
+      Props(new CreationJobManager(feedFactory, sid, 
dataManagementService.ref, timeout, blackboxMultiple)),
+      containerManager.ref)
 
     val execMetadata =
       BlackBoxExecMetaData(ImageName("test image"), Some("main"), native = 
false);
@@ -364,7 +368,7 @@ class CreationJobManagerTests
 
     // no message for timeout
     dataManagementService.expectNoMessage(timeout)
-    Thread.sleep(timeout.toMillis * 2) // timeout is doubled for blackbox 
actions
+    Thread.sleep(timeout.toMillis * blackboxMultiple) // timeout is doubled 
for blackbox actions
     dataManagementService.expectMsg(UnregisterData(testKey))
     containerManager.expectMsg(
       FailedCreationJob(
@@ -380,8 +384,10 @@ class CreationJobManagerTests
     val containerManager = TestProbe()
     val dataManagementService = TestProbe()
     val probe = TestProbe()
-    val jobManager =
-      containerManager.childActorOf(CreationJobManager.props(feedFactory, sid, 
dataManagementService.ref))
+    val jobManager = TestActorRef(
+      Props(new CreationJobManager(feedFactory, sid, 
dataManagementService.ref, timeout, blackboxMultiple)),
+      containerManager.ref)
+
     QueuePool.put(
       MemoryQueueKey(testInvocationNamespace, 
action.toDocId.asDocInfo(revision)),
       MemoryQueueValue(probe.ref, true))
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
index edacd26f3..267d3a81d 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
@@ -47,7 +47,7 @@ import spray.json.{JsObject, JsString}
 import java.time.Instant
 import scala.collection.immutable.Queue
 import scala.concurrent.Future
-import scala.concurrent.duration.DurationInt
+import scala.concurrent.duration.{DurationInt, FiniteDuration, MILLISECONDS}
 import scala.language.postfixOps
 
 @RunWith(classOf[JUnitRunner])
@@ -74,7 +74,6 @@ class MemoryQueueFlowTests
 
   behavior of "MemoryQueueFlow"
 
-  // this is 1. normal case in 
https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
   it should "normally be created and handle an activation and became idle an 
finally removed" in {
     val mockEtcdClient = mock[EtcdClient]
     val parent = TestProbe()
@@ -168,7 +167,6 @@ class MemoryQueueFlowTests
     probe.expectTerminated(fsm, 10.seconds)
   }
 
-  // this is 1-2. normal case in 
https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
   it should "became Idle and Running again if a message arrives" in {
     val mockEtcdClient = mock[EtcdClient]
     val parent = TestProbe()
@@ -282,7 +280,6 @@ class MemoryQueueFlowTests
     probe.expectTerminated(fsm, 10.seconds)
   }
 
-  // this is 2. NamespaceThrottled case in 
https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
   it should "go to the Flushing state dropping messages when it can't create 
an initial container" in {
     val mockEtcdClient = mock[EtcdClient]
     val parent = TestProbe()
@@ -365,7 +362,6 @@ class MemoryQueueFlowTests
     probe.expectTerminated(fsm, 10.seconds)
   }
 
-  // this is 3. NamespaceThrottled case in 
https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
   it should "go to the NamespaceThrottled state without dropping messages and 
get back to the Running container" in {
     val mockEtcdClient = mock[EtcdClient]
     val parent = TestProbe()
@@ -488,7 +484,6 @@ class MemoryQueueFlowTests
     probe.expectTerminated(fsm, 10.seconds)
   }
 
-  // this is 4. ActionThrottled case in 
https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
   it should "go to the ActionThrottled state when there are too many stale 
activations including transition to NamespaceThrottling" in {
     val mockEtcdClient = mock[EtcdClient]
     val parent = TestProbe()
@@ -501,7 +496,7 @@ class MemoryQueueFlowTests
 
     // max retention size is 10 and throttling fraction is 0.8
     // queue will be action throttled at 10 messages and disabled action 
throttling at 8 messages
-    val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 
seconds, 10, 5000, 0.8, 10)
+    val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 
seconds, 10, 5000, 10000, 0.8, 10)
 
     // limit is 1
     val getUserLimit = (_: String) => Future.successful(1)
@@ -637,7 +632,6 @@ class MemoryQueueFlowTests
     probe.expectTerminated(fsm, 10.seconds)
   }
 
-  // this is 5. Paused case in 
https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
   it should "be Flushing when the limit is 0 and restarted back to Running 
state when the limit is increased" in {
     val mockEtcdClient = mock[EtcdClient]
     val parent = TestProbe()
@@ -691,26 +685,18 @@ class MemoryQueueFlowTests
     expectInitialData(watcher, dataMgmtService)
     probe.expectMsg(Transition(fsm, Uninitialized, Running))
 
-    awaitAssert({
-      ackedMessageCount shouldBe 1
-      lastAckedActivationResult.response.result shouldBe Some(JsObject("error" 
-> JsString(namespaceLimitUnderZero)))
-      storedMessageCount shouldBe 1
-      lastAckedActivationResult.response.result shouldBe Some(JsObject("error" 
-> JsString(namespaceLimitUnderZero)))
-      fsm.underlyingActor.queue.length shouldBe 0
-    }, 5.seconds)
-
     probe.expectMsg(Transition(fsm, Running, Flushing))
+    // activation received in Flushing state won't be flushed immediately if 
Flushing state is caused by a whisk error
+    Thread.sleep(flushGrace.toMillis)
+    fsm ! messages(1)
 
     awaitAssert({
-      // in the paused state, all incoming messages should be dropped 
immediately
-      fsm ! messages(1)
-      ackedMessageCount shouldBe 2
+      ackedMessageCount shouldBe 1
       lastAckedActivationResult.response.result shouldBe Some(JsObject("error" 
-> JsString(namespaceLimitUnderZero)))
-      storedMessageCount shouldBe 2
+      storedMessageCount shouldBe 1
       lastAckedActivationResult.response.result shouldBe Some(JsObject("error" 
-> JsString(namespaceLimitUnderZero)))
-      fsm.underlyingActor.queue.length shouldBe 0
-    }, 5.seconds)
-
+      fsm.underlyingActor.queue.length shouldBe 1
+    }, FiniteDuration(retentionTimeout, MILLISECONDS))
     // limit is increased by an operator
     limit = 10
 
@@ -728,14 +714,12 @@ class MemoryQueueFlowTests
     // Queue is now working
     probe.expectMsg(Transition(fsm, Flushing, Running))
 
-    fsm ! messages(2)
-
     // one container is created
     
fsm.underlyingActor.namespaceContainerCount.existingContainerNumByNamespace += 1
 
     // only one message is handled
     container.send(fsm, getActivation(true, "testContainerId1"))
-    container.expectMsg(ActivationResponse(Right(messages(2))))
+    container.expectMsg(ActivationResponse(Right(messages(1))))
 
     // deleting the container from containers set
     container.send(fsm, getActivation(false, "testContainerId1"))
@@ -758,7 +742,6 @@ class MemoryQueueFlowTests
     probe.expectTerminated(fsm, 10.seconds)
   }
 
-  // this is 5-2. Paused case in 
https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
   it should "be Flushing when the limit is 0 and be terminated without 
recovering" in {
     val mockEtcdClient = mock[EtcdClient]
     val parent = TestProbe()
@@ -770,7 +753,7 @@ class MemoryQueueFlowTests
       system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, 
fqn, schedulingConfig))
 
     // generate 2 activations
-    val messages = getActivationMessages(3)
+    val messages = getActivationMessages(2)
 
     val getUserLimit = (_: String) => Future.successful(0)
 
@@ -804,35 +787,25 @@ class MemoryQueueFlowTests
     registerCallback(probe, fsm)
 
     fsm ! Start
+    fsm ! messages(0)
     expectInitialData(watcher, dataMgmtService)
     fsm ! testInitialDataStorageResult
 
     probe.expectMsg(Transition(fsm, Uninitialized, Running))
 
-    fsm ! messages(0)
-    awaitAssert({
-      ackedMessageCount shouldBe 1
-      lastAckedActivationResult.response.result shouldBe Some(JsObject("error" 
-> JsString(namespaceLimitUnderZero)))
-      storedMessageCount shouldBe 1
-      lastAckedActivationResult.response.result shouldBe Some(JsObject("error" 
-> JsString(namespaceLimitUnderZero)))
-      fsm.underlyingActor.queue.length shouldBe 0
-    }, 5.seconds)
-
     probe.expectMsg(Transition(fsm, Running, Flushing))
-
-    // in the paused state, all incoming messages should be dropped immediately
     fsm ! messages(1)
 
+    // activation received in Flushing state won't be flushed immediately if 
Flushing state is caused by a whisk error
+    Thread.sleep(flushGrace.toMillis)
+
     awaitAssert({
       ackedMessageCount shouldBe 2
       lastAckedActivationResult.response.result shouldBe Some(JsObject("error" 
-> JsString(namespaceLimitUnderZero)))
       storedMessageCount shouldBe 2
       lastAckedActivationResult.response.result shouldBe Some(JsObject("error" 
-> JsString(namespaceLimitUnderZero)))
       fsm.underlyingActor.queue.length shouldBe 0
-    }, 5.seconds)
-
-    // normal termination process
-    Thread.sleep(flushGrace.toMillis * 2)
+    }, FiniteDuration(retentionTimeout, MILLISECONDS))
 
     // In this case data clean up happens first.
     expectDataCleanUp(watcher, dataMgmtService)
@@ -844,7 +817,6 @@ class MemoryQueueFlowTests
     probe.expectTerminated(fsm, 10.seconds)
   }
 
-  // this is 6. Waiting case in 
https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
   it should "be the Flushing state when a whisk error happens" in {
     val mockEtcdClient = mock[EtcdClient]
     val parent = TestProbe()
@@ -921,13 +893,15 @@ class MemoryQueueFlowTests
 
     fsm ! messages(1)
 
+    Thread.sleep(flushGrace.toMillis)
+
     awaitAssert({
       ackedMessageCount shouldBe 2
       lastAckedActivationResult.response.result shouldBe Some(JsObject("error" 
-> JsString("whisk error")))
       storedMessageCount shouldBe 2
       lastAckedActivationResult.response.result shouldBe Some(JsObject("error" 
-> JsString("whisk error")))
       fsm.underlyingActor.queue.length shouldBe 0
-    }, 5.seconds)
+    }, FiniteDuration(retentionTimeout, MILLISECONDS))
 
     Thread.sleep(flushGrace.toMillis * 2)
 
@@ -941,7 +915,6 @@ class MemoryQueueFlowTests
     probe.expectTerminated(fsm, 10.seconds)
   }
 
-  // this is 6-2. Waiting case in 
https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
   it should "be the Flushing state when a whisk error happens and be recovered 
when a container is created" in {
     val mockEtcdClient = mock[EtcdClient]
     val parent = TestProbe()
@@ -952,6 +925,8 @@ class MemoryQueueFlowTests
       system.actorOf(SchedulingDecisionMaker.props(testInvocationNamespace, 
fqn, schedulingConfig))
     val probe = TestProbe()
     val container = TestProbe()
+    // generate 2 activations
+    val messages = getActivationMessages(2)
 
     expectDurationChecking(mockEsClient, testInvocationNamespace)
 
@@ -988,36 +963,39 @@ class MemoryQueueFlowTests
 
     probe.expectMsg(Transition(fsm, Uninitialized, Running))
 
-    fsm ! message
-    // any id is fine because it would be overridden
-    var creationId = CreationId.generate()
+    fsm ! messages(0)
 
+    // Failed to create a container
     containerManager.expectMsgPF() {
       case ContainerCreation(List(ContainerCreationMessage(_, _, _, _, _, _, 
_, _, _, id)), _, _) =>
-        creationId = id
+        fsm ! FailedCreationJob(id, testInvocationNamespace, fqn, revision, 
WhiskError, "whisk error")
     }
-    // Failed to create a container
-    fsm ! FailedCreationJob(creationId, testInvocationNamespace, fqn, 
revision, WhiskError, "whisk error")
+
+    probe.expectMsg(Transition(fsm, Running, Flushing))
+    Thread.sleep(1000)
+    fsm ! messages(1)
+
+    // activation received in Flushing state won't be flushed immediately if 
Flushing state is caused by a whisk error
+    Thread.sleep(flushGrace.toMillis)
 
     awaitAssert({
       ackedMessageCount shouldBe 1
       lastAckedActivationResult.response.result shouldBe Some(JsObject("error" 
-> JsString("whisk error")))
       storedMessageCount shouldBe 1
       lastAckedActivationResult.response.result shouldBe Some(JsObject("error" 
-> JsString("whisk error")))
-      fsm.underlyingActor.queue.length shouldBe 0
-    }, 5.seconds)
+      fsm.underlyingActor.queue.length shouldBe 1
+    }, FiniteDuration(retentionTimeout, MILLISECONDS))
 
-    probe.expectMsg(Transition(fsm, Running, Flushing))
-
-    // Failed to create a container
-    fsm ! SuccessfulCreationJob(creationId, testInvocationNamespace, fqn, 
revision)
+    // Succeed to create a container
+    containerManager.expectMsgPF() {
+      case ContainerCreation(List(ContainerCreationMessage(_, _, _, _, _, _, 
_, _, _, id)), _, _) =>
+        fsm ! SuccessfulCreationJob(id, testInvocationNamespace, fqn, revision)
+    }
 
     probe.expectMsg(Transition(fsm, Flushing, Running))
 
-    fsm ! message
-
     container.send(fsm, getActivation())
-    container.expectMsg(ActivationResponse(Right(message)))
+    container.expectMsg(ActivationResponse(Right(messages(1))))
 
     // deleting the container from containers set
     container.send(fsm, getActivation(false))
@@ -1039,7 +1017,6 @@ class MemoryQueueFlowTests
     probe.expectTerminated(fsm, 10.seconds)
   }
 
-  // this is 7. Flushing case in 
https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
   it should "be the Flushing state when a developer error happens" in {
     val mockEtcdClient = mock[EtcdClient]
     val parent = TestProbe()
@@ -1143,7 +1120,6 @@ class MemoryQueueFlowTests
     probe.expectTerminated(fsm, 10.seconds)
   }
 
-  // this is 8. GracefulShuttingDown case in 
https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
   it should "be gracefully terminated when it receives a GracefulShutDown 
message" in {
     val mockEtcdClient = mock[EtcdClient]
     val parent = TestProbe()
@@ -1250,7 +1226,6 @@ class MemoryQueueFlowTests
     probe.expectTerminated(fsm, 10.seconds)
   }
 
-  // this is 10. deprecated case in 
https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
   it should "be deprecated when a new queue supersedes it." in {
     // GracefulShuttingDown is not applicable
     val allStates = List(Running, Idle, Flushing, ActionThrottled, 
NamespaceThrottled, Removing, Removed)
@@ -1335,7 +1310,6 @@ class MemoryQueueFlowTests
     }
   }
 
-  // this is 10-2. deprecated case in 
https://yobi.navercorp.com/Lambda-dev/posts/240?referrerId=-2099518320#1612223450057
   it should "be deprecated and stops even if the queue manager could not 
respond." in {
     // GracefulShuttingDown is not applicable
     val allStates = List(Running, Idle, Flushing, ActionThrottled, 
NamespaceThrottled, Removing, Removed)
@@ -1564,6 +1538,20 @@ class MemoryQueueFlowTests
           fsm ! QueueRemovedCompleted
           probe.expectTerminated(fsm, 10.seconds)
 
+        case Flushing =>
+          // queue is stale and will be removed
+          parent.expectMsg(staleQueueRemovedMsg)
+          probe.expectMsg(Transition(fsm, state, Removed))
+
+          fsm ! QueueRemovedCompleted
+
+          Thread.sleep(gracefulShutdownTimeout.toMillis)
+
+          watcher.expectMsgAllOf(
+            UnwatchEndpoint(inProgressContainerKey, isPrefix = true, 
watcherName),
+            UnwatchEndpoint(existingContainerKey, isPrefix = true, 
watcherName),
+            UnwatchEndpoint(leaderKey, isPrefix = false, watcherName))
+
         case _ =>
           parent.expectMsg(staleQueueRemovedMsg)
           parent.expectMsg(message)
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
index 15ac5f4c3..950af0292 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
@@ -20,7 +20,6 @@ package org.apache.openwhisk.core.scheduler.queue.test
 import java.time.Instant
 import java.util.concurrent.Executor
 import java.{lang, util}
-
 import akka.actor.ActorRef
 import akka.actor.FSM.{CurrentState, StateTimeout, 
SubscribeTransitionCallBack, Transition}
 import akka.pattern.ask
@@ -39,6 +38,7 @@ import org.apache.openwhisk.core.ack.ActiveAck
 import org.apache.openwhisk.core.connector._
 import org.apache.openwhisk.core.containerpool.ContainerId
 import org.apache.openwhisk.core.database.NoDocumentException
+import org.apache.openwhisk.core.entity.ExecManifest.ImageName
 import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.core.entity.size._
 import 
org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.{existingContainers, 
inProgressContainer}
@@ -953,8 +953,12 @@ class MemoryQueueTests
     parent.expectMsg(Transition(fsm, Running, Flushing))
     (1 to expectedCount).foreach(_ => 
probe.expectMsg(ActivationResponse.developerError("nonExecutbleAction error")))
 
+    // flush msg immediately
+    fsm ! message
+    probe.expectMsg(ActivationResponse.developerError("nonExecutbleAction 
error"))
+
     parent.expectMsg(
-      queueConfig.stopGrace + 5.seconds,
+      2 * queueConfig.flushGrace + 5.seconds,
       QueueRemoved(testInvocationNamespace, fqn.toDocId.asDocInfo(action.rev), 
Some(leaderKey)))
     parent.expectMsg(Transition(fsm, Flushing, Removed))
     fsm ! QueueRemovedCompleted
@@ -971,11 +975,9 @@ class MemoryQueueTests
     val parent = TestProbe()
     val expectedCount = 3
 
-    val probe = TestProbe()
-
     expectDurationChecking(mockEsClient, testInvocationNamespace)
 
-    val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 
seconds, 10, 180000, 0.9, 10)
+    val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 
seconds, 10, 10000, 20000, 0.9, 10)
 
     val fsm =
       TestFSMRef(
@@ -1018,6 +1020,15 @@ class MemoryQueueTests
       ContainerCreationError.NoAvailableInvokersError,
       "no available invokers")
 
+    parent.expectMsg(Transition(fsm, Running, Flushing))
+    parent.expectNoMessage(5.seconds)
+
+    // Add 3 more messages.
+    (1 to expectedCount).foreach(_ => fsm ! message)
+    parent.expectNoMessage(5.seconds)
+
+    // After 10 seconds(action retention timeout), the first 3 messages are 
timed out.
+    // It does not get removed as there are still 3 messages in the queue.
     awaitAssert({
       ackedMessageCount shouldBe 3
       lastAckedActivationResult.response.result shouldBe Some(JsObject("error" 
-> JsString("no available invokers")))
@@ -1025,15 +1036,12 @@ class MemoryQueueTests
       lastAckedActivationResult.response.result shouldBe Some(JsObject("error" 
-> JsString("no available invokers")))
     }, 5.seconds)
 
-    parent.expectMsg(Transition(fsm, Running, Flushing))
-
     // should goto Running
     fsm ! SuccessfulCreationJob(testCreationId, 
message.user.namespace.name.asString, message.action, message.revision)
-    (1 to expectedCount).foreach(_ => fsm ! message)
+
     parent.expectMsg(Transition(fsm, Flushing, Running))
-    probe.expectNoMessage(2.seconds)
 
-    // should goto WaitForFlush again as existing is always 0
+    // should goto Flushing again as there is no container running.
     fsm ! FailedCreationJob(
       testCreationId,
       message.user.namespace.name.asString,
@@ -1042,23 +1050,24 @@ class MemoryQueueTests
       ContainerCreationError.ResourceNotEnoughError,
       "resource not enough")
     parent.expectMsg(Transition(fsm, Running, Flushing))
-    (1 to expectedCount).foreach(_ => fsm ! message)
 
-    // wait for event `FlushPulse`, and then all existing activations will be 
flushed
-    Thread.sleep(flushGrace.toMillis + 3.seconds.toMillis)
+    // wait for the flush grace, and then all existing activations will be 
flushed
+    Thread.sleep(queueConfig.maxBlackboxRetentionMs + 
queueConfig.flushGrace.toMillis)
 
+    // The error message is updated from the recent error message of the 
FailedCreationJob.
     awaitAssert({
-      ackedMessageCount shouldBe 9
+      ackedMessageCount shouldBe 6
       lastAckedActivationResult.response.result shouldBe Some(JsObject("error" 
-> JsString("resource not enough")))
-      storedMessageCount shouldBe 9
+      storedMessageCount shouldBe 6
       lastAckedActivationResult.response.result shouldBe Some(JsObject("error" 
-> JsString("resource not enough")))
     }, 5.seconds)
 
-    // should goto Running
-    fsm ! SuccessfulCreationJob(testCreationId, 
message.user.namespace.name.asString, message.action, message.revision)
-    (1 to expectedCount).foreach(_ => fsm ! message)
-    parent.expectMsg(Transition(fsm, Flushing, Running))
-    probe.expectNoMessage(2.seconds)
+    parent.expectMsg(queueRemovedMsg)
+
+    // should goto Removed
+    parent.expectMsg(Transition(fsm, Flushing, Removed))
+    fsm ! QueueRemovedCompleted
+
     fsm.stop()
   }
 
@@ -1149,6 +1158,109 @@ class MemoryQueueTests
     fsm.stop()
   }
 
+  it should "complete error activation after blackbox timeout when the action 
is a blackbox action and received FailedCreationJob with a whisk 
error(recoverable)" in {
+    val mockEtcdClient = mock[EtcdClient]
+    val testProbe = TestProbe()
+    val decisionMaker = TestProbe()
+    decisionMaker.ignoreMsg { case _: QueueSnapshot => true }
+    val parent = TestProbe()
+    val expectedCount = 3
+
+    val probe = TestProbe()
+    val newAck = new ActiveAck {
+      override def apply(tid: TransactionId,
+                         activationResult: WhiskActivation,
+                         blockingInvoke: Boolean,
+                         controllerInstance: ControllerInstanceId,
+                         userId: UUID,
+                         acknowledegment: AcknowledegmentMessage): Future[Any] 
= {
+        probe.ref ! activationResult.response
+        Future.successful({})
+      }
+    }
+
+    val execMetadata = BlackBoxExecMetaData(ImageName("test"), None, native = 
false)
+
+    val blackboxActionMetadata =
+      WhiskActionMetaData(
+        action.namespace,
+        action.name,
+        execMetadata,
+        action.parameters,
+        action.limits,
+        action.version,
+        action.publish,
+        action.annotations)
+        .revision[WhiskActionMetaData](action.rev)
+
+    expectDurationChecking(mockEsClient, testInvocationNamespace)
+
+    val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 
seconds, 10, 10000, 20000, 0.9, 10)
+
+    val fsm =
+      TestFSMRef(
+        new MemoryQueue(
+          mockEtcdClient,
+          durationChecker,
+          fqn,
+          mockMessaging(),
+          schedulingConfig,
+          testInvocationNamespace,
+          revision,
+          endpoints,
+          blackboxActionMetadata,
+          testProbe.ref,
+          testProbe.ref,
+          testProbe.ref,
+          decisionMaker.ref,
+          schedulerId,
+          newAck,
+          store,
+          (s: String) => { Future.successful(10000) }, // avoid exceed user 
limit
+          checkToDropStaleActivation,
+          queueConfig),
+        parent.ref,
+        "MemoryQueue")
+
+    fsm ! SubscribeTransitionCallBack(parent.ref)
+    parent.expectMsg(CurrentState(fsm, Uninitialized))
+    parent watch fsm
+
+    fsm ! Start
+
+    parent.expectMsg(Transition(fsm, Uninitialized, Running))
+
+    (1 to expectedCount).foreach(_ => fsm ! message)
+    fsm ! FailedCreationJob(
+      testCreationId,
+      message.user.namespace.name.asString,
+      message.action,
+      message.revision,
+      ContainerCreationError.NoAvailableInvokersError,
+      "no available invokers")
+
+    parent.expectMsg(Transition(fsm, Running, Flushing))
+    probe.expectNoMessage()
+
+    // should wait for sometime before flush message
+    fsm ! message
+
+    // wait for the flush grace, and then some existing activations will be 
flushed
+    Thread.sleep(queueConfig.maxBlackboxRetentionMs + 
queueConfig.flushGrace.toMillis)
+    (1 to expectedCount).foreach(_ => 
probe.expectMsg(ActivationResponse.whiskError("no available invokers")))
+
+    val duration = FiniteDuration(queueConfig.maxBlackboxRetentionMs, 
MILLISECONDS) + queueConfig.flushGrace
+    probe.expectMsg(duration, ActivationResponse.whiskError("no available 
invokers"))
+    parent.expectMsg(
+      duration,
+      QueueRemoved(testInvocationNamespace, fqn.toDocId.asDocInfo(action.rev), 
Some(leaderKey)))
+    parent.expectMsg(Transition(fsm, Flushing, Removed))
+    fsm ! QueueRemovedCompleted
+    parent.expectTerminated(fsm)
+
+    fsm.stop()
+  }
+
   it should "stop scheduling if the namespace does not exist" in {
     val mockEtcdClient = mock[EtcdClient]
     val getZeroLimit = (_: String) => { 
Future.failed(NoDocumentException("namespace does not exist")) }
@@ -1320,7 +1432,7 @@ class MemoryQueueTests
     // it always induces the throttling
     val getZeroLimit = (_: String) => { Future.successful(2) }
 
-    val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 
seconds, 1, 5000, 0.9, 10)
+    val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 
seconds, 1, 5000, 10000, 0.9, 10)
 
     expectDurationChecking(mockEsClient, testInvocationNamespace)
 
@@ -1366,7 +1478,7 @@ class MemoryQueueTests
     val probe = TestProbe()
     val parent = TestProbe()
 
-    val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 
seconds, 10, 5000, 0.9, 10)
+    val queueConfig = QueueConfig(5 seconds, 10 seconds, 10 seconds, 5 
seconds, 10, 5000, 10000, 0.9, 10)
     val msgRetentionSize = queueConfig.maxRetentionSize
 
     val tid = TransactionId(TransactionId.generateTid())
@@ -1717,7 +1829,11 @@ class MemoryQueueTests
 
     Thread.sleep(5000)
 
-    queue = MemoryQueue.dropOld(queue, java.time.Duration.ofMillis(1000), 
completeErrorActivation)
+    queue = MemoryQueue.dropOld(
+      queue,
+      java.time.Duration.ofMillis(1000),
+      "activation processing is not initiated for 1000 ms",
+      completeErrorActivation)
 
     queue.size shouldBe 3
   }
@@ -1726,7 +1842,11 @@ class MemoryQueueTests
     var queue = Queue.empty[TimeSeriesActivationEntry]
 
     noException should be thrownBy {
-      queue = MemoryQueue.dropOld(queue, java.time.Duration.ofMillis(1000), 
completeErrorActivation)
+      queue = MemoryQueue.dropOld(
+        queue,
+        java.time.Duration.ofMillis(1000),
+        "activation processing is not initiated for 1000 ms",
+        completeErrorActivation)
     }
   }
 
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
index 38f13ceee..c7a3ff095 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTestsFixture.scala
@@ -154,10 +154,12 @@ class MemoryQueueTestsFixture
   val actionThrottlingKey = ThrottlingKeys.action(testInvocationNamespace, 
fqn.copy(version = None))
 
   // queue variables
-  val queueConfig = QueueConfig(5 seconds, 10 seconds, 5 seconds, 5 seconds, 
10, 10000, 0.9, 10)
+  val queueConfig = QueueConfig(5 seconds, 10 seconds, 5 seconds, 5 seconds, 
10, 10000, 20000, 0.9, 10)
   val idleGrace = queueConfig.idleGrace
   val stopGrace = queueConfig.stopGrace
   val flushGrace = queueConfig.flushGrace
+  val retentionTimeout = queueConfig.maxRetentionMs
+  val blackboxTimeout = queueConfig.maxBlackboxRetentionMs
   val gracefulShutdownTimeout = queueConfig.gracefulShutdownTimeout
   val testRetentionSize = queueConfig.maxRetentionSize
   val testThrottlingFraction = queueConfig.throttlingFraction

Reply via email to