This is an automated email from the ASF dual-hosted git repository.

bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 625c5f2ef Fix Orphaned Container Edge Case In Paused State of 
Container Proxy (#5326)
625c5f2ef is described below

commit 625c5f2ef64c9cc50da568998199ff623ad01f41
Author: Brendan Doyle <[email protected]>
AuthorDate: Fri Sep 23 09:57:13 2022 -0400

    Fix Orphaned Container Edge Case In Paused State of Container Proxy (#5326)
    
    * fix orphaned container edge case in proxy paused state
    
    * enhance test
    
    * feedback
    
    Co-authored-by: Brendan Doyle <[email protected]>
---
 .../ShardingContainerPoolBalancer.scala            |   2 +-
 .../v2/FunctionPullingContainerProxy.scala         |  50 +++---
 .../test/FunctionPullingContainerProxyTests.scala  | 186 +++++++++++++++++++++
 3 files changed, 218 insertions(+), 20 deletions(-)

diff --git 
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
 
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 20f1581de..5f7b9f05c 100644
--- 
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ 
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -158,7 +158,7 @@ class ShardingContainerPoolBalancer(
     AkkaManagement(actorSystem).start()
     ClusterBootstrap(actorSystem).start()
     Some(Cluster(actorSystem))
-  } else if 
(loadConfigOrThrow[Seq[String]]("akka.cluster.seed-nodes").nonEmpty) {  
+  } else if 
(loadConfigOrThrow[Seq[String]]("akka.cluster.seed-nodes").nonEmpty) {
     Some(Cluster(actorSystem))
   } else {
     None
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
index d290a6bde..eaa043679 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
@@ -87,6 +87,7 @@ case class Initialized(data: InitializedData)
 case class Resumed(data: WarmData)
 case class ResumeFailed(data: WarmData)
 case class RecreateClient(action: ExecutableWhiskAction)
+case class DetermineKeepContainer(attempt: Int)
 
 // States
 sealed trait ProxyState
@@ -661,6 +662,7 @@ class FunctionPullingContainerProxy(
       val parent = context.parent
       cancelTimer(IdleTimeoutName)
       cancelTimer(KeepingTimeoutName)
+      cancelTimer(DetermineKeepContainer.toString)
       data.container
         .resume()
         .map { _ =>
@@ -693,32 +695,43 @@ class FunctionPullingContainerProxy(
           instance,
           data.container.containerId))
       goto(Running)
-
-    case Event(StateTimeout, data: WarmData) =>
-      (for {
-        count <- getLiveContainerCount(data.invocationNamespace, 
data.action.fullyQualifiedName(false), data.revision)
-        (warmedContainerKeepingCount, warmedContainerKeepingTimeout) <- 
getWarmedContainerLimit(
-          data.invocationNamespace)
-      } yield {
-        logging.info(
-          this,
-          s"Live container count: ${count}, warmed container keeping count 
configuration: ${warmedContainerKeepingCount} in namespace: 
${data.invocationNamespace}")
-        if (count <= warmedContainerKeepingCount) {
-          Keep(warmedContainerKeepingTimeout)
-        } else {
-          Remove
-        }
-      }).pipeTo(self)
+    case Event(StateTimeout, _: WarmData) =>
+      self ! DetermineKeepContainer(0)
+      stay
+    case Event(DetermineKeepContainer(attempt), data: WarmData) =>
+      getLiveContainerCount(data.invocationNamespace, 
data.action.fullyQualifiedName(false), data.revision)
+        .flatMap(count => {
+          
getWarmedContainerLimit(data.invocationNamespace).map(warmedContainerInfo => {
+            logging.info(
+              this,
+              s"Live container count: $count, warmed container keeping count 
configuration: ${warmedContainerInfo._1} in namespace: 
${data.invocationNamespace}")
+            if (count <= warmedContainerInfo._1) {
+              self ! Keep(warmedContainerInfo._2)
+            } else {
+              self ! Remove
+            }
+          })
+        })
+        .recover({
+          case t: Throwable =>
+            logging.error(
+              this,
+              s"Failed to determine whether to keep or remove container on 
pause timeout for ${data.container.containerId}, retrying. Caused by: $t")
+            if (attempt < 5) {
+              startSingleTimer(DetermineKeepContainer.toString, 
DetermineKeepContainer(attempt + 1), 500.milli)
+            } else {
+              self ! Remove
+            }
+        })
       stay
-
     case Event(Keep(warmedContainerKeepingTimeout), data: WarmData) =>
       logging.info(
         this,
         s"This is the remaining container for ${data.action}. The container 
will stop after $warmedContainerKeepingTimeout.")
       startSingleTimer(KeepingTimeoutName, Remove, 
warmedContainerKeepingTimeout)
       stay
-
     case Event(Remove | GracefulShutdown, data: WarmData) =>
+      cancelTimer(DetermineKeepContainer.toString)
       dataManagementService ! UnregisterData(
         ContainerKeys.warmedContainers(
           data.invocationNamespace,
@@ -732,7 +745,6 @@ class FunctionPullingContainerProxy(
         data.action.fullyQualifiedName(false),
         data.action.rev,
         Some(data.clientProxy))
-
     case _ => delay
   }
 
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
index 8f6561218..150757b11 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
@@ -291,6 +291,22 @@ class FunctionPullingContainerProxyTests
     Future.successful(count)
   }
 
+  def getLiveContainerCountFail(count: Long) = LoggedFunction { (_: String, _: 
FullyQualifiedEntityName, _: DocRevision) =>
+    Future.failed(new Exception("failure"))
+  }
+
+  def getLiveContainerCountFailFirstCall(count: Long) = {
+    var firstCall = true
+    LoggedFunction { (_: String, _: FullyQualifiedEntityName, _: DocRevision) 
=>
+      if (firstCall) {
+        firstCall = false
+        Future.failed(new Exception("failure"))
+      } else {
+        Future.successful(count)
+      }
+    }
+  }
+
   def getWarmedContainerLimit(limit: Future[(Int, FiniteDuration)]) = 
LoggedFunction { (_: String) =>
     limit
   }
@@ -1036,6 +1052,176 @@ class FunctionPullingContainerProxyTests
     }
   }
 
+  it should "destroy container proxy when stopping due to timeout and getting 
live count fails once" in within(timeout) {
+    val authStore = mock[ArtifactWhiskAuthStore]
+    val namespaceBlacklist: NamespaceBlacklist = new 
NamespaceBlacklist(authStore)
+    val get = getWhiskAction(Future(action.toWhiskAction))
+    val dataManagementService = TestProbe()
+    val container = new TestContainer
+    val factory = createFactory(Future.successful(container))
+    val acker = createAcker()
+    val store = createStore
+    val collector = createCollector()
+    val counter = getLiveContainerCountFailFirstCall(2)
+    val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
+    val (client, clientFactory) = testClient
+
+    val probe = TestProbe()
+    val machine =
+      probe.childActorOf(
+        FunctionPullingContainerProxy
+          .props(
+            factory,
+            entityStore,
+            namespaceBlacklist,
+            get,
+            dataManagementService.ref,
+            clientFactory,
+            acker,
+            store,
+            collector,
+            counter,
+            limit,
+            InvokerInstanceId(0, userMemory = defaultUserMemory),
+            invokerHealthManager.ref,
+            poolConfig,
+            timeoutConfig))
+
+    registerCallback(machine, probe)
+    probe watch machine
+
+    machine ! Initialize(invocationNamespace.asString, fqn, action, 
schedulerHost, rpcPort, messageTransId)
+
+    probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
+    client.expectMsg(StartClient)
+    client.send(machine, ClientCreationCompleted())
+
+    probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
+    expectInitialized(probe)
+    //register running container
+    dataManagementService.expectMsgType[RegisterData]
+
+    client.expectMsg(RequestActivation())
+    client.send(machine, message)
+
+    probe.expectMsg(Transition(machine, ClientCreated, Running))
+    client.expectMsg(ContainerWarmed)
+    client.expectMsgPF() {
+      case RequestActivation(Some(_), None) => true
+    }
+
+    machine ! StateTimeout
+    client.send(machine, RetryRequestActivation)
+    probe.expectMsg(Transition(machine, Running, Pausing))
+    probe.expectMsgType[ContainerIsPaused]
+    probe.expectMsg(Transition(machine, Pausing, Paused))
+    //register paused warmed container
+    dataManagementService.expectMsgType[RegisterData]
+
+    machine ! StateTimeout
+    client.expectMsg(StopClientProxy)
+    dataManagementService.expectMsgType[UnregisterData]
+    probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, Paused, 
Removing))
+    client.send(machine, ClientClosed)
+
+    probe expectTerminated machine
+
+    awaitAssert {
+      factory.calls should have size 1
+      container.initializeCount shouldBe 1
+      container.runCount shouldBe 1
+      collector.calls.length shouldBe 1
+      container.destroyCount shouldBe 1
+      acker.calls.length shouldBe 1
+      store.calls.length shouldBe 1
+    }
+  }
+
+  it should "destroy container proxy when stopping due to timeout and getting 
live count fails permanently" in within(timeout) {
+    val authStore = mock[ArtifactWhiskAuthStore]
+    val namespaceBlacklist: NamespaceBlacklist = new 
NamespaceBlacklist(authStore)
+    val get = getWhiskAction(Future(action.toWhiskAction))
+    val dataManagementService = TestProbe()
+    val container = new TestContainer
+    val factory = createFactory(Future.successful(container))
+    val acker = createAcker()
+    val store = createStore
+    val collector = createCollector()
+    val counter = getLiveContainerCountFail(2)
+    val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
+    val (client, clientFactory) = testClient
+
+    val probe = TestProbe()
+    val machine =
+      probe.childActorOf(
+        FunctionPullingContainerProxy
+          .props(
+            factory,
+            entityStore,
+            namespaceBlacklist,
+            get,
+            dataManagementService.ref,
+            clientFactory,
+            acker,
+            store,
+            collector,
+            counter,
+            limit,
+            InvokerInstanceId(0, userMemory = defaultUserMemory),
+            invokerHealthManager.ref,
+            poolConfig,
+            timeoutConfig))
+
+    registerCallback(machine, probe)
+    probe watch machine
+
+    machine ! Initialize(invocationNamespace.asString, fqn, action, 
schedulerHost, rpcPort, messageTransId)
+
+    probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
+    client.expectMsg(StartClient)
+    client.send(machine, ClientCreationCompleted())
+
+    probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
+    expectInitialized(probe)
+    //register running container
+    dataManagementService.expectMsgType[RegisterData]
+
+    client.expectMsg(RequestActivation())
+    client.send(machine, message)
+
+    probe.expectMsg(Transition(machine, ClientCreated, Running))
+    client.expectMsg(ContainerWarmed)
+    client.expectMsgPF() {
+      case RequestActivation(Some(_), None) => true
+    }
+
+    machine ! StateTimeout
+    client.send(machine, RetryRequestActivation)
+    probe.expectMsg(Transition(machine, Running, Pausing))
+    probe.expectMsgType[ContainerIsPaused]
+    probe.expectMsg(Transition(machine, Pausing, Paused))
+    //register paused warmed container
+    dataManagementService.expectMsgType[RegisterData]
+
+    machine ! StateTimeout
+    client.expectMsg(StopClientProxy)
+    dataManagementService.expectMsgType[UnregisterData]
+    probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, Paused, 
Removing))
+    client.send(machine, ClientClosed)
+
+    probe expectTerminated machine
+
+    awaitAssert {
+      factory.calls should have size 1
+      container.initializeCount shouldBe 1
+      container.runCount shouldBe 1
+      collector.calls.length shouldBe 1
+      container.destroyCount shouldBe 1
+      acker.calls.length shouldBe 1
+      store.calls.length shouldBe 1
+    }
+  }
+
   it should "destroy container proxy even if there is no message from the 
client when stopping due to timeout" in within(
     timeout) {
     val authStore = mock[ArtifactWhiskAuthStore]

Reply via email to