This is an automated email from the ASF dual-hosted git repository.

style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fd21565a Go to the NamespaceThrottled state rather than Flushing 
state. (#5303)
8fd21565a is described below

commit 8fd21565a293eb548e6aceddb925cec49bbb6b03
Author: Dominic Kim <[email protected]>
AuthorDate: Wed Aug 3 17:39:32 2022 +0900

    Go to the NamespaceThrottled state rather than Flushing state. (#5303)
    
    * Currently MemoryQueue will go to Flushing state when receive a 
EnableNamespaceThrottling(dropMsg=true) message, but the Flushing state doesn't 
have a case to disable namespace throttling at all.
    
    * Remove unused import.
---
 .../core/scheduler/queue/MemoryQueue.scala          | 21 ++++++---------------
 .../scheduler/queue/test/MemoryQueueFlowTests.scala | 12 ++++++++----
 .../scheduler/queue/test/MemoryQueueTests.scala     |  2 +-
 3 files changed, 15 insertions(+), 20 deletions(-)

diff --git 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
index fabc785a4..312e9ecff 100644
--- 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
+++ 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
@@ -17,15 +17,13 @@
 
 package org.apache.openwhisk.core.scheduler.queue
 
-import java.time.{Duration, Instant}
-import java.util.concurrent.atomic.AtomicInteger
 import akka.actor.Status.{Failure => FailureMessage}
 import akka.actor.{ActorRef, ActorSystem, Cancellable, FSM, Props, Stash}
 import akka.util.Timeout
 import org.apache.openwhisk.common._
 import org.apache.openwhisk.core.ConfigKeys
 import org.apache.openwhisk.core.ack.ActiveAck
-import 
org.apache.openwhisk.core.connector.ContainerCreationError.{TooManyConcurrentRequests,
 ZeroNamespaceLimit}
+import 
org.apache.openwhisk.core.connector.ContainerCreationError.ZeroNamespaceLimit
 import org.apache.openwhisk.core.connector._
 import org.apache.openwhisk.core.containerpool.Interval
 import org.apache.openwhisk.core.database.{NoDocumentException, UserContext}
@@ -44,10 +42,12 @@ import org.apache.openwhisk.core.scheduler.message.{
 import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, 
SchedulingConfig}
 import org.apache.openwhisk.core.service._
 import org.apache.openwhisk.http.Messages.{namespaceLimitUnderZero, 
tooManyConcurrentRequests}
-import pureconfig.generic.auto._
 import pureconfig.loadConfigOrThrow
 import spray.json._
+import pureconfig.generic.auto._
 
+import java.time.{Duration, Instant}
+import java.util.concurrent.atomic.AtomicInteger
 import scala.annotation.tailrec
 import scala.collection.immutable.Queue
 import scala.collection.mutable
@@ -224,18 +224,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,
       logging.info(this, s"[$invocationNamespace:$action:$stateName] Enable 
namespace throttling.")
       enableNamespaceThrottling()
 
-      // if no container could be created, it is same with Flushing state.
-      if (dropMsg) {
+      if (dropMsg)
         completeAllActivations(tooManyConcurrentRequests, isWhiskError = false)
-        goto(Flushing) using FlushingData(
-          data.schedulerActor,
-          data.droppingActor,
-          TooManyConcurrentRequests,
-          tooManyConcurrentRequests)
-      } else {
-        // if there are already some containers running, activations can still 
be processed so goto the NamespaceThrottled state.
-        goto(NamespaceThrottled) using ThrottledData(data.schedulerActor, 
data.droppingActor)
-      }
+      goto(NamespaceThrottled) using ThrottledData(data.schedulerActor, 
data.droppingActor)
 
     case Event(StateTimeout, data: RunningData) =>
       if (queue.isEmpty && (containers.size + creationIds.size) <= 0) {
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
index 267d3a81d..52568327f 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala
@@ -280,7 +280,7 @@ class MemoryQueueFlowTests
     probe.expectTerminated(fsm, 10.seconds)
   }
 
-  it should "go to the Flushing state dropping messages when it can't create 
an initial container" in {
+  it should "go to the NamespaceThrottled state dropping messages when it 
can't create an initial container" in {
     val mockEtcdClient = mock[EtcdClient]
     val parent = TestProbe()
     val watcher = TestProbe()
@@ -340,7 +340,7 @@ class MemoryQueueFlowTests
     fsm ! message
 
     dataMgmtService.expectMsg(RegisterData(namespaceThrottlingKey, 
true.toString, failoverEnabled = false))
-    probe.expectMsg(Transition(fsm, Running, Flushing))
+    probe.expectMsg(Transition(fsm, Running, NamespaceThrottled))
 
     awaitAssert({
       ackedMessageCount shouldBe 1
@@ -352,13 +352,17 @@ class MemoryQueueFlowTests
       fsm.underlyingActor.queue.size shouldBe 0
     }, 5.seconds)
 
-    parent.expectMsg(flushGrace * 2 + 5.seconds, queueRemovedMsg)
-    probe.expectMsg(Transition(fsm, Flushing, Removed))
+    fsm ! GracefulShutdown
+
+    parent.expectMsg(queueRemovedMsg)
+    probe.expectMsg(Transition(fsm, NamespaceThrottled, Removing))
 
     fsm ! QueueRemovedCompleted
 
     expectDataCleanUp(watcher, dataMgmtService)
 
+    probe.expectMsg(Transition(fsm, Removing, Removed))
+
     probe.expectTerminated(fsm, 10.seconds)
   }
 
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
index 950af0292..fd2ac7363 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala
@@ -1357,7 +1357,7 @@ class MemoryQueueTests
     parent.expectMsg(10 seconds, Transition(fsm, Uninitialized, Running))
 
     fsm ! EnableNamespaceThrottling(dropMsg = true)
-    parent.expectMsg(10 seconds, Transition(fsm, Running, Flushing))
+    parent.expectMsg(10 seconds, Transition(fsm, Running, NamespaceThrottled))
     dataManagementService.expectMsg(RegisterData(namespaceThrottlingKey, 
true.toString, false))
 
     fsm.stop()

Reply via email to