This is an automated email from the ASF dual-hosted git repository.

tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 365938d  cleaning up logic and test issues with ContainerProxy (#4971)
365938d is described below

commit 365938d1199c0b3b53530568ac13ae253bb3b923
Author: tysonnorris <[email protected]>
AuthorDate: Thu Sep 10 11:12:12 2020 -0700

    cleaning up logic and test issues with ContainerProxy (#4971)
    
    * cleaning up logic and test issues with ContainerProxy
---
 .../core/containerpool/ContainerProxy.scala        | 24 ++++++++++++++--------
 .../containerpool/test/ContainerProxyTests.scala   | 20 +++++++++---------
 2 files changed, 25 insertions(+), 19 deletions(-)

diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
index ab26b01..51027e4 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
@@ -335,7 +335,6 @@ class ContainerProxy(factory: (TransactionId,
             // implicitly via a FailureMessage which will be processed later 
when the state
             // transitions to Running
             val activation = ContainerProxy.constructWhiskActivation(job, 
None, Interval.zero, false, response)
-
             sendActiveAck(
               transid,
               activation,
@@ -673,10 +672,11 @@ class ContainerProxy(factory: (TransactionId,
     } else {
       context.parent ! RescheduleJob
     }
-    if (abort && runBuffer.nonEmpty) {
+    val abortProcess = if (abort && runBuffer.nonEmpty) {
       abortBuffered(abortResponse)
     } else {
       rejectBuffered()
+      Future.successful(())
     }
 
     val unpause = stateName match {
@@ -686,6 +686,7 @@ class ContainerProxy(factory: (TransactionId,
 
     unpause
       .flatMap(_ => container.destroy()(TransactionId.invokerNanny))
+      .flatMap(_ => abortProcess)
       .map(_ => ContainerRemoved(replacePrewarm))
       .pipeTo(self)
     if (stateName != Removing) {
@@ -695,9 +696,9 @@ class ContainerProxy(factory: (TransactionId,
     }
   }
 
-  def abortBuffered(abortResponse: Option[ActivationResponse] = None) = {
+  def abortBuffered(abortResponse: Option[ActivationResponse] = None): 
Future[Any] = {
     logging.info(this, s"aborting ${runBuffer.length} queued activations after 
failed init or failed cold start")
-    runBuffer.foreach { job =>
+    val f = runBuffer.flatMap { job =>
       implicit val tid = job.msg.transid
       logging.info(
         this,
@@ -714,14 +715,19 @@ class ContainerProxy(factory: (TransactionId,
       } else {
         CompletionMessage(tid, result, instance)
       }
-      sendActiveAck(tid, result, job.msg.blocking, 
job.msg.rootControllerIndex, job.msg.user.namespace.uuid, msg)
+      val ack =
+        sendActiveAck(tid, result, job.msg.blocking, 
job.msg.rootControllerIndex, job.msg.user.namespace.uuid, msg)
+          .andThen {
+            case Failure(e) => logging.error(this, s"failed to send abort ack 
$e")
+          }
+      val store = storeActivation(tid, result, job.msg.blocking, context)
         .andThen {
-          case Failure(e) => logging.error(this, s"failed to send abort ack 
$e")
+          case Failure(e) => logging.error(this, s"failed to store aborted 
activation $e")
         }
-      storeActivation(tid, result, job.msg.blocking, context).andThen {
-        case Failure(e) => logging.error(this, s"failed to store aborted 
activation $e")
-      }
+      //return both futures
+      Seq(ack, store)
     }
+    Future.sequence(f)
   }
 
   /**
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
index 51285cf..968a5ea 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
@@ -1276,13 +1276,12 @@ class ContainerProxyTests
   it should "terminate buffered concurrent activations when cold init fails to 
launch container" in {
     
assume(Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean))
 
-    val initPromise = Promise[Interval]()
-    val container = new TestContainer(Some(initPromise))
-    val factory = createFactory(Future.failed(new Exception("simulating a 
container creation failure")))
+    val containerPromise = Promise[Container]
+    val factory = createFactory(containerPromise.future)
     val acker = createSyncAcker(concurrentAction)
     val store = createSyncStore
     val collector =
-      createCollector(Future.successful(ActivationLogs()), () => 
container.logs(0.MB, false)(TransactionId.testing))
+      createCollector(Future.successful(ActivationLogs()), () => ())
 
     val machine =
       childActorOf(
@@ -1303,6 +1302,9 @@ class ContainerProxyTests
     machine ! Run(concurrentAction, message) //first in Uninitialized state
     machine ! Run(concurrentAction, message) //second in Uninitialized or 
Running state
 
+    //wait for buffering before failing the container
+    containerPromise.failure(new Exception("simulating a container creation 
failure"))
+
     expectMsg(Transition(machine, Uninitialized, Running))
 
     expectMsg(ContainerRemoved(true))
@@ -1310,11 +1312,6 @@ class ContainerProxyTests
     expectMsg(Transition(machine, Running, Removing))
     awaitAssert {
       factory.calls should have size 1
-      container.initializeCount shouldBe 0
-      container.runCount shouldBe 0
-      container.atomicLogsCount.get() shouldBe 0
-      container.suspendCount shouldBe 0
-      container.resumeCount shouldBe 0
       acker.calls should have size 2
 
       store.calls should have size 2
@@ -1369,6 +1366,9 @@ class ContainerProxyTests
     //second one will succeed
     run(machine, Ready)
 
+    timeout(machine) // times out Ready state so container suspends
+    expectMsg(Transition(machine, Ready, Pausing))
+    expectMsg(Transition(machine, Pausing, Paused))
     //With exception of the error on first run, the assertions should be the 
same as in
     //         `run an action and continue with a next run without pausing the 
container`
     awaitAssert {
@@ -1376,7 +1376,7 @@ class ContainerProxyTests
       container.initializeCount shouldBe 1
       container.runCount shouldBe 2
       collector.calls should have size 2
-      container.suspendCount shouldBe 0
+      container.suspendCount shouldBe 1
       container.destroyCount shouldBe 0
       acker.calls should have size 2
 

Reply via email to