This is an automated email from the ASF dual-hosted git repository.
ningyougang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 80de54e1f Remove containers gradually when disable invoker (#5253)
80de54e1f is described below
commit 80de54e1fce7c900d694ec4427ec8730a0432697
Author: jiangpengcheng <[email protected]>
AuthorDate: Thu Jun 2 11:34:47 2022 +0800
Remove containers gradually when disable invoker (#5253)
---
ansible/roles/invoker/tasks/deploy.yml | 1 +
.../core/containerpool/ContainerFactory.scala | 2 +
core/invoker/src/main/resources/application.conf | 1 +
.../v2/FunctionPullingContainerPool.scala | 40 ++++---
.../containerpool/test/ContainerPoolTests.scala | 14 ++-
.../containerpool/test/ContainerProxyTests.scala | 2 +-
.../test/FunctionPullingContainerPoolTests.scala | 124 ++++++++++++++++++++-
.../test/FunctionPullingContainerProxyTests.scala | 3 +-
8 files changed, 162 insertions(+), 25 deletions(-)
diff --git a/ansible/roles/invoker/tasks/deploy.yml
b/ansible/roles/invoker/tasks/deploy.yml
index 674fab91f..112c721a0 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -287,6 +287,7 @@
"CONFIG_whisk_containerPool_prewarmExpirationCheckIntervalVariance": "{{
container_pool_prewarm_expirationCheckIntervalVariance | default('10 seconds')
}}"
"CONFIG_whisk_containerPool_prewarmPromotion": "{{ container_pool_strict
| default('false') | lower }}"
"CONFIG_whisk_containerPool_prewarmMaxRetryLimit": "{{
container_pool_prewarm_max_retry_limit | default(5) }}"
+ "CONFIG_whisk_containerPool_batchDeletionSize": "{{
container_pool_batchDeletionSize | default(10) }}"
"CONFIG_whisk_invoker_username": "{{ invoker.username }}"
"CONFIG_whisk_invoker_password": "{{ invoker.password }}"
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
index 3b5a6c4ca..e2cba1331 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala
@@ -54,12 +54,14 @@ case class ContainerPoolConfig(userMemory: ByteSize,
prewarmMaxRetryLimit: Int,
prewarmPromotion: Boolean,
memorySyncInterval: FiniteDuration,
+ batchDeletionSize: Int,
prewarmContainerCreationConfig:
Option[PrewarmContainerCreationConfig] = None) {
require(
concurrentPeekFactor > 0 && concurrentPeekFactor <= 1.0,
s"concurrentPeekFactor must be > 0 and <= 1.0; was $concurrentPeekFactor")
require(prewarmExpirationCheckInterval.toSeconds > 0,
"prewarmExpirationCheckInterval must be > 0")
+ require(batchDeletionSize > 0, "batch deletion size must be > 0")
/**
* The shareFactor indicates the number of containers that would share a
single core, on average.
diff --git a/core/invoker/src/main/resources/application.conf
b/core/invoker/src/main/resources/application.conf
index 946b4717e..61a2ec631 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -67,6 +67,7 @@ whisk {
prewarm-max-retry-limit: 5 # max subsequent retry limit to create prewarm
containers
prewarm-promotion: false # if true, action can take prewarm container
which has bigger memory
memory-sync-interval: 1 second # period to sync memory info to etcd
+ batch-deletion-size: 10 # batch size for removing containers when disable
invoker, too big value may cause docker/k8s overload
}
kubernetes {
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala
index 94f7bd2f1..46e84b514 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala
@@ -94,6 +94,9 @@ class FunctionPullingContainerPool(
private var prewarmedPool = immutable.Map.empty[ActorRef, PreWarmData]
private var prewarmStartingPool = immutable.Map.empty[ActorRef, (String,
ByteSize)]
+ // for shutting down
+ private var disablingPool = immutable.Set.empty[ActorRef]
+
private var shuttingDown = false
private val creationMessages = TrieMap[ActorRef, ContainerCreationMessage]()
@@ -353,18 +356,12 @@ class FunctionPullingContainerPool(
// Container got removed
case ContainerRemoved(replacePrewarm) =>
- inProgressPool.get(sender()).foreach { _ =>
- inProgressPool = inProgressPool - sender()
- }
-
- warmedPool.get(sender()).foreach { _ =>
- warmedPool = warmedPool - sender()
- }
+ inProgressPool = inProgressPool - sender()
+ warmedPool = warmedPool - sender()
+ disablingPool -= sender()
// container was busy (busy indicates at full capacity), so there is
capacity to accept another job request
- busyPool.get(sender()).foreach { _ =>
- busyPool = busyPool - sender()
- }
+ busyPool = busyPool - sender()
//in case this was a prewarm
prewarmedPool.get(sender()).foreach { data =>
@@ -601,11 +598,26 @@ class FunctionPullingContainerPool(
* Make all busyPool's memoryQueue actor shutdown gracefully
*/
private def waitForPoolToClear(): Unit = {
- busyPool.keys.foreach(_ ! GracefulShutdown)
- warmedPool.keys.foreach(_ ! GracefulShutdown)
- if (inProgressPool.nonEmpty) {
+ val pool = self
+ // how many busy containers will be removed in this term
+ val slotsForBusyPool = math.max(poolConfig.batchDeletionSize -
disablingPool.size, 0)
+ (busyPool.keySet &~ disablingPool)
+ .take(slotsForBusyPool)
+ .foreach(container => {
+ disablingPool += container
+ container ! GracefulShutdown
+ })
+ // how many warm containers will be removed in this term
+ val slotsForWarmPool = math.max(poolConfig.batchDeletionSize -
disablingPool.size, 0)
+ (warmedPool.keySet &~ disablingPool)
+ .take(slotsForWarmPool)
+ .foreach(container => {
+ disablingPool += container
+ container ! GracefulShutdown
+ })
+ if (inProgressPool.nonEmpty || busyPool.size + warmedPool.size >
slotsForBusyPool + slotsForWarmPool) {
context.system.scheduler.scheduleOnce(5.seconds) {
- waitForPoolToClear()
+ pool ! GracefulShutdown
}
}
}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
index eff4b926d..eccf04cd7 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala
@@ -134,7 +134,7 @@ class ContainerPoolTests
}
def poolConfig(userMemory: ByteSize) =
- ContainerPoolConfig(userMemory, 0.5, false, 2.second, 1.minute, None, 100,
3, false, 1.second)
+ ContainerPoolConfig(userMemory, 0.5, false, 2.second, 1.minute, None, 100,
3, false, 1.second, 10)
behavior of "ContainerPool"
@@ -818,7 +818,8 @@ class ContainerPoolTests
100,
3,
false,
- 1.second)
+ 1.second,
+ 10)
val initialCount = 2
val pool =
system.actorOf(
@@ -864,7 +865,8 @@ class ContainerPoolTests
100,
3,
false,
- 1.second)
+ 1.second,
+ 10)
val minCount = 0
val initialCount = 2
val maxCount = 4
@@ -1237,7 +1239,7 @@ class ContainerPoolObjectTests extends FlatSpec with
Matchers with MockFactory {
}
it should "remove expired in order of expiration" in {
- val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second,
10.seconds, None, 1, 3, false, 1.second)
+ val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second,
10.seconds, None, 1, 3, false, 1.second, 10)
val exec = CodeExecAsString(RuntimeManifest("actionKind",
ImageName("testImage")), "testCode", None)
//use a second kind so that we know sorting is not isolated to the expired
of each kind
val exec2 = CodeExecAsString(RuntimeManifest("actionKind2",
ImageName("testImage")), "testCode", None)
@@ -1261,7 +1263,7 @@ class ContainerPoolObjectTests extends FlatSpec with
Matchers with MockFactory {
it should "remove only the prewarmExpirationLimit of expired prewarms" in {
//limit prewarm removal to 2
- val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second,
10.seconds, None, 2, 3, false, 1.second)
+ val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second,
10.seconds, None, 2, 3, false, 1.second, 10)
val exec = CodeExecAsString(RuntimeManifest("actionKind",
ImageName("testImage")), "testCode", None)
val memoryLimit = 256.MB
val prewarmConfig =
@@ -1287,7 +1289,7 @@ class ContainerPoolObjectTests extends FlatSpec with
Matchers with MockFactory {
it should "remove only the expired prewarms regardless of minCount" in {
//limit prewarm removal to 100
- val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second,
10.seconds, None, 100, 3, false, 1.second)
+ val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second,
10.seconds, None, 100, 3, false, 1.second, 10)
val exec = CodeExecAsString(RuntimeManifest("actionKind",
ImageName("testImage")), "testCode", None)
val memoryLimit = 256.MB
//minCount is 2 - should leave at least 2 prewarms when removing expired
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
index 87aeaf19e..193f9f66a 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
@@ -276,7 +276,7 @@ class ContainerProxyTests
(transid: TransactionId, activation: WhiskActivation,
isBlockingActivation: Boolean, context: UserContext) =>
Future.successful(())
}
- val poolConfig = ContainerPoolConfig(2.MB, 0.5, false, 2.second, 1.minute,
None, 100, 3, false, 1.second)
+ val poolConfig = ContainerPoolConfig(2.MB, 0.5, false, 2.second, 1.minute,
None, 100, 3, false, 1.second, 10)
def healthchecksConfig(enabled: Boolean = false) =
ContainerProxyHealthCheckConfig(enabled, 100.milliseconds, 2)
val filterEnvVar = (k: String) => Character.isUpperCase(k.charAt(0))
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
index 99fc09f92..ca6c26061 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala
@@ -180,6 +180,7 @@ class FunctionPullingContainerPoolTests
memorySyncInterval: FiniteDuration = FiniteDuration(1,
TimeUnit.SECONDS),
prewarmMaxRetryLimit: Int = 3,
prewarmPromotion: Boolean = false,
+ batchDeletionSize: Int = 10,
prewarmContainerCreationConfig:
Option[PrewarmContainerCreationConfig] = None) =
ContainerPoolConfig(
userMemory,
@@ -192,6 +193,7 @@ class FunctionPullingContainerPoolTests
prewarmMaxRetryLimit,
prewarmPromotion,
memorySyncInterval,
+ batchDeletionSize,
prewarmContainerCreationConfig)
def sendAckToScheduler(producer: MessageProducer)(schedulerInstanceId:
SchedulerInstanceId,
@@ -309,6 +311,118 @@ class FunctionPullingContainerPoolTests
}
}
+ it should "stop containers gradually when shut down" in within(timeout * 20)
{
+ val (containers, factory) = testContainers(10)
+ val doc = put(entityStore, bigWhiskAction)
+ val topic = s"creationAck${schedulerInstanceId.asString}"
+ val consumer = new TestConnector(topic, 4, true)
+ val pool = system.actorOf(
+ Props(new FunctionPullingContainerPool(
+ factory,
+ invokerHealthService.ref,
+ poolConfig(MemoryLimit.STD_MEMORY * 20, batchDeletionSize = 3),
+ invokerInstance,
+ List.empty,
+ sendAckToScheduler(consumer.getProducer()))))
+
+ (0 to 10).foreach(_ => pool !
CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)) // 11
* stdMemory taken)
+ (0 to 10).foreach(i => {
+ containers(i).expectMsgPF() {
+ case Initialize(invocationNamespace, executeAction, schedulerHost,
rpcPort, _) => true
+ }
+ // create 5 container in busy pool, and 6 in warmed pool
+ if (i < 5)
+ containers(i).send(pool, Initialized(initializedData)) // container is
initialized
+ else
+ containers(i).send(
+ pool,
+ ContainerIsPaused(
+ WarmData(
+ stub[DockerContainer],
+ invocationNamespace.asString,
+ whiskAction.toExecutableWhiskAction.get,
+ doc.rev,
+ Instant.now,
+ TestProbe().ref)))
+ })
+
+ // disable
+ pool ! GracefulShutdown
+ // at first, 3 containers will be removed from busy pool, and left
containers will not
+ var disablingContainers = Set.empty[Int]
+ (0 to 10).foreach(i => {
+ try {
+ containers(i).expectMsg(1.second, GracefulShutdown)
+ disablingContainers += i
+ } catch {
+ case _: Throwable =>
+ }
+ })
+ assert(disablingContainers.size == 3, "more than 3 containers is shutting
down")
+ disablingContainers.foreach(i => containers(i).send(pool,
ContainerRemoved(false)))
+
+ Thread.sleep(3000)
+ var completedContainer = -1
+ (0 to 10)
+ .filter(!disablingContainers.contains(_))
+ .foreach(i => {
+ try {
+ containers(i).expectMsg(1.second, GracefulShutdown)
+ disablingContainers += i
+ // only make one container complete shutting down
+ if (completedContainer == -1)
+ completedContainer = i
+ } catch {
+ case _: Throwable =>
+ }
+ })
+ assert(disablingContainers.size == 6, "more than 3 containers is shutting
down")
+ containers(completedContainer).send(pool, ContainerRemoved(false))
+
+ Thread.sleep(3000)
+ (0 to 10)
+ .filter(!disablingContainers.contains(_))
+ .foreach(i => {
+ try {
+ containers(i).expectMsg(1.second, GracefulShutdown)
+ disablingContainers += i
+ } catch {
+ case _: Throwable =>
+ }
+ })
+ // there should be only one more container going to shut down
+ assert(disablingContainers.size == 7, "more than 3 containers is shutting
down")
+ disablingContainers.foreach(i => containers(i).send(pool,
ContainerRemoved(false)))
+
+ Thread.sleep(3000)
+ (0 to 10)
+ .filter(!disablingContainers.contains(_))
+ .foreach(i => {
+ try {
+ containers(i).expectMsg(1.second, GracefulShutdown)
+ disablingContainers += i
+ } catch {
+ case _: Throwable =>
+ }
+ })
+ assert(disablingContainers.size == 10, "more than 3 containers is shutting
down")
+ disablingContainers.foreach(i => containers(i).send(pool,
ContainerRemoved(false)))
+
+ Thread.sleep(3000)
+ (0 to 10)
+ .filter(!disablingContainers.contains(_))
+ .foreach(i => {
+ try {
+ containers(i).expectMsg(1.second, GracefulShutdown)
+ disablingContainers += i
+ } catch {
+ case _: Throwable =>
+ }
+ })
+ assert(disablingContainers.size == 11, "unexpected containers is shutting
down")
+ disablingContainers.foreach(i => containers(i).send(pool,
ContainerRemoved(false)))
+ }
+
it should "create prewarmed containers on startup" in within(timeout) {
stream.reset()
val (containers, factory) = testContainers(1)
@@ -343,6 +457,7 @@ class FunctionPullingContainerPoolTests
3,
false,
FiniteDuration(10, TimeUnit.SECONDS),
+ 10,
prewarmContainerCreationConfig)
val pool = system.actorOf(
@@ -906,7 +1021,8 @@ class FunctionPullingContainerPoolTests
100,
3,
false,
- 1.second)
+ 1.second,
+ 10)
val initialCount = 2
val pool = system.actorOf(
Props(
@@ -958,7 +1074,8 @@ class FunctionPullingContainerPoolTests
100,
3,
false,
- 1.second)
+ 1.second,
+ 10)
val minCount = 0
val initialCount = 2
val maxCount = 4
@@ -1105,7 +1222,8 @@ class FunctionPullingContainerPoolTests
100,
maxRetryLimit,
false,
- 1.second)
+ 1.second,
+ 10)
val initialCount = 1
val pool = system.actorOf(
Props(
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
index c77932007..570440e1f 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
@@ -115,7 +115,8 @@ class FunctionPullingContainerProxyTests
100,
3,
false,
- 1.second)
+ 1.second,
+ 10)
val timeoutConfig = ContainerProxyTimeoutConfig(5.seconds, 5.seconds,
5.seconds)