This is an automated email from the ASF dual-hosted git repository.
bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 625c5f2ef Fix Orphaned Container Edge Case In Paused State of
Container Proxy (#5326)
625c5f2ef is described below
commit 625c5f2ef64c9cc50da568998199ff623ad01f41
Author: Brendan Doyle <[email protected]>
AuthorDate: Fri Sep 23 09:57:13 2022 -0400
Fix Orphaned Container Edge Case In Paused State of Container Proxy (#5326)
* fix orphaned container edge case in proxy paused state
* enhance test
* feedback
Co-authored-by: Brendan Doyle <[email protected]>
---
.../ShardingContainerPoolBalancer.scala | 2 +-
.../v2/FunctionPullingContainerProxy.scala | 50 +++---
.../test/FunctionPullingContainerProxyTests.scala | 186 +++++++++++++++++++++
3 files changed, 218 insertions(+), 20 deletions(-)
diff --git
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 20f1581de..5f7b9f05c 100644
---
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -158,7 +158,7 @@ class ShardingContainerPoolBalancer(
AkkaManagement(actorSystem).start()
ClusterBootstrap(actorSystem).start()
Some(Cluster(actorSystem))
- } else if
(loadConfigOrThrow[Seq[String]]("akka.cluster.seed-nodes").nonEmpty) {
+ } else if
(loadConfigOrThrow[Seq[String]]("akka.cluster.seed-nodes").nonEmpty) {
Some(Cluster(actorSystem))
} else {
None
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
index d290a6bde..eaa043679 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
@@ -87,6 +87,7 @@ case class Initialized(data: InitializedData)
case class Resumed(data: WarmData)
case class ResumeFailed(data: WarmData)
case class RecreateClient(action: ExecutableWhiskAction)
+case class DetermineKeepContainer(attempt: Int)
// States
sealed trait ProxyState
@@ -661,6 +662,7 @@ class FunctionPullingContainerProxy(
val parent = context.parent
cancelTimer(IdleTimeoutName)
cancelTimer(KeepingTimeoutName)
+ cancelTimer(DetermineKeepContainer.toString)
data.container
.resume()
.map { _ =>
@@ -693,32 +695,43 @@ class FunctionPullingContainerProxy(
instance,
data.container.containerId))
goto(Running)
-
- case Event(StateTimeout, data: WarmData) =>
- (for {
- count <- getLiveContainerCount(data.invocationNamespace,
data.action.fullyQualifiedName(false), data.revision)
- (warmedContainerKeepingCount, warmedContainerKeepingTimeout) <-
getWarmedContainerLimit(
- data.invocationNamespace)
- } yield {
- logging.info(
- this,
- s"Live container count: ${count}, warmed container keeping count
configuration: ${warmedContainerKeepingCount} in namespace:
${data.invocationNamespace}")
- if (count <= warmedContainerKeepingCount) {
- Keep(warmedContainerKeepingTimeout)
- } else {
- Remove
- }
- }).pipeTo(self)
+ case Event(StateTimeout, _: WarmData) =>
+ self ! DetermineKeepContainer(0)
+ stay
+ case Event(DetermineKeepContainer(attempt), data: WarmData) =>
+ getLiveContainerCount(data.invocationNamespace,
data.action.fullyQualifiedName(false), data.revision)
+ .flatMap(count => {
+
getWarmedContainerLimit(data.invocationNamespace).map(warmedContainerInfo => {
+ logging.info(
+ this,
+ s"Live container count: $count, warmed container keeping count
configuration: ${warmedContainerInfo._1} in namespace:
${data.invocationNamespace}")
+ if (count <= warmedContainerInfo._1) {
+ self ! Keep(warmedContainerInfo._2)
+ } else {
+ self ! Remove
+ }
+ })
+ })
+ .recover({
+ case t: Throwable =>
+ logging.error(
+ this,
+ s"Failed to determine whether to keep or remove container on
pause timeout for ${data.container.containerId}, retrying. Caused by: $t")
+ if (attempt < 5) {
+ startSingleTimer(DetermineKeepContainer.toString,
DetermineKeepContainer(attempt + 1), 500.milli)
+ } else {
+ self ! Remove
+ }
+ })
stay
-
case Event(Keep(warmedContainerKeepingTimeout), data: WarmData) =>
logging.info(
this,
s"This is the remaining container for ${data.action}. The container
will stop after $warmedContainerKeepingTimeout.")
startSingleTimer(KeepingTimeoutName, Remove,
warmedContainerKeepingTimeout)
stay
-
case Event(Remove | GracefulShutdown, data: WarmData) =>
+ cancelTimer(DetermineKeepContainer.toString)
dataManagementService ! UnregisterData(
ContainerKeys.warmedContainers(
data.invocationNamespace,
@@ -732,7 +745,6 @@ class FunctionPullingContainerProxy(
data.action.fullyQualifiedName(false),
data.action.rev,
Some(data.clientProxy))
-
case _ => delay
}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
index 8f6561218..150757b11 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
@@ -291,6 +291,22 @@ class FunctionPullingContainerProxyTests
Future.successful(count)
}
+ def getLiveContainerCountFail(count: Long) = LoggedFunction { (_: String, _:
FullyQualifiedEntityName, _: DocRevision) =>
+ Future.failed(new Exception("failure"))
+ }
+
+ def getLiveContainerCountFailFirstCall(count: Long) = {
+ var firstCall = true
+ LoggedFunction { (_: String, _: FullyQualifiedEntityName, _: DocRevision)
=>
+ if (firstCall) {
+ firstCall = false
+ Future.failed(new Exception("failure"))
+ } else {
+ Future.successful(count)
+ }
+ }
+ }
+
def getWarmedContainerLimit(limit: Future[(Int, FiniteDuration)]) =
LoggedFunction { (_: String) =>
limit
}
@@ -1036,6 +1052,176 @@ class FunctionPullingContainerProxyTests
}
}
+ it should "destroy container proxy when stopping due to timeout and getting
live count fails once" in within(timeout) {
+ val authStore = mock[ArtifactWhiskAuthStore]
+ val namespaceBlacklist: NamespaceBlacklist = new
NamespaceBlacklist(authStore)
+ val get = getWhiskAction(Future(action.toWhiskAction))
+ val dataManagementService = TestProbe()
+ val container = new TestContainer
+ val factory = createFactory(Future.successful(container))
+ val acker = createAcker()
+ val store = createStore
+ val collector = createCollector()
+ val counter = getLiveContainerCountFailFirstCall(2)
+ val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
+ val (client, clientFactory) = testClient
+
+ val probe = TestProbe()
+ val machine =
+ probe.childActorOf(
+ FunctionPullingContainerProxy
+ .props(
+ factory,
+ entityStore,
+ namespaceBlacklist,
+ get,
+ dataManagementService.ref,
+ clientFactory,
+ acker,
+ store,
+ collector,
+ counter,
+ limit,
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ invokerHealthManager.ref,
+ poolConfig,
+ timeoutConfig))
+
+ registerCallback(machine, probe)
+ probe watch machine
+
+ machine ! Initialize(invocationNamespace.asString, fqn, action,
schedulerHost, rpcPort, messageTransId)
+
+ probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
+ client.expectMsg(StartClient)
+ client.send(machine, ClientCreationCompleted())
+
+ probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
+ expectInitialized(probe)
+ //register running container
+ dataManagementService.expectMsgType[RegisterData]
+
+ client.expectMsg(RequestActivation())
+ client.send(machine, message)
+
+ probe.expectMsg(Transition(machine, ClientCreated, Running))
+ client.expectMsg(ContainerWarmed)
+ client.expectMsgPF() {
+ case RequestActivation(Some(_), None) => true
+ }
+
+ machine ! StateTimeout
+ client.send(machine, RetryRequestActivation)
+ probe.expectMsg(Transition(machine, Running, Pausing))
+ probe.expectMsgType[ContainerIsPaused]
+ probe.expectMsg(Transition(machine, Pausing, Paused))
+ //register paused warmed container
+ dataManagementService.expectMsgType[RegisterData]
+
+ machine ! StateTimeout
+ client.expectMsg(StopClientProxy)
+ dataManagementService.expectMsgType[UnregisterData]
+ probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, Paused,
Removing))
+ client.send(machine, ClientClosed)
+
+ probe expectTerminated machine
+
+ awaitAssert {
+ factory.calls should have size 1
+ container.initializeCount shouldBe 1
+ container.runCount shouldBe 1
+ collector.calls.length shouldBe 1
+ container.destroyCount shouldBe 1
+ acker.calls.length shouldBe 1
+ store.calls.length shouldBe 1
+ }
+ }
+
+ it should "destroy container proxy when stopping due to timeout and getting
live count fails permanently" in within(timeout) {
+ val authStore = mock[ArtifactWhiskAuthStore]
+ val namespaceBlacklist: NamespaceBlacklist = new
NamespaceBlacklist(authStore)
+ val get = getWhiskAction(Future(action.toWhiskAction))
+ val dataManagementService = TestProbe()
+ val container = new TestContainer
+ val factory = createFactory(Future.successful(container))
+ val acker = createAcker()
+ val store = createStore
+ val collector = createCollector()
+ val counter = getLiveContainerCountFail(2)
+ val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
+ val (client, clientFactory) = testClient
+
+ val probe = TestProbe()
+ val machine =
+ probe.childActorOf(
+ FunctionPullingContainerProxy
+ .props(
+ factory,
+ entityStore,
+ namespaceBlacklist,
+ get,
+ dataManagementService.ref,
+ clientFactory,
+ acker,
+ store,
+ collector,
+ counter,
+ limit,
+ InvokerInstanceId(0, userMemory = defaultUserMemory),
+ invokerHealthManager.ref,
+ poolConfig,
+ timeoutConfig))
+
+ registerCallback(machine, probe)
+ probe watch machine
+
+ machine ! Initialize(invocationNamespace.asString, fqn, action,
schedulerHost, rpcPort, messageTransId)
+
+ probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
+ client.expectMsg(StartClient)
+ client.send(machine, ClientCreationCompleted())
+
+ probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
+ expectInitialized(probe)
+ //register running container
+ dataManagementService.expectMsgType[RegisterData]
+
+ client.expectMsg(RequestActivation())
+ client.send(machine, message)
+
+ probe.expectMsg(Transition(machine, ClientCreated, Running))
+ client.expectMsg(ContainerWarmed)
+ client.expectMsgPF() {
+ case RequestActivation(Some(_), None) => true
+ }
+
+ machine ! StateTimeout
+ client.send(machine, RetryRequestActivation)
+ probe.expectMsg(Transition(machine, Running, Pausing))
+ probe.expectMsgType[ContainerIsPaused]
+ probe.expectMsg(Transition(machine, Pausing, Paused))
+ //register paused warmed container
+ dataManagementService.expectMsgType[RegisterData]
+
+ machine ! StateTimeout
+ client.expectMsg(StopClientProxy)
+ dataManagementService.expectMsgType[UnregisterData]
+ probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, Paused,
Removing))
+ client.send(machine, ClientClosed)
+
+ probe expectTerminated machine
+
+ awaitAssert {
+ factory.calls should have size 1
+ container.initializeCount shouldBe 1
+ container.runCount shouldBe 1
+ collector.calls.length shouldBe 1
+ container.destroyCount shouldBe 1
+ acker.calls.length shouldBe 1
+ store.calls.length shouldBe 1
+ }
+ }
+
it should "destroy container proxy even if there is no message from the
client when stopping due to timeout" in within(
timeout) {
val authStore = mock[ArtifactWhiskAuthStore]