bdoyle0182 commented on a change in pull request #5110:
URL: https://github.com/apache/openwhisk/pull/5110#discussion_r657493878



##########
File path: 
core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
##########
@@ -0,0 +1,1097 @@
+package org.apache.openwhisk.core.scheduler.queue
+
+import java.time.{Duration, Instant}
+
+import akka.actor.Status.{Failure => FailureMessage}
+import akka.actor.{ActorRef, ActorSystem, Cancellable, FSM, Props, Stash}
+import akka.util.Timeout
+import org.apache.openwhisk.common._
+import org.apache.openwhisk.core.ack.ActiveAck
+import 
org.apache.openwhisk.core.connector.ContainerCreationError.{TooManyConcurrentRequests,
 ZeroNamespaceLimit}
+import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.database.{NoDocumentException, UserContext}
+import org.apache.openwhisk.core.entity._
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.etcd.EtcdClient
+import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix
+import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, QueueKeys, 
ThrottlingKeys}
+import org.apache.openwhisk.core.scheduler.SchedulerEndpoints
+import org.apache.openwhisk.core.scheduler.grpc.{GetActivation, 
ActivationResponse => GetActivationResponse}
+import org.apache.openwhisk.core.scheduler.message.{
+  ContainerCreation,
+  ContainerDeletion,
+  FailedCreationJob,
+  SuccessfulCreationJob
+}
+import org.apache.openwhisk.core.service._
+import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
+import org.apache.openwhisk.http.Messages.{namespaceLimitUnderZero, 
tooManyConcurrentRequests}
+import pureconfig.generic.auto._
+import pureconfig.loadConfigOrThrow
+import spray.json._
+
+import scala.annotation.tailrec
+import scala.collection.immutable.Queue
+import scala.collection.mutable
+import scala.concurrent.duration._
+import scala.concurrent.{duration, ExecutionContextExecutor, Future, Promise}
+import scala.language.postfixOps
+import scala.util.{Failure, Success}
+
+// States
+sealed trait MemoryQueueState
+case object Uninitialized extends MemoryQueueState
+case object Running extends MemoryQueueState
+case object Idle extends MemoryQueueState
+case object Flushing extends MemoryQueueState
+case object Removing extends MemoryQueueState
+case object Removed extends MemoryQueueState
+case object ActionThrottled extends MemoryQueueState
+case object NamespaceThrottled extends MemoryQueueState
+
+// Data
+sealed abstract class MemoryQueueData()
+case class NoData() extends MemoryQueueData()
+case class NoActors() extends MemoryQueueData()
+case class RunningData(schedulerActor: ActorRef, droppingActor: ActorRef) 
extends MemoryQueueData()
+case class ThrottledData(schedulerActor: ActorRef, droppingActor: ActorRef) 
extends MemoryQueueData()
+case class FlushingData(schedulerActor: ActorRef,
+                        droppingActor: ActorRef,
+                        error: ContainerCreationError,
+                        reason: String,
+                        activeDuringFlush: Boolean = false)
+    extends MemoryQueueData()
+case class RemovingData(schedulerActor: ActorRef, droppingActor: ActorRef, 
outdated: Boolean) extends MemoryQueueData()
+
+// Events sent by the actor
+case class QueueRemoved(invocationNamespace: String, action: DocInfo, 
leaderKey: Option[String])
+case class QueueReactivated(invocationNamespace: String, action: 
FullyQualifiedEntityName, docInfo: DocInfo)
+case class CancelPoll(promise: Promise[Either[MemoryQueueError, 
ActivationMessage]])
+case object QueueRemovedCompleted
+case object FlushPulse
+
+// Events received by the actor
+case object Start
+case object VersionUpdated
+case object StopSchedulingAsOutdated
+
+sealed trait RequiredAction
+case object Skip extends RequiredAction
+case object AddInitialContainer extends RequiredAction
+case object AddContainer extends RequiredAction
+case class EnableNamespaceThrottling(dropMsg: Boolean) extends RequiredAction
+case object DisableNamespaceThrottling extends RequiredAction
+case object EnableActionThrottling extends RequiredAction
+case object DisableActionThrottling extends RequiredAction
+case object Pausing extends RequiredAction
+case class DecisionResults(required: RequiredAction, num: Int)
+
+case class TimeSeriesActivationEntry(timestamp: Instant, msg: 
ActivationMessage)
+
+class MemoryQueue(private val etcdClient: EtcdClient,
+                  private val durationChecker: DurationChecker,
+                  private val action: FullyQualifiedEntityName,
+                  messagingProducer: MessageProducer,
+                  config: WhiskConfig,
+                  invocationNamespace: String,
+                  revision: DocRevision,
+                  endpoints: SchedulerEndpoints,
+                  actionMetaData: WhiskActionMetaData,
+                  dataManagementService: ActorRef,
+                  watcherService: ActorRef,
+                  containerManager: ActorRef,
+                  decisionMaker: ActorRef,
+                  schedulerId: SchedulerInstanceId,
+                  ack: ActiveAck,
+                  store: (TransactionId, WhiskActivation, UserContext) => 
Future[Any],
+                  getUserLimit: String => Future[Int],
+                  checkToDropStaleActivation: 
(Queue[TimeSeriesActivationEntry],
+                                               Long,
+                                               String,
+                                               FullyQualifiedEntityName,
+                                               MemoryQueueState,
+                                               ActorRef) => Unit,
+                  queueConfig: QueueConfig)(implicit logging: Logging)
+    extends FSM[MemoryQueueState, MemoryQueueData]
+    with Stash {
+
+  private implicit val ec: ExecutionContextExecutor = context.dispatcher
+  private implicit val actorSystem: ActorSystem = context.system
+  private implicit val timeout = Timeout(5.seconds)
+  private implicit val order: Ordering[BufferedRequest] = 
Ordering.by(_.containerId)
+
+  private val unversionedAction = action.copy(version = None)
+  private val checkInterval: FiniteDuration = 100 milliseconds
+  private val StaleThreshold: Double = 100.0
+  private val StaleDuration = Duration.ofMillis(StaleThreshold.toLong)
+  private val dropInterval: FiniteDuration = 10 seconds
+  private val leaderKey = QueueKeys.queue(invocationNamespace, 
unversionedAction, leader = true)
+  private val inProgressContainerPrefixKey =
+    containerPrefix(ContainerKeys.inProgressPrefix, invocationNamespace, 
action, Some(revision))
+  private val existingContainerPrefixKey =
+    containerPrefix(ContainerKeys.namespacePrefix, invocationNamespace, 
action, Some(revision))
+  private val namespaceThrottlingKey = 
ThrottlingKeys.namespace(EntityName(invocationNamespace))
+  private val actionThrottlingKey = ThrottlingKeys.action(invocationNamespace, 
unversionedAction)
+  private val pollTimeOut = 1.seconds
+  private var requestBuffer = mutable.PriorityQueue.empty[BufferedRequest]
+  private val memory = actionMetaData.limits.memory.megabytes.MB
+  private val queueRemovedMsg = QueueRemoved(invocationNamespace, 
action.toDocId.asDocInfo(revision), Some(leaderKey))
+  private val staleQueueRemovedMsg = QueueRemoved(invocationNamespace, 
action.toDocId.asDocInfo(revision), None)
+
+  private[queue] var containers = Set.empty[String]
+  private[queue] var creationIds = Set.empty[String]
+
+  private[queue] var queue = Queue.empty[TimeSeriesActivationEntry]
+  private[queue] var in: Int = 0
+  private[queue] val namespaceContainerCount = 
NamespaceContainerCount(invocationNamespace, etcdClient, watcherService)
+  private[queue] var averageDuration: Option[Double] = None
+  private[queue] var averageDurationBuffer = 
AverageRingBuffer(queueConfig.durationBufferSize)
+  private[queue] var limit: Option[Int] = None
+  private[queue] var initialized = false
+
+  private val logScheduler: Cancellable = 
context.system.scheduler.scheduleWithFixedDelay(0.seconds, 1.seconds) { () =>
+    MetricEmitter.emitGaugeMetric(
+      
LoggingMarkers.SCHEDULER_QUEUE_WAITING_ACTIVATION(s"$invocationNamespace/$action"),
+      queue.size)
+
+    MetricEmitter.emitGaugeMetric(
+      LoggingMarkers.SCHEDULER_NAMESPACE_CONTAINER(invocationNamespace),
+      namespaceContainerCount.existingContainerNumByNamespace)
+    MetricEmitter.emitGaugeMetric(
+      
LoggingMarkers.SCHEDULER_NAMESPACE_INPROGRESS_CONTAINER(invocationNamespace),
+      namespaceContainerCount.inProgressContainerNumByNamespace)
+
+    MetricEmitter.emitGaugeMetric(
+      LoggingMarkers.SCHEDULER_ACTION_CONTAINER(invocationNamespace, 
action.asString),
+      containers.size)
+    MetricEmitter.emitGaugeMetric(
+      
LoggingMarkers.SCHEDULER_ACTION_INPROGRESS_CONTAINER(invocationNamespace, 
action.asString),
+      creationIds.size)
+  }
+
+  getAverageDuration()
+
+  private val watcherName = s"memory-queue-$action-$revision"
+  // watch existing containers for action and namespace
+  private val watchedKeys = Seq(inProgressContainerPrefixKey, 
existingContainerPrefixKey)
+
+  watchedKeys.foreach { key =>
+    watcherService ! WatchEndpoint(key, "", isPrefix = true, watcherName, 
Set(PutEvent, DeleteEvent))
+  }
+
+  startWith(Uninitialized, NoData())
+
+  when(Uninitialized) {
+    case Event(Start, _) =>
+      logging.info(this, s"[$invocationNamespace:$action:$stateName] a new 
queue is created.")
+      val (schedulerActor, droppingActor) = startMonitoring()
+      initializeThrottling()
+
+      watcherService ! WatchEndpoint(leaderKey, endpoints.serialize, isPrefix 
= false, watcherName, Set(DeleteEvent))
+
+      goto(Running) using RunningData(schedulerActor, droppingActor)
+
+    // this is the case that the action version is updated, so no data needs 
to be stored
+    case Event(VersionUpdated, _) =>
+      val (schedulerActor, droppingActor) = startMonitoring()
+
+      goto(Running) using RunningData(schedulerActor, droppingActor)
+
+    // other messages should not be handled in this state.
+    case _ =>
+      stash()
+      stay
+  }
+
+  when(Running, stateTimeout = queueConfig.idleGrace) {
+    case Event(EnableNamespaceThrottling(dropMsg), data: RunningData) =>
+      logging.info(this, s"[$invocationNamespace:$action:$stateName] Enable 
namespace throttling.")
+      enableNamespaceThrottling()
+
+      // if no container could be created, it is same with Flushing state.
+      if (dropMsg) {
+        completeAllActivations(tooManyConcurrentRequests, isWhiskError = false)
+        goto(Flushing) using FlushingData(
+          data.schedulerActor,
+          data.droppingActor,
+          TooManyConcurrentRequests,
+          tooManyConcurrentRequests)
+      } else {
+        // if there are already some containers running, activations can still 
be processed so goto the NamespaceThrottled state.
+        goto(NamespaceThrottled) using ThrottledData(data.schedulerActor, 
data.droppingActor)
+      }
+
+    case Event(StateTimeout, data: RunningData) =>
+      if (queue.isEmpty && (containers.size + creationIds.size) <= 0) {
+        logging.info(
+          this,
+          s"[$invocationNamespace:$action:$stateName] No activations coming in 
${queueConfig.idleGrace}")
+        actorSystem.stop(data.schedulerActor)
+        actorSystem.stop(data.droppingActor)
+
+        goto(Idle) using NoActors()
+      } else {
+        logging.info(
+          this,
+          s"[$invocationNamespace:$action:$stateName] The queue is timed out 
but there are still ${queue.size} activation messages or (running: 
${containers.size}, in-progress: ${creationIds.size}) containers")
+        stay
+      }
+
+    case Event(FailedCreationJob(creationId, _, _, _, error, message), 
RunningData(schedulerActor, droppingActor)) =>
+      creationIds -= creationId.asString
+      // when there is no container, it moves to the Flushing state as no 
activations can be invoked
+      if (containers.size <= 0) {
+        val isWhiskError = ContainerCreationError.whiskErrors.contains(error)
+        completeAllActivations(message, isWhiskError)
+        logging.error(
+          this,
+          s"[$invocationNamespace:$action:$stateName] Failed to create an 
initial container due to ${if (isWhiskError) "whiskError"
+          else "developerError"}, reason: $message.")
+
+        goto(Flushing) using FlushingData(schedulerActor, droppingActor, 
error, message)
+      } else
+        // if there are already some containers running, activations can be 
handled anyway.
+        stay
+  }
+
+  // there is no timeout for this state as when there is no further message, 
it would move to the Running state again.
+  when(NamespaceThrottled) {
+    case Event(msg: ActivationMessage, _: ThrottledData) =>
+      handleActivationMessage(msg)
+      stay
+
+    case Event(DisableNamespaceThrottling, data: ThrottledData) =>
+      logging.info(this, s"[$invocationNamespace:$action:$stateName] Disable 
namespace throttling.")
+      disableNamespaceThrottling()
+      goto(Running) using RunningData(data.schedulerActor, data.schedulerActor)
+  }
+
+  // there is no timeout for this state as when there is no further message, 
it would move to the Running state again.
+  when(ActionThrottled) {
+    // since there are already too many activation messages, it drops the new 
messages
+    case Event(msg: ActivationMessage, ThrottledData(_, _)) =>
+      completeErrorActivation(msg, tooManyConcurrentRequests, isWhiskError = 
false)
+      stay
+  }
+
+  when(Idle, stateTimeout = queueConfig.stopGrace) {
+    case Event(msg: ActivationMessage, _: NoActors) =>
+      val (schedulerActor, droppingActor) = startMonitoring()
+      handleActivationMessage(msg)
+      goto(Running) using RunningData(schedulerActor, droppingActor)
+
+    case Event(request: GetActivation, _) if request.action == action =>
+      sender ! GetActivationResponse(Left(NoActivationMessage()))
+      stay
+
+    case Event(StateTimeout, _: NoActors) =>
+      logging.info(this, s"[$invocationNamespace:$action:$stateName] The queue 
is timed out, stop the queue.")
+      cleanUpDataAndGotoRemoved()
+
+    case Event(GracefulShutdown, _: NoActors) =>
+      logging.info(this, s"[$invocationNamespace:$action:$stateName] Received 
GracefulShutdown, stop the queue.")
+      cleanUpDataAndGotoRemoved()
+
+    case Event(StopSchedulingAsOutdated, _: NoActors) =>
+      logging.info(this, s"[$invocationNamespace:$action:$stateName] stop 
further scheduling.")
+
+      cleanUpWatcher()
+
+      // let QueueManager know this queue is no longer in charge.
+      context.parent ! staleQueueRemovedMsg
+
+      // since the queue is outdated and there is no activation, delete all 
old containers.
+      containerManager ! ContainerDeletion(invocationNamespace, action, 
revision, actionMetaData)
+
+      goto(Removed) using NoData()
+  }
+
+  when(Flushing) {
+    // an initial container is successfully created.
+    case Event(SuccessfulCreationJob(creationId, _, _, _), 
FlushingData(schedulerActor, droppingActor, _, _, _)) =>
+      creationIds -= creationId.asString
+
+      goto(Running) using RunningData(schedulerActor, droppingActor)
+
+    // log the failed information
+    case Event(FailedCreationJob(creationId, _, _, _, _, message), data: 
FlushingData) =>
+      creationIds -= creationId.asString
+      logging.info(
+        this,
+        s"[$invocationNamespace:$action:$stateName][$creationId] Failed to 
create a container due to $message")
+
+      // keep updating the reason
+      stay using data.copy(reason = message)
+
+    // since there is no container, activations cannot be handled.
+    case Event(msg: ActivationMessage, data: FlushingData) =>
+      completeErrorActivation(msg, data.reason, 
ContainerCreationError.whiskErrors.contains(data.error))
+      stay() using data.copy(activeDuringFlush = true)
+
+    // Since SchedulingDecisionMaker keep sending a message to create a 
container, this state is not automatically timed out.
+    // Instead, StateTimeout message will be sent by a timer.
+    case Event(StateTimeout, data: FlushingData) =>
+      completeAllActivations(data.reason, 
ContainerCreationError.whiskErrors.contains(data.error))
+      if (data.activeDuringFlush)
+        stay using data.copy(activeDuringFlush = false)
+      else
+        cleanUpActorsAndGotoRemoved(data)
+
+    case Event(GracefulShutdown, data: FlushingData) =>
+      completeAllActivations(data.reason, 
ContainerCreationError.whiskErrors.contains(data.error))
+      logging.info(this, s"[$invocationNamespace:$action:$stateName] Received 
GracefulShutdown, stop the queue.")
+      cleanUpActorsAndGotoRemoved(data)
+  }
+
+  // in case there is any activation in the queue, it waits until all of them 
are handled.
+  when(Removing, stateTimeout = queueConfig.gracefulShutdownTimeout) {
+    // When there is no message in the queue, SchedulingDecisionMaker would 
stop sending any message
+    // So the queue can be timed out on every gracefulShutdownTimeout
+    case Event(QueueRemovedCompleted | StateTimeout, data: RemovingData) =>
+      cleanUpActorsAndGotoRemovedIfPossible(data)
+
+    case Event(GracefulShutdown, data: RemovingData) =>
+      logging.info(
+        this,
+        s"[$invocationNamespace:$action:$stateName] The queue received 
GracefulShutdown trying to stop the queue.")
+      cleanUpActorsAndGotoRemovedIfPossible(data)
+
+    case Event(StopSchedulingAsOutdated, data: RemovingData) =>
+      logging.info(
+        this,
+        s"[$invocationNamespace:$action:$stateName] The queue received 
StopSchedulingAsOutdated trying to stop the queue.")
+      cleanUpActorsAndGotoRemovedIfPossible(data.copy(outdated = true))
+  }
+
+  when(Removed, stateTimeout = queueConfig.gracefulShutdownTimeout) {
+    // since this Queue will be terminated, rescheduling the msg
+    case Event(msg: ActivationMessage, _: NoData) =>
+      context.parent ! msg
+      stay()
+
+    // this queue is going to stop so let client connect to a new queue
+    case Event(request: GetActivation, _: NoData) if request.action == action 
=>
+      implicit val tid = request.transactionId
+      logging.info(
+        this,
+        s"[$invocationNamespace:$action:$stateName] Get activation request 
${request.containerId}, let client connect to a new queue.")
+      forwardAllActivations(context.parent)
+      sender ! GetActivationResponse(Left(NoMemoryQueue()))
+
+      stay
+
+    // actors and data are already wiped
+    case Event(QueueRemovedCompleted, _: NoData) =>
+      stop()
+
+    // This is not supposed to happen. This will ensure the queue does not run 
forever.
+    // This can happen when QueueManager could not respond with 
QueueRemovedCompleted for some reason.
+    case Event(StateTimeout, _: NoData) =>
+      context.parent ! queueRemovedMsg
+
+      stop()
+
+    // This queue is going to stop, do nothing
+    case Event(msg @ (StopSchedulingAsOutdated | GracefulShutdown), _: NoData) 
=>
+      logging.info(
+        this,
+        s"[$invocationNamespace:$action:$stateName] The queue received $msg 
but do nothing as it is going to stop.")
+      stay
+  }
+
+  whenUnhandled {
+    // The queue endpoint is removed, trying to restore it.
+    case Event(WatchEndpointRemoved(_, `leaderKey`, value, false), data) =>
+      data match {
+        case RemovingData(_, _, _) =>
+          logging.info(
+            this,
+            s"[$invocationNamespace:$action:$stateName] This queue is shutdown 
by `/disable` api, do nothing here.")
+        case _ =>
+          dataManagementService ! RegisterInitialData(leaderKey, value, 
failoverEnabled = false, Some(self)) // the watcher is already setup
+      }
+      stay
+
+    // we don't care the storage results for namespaceThrottlingKey
+    case Event(InitialDataStorageResults(`namespaceThrottlingKey`, _), _) =>
+      stay
+
+    // The queue endpoint is restored
+    case Event(InitialDataStorageResults(`leaderKey`, Right(_)), _) =>
+      stay
+
+    // this can be a case that there is another queue already running.
+    // it can happen if a node is segregated by the temporal network rupture 
and the queue endpoint is removed.
+    case Event(InitialDataStorageResults(`leaderKey`, Left(_)), data) =>
+      logging.warn(this, s"[$invocationNamespace:$action:$stateName] the queue 
is superseded by a new queue.")
+      // let QueueManager know this queue is no longer in charge.
+      context.parent ! queueRemovedMsg
+
+      // forward all activations to the parent queue manager.
+      // parent queue manager is supposed to removed the reference of this 
queue and forward messages to a new queue
+      forwardAllActivations(context.parent)
+
+      // only clean up actors because etcd data is already being used by 
another queue
+      cleanUpActors(data)
+
+      goto(Removed) using NoData()
+
+    case Event(WatchEndpointRemoved(watchKey, key, _, true), _) =>
+      watchKey match {
+        case `inProgressContainerPrefixKey` =>
+          creationIds -= key.split("/").last
+        case `existingContainerPrefixKey` =>
+          containers -= key.split("/").last
+        case _ =>
+      }
+      stay
+
+    case Event(WatchEndpointInserted(watchKey, key, _, true), _) =>
+      watchKey match {
+        case `inProgressContainerPrefixKey` =>
+          creationIds += key.split("/").last
+        case `existingContainerPrefixKey` =>
+          containers += key.split("/").last
+        case _ =>
+      }
+      stay
+
+    // common case for Running, NamespaceThrottled, ActionThrottled
+    case Event(SuccessfulCreationJob(creationId, _, _, _), _) =>
+      creationIds -= creationId.asString
+      stay()
+
+    // for other cases
+    case Event(FailedCreationJob(creationId, invocationNamespace, action, 
revision, _, message), _) =>
+      creationIds -= creationId.asString
+      logging.info(
+        this,
+        s"[$invocationNamespace:$action:$stateName][$creationId] Got failed 
creation job with revision $revision and error $message.")
+      stay()
+
+    // common case for Running, NamespaceThrottled, ActionThrottled, Removing
+    case Event(cancel: CancelPoll, _) =>
+      cancel.promise.trySuccess(Left(NoActivationMessage()))
+
+      stay
+
+    // common case for Running, NamespaceThrottled, ActionThrottled, Removing
+    case Event(msg: ActivationMessage, _) =>
+      handleActivationMessage(msg)
+
+    // common case for Running, NamespaceThrottled, ActionThrottled, Removing
+    case Event(request: GetActivation, _) if request.action == action =>
+      implicit val tid = request.transactionId
+      if (request.alive) {
+        containers += request.containerId
+        handleActivationRequest(request)
+      } else {
+        logging.info(this, s"Remove containerId because ${request.containerId} 
is not alive")
+        sender ! GetActivationResponse(Left(NoActivationMessage()))
+        containers -= request.containerId
+        stay
+      }
+
+    // common case for Running, NamespaceThrottled, ActionThrottled, Removing
+    case Event(request: GetActivation, _) if request.action != action =>
+      implicit val tid = request.transactionId
+      logging.warn(this, s"[$invocationNamespace:$action:$stateName] version 
mismatch ${request.action}")
+      sender ! GetActivationResponse(Left(ActionMismatch()))
+
+      stay
+
+    case Event(DropOld, _) =>
+      if (queue.nonEmpty && Duration
+            .between(queue.head.timestamp, Instant.now)
+            .compareTo(Duration.ofMillis(queueConfig.maxRetentionMs)) < 0) {
+        logging.error(
+          this,
+          s"[$invocationNamespace:$action:$stateName] Drop some stale 
activations for $revision, existing container is ${containers.size}, inProgress 
container is ${creationIds.size}, state data: $stateData, in is $in, current: 
${queue.size}.")
+        logging.error(
+          this,
+          s"[$invocationNamespace:$action:$stateName] the head stale message: 
${queue.head.msg.activationId}")
+      }
+      queue = MemoryQueue.dropOld(queue, 
Duration.ofMillis(queueConfig.maxRetentionMs), completeErrorActivation)
+
+      stay
+
+    // common case for all statuses
+    case Event(StatusQuery, _) =>
+      sender ! StatusData(invocationNamespace, action.asString, queue.size, 
stateName.toString, stateData.toString)
+      stay
+
+    // Common case for all cases
+    case Event(GracefulShutdown, data) =>
+      logging.info(this, s"[$invocationNamespace:$action:$stateName] 
Gracefully shutdown the memory queue.")
+      // delete relative data, e.g leaderKey, namespaceThrottlingKey, 
actionThrottlingKey
+      cleanUpData()
+
+      // let queue manager knows this queue is going to stop and let it 
forward incoming activations to a new queue
+      context.parent ! queueRemovedMsg
+
+      goto(Removing) using getRemovingData(data, outdated = false)
+
+    // the version is updated. it's a shared case for all states
+    case Event(StopSchedulingAsOutdated, data) =>
+      logging.info(this, s"[$invocationNamespace:$action:$stateName] stop 
further scheduling.")
+      // let QueueManager know this queue is no longer in charge.
+      context.parent ! staleQueueRemovedMsg
+
+      goto(Removing) using getRemovingData(data, outdated = true)
+
+    case Event(t: FailureMessage, _) =>
+      logging.error(this, s"[$invocationNamespace:$action:$stateName] got an 
unexpected failure message: $t")
+
+      stay
+
+    case Event(msg: DecisionResults, _) =>
+      val DecisionResults(result, num) = msg
+      result match {
+        case AddInitialContainer if num > 0 =>

Review comment:
       Should this be `if num = 0`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to