This is an automated email from the ASF dual-hosted git repository.
tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 365938d cleaning up logic and test issues with ContainerProxy (#4971)
365938d is described below
commit 365938d1199c0b3b53530568ac13ae253bb3b923
Author: tysonnorris <[email protected]>
AuthorDate: Thu Sep 10 11:12:12 2020 -0700
cleaning up logic and test issues with ContainerProxy (#4971)
* cleaning up logic and test issues with ContainerProxy
---
.../core/containerpool/ContainerProxy.scala | 24 ++++++++++++++--------
.../containerpool/test/ContainerProxyTests.scala | 20 +++++++++---------
2 files changed, 25 insertions(+), 19 deletions(-)
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
index ab26b01..51027e4 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
@@ -335,7 +335,6 @@ class ContainerProxy(factory: (TransactionId,
// implicitly via a FailureMessage which will be processed later
when the state
// transitions to Running
val activation = ContainerProxy.constructWhiskActivation(job,
None, Interval.zero, false, response)
-
sendActiveAck(
transid,
activation,
@@ -673,10 +672,11 @@ class ContainerProxy(factory: (TransactionId,
} else {
context.parent ! RescheduleJob
}
- if (abort && runBuffer.nonEmpty) {
+ val abortProcess = if (abort && runBuffer.nonEmpty) {
abortBuffered(abortResponse)
} else {
rejectBuffered()
+ Future.successful(())
}
val unpause = stateName match {
@@ -686,6 +686,7 @@ class ContainerProxy(factory: (TransactionId,
unpause
.flatMap(_ => container.destroy()(TransactionId.invokerNanny))
+ .flatMap(_ => abortProcess)
.map(_ => ContainerRemoved(replacePrewarm))
.pipeTo(self)
if (stateName != Removing) {
@@ -695,9 +696,9 @@ class ContainerProxy(factory: (TransactionId,
}
}
- def abortBuffered(abortResponse: Option[ActivationResponse] = None) = {
+ def abortBuffered(abortResponse: Option[ActivationResponse] = None):
Future[Any] = {
logging.info(this, s"aborting ${runBuffer.length} queued activations after
failed init or failed cold start")
- runBuffer.foreach { job =>
+ val f = runBuffer.flatMap { job =>
implicit val tid = job.msg.transid
logging.info(
this,
@@ -714,14 +715,19 @@ class ContainerProxy(factory: (TransactionId,
} else {
CompletionMessage(tid, result, instance)
}
- sendActiveAck(tid, result, job.msg.blocking,
job.msg.rootControllerIndex, job.msg.user.namespace.uuid, msg)
+ val ack =
+ sendActiveAck(tid, result, job.msg.blocking,
job.msg.rootControllerIndex, job.msg.user.namespace.uuid, msg)
+ .andThen {
+ case Failure(e) => logging.error(this, s"failed to send abort ack
$e")
+ }
+ val store = storeActivation(tid, result, job.msg.blocking, context)
.andThen {
- case Failure(e) => logging.error(this, s"failed to send abort ack
$e")
+ case Failure(e) => logging.error(this, s"failed to store aborted
activation $e")
}
- storeActivation(tid, result, job.msg.blocking, context).andThen {
- case Failure(e) => logging.error(this, s"failed to store aborted
activation $e")
- }
+ //return both futures
+ Seq(ack, store)
}
+ Future.sequence(f)
}
/**
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
index 51285cf..968a5ea 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
@@ -1276,13 +1276,12 @@ class ContainerProxyTests
it should "terminate buffered concurrent activations when cold init fails to
launch container" in {
assume(Option(WhiskProperties.getProperty("whisk.action.concurrency")).exists(_.toBoolean))
- val initPromise = Promise[Interval]()
- val container = new TestContainer(Some(initPromise))
- val factory = createFactory(Future.failed(new Exception("simulating a
container creation failure")))
+ val containerPromise = Promise[Container]
+ val factory = createFactory(containerPromise.future)
val acker = createSyncAcker(concurrentAction)
val store = createSyncStore
val collector =
- createCollector(Future.successful(ActivationLogs()), () =>
container.logs(0.MB, false)(TransactionId.testing))
+ createCollector(Future.successful(ActivationLogs()), () => ())
val machine =
childActorOf(
@@ -1303,6 +1302,9 @@ class ContainerProxyTests
machine ! Run(concurrentAction, message) //first in Uninitialized state
machine ! Run(concurrentAction, message) //second in Uninitialized or
Running state
+ //wait for buffering before failing the container
+ containerPromise.failure(new Exception("simulating a container creation
failure"))
+
expectMsg(Transition(machine, Uninitialized, Running))
expectMsg(ContainerRemoved(true))
@@ -1310,11 +1312,6 @@ class ContainerProxyTests
expectMsg(Transition(machine, Running, Removing))
awaitAssert {
factory.calls should have size 1
- container.initializeCount shouldBe 0
- container.runCount shouldBe 0
- container.atomicLogsCount.get() shouldBe 0
- container.suspendCount shouldBe 0
- container.resumeCount shouldBe 0
acker.calls should have size 2
store.calls should have size 2
@@ -1369,6 +1366,9 @@ class ContainerProxyTests
//second one will succeed
run(machine, Ready)
+ timeout(machine) // times out Ready state so container suspends
+ expectMsg(Transition(machine, Ready, Pausing))
+ expectMsg(Transition(machine, Pausing, Paused))
//With exception of the error on first run, the assertions should be the
same as in
// `run an action and continue with a next run without pausing the
container`
awaitAssert {
@@ -1376,7 +1376,7 @@ class ContainerProxyTests
container.initializeCount shouldBe 1
container.runCount shouldBe 2
collector.calls should have size 2
- container.suspendCount shouldBe 0
+ container.suspendCount shouldBe 1
container.destroyCount shouldBe 0
acker.calls should have size 2