This is an automated email from the ASF dual-hosted git repository.

ningyougang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 80de54e1f Remove containers gradually when disable invoker (#5253)
80de54e1f is described below

commit 80de54e1fce7c900d694ec4427ec8730a0432697
Author: jiangpengcheng <[email protected]>
AuthorDate: Thu Jun 2 11:34:47 2022 +0800

    Remove containers gradually when disable invoker (#5253)
---
 ansible/roles/invoker/tasks/deploy.yml             |   1 +
 .../core/containerpool/ContainerFactory.scala      |   2 +
 core/invoker/src/main/resources/application.conf   |   1 +
 .../v2/FunctionPullingContainerPool.scala          |  40 ++++---
 .../containerpool/test/ContainerPoolTests.scala    |  14 ++-
 .../containerpool/test/ContainerProxyTests.scala   |   2 +-
 .../test/FunctionPullingContainerPoolTests.scala   | 124 ++++++++++++++++++++-
 .../test/FunctionPullingContainerProxyTests.scala  |   3 +-
 8 files changed, 162 insertions(+), 25 deletions(-)

diff --git a/ansible/roles/invoker/tasks/deploy.yml 
b/ansible/roles/invoker/tasks/deploy.yml
index 674fab91f..112c721a0 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -287,6 +287,7 @@
       "CONFIG_whisk_containerPool_prewarmExpirationCheckIntervalVariance": "{{ 
container_pool_prewarm_expirationCheckIntervalVariance | default('10 seconds') 
}}"
       "CONFIG_whisk_containerPool_prewarmPromotion": "{{ container_pool_strict 
| default('false') | lower }}"
       "CONFIG_whisk_containerPool_prewarmMaxRetryLimit": "{{ 
container_pool_prewarm_max_retry_limit | default(5) }}"
+      "CONFIG_whisk_containerPool_batchDeletionSize": "{{ 
container_pool_batchDeletionSize | default(10) }}"
       "CONFIG_whisk_invoker_username": "{{ invoker.username }}"
       "CONFIG_whisk_invoker_password": "{{ invoker.password }}"
 
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
index 3b5a6c4ca..e2cba1331 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
@@ -54,12 +54,14 @@ case class ContainerPoolConfig(userMemory: ByteSize,
                                prewarmMaxRetryLimit: Int,
                                prewarmPromotion: Boolean,
                                memorySyncInterval: FiniteDuration,
+                               batchDeletionSize: Int,
                                prewarmContainerCreationConfig: 
Option[PrewarmContainerCreationConfig] = None) {
   require(
     concurrentPeekFactor > 0 && concurrentPeekFactor <= 1.0,
     s"concurrentPeekFactor must be > 0 and <= 1.0; was $concurrentPeekFactor")
 
   require(prewarmExpirationCheckInterval.toSeconds > 0, 
"prewarmExpirationCheckInterval must be > 0")
+  require(batchDeletionSize > 0, "batch deletion size must be > 0")
 
   /**
    * The shareFactor indicates the number of containers that would share a 
single core, on average.
diff --git a/core/invoker/src/main/resources/application.conf 
b/core/invoker/src/main/resources/application.conf
index 946b4717e..61a2ec631 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -67,6 +67,7 @@ whisk {
     prewarm-max-retry-limit: 5 # max subsequent retry limit to create prewarm 
containers
     prewarm-promotion: false # if true, action can take prewarm container 
which has bigger memory
     memory-sync-interval: 1 second # period to sync memory info to etcd
+    batch-deletion-size: 10 # batch size for removing containers when disable 
invoker, too big value may cause docker/k8s overload
   }
 
   kubernetes {
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala
index 94f7bd2f1..46e84b514 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala
@@ -94,6 +94,9 @@ class FunctionPullingContainerPool(
   private var prewarmedPool = immutable.Map.empty[ActorRef, PreWarmData]
   private var prewarmStartingPool = immutable.Map.empty[ActorRef, (String, 
ByteSize)]
 
+  // for shutting down
+  private var disablingPool = immutable.Set.empty[ActorRef]
+
   private var shuttingDown = false
 
   private val creationMessages = TrieMap[ActorRef, ContainerCreationMessage]()
@@ -353,18 +356,12 @@ class FunctionPullingContainerPool(
 
     // Container got removed
     case ContainerRemoved(replacePrewarm) =>
-      inProgressPool.get(sender()).foreach { _ =>
-        inProgressPool = inProgressPool - sender()
-      }
-
-      warmedPool.get(sender()).foreach { _ =>
-        warmedPool = warmedPool - sender()
-      }
+      inProgressPool = inProgressPool - sender()
+      warmedPool = warmedPool - sender()
+      disablingPool -= sender()
 
       // container was busy (busy indicates at full capacity), so there is 
capacity to accept another job request
-      busyPool.get(sender()).foreach { _ =>
-        busyPool = busyPool - sender()
-      }
+      busyPool = busyPool - sender()
 
       //in case this was a prewarm
       prewarmedPool.get(sender()).foreach { data =>
@@ -601,11 +598,26 @@ class FunctionPullingContainerPool(
    * Make all busyPool's memoryQueue actor shutdown gracefully
    */
   private def waitForPoolToClear(): Unit = {
-    busyPool.keys.foreach(_ ! GracefulShutdown)
-    warmedPool.keys.foreach(_ ! GracefulShutdown)
-    if (inProgressPool.nonEmpty) {
+    val pool = self
+    // how many busy containers will be removed in this term
+    val slotsForBusyPool = math.max(poolConfig.batchDeletionSize - 
disablingPool.size, 0)
+    (busyPool.keySet &~ disablingPool)
+      .take(slotsForBusyPool)
+      .foreach(container => {
+        disablingPool += container
+        container ! GracefulShutdown
+      })
+    // how many warm containers will be removed in this term
+    val slotsForWarmPool = math.max(poolConfig.batchDeletionSize - 
disablingPool.size, 0)
+    (warmedPool.keySet &~ disablingPool)
+      .take(slotsForWarmPool)
+      .foreach(container => {
+        disablingPool += container
+        container ! GracefulShutdown
+      })
+    if (inProgressPool.nonEmpty || busyPool.size + warmedPool.size > 
slotsForBusyPool + slotsForWarmPool) {
       context.system.scheduler.scheduleOnce(5.seconds) {
-        waitForPoolToClear()
+        pool ! GracefulShutdown
       }
     }
   }
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
index eff4b926d..eccf04cd7 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
@@ -134,7 +134,7 @@ class ContainerPoolTests
   }
 
   def poolConfig(userMemory: ByteSize) =
-    ContainerPoolConfig(userMemory, 0.5, false, 2.second, 1.minute, None, 100, 
3, false, 1.second)
+    ContainerPoolConfig(userMemory, 0.5, false, 2.second, 1.minute, None, 100, 
3, false, 1.second, 10)
 
   behavior of "ContainerPool"
 
@@ -818,7 +818,8 @@ class ContainerPoolTests
         100,
         3,
         false,
-        1.second)
+        1.second,
+        10)
     val initialCount = 2
     val pool =
       system.actorOf(
@@ -864,7 +865,8 @@ class ContainerPoolTests
         100,
         3,
         false,
-        1.second)
+        1.second,
+        10)
     val minCount = 0
     val initialCount = 2
     val maxCount = 4
@@ -1237,7 +1239,7 @@ class ContainerPoolObjectTests extends FlatSpec with 
Matchers with MockFactory {
   }
 
   it should "remove expired in order of expiration" in {
-    val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 
10.seconds, None, 1, 3, false, 1.second)
+    val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 
10.seconds, None, 1, 3, false, 1.second, 10)
     val exec = CodeExecAsString(RuntimeManifest("actionKind", 
ImageName("testImage")), "testCode", None)
     //use a second kind so that we know sorting is not isolated to the expired 
of each kind
     val exec2 = CodeExecAsString(RuntimeManifest("actionKind2", 
ImageName("testImage")), "testCode", None)
@@ -1261,7 +1263,7 @@ class ContainerPoolObjectTests extends FlatSpec with 
Matchers with MockFactory {
 
   it should "remove only the prewarmExpirationLimit of expired prewarms" in {
     //limit prewarm removal to 2
-    val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 
10.seconds, None, 2, 3, false, 1.second)
+    val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 
10.seconds, None, 2, 3, false, 1.second, 10)
     val exec = CodeExecAsString(RuntimeManifest("actionKind", 
ImageName("testImage")), "testCode", None)
     val memoryLimit = 256.MB
     val prewarmConfig =
@@ -1287,7 +1289,7 @@ class ContainerPoolObjectTests extends FlatSpec with 
Matchers with MockFactory {
 
   it should "remove only the expired prewarms regardless of minCount" in {
     //limit prewarm removal to 100
-    val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 
10.seconds, None, 100, 3, false, 1.second)
+    val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 
10.seconds, None, 100, 3, false, 1.second, 10)
     val exec = CodeExecAsString(RuntimeManifest("actionKind", 
ImageName("testImage")), "testCode", None)
     val memoryLimit = 256.MB
     //minCount is 2 - should leave at least 2 prewarms when removing expired
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
index 87aeaf19e..193f9f66a 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
@@ -276,7 +276,7 @@ class ContainerProxyTests
     (transid: TransactionId, activation: WhiskActivation, 
isBlockingActivation: Boolean, context: UserContext) =>
       Future.successful(())
   }
-  val poolConfig = ContainerPoolConfig(2.MB, 0.5, false, 2.second, 1.minute, 
None, 100, 3, false, 1.second)
+  val poolConfig = ContainerPoolConfig(2.MB, 0.5, false, 2.second, 1.minute, 
None, 100, 3, false, 1.second, 10)
   def healthchecksConfig(enabled: Boolean = false) = 
ContainerProxyHealthCheckConfig(enabled, 100.milliseconds, 2)
   val filterEnvVar = (k: String) => Character.isUpperCase(k.charAt(0))
 
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
index 99fc09f92..ca6c26061 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
@@ -180,6 +180,7 @@ class FunctionPullingContainerPoolTests
                  memorySyncInterval: FiniteDuration = FiniteDuration(1, 
TimeUnit.SECONDS),
                  prewarmMaxRetryLimit: Int = 3,
                  prewarmPromotion: Boolean = false,
+                 batchDeletionSize: Int = 10,
                  prewarmContainerCreationConfig: 
Option[PrewarmContainerCreationConfig] = None) =
     ContainerPoolConfig(
       userMemory,
@@ -192,6 +193,7 @@ class FunctionPullingContainerPoolTests
       prewarmMaxRetryLimit,
       prewarmPromotion,
       memorySyncInterval,
+      batchDeletionSize,
       prewarmContainerCreationConfig)
 
   def sendAckToScheduler(producer: MessageProducer)(schedulerInstanceId: 
SchedulerInstanceId,
@@ -309,6 +311,118 @@ class FunctionPullingContainerPoolTests
     }
   }
 
+  it should "stop containers gradually when shut down" in within(timeout * 20) 
{
+    val (containers, factory) = testContainers(10)
+    val doc = put(entityStore, bigWhiskAction)
+    val topic = s"creationAck${schedulerInstanceId.asString}"
+    val consumer = new TestConnector(topic, 4, true)
+    val pool = system.actorOf(
+      Props(new FunctionPullingContainerPool(
+        factory,
+        invokerHealthService.ref,
+        poolConfig(MemoryLimit.STD_MEMORY * 20, batchDeletionSize = 3),
+        invokerInstance,
+        List.empty,
+        sendAckToScheduler(consumer.getProducer()))))
+
+    (0 to 10).foreach(_ => pool ! 
CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)) // 11 
* stdMemory taken)
+    (0 to 10).foreach(i => {
+      containers(i).expectMsgPF() {
+        case Initialize(invocationNamespace, executeAction, schedulerHost, 
rpcPort, _) => true
+      }
+      // create 5 container in busy pool, and 6 in warmed pool
+      if (i < 5)
+        containers(i).send(pool, Initialized(initializedData)) // container is 
initialized
+      else
+        containers(i).send(
+          pool,
+          ContainerIsPaused(
+            WarmData(
+              stub[DockerContainer],
+              invocationNamespace.asString,
+              whiskAction.toExecutableWhiskAction.get,
+              doc.rev,
+              Instant.now,
+              TestProbe().ref)))
+    })
+
+    // disable
+    pool ! GracefulShutdown
+    // at first, 3 containers will be removed from busy pool, and left 
containers will not
+    var disablingContainers = Set.empty[Int]
+    (0 to 10).foreach(i => {
+      try {
+        containers(i).expectMsg(1.second, GracefulShutdown)
+        disablingContainers += i
+      } catch {
+        case _: Throwable =>
+      }
+    })
+    assert(disablingContainers.size == 3, "more than 3 containers is shutting 
down")
+    disablingContainers.foreach(i => containers(i).send(pool, 
ContainerRemoved(false)))
+
+    Thread.sleep(3000)
+    var completedContainer = -1
+    (0 to 10)
+      .filter(!disablingContainers.contains(_))
+      .foreach(i => {
+        try {
+          containers(i).expectMsg(1.second, GracefulShutdown)
+          disablingContainers += i
+          // only make one container complete shutting down
+          if (completedContainer == -1)
+            completedContainer = i
+        } catch {
+          case _: Throwable =>
+        }
+      })
+    assert(disablingContainers.size == 6, "more than 3 containers is shutting 
down")
+    containers(completedContainer).send(pool, ContainerRemoved(false))
+
+    Thread.sleep(3000)
+    (0 to 10)
+      .filter(!disablingContainers.contains(_))
+      .foreach(i => {
+        try {
+          containers(i).expectMsg(1.second, GracefulShutdown)
+          disablingContainers += i
+        } catch {
+          case _: Throwable =>
+        }
+      })
+    // there should be only one more container going to shut down
+    assert(disablingContainers.size == 7, "more than 3 containers is shutting 
down")
+    disablingContainers.foreach(i => containers(i).send(pool, 
ContainerRemoved(false)))
+
+    Thread.sleep(3000)
+    (0 to 10)
+      .filter(!disablingContainers.contains(_))
+      .foreach(i => {
+        try {
+          containers(i).expectMsg(1.second, GracefulShutdown)
+          disablingContainers += i
+        } catch {
+          case _: Throwable =>
+        }
+      })
+    assert(disablingContainers.size == 10, "more than 3 containers is shutting 
down")
+    disablingContainers.foreach(i => containers(i).send(pool, 
ContainerRemoved(false)))
+
+    Thread.sleep(3000)
+    (0 to 10)
+      .filter(!disablingContainers.contains(_))
+      .foreach(i => {
+        try {
+          containers(i).expectMsg(1.second, GracefulShutdown)
+          disablingContainers += i
+        } catch {
+          case _: Throwable =>
+        }
+      })
+    assert(disablingContainers.size == 11, "unexpected containers is shutting 
down")
+    disablingContainers.foreach(i => containers(i).send(pool, 
ContainerRemoved(false)))
+  }
+
   it should "create prewarmed containers on startup" in within(timeout) {
     stream.reset()
     val (containers, factory) = testContainers(1)
@@ -343,6 +457,7 @@ class FunctionPullingContainerPoolTests
       3,
       false,
       FiniteDuration(10, TimeUnit.SECONDS),
+      10,
       prewarmContainerCreationConfig)
 
     val pool = system.actorOf(
@@ -906,7 +1021,8 @@ class FunctionPullingContainerPoolTests
         100,
         3,
         false,
-        1.second)
+        1.second,
+        10)
     val initialCount = 2
     val pool = system.actorOf(
       Props(
@@ -958,7 +1074,8 @@ class FunctionPullingContainerPoolTests
         100,
         3,
         false,
-        1.second)
+        1.second,
+        10)
     val minCount = 0
     val initialCount = 2
     val maxCount = 4
@@ -1105,7 +1222,8 @@ class FunctionPullingContainerPoolTests
         100,
         maxRetryLimit,
         false,
-        1.second)
+        1.second,
+        10)
     val initialCount = 1
     val pool = system.actorOf(
       Props(
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
index c77932007..570440e1f 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
@@ -115,7 +115,8 @@ class FunctionPullingContainerProxyTests
       100,
       3,
       false,
-      1.second)
+      1.second,
+      10)
 
   val timeoutConfig = ContainerProxyTimeoutConfig(5.seconds, 5.seconds, 
5.seconds)
 

Reply via email to